/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.edc.connector.dataplane.util.sink;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSink;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.response.ResponseStatus;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.result.AbstractResult;
import org.eclipse.edc.util.async.AsyncUtils;
import org.jetbrains.annotations.NotNull;

public class AsyncStreamingDataSink
implements DataSink {
    private final AsyncResponseContext asyncContext;
    private final ExecutorService executorService;
    private final Monitor monitor;

    public AsyncStreamingDataSink(AsyncResponseContext asyncContext, ExecutorService executorService, Monitor monitor) {
        this.asyncContext = asyncContext;
        this.executorService = executorService;
        this.monitor = monitor;
    }

    public CompletableFuture<StreamResult<Object>> transfer(DataSource source) {
        StreamResult streamResult = source.openPartStream();
        if (streamResult.failed()) {
            return CompletableFuture.completedFuture(StreamResult.failure((StreamFailure)((StreamFailure)streamResult.getFailure())));
        }
        Stream partStream = (Stream)streamResult.getContent();
        return ((CompletableFuture)partStream.map(part -> CompletableFuture.supplyAsync(() -> this.transferPart((DataSource.Part)part), this.executorService)).collect(AsyncUtils.asyncAllOf())).thenApply(r -> this.processResults((List<? extends StatusResult<?>>)r, partStream));
    }

    @NotNull
    private StreamResult<Object> processResults(List<? extends StatusResult<?>> results, Stream<DataSource.Part> partStream) {
        this.close(partStream);
        if (results.stream().anyMatch(AbstractResult::failed)) {
            return StreamResult.error((String)"Error transferring data");
        }
        return StreamResult.success();
    }

    @NotNull
    private StatusResult<?> transferPart(DataSource.Part part) {
        boolean result = this.asyncContext.register(outputStream -> {
            try {
                part.openStream().transferTo((OutputStream)outputStream);
            }
            catch (IOException e) {
                throw new EdcException((Throwable)e);
            }
        });
        return result ? StatusResult.success() : StatusResult.failure((ResponseStatus)ResponseStatus.FATAL_ERROR, (String)"Could not resume output stream write");
    }

    private void close(AutoCloseable closeable) {
        try {
            closeable.close();
        }
        catch (Exception e) {
            this.monitor.warning("Error closing stream", new Throwable[]{e});
        }
    }

    @FunctionalInterface
    public static interface AsyncResponseContext {
        public boolean register(Consumer<OutputStream> var1);
    }
}

