/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.edc.connector.controlplane.services.transferprocess;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.time.Clock;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.eclipse.edc.connector.controlplane.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.controlplane.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.controlplane.contract.spi.validation.ContractValidationService;
import org.eclipse.edc.connector.controlplane.services.spi.protocol.ProtocolTokenValidator;
import org.eclipse.edc.connector.controlplane.services.spi.transferprocess.TransferProcessProtocolService;
import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessObservable;
import org.eclipse.edc.connector.controlplane.transfer.spi.observe.TransferProcessStartedData;
import org.eclipse.edc.connector.controlplane.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.TransferProcessStates;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferCompletionMessage;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferRemoteMessage;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferRequestMessage;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferStartMessage;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferSuspensionMessage;
import org.eclipse.edc.connector.controlplane.transfer.spi.types.protocol.TransferTerminationMessage;
import org.eclipse.edc.participant.spi.ParticipantAgent;
import org.eclipse.edc.policy.context.request.spi.RequestTransferProcessPolicyContext;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.result.ServiceResult;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;
import org.eclipse.edc.transaction.spi.TransactionContext;
import org.eclipse.edc.validator.spi.DataAddressValidatorRegistry;
import org.eclipse.edc.validator.spi.ValidationResult;
import org.jetbrains.annotations.NotNull;

