/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.edc.connector.dataplane.util.sink;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.AbstractResult;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.edc.spi.telemetry.TraceCarrier;
import org.eclipse.edc.util.async.AsyncUtils;
import org.eclipse.edc.util.stream.PartitionIterator;
import org.jetbrains.annotations.NotNull;

public abstract class ParallelSink
implements DataSink {
    protected String requestId;
    protected int partitionSize = 5;
    protected ExecutorService executorService;
    protected Monitor monitor;
    protected Telemetry telemetry;

    @WithSpan
    public CompletableFuture<StreamResult<Void>> transfer(DataSource source) {
        CompletionStage completionStage;
        block9: {
            StreamResult streamResult = source.openPartStream();
            if (streamResult.failed()) {
                return CompletableFuture.completedFuture(StreamResult.failure((StreamFailure)((StreamFailure)streamResult.getFailure())));
            }
            Stream partStream = (Stream)streamResult.getContent();
            try {
                Stream partitioned = PartitionIterator.streamOf((Stream)partStream, (int)this.partitionSize);
                TraceCarrier traceCarrier = this.telemetry.getTraceCarrierWithCurrentContext();
                List futures = partitioned.map(parts -> this.processPartsAsync((List<DataSource.Part>)parts, traceCarrier)).collect(Collectors.toList());
                completionStage = ((CompletableFuture)((CompletableFuture)futures.stream().collect(AsyncUtils.asyncAllOf())).thenApply(results -> results.stream().filter(AbstractResult::failed).findFirst().map(r -> StreamResult.error((String)String.join((CharSequence)",", r.getFailureMessages()))).orElseGet(this::complete))).exceptionally(throwable -> StreamResult.error((String)("Unhandled exception raised when transferring data: " + throwable.getMessage())));
                if (partStream == null) break block9;
            }
            catch (Throwable throwable2) {
                try {
                    if (partStream != null) {
                        try {
                            partStream.close();
                        }
                        catch (Throwable throwable3) {
                            throwable2.addSuppressed(throwable3);
                        }
                    }
                    throw throwable2;
                }
                catch (Exception e) {
                    String errorMessage = String.format("Error processing data transfer request - Request ID: %s", this.requestId);
                    this.monitor.severe(errorMessage, new Throwable[]{e});
                    return CompletableFuture.completedFuture(StreamResult.error((String)errorMessage));
                }
            }
            partStream.close();
        }
        return completionStage;
    }

    @NotNull
    private CompletableFuture<StreamResult<Void>> processPartsAsync(List<DataSource.Part> parts, TraceCarrier traceCarrier) {
        Supplier<StreamResult> supplier = () -> this.transferParts(parts);
        return CompletableFuture.supplyAsync(this.telemetry.contextPropagationMiddleware(supplier, traceCarrier), this.executorService);
    }

    protected abstract StreamResult<Void> transferParts(List<DataSource.Part> var1);

    protected StreamResult<Void> complete() {
        return StreamResult.success();
    }

    protected static abstract class Builder<B extends Builder<B, T>, T extends ParallelSink> {
        protected T sink;

        protected Builder(T sink) {
            this.sink = sink;
            ((ParallelSink)this.sink).telemetry = new Telemetry();
        }

        public B requestId(String requestId) {
            ((ParallelSink)this.sink).requestId = requestId;
            return this.self();
        }

        public B partitionSize(int partitionSize) {
            ((ParallelSink)this.sink).partitionSize = partitionSize;
            return this.self();
        }

        public B executorService(ExecutorService executorService) {
            ((ParallelSink)this.sink).executorService = executorService;
            return this.self();
        }

        public B monitor(Monitor monitor) {
            ((ParallelSink)this.sink).monitor = monitor;
            return this.self();
        }

        public B telemetry(Telemetry telemetry) {
            ((ParallelSink)this.sink).telemetry = telemetry;
            return this.self();
        }

        public T build() {
            Objects.requireNonNull(((ParallelSink)this.sink).requestId, "requestId");
            Objects.requireNonNull(((ParallelSink)this.sink).executorService, "executorService");
            this.validate();
            return this.sink;
        }

        protected abstract void validate();

        private B self() {
            return (B)this;
        }
    }
}

