/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.edc.connector.dataplane.util.sink;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.AbstractResult;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.edc.util.async.AsyncUtils;
import org.eclipse.edc.util.stream.PartitionIterator;
import org.jetbrains.annotations.NotNull;

public abstract class ParallelSink
implements DataSink {
    protected String requestId;
    protected int partitionSize = 5;
    protected ExecutorService executorService;
    protected Monitor monitor;
    protected Telemetry telemetry;

    @WithSpan
    public CompletableFuture<StreamResult<Object>> transfer(DataSource source) {
        return ((CompletableFuture)CompletableFuture.supplyAsync(() -> ((DataSource)source).openPartStream(), this.executorService).thenCompose(result -> (CompletionStage)result.map(this::process).orElse(f -> CompletableFuture.completedFuture(this.error(f.getFailureDetail()))))).exceptionally(throwable -> this.error(throwable.getMessage()));
    }

    protected abstract StreamResult<Object> transferParts(List<DataSource.Part> var1);

    @NotNull
    private CompletableFuture<StreamResult<Object>> process(Stream<DataSource.Part> parts) {
        try (Stream<DataSource.Part> stream = parts;){
            CompletionStage completionStage = ((CompletableFuture)PartitionIterator.streamOf(parts, (int)this.partitionSize).map(this::processPartitionAsync).collect(AsyncUtils.asyncAllOf())).thenApply(results -> results.stream().filter(AbstractResult::failed).findFirst().map(r -> StreamResult.failure((StreamFailure)((StreamFailure)r.getFailure()))).orElseGet(this::complete));
            return completionStage;
        }
    }

    @NotNull
    private CompletableFuture<StreamResult<Object>> processPartitionAsync(List<DataSource.Part> parts) {
        return CompletableFuture.supplyAsync(this.transfer(parts), this.executorService);
    }

    private Supplier<StreamResult<Object>> transfer(List<DataSource.Part> parts) {
        return this.telemetry.contextPropagationMiddleware(() -> this.transferParts(parts), this.telemetry.getTraceCarrierWithCurrentContext());
    }

    @NotNull
    private StreamResult<Object> error(String message) {
        return StreamResult.error((String)"Error processing data transfer request - Request ID: %s. Message: %s".formatted(this.requestId, message));
    }

    protected StreamResult<Object> complete() {
        return StreamResult.success();
    }

    protected static abstract class Builder<B extends Builder<B, T>, T extends ParallelSink> {
        protected T sink;

        protected Builder(T sink) {
            this.sink = sink;
            ((ParallelSink)this.sink).telemetry = new Telemetry();
        }

        public B requestId(String requestId) {
            ((ParallelSink)this.sink).requestId = requestId;
            return this.self();
        }

        public B partitionSize(int partitionSize) {
            ((ParallelSink)this.sink).partitionSize = partitionSize;
            return this.self();
        }

        public B executorService(ExecutorService executorService) {
            ((ParallelSink)this.sink).executorService = executorService;
            return this.self();
        }

        public B monitor(Monitor monitor) {
            ((ParallelSink)this.sink).monitor = monitor;
            return this.self();
        }

        public B telemetry(Telemetry telemetry) {
            ((ParallelSink)this.sink).telemetry = telemetry;
            return this.self();
        }

        public T build() {
            Objects.requireNonNull(((ParallelSink)this.sink).requestId, "requestId");
            Objects.requireNonNull(((ParallelSink)this.sink).executorService, "executorService");
            this.validate();
            return this.sink;
        }

        protected abstract void validate();

        private B self() {
            return (B)this;
        }
    }
}

