/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.dataspacetck.dsp.system.pipeline;

import java.io.InputStream;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.eclipse.dataspacetck.core.api.message.MessageSerializer;
import org.eclipse.dataspacetck.core.api.system.CallbackEndpoint;
import org.eclipse.dataspacetck.core.spi.boot.Monitor;
import org.eclipse.dataspacetck.dsp.system.api.connector.Connector;
import org.eclipse.dataspacetck.dsp.system.api.connector.NegotiationListener;
import org.eclipse.dataspacetck.dsp.system.api.message.JsonLdFunctions;
import org.eclipse.dataspacetck.dsp.system.api.message.NegotiationFunctions;
import org.eclipse.dataspacetck.dsp.system.api.pipeline.ConsumerNegotiationPipeline;
import org.eclipse.dataspacetck.dsp.system.api.statemachine.ContractNegotiation;
import org.eclipse.dataspacetck.dsp.system.client.cn.ConsumerNegotiationClient;
import org.eclipse.dataspacetck.dsp.system.pipeline.AbstractNegotiationPipeline;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;

public class ConsumerNegotiationPipelineImpl
extends AbstractNegotiationPipeline<ConsumerNegotiationPipeline>
implements ConsumerNegotiationPipeline {
    private static final String REQUEST_INITIAL_PATH = "/negotiations/request";
    private static final String REQUEST_PATH = "/negotiations/[^/]+/request";
    private static final String NEGOTIATION_EVENT_PATH = "/negotiations/[^/]+/events";
    private static final String VERIFICATION_PATH = "/negotiations/[^/]+/agreement/verification";
    private final ConsumerNegotiationClient negotiationClient;
    private final Connector providerConnector;
    private final CallbackEndpoint endpoint;
    private final String consumerConnectorId;

    public ConsumerNegotiationPipelineImpl(ConsumerNegotiationClient negotiationClient, CallbackEndpoint endpoint, Connector providerConnector, String consumerConnectorId, Monitor monitor, long waitTime) {
        super(negotiationClient, endpoint, monitor, waitTime);
        this.negotiationClient = negotiationClient;
        this.providerConnector = providerConnector;
        this.endpoint = endpoint;
        this.consumerConnectorId = consumerConnectorId;
    }

    public ConsumerNegotiationPipeline initiateRequest(String datasetId, String offerId) {
        this.stages.add(() -> {
            this.providerConnector.getProviderNegotiationManager().registerListener(new NegotiationListener(){

                public void contractRequested(ContractNegotiation negotiation) {
                    ConsumerNegotiationPipelineImpl.this.providerNegotiation = negotiation;
                    ConsumerNegotiationPipelineImpl.this.providerConnector.getProviderNegotiationManager().deregisterListener((NegotiationListener)this);
                }
            });
            this.negotiationClient.initiateRequest(datasetId, offerId);
        });
        return this;
    }

    public ConsumerNegotiationPipeline sendOfferMessage(boolean expectError) {
        this.stages.add(() -> {
            String providerId = this.providerNegotiation.getId();
            String consumerId = this.providerNegotiation.getCorrelationId();
            String offerId = this.providerNegotiation.getOfferId();
            String datasetId = this.providerNegotiation.getDatasetId();
            String assignee = this.providerNegotiation.getCounterPartyId();
            Map offerMessage = NegotiationFunctions.createOffer((String)providerId, (String)consumerId, (String)offerId, (String)"TCK_PARTICIPANT", (String)assignee, (String)datasetId);
            this.monitor.debug("Sending offer");
            String consumerAddress = this.providerNegotiation.getCallbackAddress();
            this.negotiationClient.contractOffer(consumerId, offerMessage, consumerAddress, expectError);
            if (!expectError) {
                this.providerConnector.getProviderNegotiationManager().offered(providerId);
            }
        });
        return this;
    }

    public ConsumerNegotiationPipeline sendAgreementMessage(boolean expectError) {
        this.stages.add(() -> {
            String providerId = this.providerNegotiation.getId();
            String consumerId = this.providerNegotiation.getCorrelationId();
            String agreementId = UUID.randomUUID().toString();
            String datasetId = this.providerNegotiation.getDatasetId();
            Map agreement = NegotiationFunctions.createAgreement((String)providerId, (String)consumerId, (String)agreementId, (String)"TCK_PARTICIPANT", (String)this.consumerConnectorId, (String)datasetId, (String)this.endpoint.getAddress());
            String callbackAddress = this.providerNegotiation.getCallbackAddress();
            this.monitor.debug("Sending agreement");
            this.negotiationClient.contractAgreement(consumerId, agreement, callbackAddress, expectError);
            if (!expectError) {
                this.providerConnector.getProviderNegotiationManager().agreed(providerId);
            }
        });
        return this;
    }

    public ConsumerNegotiationPipeline sendFinalizedEvent(boolean expectError) {
        this.stages.add(() -> {
            String providerId = this.providerNegotiation.getId();
            String consumerId = this.providerNegotiation.getCorrelationId();
            Map event = NegotiationFunctions.createFinalizedEvent((String)providerId, (String)consumerId);
            String callbackAddress = this.providerNegotiation.getCallbackAddress();
            this.monitor.debug("Sending finalized event");
            this.negotiationClient.finalize(consumerId, event, callbackAddress, expectError);
            if (!expectError) {
                this.providerConnector.getProviderNegotiationManager().finalized(providerId);
            }
        });
        return this;
    }

    public ConsumerNegotiationPipeline expectInitialRequest(BiFunction<Map<String, Object>, String, Map<String, Object>> action) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectLatches.add(latch);
        this.stages.add(() -> this.endpoint.registerHandler(REQUEST_INITIAL_PATH, event -> {
            Map expanded = MessageSerializer.processJsonLd((InputStream)event);
            Map negotiation = (Map)action.apply(expanded, this.consumerConnectorId);
            this.endpoint.deregisterHandler(REQUEST_INITIAL_PATH);
            latch.countDown();
            return MessageSerializer.serialize((Object)MessageSerializer.processJsonLd((Map)negotiation));
        }));
        return this;
    }

    public ConsumerNegotiationPipeline expectRequest(BiFunction<Map<String, Object>, String, Map<String, Object>> action) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectLatches.add(latch);
        this.stages.add(() -> this.endpoint.registerHandler(REQUEST_PATH, event -> {
            Map expanded = MessageSerializer.processJsonLd((InputStream)event);
            Map negotiation = (Map)action.apply(expanded, this.consumerConnectorId);
            this.endpoint.deregisterHandler(REQUEST_PATH);
            latch.countDown();
            return MessageSerializer.serialize((Object)MessageSerializer.processJsonLd((Map)negotiation));
        }));
        return this;
    }

    public ConsumerNegotiationPipeline expectAcceptedEvent(Consumer<Map<String, Object>> action) {
        return this.expectResponse(NEGOTIATION_EVENT_PATH, action);
    }

    public ConsumerNegotiationPipeline expectVerifiedMessage(Consumer<Map<String, Object>> action) {
        return this.expectResponse(VERIFICATION_PATH, action);
    }

    public ConsumerNegotiationPipeline thenVerifyConsumerState(ContractNegotiation.State state) {
        this.stages.add(() -> {
            this.pause();
            String callbackAddress = this.providerNegotiation.getCallbackAddress();
            String processId = this.providerNegotiation.getCorrelationId();
            Map<String, Object> negotiation = this.negotiationClient.getNegotiation(processId, callbackAddress);
            String actual = JsonLdFunctions.stringIdProperty((String)"https://w3id.org/dspace/2025/1/state", negotiation);
            Assertions.assertEquals((Object)("https://w3id.org/dspace/2025/1/" + state.toString()), (Object)actual);
        });
        return this;
    }

    @NotNull
    private ConsumerNegotiationPipeline expectResponse(String path, Consumer<Map<String, Object>> action) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectLatches.add(latch);
        this.stages.add(() -> this.endpoint.registerHandler(path, event -> {
            Map expanded = MessageSerializer.processJsonLd((InputStream)event);
            action.accept(expanded);
            this.endpoint.deregisterHandler(path);
            latch.countDown();
            return null;
        }));
        return this;
    }

    @Override
    protected ConsumerNegotiationPipeline self() {
        return this;
    }

    @Override
    protected void terminated(String id) {
        this.providerConnector.getProviderNegotiationManager().terminated(id);
    }
}

