/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tinkerpop.gremlin.server.handler;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang.time.StopWatch;
import org.apache.tinkerpop.gremlin.driver.message.RequestMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.server.Settings;
import org.javatuples.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
@ChannelHandler.Sharable
public class IteratorHandler
extends ChannelOutboundHandlerAdapter {
    private static final Logger logger = LoggerFactory.getLogger(IteratorHandler.class);
    private final Settings settings;

    public IteratorHandler(Settings settings) {
        this.settings = settings;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        if (msg instanceof Pair) {
            try {
                Pair pair = (Pair)msg;
                Iterator itty = (Iterator)pair.getValue1();
                RequestMessage requestMessage = (RequestMessage)pair.getValue0();
                int resultIterationBatchSize = requestMessage.optionalArgs("batchSize").orElse(this.settings.resultIterationBatchSize);
                StopWatch stopWatch = new StopWatch();
                EventExecutor executorService = ctx.executor();
                Future iteration = executorService.submit(() -> {
                    logger.debug("Preparing to iterate results from - {} - in thread [{}]", (Object)requestMessage, (Object)Thread.currentThread().getName());
                    stopWatch.start();
                    ArrayList aggregate = new ArrayList(resultIterationBatchSize);
                    while (itty.hasNext()) {
                        aggregate.add(itty.next());
                        if (aggregate.size() == resultIterationBatchSize || !itty.hasNext()) {
                            ResponseStatusCode code = itty.hasNext() ? ResponseStatusCode.PARTIAL_CONTENT : ResponseStatusCode.SUCCESS;
                            ctx.writeAndFlush((Object)ResponseMessage.build((RequestMessage)requestMessage).code(code).result(aggregate).create());
                            aggregate = new ArrayList(resultIterationBatchSize);
                        }
                        stopWatch.split();
                        if (stopWatch.getSplitTime() > this.settings.serializedResponseTimeout) {
                            throw new TimeoutException("Serialization of the entire response exceeded the serializeResponseTimeout setting");
                        }
                        stopWatch.unsplit();
                    }
                    return null;
                });
                iteration.addListener(f -> {
                    stopWatch.stop();
                    if (!f.isSuccess()) {
                        String errorMessage = String.format("Response iteration and serialization exceeded the configured threshold for request [%s] - %s", msg, f.cause().getMessage());
                        logger.warn(errorMessage);
                        ctx.writeAndFlush((Object)ResponseMessage.build((RequestMessage)requestMessage).code(ResponseStatusCode.SERVER_ERROR_TIMEOUT).statusAttributeException(f.cause()).statusMessage(errorMessage).create());
                    }
                });
            }
            finally {
                ReferenceCountUtil.release((Object)msg);
            }
        } else {
            ctx.write(msg, promise);
        }
    }
}

