/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.dataspacetck.dsp.system.pipeline;

import java.io.InputStream;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;
import org.eclipse.dataspacetck.core.api.message.MessageSerializer;
import org.eclipse.dataspacetck.core.api.pipeline.AbstractAsyncPipeline;
import org.eclipse.dataspacetck.core.api.system.CallbackEndpoint;
import org.eclipse.dataspacetck.core.spi.boot.Monitor;
import org.eclipse.dataspacetck.dsp.system.api.message.NegotiationFunctions;
import org.eclipse.dataspacetck.dsp.system.api.pipeline.NegotiationPipeline;
import org.eclipse.dataspacetck.dsp.system.api.statemachine.ContractNegotiation;
import org.eclipse.dataspacetck.dsp.system.client.cn.NegotiationClient;
import org.jetbrains.annotations.NotNull;

public abstract class AbstractNegotiationPipeline<P extends NegotiationPipeline<P>>
extends AbstractAsyncPipeline<P>
implements NegotiationPipeline<P> {
    private static final String NEGOTIATIONS_TERMINATION_PATH = "/negotiations/[^/]+/termination/";
    private final NegotiationClient negotiationClient;
    protected ContractNegotiation providerNegotiation;

    public AbstractNegotiationPipeline(NegotiationClient negotiationClient, CallbackEndpoint endpoint, Monitor monitor, long waitTime) {
        super(endpoint, monitor, waitTime);
        this.negotiationClient = negotiationClient;
    }

    public P thenWaitForState(ContractNegotiation.State state) {
        return (P)((NegotiationPipeline)this.thenWait("state to transition to " + String.valueOf(state), () -> this.providerNegotiation != null && state == this.providerNegotiation.getState()));
    }

    public P expectTerminationMessage(Function<Map<String, Object>, Map<String, Object>> action) {
        return this.expectResponse(NEGOTIATIONS_TERMINATION_PATH, action);
    }

    @NotNull
    private P expectResponse(String path, Function<Map<String, Object>, Map<String, Object>> action) {
        CountDownLatch latch = new CountDownLatch(1);
        this.expectLatches.add(latch);
        this.stages.add(() -> this.endpoint.registerHandler(path, event -> {
            Map expanded = MessageSerializer.processJsonLd((InputStream)event);
            Map response = (Map)action.apply(expanded);
            this.endpoint.deregisterHandler(path);
            latch.countDown();
            return MessageSerializer.serialize((Object)MessageSerializer.processJsonLd((Map)response));
        }));
        return this.self();
    }

    public P sendTermination(boolean expectError) {
        this.stages.add(() -> {
            this.pause();
            String id = this.providerNegotiation.getId();
            String correlationId = this.providerNegotiation.getCorrelationId();
            Map termination = NegotiationFunctions.createTermination((String)this.providerNegotiation.providerPid(), (String)this.providerNegotiation.consumerPid(), (String)"1", (String[])new String[0]);
            this.monitor.debug("Sending termination: " + correlationId);
            this.negotiationClient.terminate(correlationId, termination, this.providerNegotiation.getCallbackAddress(), expectError);
            if (!expectError) {
                this.terminated(id);
            }
        });
        return this.self();
    }

    protected abstract void terminated(String var1);

    protected abstract P self();
}

