/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.dataspacetck.dsp.system.pipeline.tp;

import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.eclipse.dataspacetck.core.api.message.MessageSerializer;
import org.eclipse.dataspacetck.core.api.pipeline.AbstractAsyncPipeline;
import org.eclipse.dataspacetck.core.api.system.CallbackEndpoint;
import org.eclipse.dataspacetck.core.api.system.ProtocolHandler;
import org.eclipse.dataspacetck.core.spi.boot.Monitor;
import org.eclipse.dataspacetck.dsp.system.api.http.FallibleDspHandler;
import org.eclipse.dataspacetck.dsp.system.api.message.tp.TransferFunctions;
import org.eclipse.dataspacetck.dsp.system.api.pipeline.tp.TransferProcessPipeline;
import org.eclipse.dataspacetck.dsp.system.api.service.Result;
import org.eclipse.dataspacetck.dsp.system.api.statemachine.TransferProcess;
import org.eclipse.dataspacetck.dsp.system.client.tp.TransferProcessClient;

public abstract class AbstractTransferProcessPipeline<P extends TransferProcessPipeline<P>>
extends AbstractAsyncPipeline<P>
implements TransferProcessPipeline<P> {
    private static final String TRANSFER_START_PATH = "/transfers/[^/]+/start";
    private static final String TRANSFER_TERMINATION_PATH = "/transfers/[^/]+/termination";
    private static final String TRANSFER_COMPLETION_PATH = "/transfers/[^/]+/completion";
    private static final String TRANSFER_SUSPENSION_PATH = "/transfers/[^/]+/suspension";
    protected final TransferProcessClient transferProcessClient;
    protected TransferProcess transferProcess;

    public AbstractTransferProcessPipeline(TransferProcessClient transferProcessClient, CallbackEndpoint endpoint, Monitor monitor, long waitTime) {
        super(endpoint, monitor, waitTime);
        this.transferProcessClient = transferProcessClient;
    }

    public P thenWaitForState(TransferProcess.State state) {
        return (P)((TransferProcessPipeline)this.thenWait("state to transition to " + String.valueOf(state), () -> this.transferProcess != null && state == this.transferProcess.getState()));
    }

    public P thenPause() {
        this.stages.add(() -> this.pause());
        return this.self();
    }

    public P sendTermination(boolean expectError) {
        this.stages.add(() -> {
            String id = this.transferProcess.getId();
            String correlationId = this.transferProcess.getCorrelationId();
            Map terminationMessage = TransferFunctions.createTermination((String)this.transferProcess.providerPid(), (String)this.transferProcess.consumerPid(), (String)"1", (String[])new String[0]);
            this.monitor.debug("Sending transfer termination");
            String consumerAddress = this.transferProcess.getCallbackAddress();
            this.transferProcessClient.terminateTransfer(correlationId, terminationMessage, consumerAddress, expectError);
            this.terminated(id);
        });
        return this.self();
    }

    public P sendCompletion(boolean expectError) {
        this.stages.add(() -> {
            String id = this.transferProcess.getId();
            String correlationId = this.transferProcess.getCorrelationId();
            Map completion = TransferFunctions.createCompletion((String)this.transferProcess.providerPid(), (String)this.transferProcess.consumerPid());
            this.monitor.debug("Sending transfer completion");
            String consumerAddress = this.transferProcess.getCallbackAddress();
            this.transferProcessClient.completeTransfer(correlationId, completion, consumerAddress, expectError);
            if (!expectError) {
                this.completed(id);
            }
        });
        return this.self();
    }

    public P sendSuspension(boolean expectError) {
        this.stages.add(() -> {
            String id = this.transferProcess.getId();
            String correlationId = this.transferProcess.getCorrelationId();
            Map suspension = TransferFunctions.createSuspension((String)this.transferProcess.providerPid(), (String)this.transferProcess.consumerPid(), (String)"1", (String[])new String[0]);
            this.monitor.debug("Sending transfer suspension");
            String consumerAddress = this.transferProcess.getCallbackAddress();
            this.transferProcessClient.suspendTransfer(correlationId, suspension, consumerAddress, expectError);
            if (!expectError) {
                this.suspended(id);
            }
        });
        return this.self();
    }

    public P sendStarted(Map<String, Object> dataAddress, boolean expectError) {
        this.stages.add(() -> {
            String id = this.transferProcess.getId();
            String correlationId = this.transferProcess.getCorrelationId();
            Map startMessage = TransferFunctions.createStartRequest((String)this.transferProcess.providerPid(), (String)this.transferProcess.consumerPid(), (Map)dataAddress);
            this.monitor.debug("Sending transfer start");
            String consumerAddress = this.transferProcess.getCallbackAddress();
            this.transferProcessClient.startTransfer(correlationId, startMessage, consumerAddress, expectError);
            if (!expectError) {
                this.started(id);
            }
        });
        return this.self();
    }

    public P expectStartMessage(Function<Map<String, Object>, Result<Map<String, Object>, Map<String, Object>>> action) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectLatches.add(latch);
        this.stages.add(() -> this.endpoint.registerProtocolHandler(TRANSFER_START_PATH, (ProtocolHandler)new FallibleDspHandler(msg -> {
            Result result = (Result)action.apply(MessageSerializer.processJsonLd((InputStream)msg));
            this.endpoint.deregisterHandler(TRANSFER_START_PATH);
            latch.countDown();
            return result;
        })));
        return this.self();
    }

    public P expectTerminationMessage(Function<Map<String, Object>, Map<String, Object>> action) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectLatches.add(latch);
        this.stages.add(() -> this.endpoint.registerHandler(TRANSFER_TERMINATION_PATH, offer -> {
            Map transfer = (Map)action.apply(MessageSerializer.processJsonLd((InputStream)offer));
            this.endpoint.deregisterHandler(TRANSFER_TERMINATION_PATH);
            latch.countDown();
            return MessageSerializer.serialize((Object)transfer);
        }));
        return this.self();
    }

    public P expectCompletionMessage(Function<Map<String, Object>, Result<Map<String, Object>, Map<String, Object>>> action) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectLatches.add(latch);
        this.stages.add(() -> this.endpoint.registerProtocolHandler(TRANSFER_COMPLETION_PATH, (ProtocolHandler)new FallibleDspHandler(offer -> {
            Result result = (Result)action.apply(MessageSerializer.processJsonLd((InputStream)offer));
            this.endpoint.deregisterHandler(TRANSFER_COMPLETION_PATH);
            latch.countDown();
            return result;
        })));
        return this.self();
    }

    public P expectSuspensionMessage(Function<Map<String, Object>, Result<Map<String, Object>, Map<String, Object>>> action) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectLatches.add(latch);
        this.stages.add(() -> this.endpoint.registerProtocolHandler(TRANSFER_SUSPENSION_PATH, (ProtocolHandler)new FallibleDspHandler(offer -> {
            Result result = (Result)action.apply(MessageSerializer.processJsonLd((InputStream)offer));
            this.endpoint.deregisterHandler(TRANSFER_SUSPENSION_PATH);
            latch.countDown();
            return result;
        })));
        return this.self();
    }

    protected abstract void started(String var1);

    protected abstract void completed(String var1);

    protected abstract void terminated(String var1);

    protected abstract void suspended(String var1);

    protected abstract P self();
}

