/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.dataspacetck.dsp.system.pipeline.tp;

import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiFunction;
import org.eclipse.dataspacetck.core.api.message.MessageSerializer;
import org.eclipse.dataspacetck.core.api.system.CallbackEndpoint;
import org.eclipse.dataspacetck.core.spi.boot.Monitor;
import org.eclipse.dataspacetck.dsp.system.api.connector.Connector;
import org.eclipse.dataspacetck.dsp.system.api.connector.tp.TransferProcessListener;
import org.eclipse.dataspacetck.dsp.system.api.message.JsonLdFunctions;
import org.eclipse.dataspacetck.dsp.system.api.pipeline.tp.ConsumerTransferProcessPipeline;
import org.eclipse.dataspacetck.dsp.system.api.statemachine.TransferProcess;
import org.eclipse.dataspacetck.dsp.system.client.tp.ConsumerTransferProcessClient;
import org.eclipse.dataspacetck.dsp.system.pipeline.tp.AbstractTransferProcessPipeline;

public class ConsumerTransferProcessPipelineImpl
extends AbstractTransferProcessPipeline<ConsumerTransferProcessPipeline>
implements ConsumerTransferProcessPipeline {
    private static final String REQUEST_PATH = "/transfers/request";
    private final ConsumerTransferProcessClient transferProcessClient;
    private final Connector providerConnector;
    private final String consumerConnectorId;

    public ConsumerTransferProcessPipelineImpl(ConsumerTransferProcessClient transferProcessClient, CallbackEndpoint endpoint, Connector providerConnector, String consumerConnectorId, Monitor monitor, long waitTime) {
        super(transferProcessClient, endpoint, monitor, waitTime);
        this.transferProcessClient = transferProcessClient;
        this.providerConnector = providerConnector;
        this.consumerConnectorId = consumerConnectorId;
    }

    public ConsumerTransferProcessPipeline initiateTransferRequest(String agreementId, String format) {
        this.stages.add(() -> {
            this.providerConnector.getProviderTransferProcessManager().registerListener(new TransferProcessListener(){

                public void requested(TransferProcess transferProcess) {
                    ConsumerTransferProcessPipelineImpl.this.transferProcess = transferProcess;
                    ConsumerTransferProcessPipelineImpl.this.providerConnector.getProviderTransferProcessManager().deregisterListener((TransferProcessListener)this);
                }
            });
            this.transferProcessClient.initiateTransferRequest(agreementId, format);
        });
        return this;
    }

    public ConsumerTransferProcessPipeline expectTransferRequest(BiFunction<Map<String, Object>, String, Map<String, Object>> action) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectLatches.add(latch);
        this.stages.add(() -> this.endpoint.registerHandler(REQUEST_PATH, event -> {
            Map expanded = MessageSerializer.processJsonLd((InputStream)event);
            Map negotiation = (Map)action.apply(expanded, this.consumerConnectorId);
            this.endpoint.deregisterHandler(REQUEST_PATH);
            latch.countDown();
            return MessageSerializer.serialize((Object)MessageSerializer.processJsonLd((Map)negotiation));
        }));
        return this;
    }

    @Override
    protected ConsumerTransferProcessPipeline self() {
        return this;
    }

    @Override
    protected void suspended(String id) {
        this.providerConnector.getProviderTransferProcessManager().suspended(id);
    }

    @Override
    protected void completed(String id) {
        this.providerConnector.getProviderTransferProcessManager().completed(id);
    }

    @Override
    protected void terminated(String id) {
        this.providerConnector.getProviderTransferProcessManager().terminated(id);
    }

    @Override
    protected void started(String id) {
        this.providerConnector.getProviderTransferProcessManager().started(id);
    }

    public ConsumerTransferProcessPipeline thenVerifyConsumerState(TransferProcess.State state) {
        this.thenWait("for consumer transfer process state to be " + String.valueOf(state), () -> {
            String callbackAddress = this.transferProcess.getCallbackAddress();
            String processId = this.transferProcess.getCorrelationId();
            Map<String, Object> tp = this.transferProcessClient.getTransferProcess(processId, callbackAddress);
            String actual = JsonLdFunctions.stringIdProperty((String)"https://w3id.org/dspace/2025/1/state", tp);
            return ("https://w3id.org/dspace/2025/1/" + state.toString()).equals(actual);
        });
        return this;
    }
}

