/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.transport.nio;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.transport.ProtocolOutboundHandler;
import org.opensearch.transport.TcpChannel;
import org.opensearch.transport.TcpTransportChannel;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportMessageListener;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.nio.MockStreamTransportResponse;
import org.opensearch.transport.stream.StreamErrorCode;
import org.opensearch.transport.stream.StreamException;
import org.opensearch.transport.stream.StreamingTransportChannel;

class MockStreamingTransportChannel
extends TcpTransportChannel
implements StreamingTransportChannel {
    private static final Logger logger = LogManager.getLogger(MockStreamingTransportChannel.class);
    private final AtomicBoolean streamOpen = new AtomicBoolean(true);
    private final Transport.ResponseHandlers responseHandlers;
    private final TransportMessageListener messageListener;
    private final Queue<TransportResponse> bufferedResponses = new ConcurrentLinkedQueue<TransportResponse>();

    public MockStreamingTransportChannel(ProtocolOutboundHandler outboundHandler, TcpChannel channel, String action, long requestId, Version version, Set<String> features, boolean compressResponse, boolean isHandshake, Releasable breakerRelease, Transport.ResponseHandlers responseHandlers, TransportMessageListener messageListener) {
        super(outboundHandler, channel, action, requestId, version, features, compressResponse, isHandshake, breakerRelease);
        this.responseHandlers = responseHandlers;
        this.messageListener = messageListener;
    }

    public void sendResponseBatch(TransportResponse response) throws StreamException {
        if (!this.streamOpen.get()) {
            throw new StreamException(StreamErrorCode.UNAVAILABLE, "Stream is closed for requestId [" + this.requestId + "]");
        }
        try {
            this.bufferedResponses.add(response);
            logger.debug("Buffered response {} for action[{}] and requestId[{}]. Total buffered: {}", (Object)response.getClass().getSimpleName(), (Object)this.action, (Object)this.requestId, (Object)this.bufferedResponses.size());
        }
        catch (Exception e) {
            this.streamOpen.set(false);
            this.release(true);
            throw new StreamException(StreamErrorCode.INTERNAL, "Error buffering response batch", (Throwable)e);
        }
    }

    public void completeStream() {
        if (this.streamOpen.compareAndSet(true, false)) {
            logger.debug("Completing stream for action[{}] and requestId[{}]. Processing {} buffered responses", (Object)this.action, (Object)this.requestId, (Object)this.bufferedResponses.size());
            try {
                TransportResponseHandler handler = this.responseHandlers.onResponseReceived(this.requestId, this.messageListener);
                if (handler == null) {
                    throw new StreamException(StreamErrorCode.INTERNAL, "No response handler found for requestId [" + this.requestId + "]");
                }
                ArrayList<TransportResponse> responsesCopy = new ArrayList<TransportResponse>(this.bufferedResponses);
                MockStreamTransportResponse<TransportResponse> streamResponse = new MockStreamTransportResponse<TransportResponse>(responsesCopy);
                TransportResponseHandler typedHandler = handler;
                logger.debug("Calling handleStreamResponse for action[{}] and requestId[{}] with {} responses", (Object)this.action, (Object)this.requestId, (Object)responsesCopy.size());
                typedHandler.handleStreamResponse(streamResponse);
            }
            catch (Exception e) {
                this.release(true);
                throw new StreamException(StreamErrorCode.INTERNAL, "Error completing stream", (Throwable)e);
            }
            finally {
                this.release(false);
            }
        } else {
            logger.warn("CompleteStream called on already closed stream with action[{}] and requestId[{}]", (Object)this.action, (Object)this.requestId);
            throw new StreamException(StreamErrorCode.UNAVAILABLE, "MockStreamingTransportChannel stream already closed.");
        }
    }

    public void sendResponse(TransportResponse response) throws IOException {
        throw new UnsupportedOperationException("sendResponse() is not supported for streaming requests in MockStreamingTransportChannel. Use sendResponseBatch() instead.");
    }

    public String getChannelType() {
        return "mock-stream-transport";
    }
}