public class TransferProcessProtocolServiceImpl
implements TransferProcessProtocolService {
    private final TransferProcessStore transferProcessStore;
    private final TransactionContext transactionContext;
    private final ContractNegotiationStore negotiationStore;
    private final ContractValidationService contractValidationService;
    private final DataAddressValidatorRegistry dataAddressValidator;
    private final TransferProcessObservable observable;
    private final ProtocolTokenValidator protocolTokenValidator;
    private final Clock clock;
    private final Monitor monitor;
    private final Telemetry telemetry;

    public TransferProcessProtocolServiceImpl(TransferProcessStore transferProcessStore, TransactionContext transactionContext, ContractNegotiationStore negotiationStore, ContractValidationService contractValidationService, ProtocolTokenValidator protocolTokenValidator, DataAddressValidatorRegistry dataAddressValidator, TransferProcessObservable observable, Clock clock, Monitor monitor, Telemetry telemetry) {
        this.transferProcessStore = transferProcessStore;
        this.transactionContext = transactionContext;
        this.negotiationStore = negotiationStore;
        this.contractValidationService = contractValidationService;
        this.protocolTokenValidator = protocolTokenValidator;
        this.dataAddressValidator = dataAddressValidator;
        this.observable = observable;
        this.clock = clock;
        this.monitor = monitor;
        this.telemetry = telemetry;
    }

    @WithSpan
    @NotNull
    public ServiceResult<TransferProcess> notifyRequested(TransferRequestMessage message, TokenRepresentation tokenRepresentation) {
        return (ServiceResult)this.transactionContext.execute(() -> (ServiceResult)((ServiceResult)((ServiceResult)((ServiceResult)this.fetchNotifyRequestContext(message).compose(context -> this.verifyRequest(tokenRepresentation, (TransferRequestMessageContext)context, (RemoteMessage)message))).compose(context -> this.validateDestination(message, (ClaimTokenContext)context))).compose(context -> this.validateAgreement((TransferRemoteMessage)message, (ClaimTokenContext)context))).compose(context -> this.requestedAction(message, context.agreement().getAssetId())));
    }

    @WithSpan
    @NotNull
    public ServiceResult<TransferProcess> notifyStarted(TransferStartMessage message, TokenRepresentation tokenRepresentation) {
        return (ServiceResult)this.transactionContext.execute(() -> (ServiceResult)((ServiceResult)this.fetchRequestContext(message, this::findTransferProcess).compose(context -> this.verifyRequest(tokenRepresentation, (TransferRequestMessageContext)context, (RemoteMessage)message))).compose(context -> this.onMessageDo((TransferRemoteMessage)message, context.participantAgent(), context.agreement(), transferProcess -> this.startedAction(message, (TransferProcess)transferProcess))));
    }

    @WithSpan
    @NotNull
    public ServiceResult<TransferProcess> notifyCompleted(TransferCompletionMessage message, TokenRepresentation tokenRepresentation) {
        return (ServiceResult)this.transactionContext.execute(() -> (ServiceResult)((ServiceResult)this.fetchRequestContext(message, this::findTransferProcess).compose(context -> this.verifyRequest(tokenRepresentation, (TransferRequestMessageContext)context, (RemoteMessage)message))).compose(context -> this.onMessageDo((TransferRemoteMessage)message, context.participantAgent(), context.agreement(), transferProcess -> this.completedAction(message, (TransferProcess)transferProcess))));
    }

    @NotNull
    public ServiceResult<TransferProcess> notifySuspended(TransferSuspensionMessage message, TokenRepresentation tokenRepresentation) {
        return (ServiceResult)this.transactionContext.execute(() -> (ServiceResult)((ServiceResult)this.fetchRequestContext(message, this::findTransferProcess).compose(context -> this.verifyRequest(tokenRepresentation, (TransferRequestMessageContext)context, (RemoteMessage)message))).compose(context -> this.onMessageDo((TransferRemoteMessage)message, context.participantAgent(), context.agreement(), transferProcess -> this.suspendedAction(message, (TransferProcess)transferProcess))));
    }

    @WithSpan
    @NotNull
    public ServiceResult<TransferProcess> notifyTerminated(TransferTerminationMessage message, TokenRepresentation tokenRepresentation) {
        return (ServiceResult)this.transactionContext.execute(() -> (ServiceResult)((ServiceResult)this.fetchRequestContext(message, this::findTransferProcess).compose(context -> this.verifyRequest(tokenRepresentation, (TransferRequestMessageContext)context, (RemoteMessage)message))).compose(context -> this.onMessageDo((TransferRemoteMessage)message, context.participantAgent(), context.agreement(), transferProcess -> this.terminatedAction(message, (TransferProcess)transferProcess))));
    }

    @WithSpan
    @NotNull
    public ServiceResult<TransferProcess> findById(String id, TokenRepresentation tokenRepresentation) {
        return (ServiceResult)this.transactionContext.execute(() -> (ServiceResult)((ServiceResult)this.fetchRequestContext(id, this::findTransferProcessById).compose(context -> this.verifyRequest(tokenRepresentation, (TransferRequestMessageContext)context, null))).compose(context -> this.validateCounterParty(context.participantAgent(), context.agreement(), context.transferProcess())));
    }

    @NotNull
    private ServiceResult<TransferProcess> requestedAction(TransferRequestMessage message, String assetId) {
        TransferProcess existingTransferProcess = this.transferProcessStore.findForCorrelationId(message.getConsumerPid());
        if (existingTransferProcess != null) {
            return ServiceResult.success((Object)existingTransferProcess);
        }
        TransferProcess process = ((TransferProcess.Builder)((TransferProcess.Builder)((TransferProcess.Builder)TransferProcess.Builder.newInstance().id(UUID.randomUUID().toString())).protocol(message.getProtocol()).correlationId(message.getConsumerPid()).counterPartyAddress(message.getCallbackAddress()).dataDestination(message.getDataDestination()).assetId(assetId).contractId(message.getContractId()).transferType(message.getTransferType()).type(TransferProcess.Type.PROVIDER).clock(this.clock)).traceContext(this.telemetry.getCurrentTraceContext())).build();
        this.observable.invokeForEach(l -> l.preCreated(process));
        process.protocolMessageReceived(message.getId());
        this.update(process);
        this.observable.invokeForEach(l -> l.initiated(process));
        return ServiceResult.success((Object)process);
    }

    @NotNull
    private ServiceResult<TransferProcess> startedAction(TransferStartMessage message, TransferProcess transferProcess) {
        if (transferProcess.getType() == TransferProcess.Type.CONSUMER && transferProcess.canBeStartedConsumer()) {
            this.observable.invokeForEach(l -> l.preStarted(transferProcess));
            transferProcess.protocolMessageReceived(message.getId());
            transferProcess.transitionStarted(transferProcess.getDataPlaneId());
            this.update(transferProcess);
            TransferProcessStartedData transferStartedData = TransferProcessStartedData.Builder.newInstance().dataAddress(message.getDataAddress()).build();
            this.observable.invokeForEach(l -> l.started(transferProcess, transferStartedData));
            return ServiceResult.success((Object)transferProcess);
        }
        if (transferProcess.getType() == TransferProcess.Type.PROVIDER && transferProcess.currentStateIsOneOf(new TransferProcessStates[]{TransferProcessStates.SUSPENDED})) {
            transferProcess.protocolMessageReceived(message.getId());
            transferProcess.transitionStarting();
            this.update(transferProcess);
            return ServiceResult.success((Object)transferProcess);
        }
        return ServiceResult.conflict((String)String.format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be started"));
    }

    @NotNull
    private ServiceResult<TransferProcess> completedAction(TransferCompletionMessage message, TransferProcess transferProcess) {
        if (transferProcess.canBeCompleted()) {
            this.observable.invokeForEach(l -> l.preCompleted(transferProcess));
            transferProcess.protocolMessageReceived(message.getId());
            transferProcess.transitionCompletingRequested();
            this.update(transferProcess);
            this.observable.invokeForEach(l -> l.completed(transferProcess));
            return ServiceResult.success((Object)transferProcess);
        }
        return ServiceResult.conflict((String)String.format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be completed"));
    }

    @NotNull
    private ServiceResult<TransferProcess> suspendedAction(TransferSuspensionMessage message, TransferProcess transferProcess) {
        if (transferProcess.canBeSuspended()) {
            String reason = message.getReason().stream().map(Object::toString).collect(Collectors.joining(", "));
            transferProcess.transitionSuspendingRequested(reason);
            transferProcess.protocolMessageReceived(message.getId());
            this.update(transferProcess);
            return ServiceResult.success((Object)transferProcess);
        }
        return ServiceResult.conflict((String)String.format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be suspended"));
    }

    @NotNull
    private ServiceResult<TransferProcess> terminatedAction(TransferTerminationMessage message, TransferProcess transferProcess) {
        if (transferProcess.canBeTerminated()) {
            transferProcess.transitionTerminatingRequested();
            transferProcess.protocolMessageReceived(message.getId());
            this.update(transferProcess);
            return ServiceResult.success((Object)transferProcess);
        }
        return ServiceResult.conflict((String)String.format("Cannot process %s because %s", message.getClass().getSimpleName(), "transfer cannot be terminated"));
    }

    private ServiceResult<ClaimTokenContext> validateDestination(TransferRequestMessage message, ClaimTokenContext context) {
        ValidationResult validDestination;
        DataAddress destination = message.getDataDestination();
        if (destination != null && (validDestination = this.dataAddressValidator.validateDestination(destination)).failed()) {
            return ServiceResult.badRequest((List)validDestination.getFailureMessages());
        }
        return ServiceResult.success((Object)context);
    }

    private ServiceResult<ClaimTokenContext> validateAgreement(TransferRemoteMessage message, ClaimTokenContext context) {
        Result validationResult = this.contractValidationService.validateAgreement(context.participantAgent(), context.agreement());
        if (validationResult.failed()) {
            return ServiceResult.conflict((String)String.format("Cannot process %s because %s", message.getClass().getSimpleName(), "agreement not found or not valid"));
        }
        return ServiceResult.success((Object)context);
    }

    private ServiceResult<TransferRequestMessageContext> fetchNotifyRequestContext(TransferRequestMessage message) {
        return Optional.ofNullable(this.negotiationStore.findContractAgreement(message.getContractId())).map(contractAgreement -> new TransferRequestMessageContext((ContractAgreement)contractAgreement, null)).map(ServiceResult::success).orElseGet(() -> ServiceResult.notFound((String)String.format("Cannot process %s because %s", message.getClass().getSimpleName(), "agreement not found or not valid")));
    }

    private <T> ServiceResult<TransferRequestMessageContext> fetchRequestContext(T input, Function<T, ServiceResult<TransferProcess>> tpProvider) {
        return (ServiceResult)tpProvider.apply(input).compose(transferProcess -> (ServiceResult)this.findContractByTransferProcess((TransferProcess)transferProcess).map(agreement -> new TransferRequestMessageContext((ContractAgreement)agreement, (TransferProcess)transferProcess)));
    }

    private ServiceResult<ClaimTokenContext> verifyRequest(TokenRepresentation tokenRepresentation, TransferRequestMessageContext context, RemoteMessage message) {
        ServiceResult result = this.protocolTokenValidator.verify(tokenRepresentation, RequestTransferProcessPolicyContext::new, context.agreement().getPolicy(), message);
        if (result.failed()) {
            this.monitor.debug(() -> "Verification Failed: %s".formatted(result.getFailureDetail()), new Throwable[0]);
            return ServiceResult.notFound((String)"Not found");
        }
        return ServiceResult.success((Object)new ClaimTokenContext((ParticipantAgent)result.getContent(), context.agreement(), context.transferProcess()));
    }

    private ServiceResult<ContractAgreement> findContractByTransferProcess(TransferProcess transferProcess) {
        ContractAgreement agreement = this.negotiationStore.findContractAgreement(transferProcess.getContractId());
        if (agreement == null) {
            return ServiceResult.notFound((String)String.format("No transfer process with id %s found", transferProcess.getId()));
        }
        return ServiceResult.success((Object)agreement);
    }

    private ServiceResult<TransferProcess> onMessageDo(TransferRemoteMessage message, ParticipantAgent participantAgent, ContractAgreement agreement, Function<TransferProcess, ServiceResult<TransferProcess>> action) {
        return (ServiceResult)this.findAndLease(message).compose(transferProcess -> (ServiceResult)((ServiceResult)this.validateCounterParty(participantAgent, agreement, (TransferProcess)transferProcess).compose(p -> {
            if (p.shouldIgnoreIncomingMessage(message.getId())) {
                return ServiceResult.success((Object)p);
            }
            return (ServiceResult)action.apply((TransferProcess)p);
        })).onFailure(f -> this.breakLease((TransferProcess)transferProcess)));
    }

    private ServiceResult<TransferProcess> validateCounterParty(ParticipantAgent participantAgent, ContractAgreement agreement, TransferProcess transferProcess) {
        Result validation = this.contractValidationService.validateRequest(participantAgent, agreement);
        if (validation.failed()) {
            return ServiceResult.badRequest((List)validation.getFailureMessages());
        }
        return ServiceResult.success((Object)transferProcess);
    }

    private ServiceResult<TransferProcess> findAndLease(TransferRemoteMessage remoteMessage) {
        return (ServiceResult)this.transferProcessStore.findByIdAndLease(remoteMessage.getProcessId()).flatMap(ServiceResult::from);
    }

    private ServiceResult<TransferProcess> findTransferProcess(TransferRemoteMessage remoteMessage) {
        return this.findTransferProcessById(remoteMessage.getProcessId());
    }

    private ServiceResult<TransferProcess> findTransferProcessById(String id) {
        return Optional.ofNullable((TransferProcess)this.transferProcessStore.findById(id)).or(() -> Optional.ofNullable(this.transferProcessStore.findForCorrelationId(id))).map(ServiceResult::success).orElseGet(() -> this.notFound(id));
    }

    private ServiceResult<TransferProcess> notFound(String transferProcessId) {
        return ServiceResult.notFound((String)String.format("No transfer process with id %s found", transferProcessId));
    }

    private void breakLease(TransferProcess process) {
        this.transferProcessStore.save((Object)process);
    }

    private void update(TransferProcess transferProcess) {
        this.transferProcessStore.save((Object)transferProcess);
        this.monitor.debug(String.format("TransferProcess %s is now in state %s", transferProcess.getId(), TransferProcessStates.from((int)transferProcess.getState())), new Throwable[0]);
    }

    private record ClaimTokenContext(ParticipantAgent participantAgent, ContractAgreement agreement, TransferProcess transferProcess) {
    }

    private record TransferRequestMessageContext(ContractAgreement agreement, TransferProcess transferProcess) {
    }
}

