/*
 * Decompiled with CFR 0.152.
 */
package com.marklogic.client.dataservices.impl;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.dataservices.IOEndpoint;
import com.marklogic.client.dataservices.InputOutputCaller;
import com.marklogic.client.dataservices.impl.CallContextImpl;
import com.marklogic.client.dataservices.impl.HandleProvider;
import com.marklogic.client.dataservices.impl.IOEndpointImpl;
import com.marklogic.client.dataservices.impl.InputOutputCallerImpl;
import com.marklogic.client.io.marker.BufferableContentHandle;
import com.marklogic.client.io.marker.BufferableHandle;
import com.marklogic.client.io.marker.JSONWriteHandle;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InputOutputEndpointImpl<I, O>
extends IOEndpointImpl<I, O>
implements InputOutputCaller<I, O> {
    private static final Logger logger = LoggerFactory.getLogger(InputOutputEndpointImpl.class);
    private final InputOutputCallerImpl<I, O> caller;
    private final int batchSize;

    public InputOutputEndpointImpl(DatabaseClient client, JSONWriteHandle apiDecl, HandleProvider<I, O> handleProvider) {
        this(client, new InputOutputCallerImpl<I, O>(apiDecl, handleProvider));
    }

    private InputOutputEndpointImpl(DatabaseClient client, InputOutputCallerImpl<I, O> caller) {
        super(client, caller);
        this.caller = caller;
        this.batchSize = this.initBatchSize(caller);
    }

    private InputOutputCallerImpl<I, O> getCaller() {
        return this.caller;
    }

    private int getBatchSize() {
        return this.batchSize;
    }

    @Override
    public O[] call(I[] input) {
        return this.getResponseData(this.newCallContext(true), input);
    }

    @Override
    public O[] call(IOEndpoint.CallContext callContext, I[] input) {
        return this.getResponseData(callContext, input);
    }

    @Override
    public InputOutputCaller.BulkInputOutputCaller<I, O> bulkCaller() {
        return new BulkInputOutputCallerImpl(this);
    }

    @Override
    public InputOutputCaller.BulkInputOutputCaller<I, O> bulkCaller(IOEndpoint.CallContext callContext) {
        return new BulkInputOutputCallerImpl(this, this.getBatchSize(), this.checkAllowedArgs(callContext));
    }

    @Override
    public InputOutputCaller.BulkInputOutputCaller<I, O> bulkCaller(IOEndpoint.CallContext[] callContexts) {
        if (callContexts == null || callContexts.length == 0) {
            throw new IllegalArgumentException("CallContext cannot be null or empty");
        }
        return this.bulkCaller(callContexts, callContexts.length);
    }

    @Override
    public InputOutputCaller.BulkInputOutputCaller<I, O> bulkCaller(IOEndpoint.CallContext[] callContexts, int threadCount) {
        if (callContexts == null) {
            throw new IllegalArgumentException("CallContext cannot be null");
        }
        if (threadCount > callContexts.length) {
            throw new IllegalArgumentException("Thread count cannot be more than the callContext count.");
        }
        switch (callContexts.length) {
            case 0: {
                throw new IllegalArgumentException("CallContext cannot be empty");
            }
            case 1: {
                return new BulkInputOutputCallerImpl(this, this.getBatchSize(), this.checkAllowedArgs(callContexts[0]));
            }
        }
        return new BulkInputOutputCallerImpl(this, this.getBatchSize(), this.checkAllowedArgs(callContexts), threadCount);
    }

    private O[] getResponseData(IOEndpoint.CallContext callContext, I[] input) {
        InputOutputCallerImpl callerImpl = this.getCaller();
        BufferableContentHandle<?, ?>[] inputHandles = callerImpl.bufferableInputHandleOn(input);
        return callerImpl.arrayCall(this.getClient(), this.checkAllowedArgs(callContext), inputHandles);
    }

    public static class BulkInputOutputCallerImpl<I, O>
    extends IOEndpointImpl.BulkIOEndpointCallerImpl<I, O>
    implements InputOutputCaller.BulkInputOutputCaller<I, O> {
        private final InputOutputEndpointImpl<I, O> endpoint;
        private final int batchSize;
        private final LinkedBlockingQueue<I> inputQueue;
        private Consumer<O> outputListener;
        private InputOutputCaller.BulkInputOutputCaller.ErrorListener errorListener;

        public BulkInputOutputCallerImpl(InputOutputEndpointImpl<I, O> endpoint) {
            this(endpoint, ((InputOutputEndpointImpl)endpoint).getBatchSize(), endpoint.checkAllowedArgs(endpoint.newCallContext()));
        }

        private BulkInputOutputCallerImpl(InputOutputEndpointImpl<I, O> endpoint, int batchSize, CallContextImpl<I, O> callContext) {
            super(endpoint, callContext);
            this.checkEndpoint(endpoint, "InputOutputEndpointImpl");
            this.endpoint = endpoint;
            this.batchSize = batchSize;
            this.inputQueue = new LinkedBlockingQueue();
        }

        private BulkInputOutputCallerImpl(InputOutputEndpointImpl<I, O> endpoint, int batchSize, CallContextImpl<I, O>[] callContexts, int threadCount) {
            super(endpoint, callContexts, threadCount, 2 * callContexts.length);
            this.endpoint = endpoint;
            this.batchSize = batchSize;
            this.inputQueue = new LinkedBlockingQueue();
        }

        private InputOutputEndpointImpl<I, O> getEndpoint() {
            return this.endpoint;
        }

        private int getBatchSize() {
            return this.batchSize;
        }

        private LinkedBlockingQueue<I> getInputQueue() {
            return this.inputQueue;
        }

        private Consumer<O> getOutputListener() {
            return this.outputListener;
        }

        @Override
        public void setOutputListener(Consumer<O> listener) {
            this.outputListener = listener;
        }

        @Override
        public void accept(I input) {
            if (this.getOutputListener() == null) {
                throw new IllegalStateException("Must configure output consumer before providing input");
            }
            boolean hasBatch = this.queueInput(input, this.getInputQueue(), this.getBatchSize());
            if (hasBatch) {
                this.processInput();
            }
        }

        @Override
        public void acceptAll(I[] input) {
            if (this.getOutputListener() == null) {
                throw new IllegalStateException("Must configure output consumer before providing input");
            }
            boolean hasBatch = this.queueAllInput(input, this.getInputQueue(), this.getBatchSize());
            if (hasBatch) {
                this.processInput();
            }
        }

        @Override
        public void setErrorListener(InputOutputCaller.BulkInputOutputCaller.ErrorListener errorListener) {
            this.errorListener = errorListener;
        }

        private InputOutputCaller.BulkInputOutputCaller.ErrorListener getErrorListener() {
            return this.errorListener;
        }

        private void processInput() {
            I[] inputBatch = this.getInputBatch(this.getInputQueue(), this.getBatchSize());
            if (this.getCallContext() != null) {
                this.processInput(this.getCallContext(), inputBatch);
            } else if (this.getCallContextQueue() != null) {
                BulkCallableImpl bulkCallableImpl = new BulkCallableImpl(this, inputBatch);
                this.submitTask(bulkCallableImpl);
            } else {
                throw new IllegalArgumentException("Cannot process input without Callcontext.");
            }
        }

        private void processInput(CallContextImpl<I, O> callContext, I[] inputBatch) {
            logger.trace("input endpoint running endpoint={} count={} state={}", new Object[]{callContext.getEndpoint().getEndpointPath(), this.getCallCount(), callContext.getEndpointState()});
            InputOutputCallerImpl callerImpl = ((InputOutputEndpointImpl)this.getEndpoint()).getCaller();
            IOEndpoint.BulkIOEndpointCaller.ErrorDisposition error = IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.RETRY;
            BufferableHandle[] inputHandles = callerImpl.bufferableInputHandleOn(inputBatch);
            for (int retryCount = 0; retryCount < 100 && error == IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.RETRY; ++retryCount) {
                Throwable throwable = null;
                O[] output = null;
                try {
                    output = callerImpl.arrayCall(callContext.getClient(), callContext, (BufferableContentHandle<?, ?>[])inputHandles);
                    this.incrementCallCount();
                    this.processOutputBatch(output, this.getOutputListener());
                    return;
                }
                catch (Throwable catchedThrowable) {
                    throwable = catchedThrowable;
                    if (throwable == null) continue;
                    if (this.getErrorListener() != null) {
                        try {
                            error = retryCount < 99 ? this.getErrorListener().processError(retryCount, throwable, callContext, inputHandles) : IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.SKIP_CALL;
                        }
                        catch (Throwable throwable1) {
                            logger.error("Error Listener failed with ", throwable1);
                            error = IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.STOP_ALL_CALLS;
                        }
                        switch (error) {
                            case RETRY: {
                                break;
                            }
                            case SKIP_CALL: {
                                return;
                            }
                            case STOP_ALL_CALLS: {
                                if (this.getCallerThreadPoolExecutor() != null) {
                                    this.getCallerThreadPoolExecutor().shutdown();
                                } else {
                                    break;
                                }
                            }
                        }
                        continue;
                    }
                    if (this.getCallContext() != null) {
                        throw new RuntimeException("Failed to produce output from input", throwable);
                    }
                    logger.error("No error listener set. Stop all calls. " + this.getEndpoint().getEndpointPath(), throwable);
                    error = IOEndpoint.BulkIOEndpointCaller.ErrorDisposition.STOP_ALL_CALLS;
                    continue;
                }
            }
        }

        @Override
        public void awaitCompletion() {
            try {
                if (this.getInputQueue() != null) {
                    while (!this.getInputQueue().isEmpty()) {
                        this.processInput();
                    }
                }
                if (this.getCallerThreadPoolExecutor() != null) {
                    this.getCallerThreadPoolExecutor().shutdown();
                    this.getCallerThreadPoolExecutor().awaitTermination();
                }
            }
            catch (Throwable throwable) {
                throw new RuntimeException("Error occurred while awaiting termination " + throwable.getMessage());
            }
        }

        private static class BulkCallableImpl<I, O>
        implements Callable<Boolean> {
            private final BulkInputOutputCallerImpl<I, O> bulkInputOutputCallerImpl;
            private final I[] inputBatch;

            BulkCallableImpl(BulkInputOutputCallerImpl<I, O> BulkInputOutputCallerImpl2, I[] inputBatch) {
                this.bulkInputOutputCallerImpl = BulkInputOutputCallerImpl2;
                this.inputBatch = inputBatch;
            }

            @Override
            public Boolean call() {
                try {
                    CallContextImpl callContext = this.bulkInputOutputCallerImpl.getCallContextQueue().take();
                    ((BulkInputOutputCallerImpl)this.bulkInputOutputCallerImpl).processInput(callContext, this.inputBatch);
                    this.bulkInputOutputCallerImpl.getCallContextQueue().put(callContext);
                }
                catch (Exception ex) {
                    throw new InternalError("Error occurred while processing CallContext - " + ex.getMessage());
                }
                return true;
            }
        }
    }
}

