/*
 * Decompiled with CFR 0.152.
 */
package org.reaktivity.nukleus.tcp.internal.stream;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.net.UnknownHostException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.LongSupplier;
import java.util.function.LongUnaryOperator;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
import java.util.regex.Matcher;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.LangUtil;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.reaktivity.nukleus.buffer.BufferPool;
import org.reaktivity.nukleus.function.MessageConsumer;
import org.reaktivity.nukleus.function.MessageFunction;
import org.reaktivity.nukleus.function.MessagePredicate;
import org.reaktivity.nukleus.route.RouteManager;
import org.reaktivity.nukleus.stream.StreamFactory;
import org.reaktivity.nukleus.tcp.internal.TcpConfiguration;
import org.reaktivity.nukleus.tcp.internal.TcpCounters;
import org.reaktivity.nukleus.tcp.internal.TcpRouteCounters;
import org.reaktivity.nukleus.tcp.internal.poller.Poller;
import org.reaktivity.nukleus.tcp.internal.poller.PollerKey;
import org.reaktivity.nukleus.tcp.internal.stream.TcpState;
import org.reaktivity.nukleus.tcp.internal.types.Flyweight;
import org.reaktivity.nukleus.tcp.internal.types.OctetsFW;
import org.reaktivity.nukleus.tcp.internal.types.TcpAddressFW;
import org.reaktivity.nukleus.tcp.internal.types.control.RouteFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.AbortFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.BeginFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.DataFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.EndFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.ResetFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.TcpBeginExFW;
import org.reaktivity.nukleus.tcp.internal.types.stream.WindowFW;
import org.reaktivity.nukleus.tcp.internal.util.CIDR;
import org.reaktivity.nukleus.tcp.internal.util.IpUtil;

public class TcpClientFactory
implements StreamFactory {
    private final RouteFW routeRO = new RouteFW();
    private final BeginFW beginRO = new BeginFW();
    private final DataFW dataRO = new DataFW();
    private final EndFW endRO = new EndFW();
    private final AbortFW abortRO = new AbortFW();
    private final ResetFW resetRO = new ResetFW();
    private final WindowFW windowRO = new WindowFW();
    private final BeginFW.Builder beginRW = new BeginFW.Builder();
    private final DataFW.Builder dataRW = new DataFW.Builder();
    private final EndFW.Builder endRW = new EndFW.Builder();
    private final AbortFW.Builder abortRW = new AbortFW.Builder();
    private final ResetFW.Builder resetRW = new ResetFW.Builder();
    private final WindowFW.Builder windowRW = new WindowFW.Builder();
    private final TcpBeginExFW beginExRO = new TcpBeginExFW();
    private final TcpBeginExFW.Builder beginExRW = new TcpBeginExFW.Builder();
    private final MessageFunction<RouteFW> wrapRoute = (t, b, i, l) -> this.routeRO.wrap(b, i, i + l);
    private final BufferPool bufferPool;
    private Poller poller;
    private final RouteManager router;
    private final ByteBuffer readByteBuffer;
    private final MutableDirectBuffer readBuffer;
    private final MutableDirectBuffer writeBuffer;
    private final ByteBuffer writeByteBuffer;
    private final LongUnaryOperator supplyReplyId;
    private final LongSupplier supplyTraceId;
    private final int tcpTypeId;
    private final Map<String, Predicate<? super InetAddress>> targetToCidrMatch;
    private final TcpCounters counters;
    private final int windowThreshold;
    private final boolean keepalive;

    public TcpClientFactory(TcpConfiguration config, RouteManager router, Poller poller, MutableDirectBuffer writeBuffer, BufferPool bufferPool, LongUnaryOperator supplyReplyId, LongSupplier supplyTraceId, ToIntFunction<String> supplyTypeId, TcpCounters counters) {
        this.router = Objects.requireNonNull(router);
        this.poller = poller;
        this.writeBuffer = Objects.requireNonNull(writeBuffer);
        this.writeByteBuffer = ByteBuffer.allocateDirect(writeBuffer.capacity()).order(ByteOrder.nativeOrder());
        this.bufferPool = Objects.requireNonNull(bufferPool);
        this.supplyReplyId = Objects.requireNonNull(supplyReplyId);
        this.supplyTraceId = Objects.requireNonNull(supplyTraceId);
        this.tcpTypeId = supplyTypeId.applyAsInt("tcp");
        int readBufferSize = writeBuffer.capacity() - 57;
        this.readByteBuffer = ByteBuffer.allocateDirect(readBufferSize).order(ByteOrder.nativeOrder());
        this.readBuffer = new UnsafeBuffer(this.readByteBuffer);
        this.targetToCidrMatch = new HashMap<String, Predicate<? super InetAddress>>();
        this.counters = counters;
        this.windowThreshold = bufferPool.slotCapacity() * config.windowThreshold() / 100;
        this.keepalive = config.keepalive();
    }

    public MessageConsumer newStream(int msgTypeId, DirectBuffer buffer, int index, int length, MessageConsumer throttle) {
        BeginFW begin = this.beginRO.wrap(buffer, index, index + length);
        long streamId = begin.streamId();
        MessageConsumer result = null;
        if ((streamId & 1L) != 0L) {
            result = this.newInitialStream(begin, throttle);
        }
        return result;
    }

    private MessageConsumer newInitialStream(BeginFW begin, MessageConsumer application) {
        long routeId = begin.routeId();
        long initialId = begin.streamId();
        OctetsFW extension = begin.extension();
        boolean hasExtension = extension.sizeof() > 0;
        MessagePredicate filter = (t, b, o, l) -> {
            RouteFW route = this.routeRO.wrap(b, o, o + l);
            String remoteAddressAndPort = route.remoteAddress().asString();
            Matcher matcher = IpUtil.CONNECT_HOST_AND_PORT_PATTERN.matcher(remoteAddressAndPort);
            return !hasExtension || matcher.matches() && this.resolveRemoteAddressExt(extension, matcher.group(1), Integer.parseInt(matcher.group(2))) != null;
        };
        RouteFW route = (RouteFW)this.router.resolve(routeId, begin.authorization(), filter, this.wrapRoute);
        MessageConsumer newStream = null;
        if (route != null) {
            InetSocketAddress remoteAddress;
            String remoteAddressAndPort = route.remoteAddress().asString();
            Matcher matcher = IpUtil.CONNECT_HOST_AND_PORT_PATTERN.matcher(remoteAddressAndPort);
            matcher.matches();
            String remoteHost = matcher.group(1);
            int remotePort = Integer.parseInt(matcher.group(2));
            InetSocketAddress inetSocketAddress = remoteAddress = hasExtension ? this.resolveRemoteAddressExt(extension, remoteHost, remotePort) : new InetSocketAddress(remoteHost, remotePort);
            assert (remoteAddress != null);
            SocketChannel channel = this.newSocketChannel();
            TcpRouteCounters routeCounters = this.counters.supplyRoute(route.correlationId());
            TcpClient client = new TcpClient(application, routeId, initialId, channel, routeCounters);
            client.doNetworkConnect(remoteAddress);
            newStream = (x$0, x$1, x$2, x$3) -> client.onApplication(x$0, x$1, x$2, x$3);
        }
        return newStream;
    }

    private InetSocketAddress resolveRemoteAddressExt(OctetsFW extension, String targetName, long targetRef) {
        TcpBeginExFW beginEx = extension.get(this.beginExRO::wrap);
        TcpAddressFW remoteAddress = beginEx.remoteAddress();
        int remotePort = beginEx.remotePort();
        InetAddress address = null;
        try {
            Predicate<? super InetAddress> subnetFilter = this.extensionMatcher(targetName);
            if (targetRef == 0L || targetRef == (long)remotePort) {
                switch (remoteAddress.kind()) {
                    case 3: {
                        String requestedAddressName = remoteAddress.host().asString();
                        Optional<? super InetAddress> optional = Arrays.stream(InetAddress.getAllByName(requestedAddressName)).filter(subnetFilter).findFirst();
                        address = optional.isPresent() ? optional.get() : null;
                        break;
                    }
                    case 1: {
                        OctetsFW ipRO = remoteAddress.ipv4Address();
                        byte[] addr = new byte[ipRO.sizeof()];
                        ipRO.buffer().getBytes(ipRO.offset(), addr, 0, ipRO.sizeof());
                        InetAddress candidate = InetAddress.getByAddress(addr);
                        address = subnetFilter.test(candidate) ? candidate : null;
                        break;
                    }
                    case 2: {
                        OctetsFW ipRO = remoteAddress.ipv6Address();
                        byte[] addr = new byte[ipRO.sizeof()];
                        ipRO.buffer().getBytes(ipRO.offset(), addr, 0, ipRO.sizeof());
                        InetAddress candidate = InetAddress.getByAddress(addr);
                        address = subnetFilter.test(candidate) ? candidate : null;
                        break;
                    }
                    default: {
                        throw new RuntimeException("Unexpected address kind");
                    }
                }
            }
        }
        catch (UnknownHostException unknownHostException) {
            // empty catch block
        }
        return address != null ? new InetSocketAddress(address, remotePort) : null;
    }

    private Predicate<? super InetAddress> extensionMatcher(String targetName) throws UnknownHostException {
        Predicate result;
        if (targetName.contains("/")) {
            result = this.targetToCidrMatch.computeIfAbsent(targetName, this::inetMatchesCIDR);
        } else {
            InetAddress.getByName(targetName);
            result = this.targetToCidrMatch.computeIfAbsent(targetName, this::inetMatchesInet);
        }
        return result;
    }

    private Predicate<InetAddress> inetMatchesCIDR(String targetName) {
        CIDR cidr = new CIDR(targetName);
        return candidate -> cidr.isInRange(candidate.getHostAddress());
    }

    private Predicate<InetAddress> inetMatchesInet(String targetName) {
        try {
            InetAddress toMatch = InetAddress.getByName(targetName);
            return candidate -> toMatch.equals(candidate);
        }
        catch (UnknownHostException e) {
            LangUtil.rethrowUnchecked((Throwable)e);
            return candidate -> false;
        }
    }

    private SocketChannel newSocketChannel() {
        try {
            SocketChannel channel = SocketChannel.open();
            channel.configureBlocking(false);
            channel.setOption((SocketOption)StandardSocketOptions.TCP_NODELAY, (Object)true);
            return channel;
        }
        catch (IOException ex) {
            LangUtil.rethrowUnchecked((Throwable)ex);
            return null;
        }
    }

    private void doCloseNetwork(SocketChannel network) {
        CloseHelper.quietClose((AutoCloseable)network);
    }

    private void doBegin(MessageConsumer receiver, long routeId, long streamId, long traceId, InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
        BeginFW begin = this.beginRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).traceId(traceId).affinity(streamId).extension(b -> b.set(this.tcpBeginEx(localAddress, remoteAddress))).build();
        receiver.accept(begin.typeId(), begin.buffer(), begin.offset(), begin.sizeof());
    }

    private void doData(MessageConsumer stream, long routeId, long streamId, long traceId, long budgetId, int reserved, DirectBuffer payload, int offset, int length) {
        DataFW data = this.dataRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).traceId(traceId).budgetId(budgetId).reserved(reserved).payload(payload, offset, length).build();
        stream.accept(data.typeId(), data.buffer(), data.offset(), data.sizeof());
    }

    private void doEnd(MessageConsumer receiver, long routeId, long streamId, long traceId) {
        EndFW end = this.endRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).traceId(traceId).build();
        receiver.accept(end.typeId(), end.buffer(), end.offset(), end.sizeof());
    }

    private void doAbort(MessageConsumer receiver, long routeId, long streamId, long traceId) {
        AbortFW abort = this.abortRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).traceId(traceId).build();
        receiver.accept(abort.typeId(), abort.buffer(), abort.offset(), abort.sizeof());
    }

    private void doReset(MessageConsumer receiver, long routeId, long streamId, long traceId) {
        ResetFW reset = this.resetRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).traceId(traceId).build();
        receiver.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof());
    }

    private void doWindow(MessageConsumer sender, long routeId, long streamId, long traceId, int budgetId, int credit, int padding) {
        WindowFW window = this.windowRW.wrap(this.writeBuffer, 0, this.writeBuffer.capacity()).routeId(routeId).streamId(streamId).traceId(traceId).budgetId(budgetId).credit(credit).padding(padding).build();
        sender.accept(window.typeId(), window.buffer(), window.offset(), window.sizeof());
    }

    private Flyweight.Builder.Visitor tcpBeginEx(InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
        return (buffer, offset, limit) -> this.beginExRW.wrap(buffer, offset, limit).typeId(this.tcpTypeId).localAddress(a -> IpUtil.socketAddress(localAddress, a::ipv4Address, a::ipv6Address)).localPort(localAddress.getPort()).remoteAddress(a -> IpUtil.socketAddress(remoteAddress, a::ipv4Address, a::ipv6Address)).remotePort(remoteAddress.getPort()).build().sizeof();
    }

    private final class TcpClient {
        private final MessageConsumer application;
        private final long routeId;
        private final long initialId;
        private final long replyId;
        private final SocketChannel network;
        private final TcpRouteCounters counters;
        private PollerKey networkKey;
        private long replyBudgetId;
        private int replyBudget;
        private int replyPadding;
        private int initialBudget;
        private int state;
        private int networkSlot = -1;
        private int networkSlotOffset;
        private int bytesFlushed;

        private TcpClient(MessageConsumer application, long routeId, long initialId, SocketChannel network, TcpRouteCounters counters) {
            this.application = application;
            this.routeId = routeId;
            this.initialId = initialId;
            this.replyId = TcpClientFactory.this.supplyReplyId.applyAsLong(initialId);
            this.network = network;
            this.counters = counters;
        }

        private void doNetworkConnect(InetSocketAddress remoteAddress) {
            try {
                this.state = TcpState.openingInitial(this.state);
                this.counters.opensWritten.getAsLong();
                this.network.setOption((SocketOption)StandardSocketOptions.SO_KEEPALIVE, (Object)TcpClientFactory.this.keepalive);
                if (this.network.connect(remoteAddress)) {
                    this.onNetworkConnected();
                } else {
                    this.networkKey = TcpClientFactory.this.poller.doRegister(this.network, 8, this::onNetworkConnect);
                }
            }
            catch (IOException | UnresolvedAddressException ex) {
                this.onNetworkRejected();
            }
        }

        private int onNetworkConnect(PollerKey key) {
            try {
                key.clear(8);
                this.network.finishConnect();
                this.onNetworkConnected();
            }
            catch (IOException | UnresolvedAddressException ex) {
                this.onNetworkRejected();
            }
            return 1;
        }

        private void onNetworkConnected() {
            long traceId = TcpClientFactory.this.supplyTraceId.getAsLong();
            this.state = TcpState.openInitial(this.state);
            this.counters.opensRead.getAsLong();
            try {
                this.networkKey.handler(1, this::onNetworkReadable);
                this.networkKey.handler(4, this::onNetworkWritable);
                this.doApplicationBegin(traceId);
                this.doApplicationWindow(traceId, TcpClientFactory.this.bufferPool.slotCapacity());
            }
            catch (IOException ex) {
                this.doCleanup(traceId);
            }
        }

        private void onNetworkRejected() {
            long traceId = TcpClientFactory.this.supplyTraceId.getAsLong();
            this.counters.resetsRead.getAsLong();
            this.doApplicationResetIfNecessary(traceId);
        }

        private int onNetworkReadable(PollerKey key) {
            assert (this.replyBudget > this.replyPadding);
            int limit = Math.min(this.replyBudget - this.replyPadding, TcpClientFactory.this.readBuffer.capacity());
            ((Buffer)TcpClientFactory.this.readByteBuffer).position(0);
            ((Buffer)TcpClientFactory.this.readByteBuffer).limit(limit);
            try {
                int bytesRead = this.network.read(TcpClientFactory.this.readByteBuffer);
                if (bytesRead == -1) {
                    key.clear(1);
                    CloseHelper.close(this.network::shutdownInput);
                    this.counters.closesRead.getAsLong();
                    this.doApplicationEnd(TcpClientFactory.this.supplyTraceId.getAsLong());
                    if (this.network.socket().isOutputShutdown()) {
                        TcpClientFactory.this.doCloseNetwork(this.network);
                    }
                } else if (bytesRead != 0) {
                    this.counters.bytesRead.accept(bytesRead);
                    this.doApplicationData((DirectBuffer)TcpClientFactory.this.readBuffer, 0, bytesRead);
                }
            }
            catch (IOException ex) {
                this.doCleanup(TcpClientFactory.this.supplyTraceId.getAsLong());
            }
            return 1;
        }

        private int onNetworkWritable(PollerKey key) {
            if (this.networkSlot == -1) {
                this.counters.writeopsNoSlot.getAsLong();
                assert (key == this.networkKey);
                return 0;
            }
            assert (this.networkSlot != -1);
            long traceId = TcpClientFactory.this.supplyTraceId.getAsLong();
            MutableDirectBuffer buffer = TcpClientFactory.this.bufferPool.buffer(this.networkSlot);
            ByteBuffer byteBuffer = TcpClientFactory.this.bufferPool.byteBuffer(this.networkSlot);
            byteBuffer.limit(byteBuffer.position() + this.networkSlotOffset);
            return this.doNetworkWrite((DirectBuffer)buffer, 0, this.networkSlotOffset, byteBuffer, traceId);
        }

        private int doNetworkWrite(DirectBuffer buffer, int offset, int length, ByteBuffer byteBuffer, long traceId) {
            int bytesWritten = 0;
            try {
                for (int i = 16; bytesWritten == 0 && i > 0; --i) {
                    bytesWritten = this.network.write(byteBuffer);
                }
                this.counters.bytesWritten.accept(bytesWritten);
                this.bytesFlushed += bytesWritten;
                if (bytesWritten < length) {
                    if (this.networkSlot == -1) {
                        this.networkSlot = TcpClientFactory.this.bufferPool.acquire(this.initialId);
                    }
                    if (this.networkSlot == -1) {
                        this.counters.overflows.getAsLong();
                        this.doApplicationResetIfNecessary(traceId);
                        this.doCleanup(traceId);
                    } else {
                        MutableDirectBuffer slotBuffer = TcpClientFactory.this.bufferPool.buffer(this.networkSlot);
                        slotBuffer.putBytes(0, buffer, offset + bytesWritten, length - bytesWritten);
                        this.networkSlotOffset = length - bytesWritten;
                        this.networkKey.register(4);
                        this.counters.writeops.getAsLong();
                    }
                } else {
                    this.cleanupNetworkSlotIfNecessary();
                    this.networkKey.clear(4);
                    if (TcpState.initialClosing(this.state)) {
                        this.doNetworkShutdownOutput(traceId);
                    } else if (this.bytesFlushed >= TcpClientFactory.this.windowThreshold) {
                        this.doApplicationWindow(traceId, this.bytesFlushed);
                        this.bytesFlushed = 0;
                    }
                }
            }
            catch (IOException ex) {
                this.doCleanup(traceId);
            }
            return bytesWritten;
        }

        private void doNetworkShutdownOutput(long traceId) {
            this.state = TcpState.closeInitial(this.state);
            this.cleanupNetworkSlotIfNecessary();
            try {
                if (this.network.isConnectionPending()) {
                    this.networkKey.clear(8);
                    TcpClientFactory.this.doCloseNetwork(this.network);
                    this.counters.closesWritten.getAsLong();
                } else {
                    this.networkKey.clear(4);
                    this.network.shutdownOutput();
                    this.counters.closesWritten.getAsLong();
                    if (this.network.socket().isInputShutdown()) {
                        TcpClientFactory.this.doCloseNetwork(this.network);
                    }
                }
            }
            catch (IOException ex) {
                this.doCleanup(traceId);
            }
        }

        private void cleanupNetworkSlotIfNecessary() {
            if (this.networkSlot != -1) {
                TcpClientFactory.this.bufferPool.release(this.networkSlot);
                this.networkSlot = -1;
                this.networkSlotOffset = 0;
            }
        }

        private void onApplication(int msgTypeId, DirectBuffer buffer, int index, int length) {
            switch (msgTypeId) {
                case 1: {
                    BeginFW begin = TcpClientFactory.this.beginRO.wrap(buffer, index, index + length);
                    this.onApplicationBegin(begin);
                    break;
                }
                case 2: {
                    DataFW data = TcpClientFactory.this.dataRO.wrap(buffer, index, index + length);
                    this.onApplicationData(data);
                    break;
                }
                case 3: {
                    EndFW end = TcpClientFactory.this.endRO.wrap(buffer, index, index + length);
                    this.onApplicationEnd(end);
                    break;
                }
                case 4: {
                    AbortFW abort = TcpClientFactory.this.abortRO.wrap(buffer, index, index + length);
                    this.onApplicationAbort(abort);
                    break;
                }
                case 0x40000001: {
                    ResetFW reset = TcpClientFactory.this.resetRO.wrap(buffer, index, index + length);
                    this.onApplicationReset(reset);
                    break;
                }
                case 0x40000002: {
                    WindowFW window = TcpClientFactory.this.windowRO.wrap(buffer, index, index + length);
                    this.onApplicationWindow(window);
                }
            }
        }

        private void onApplicationBegin(BeginFW begin) {
            assert (TcpState.initialOpening(this.state));
        }

        private void onApplicationData(DataFW data) {
            long traceId = data.traceId();
            int reserved = data.reserved();
            this.initialBudget -= reserved;
            if (this.initialBudget < 0) {
                this.doApplicationResetIfNecessary(traceId);
                this.doCleanup(traceId);
            } else {
                ByteBuffer byteBuffer;
                OctetsFW payload = data.payload();
                DirectBuffer buffer = payload.buffer();
                int offset = payload.offset();
                int length = payload.sizeof();
                assert (reserved == length);
                assert (length > 0);
                if (this.networkSlot != -1) {
                    MutableDirectBuffer slotBuffer = TcpClientFactory.this.bufferPool.buffer(this.networkSlot);
                    slotBuffer.putBytes(this.networkSlotOffset, buffer, offset, length);
                    this.networkSlotOffset += length;
                    ByteBuffer slotByteBuffer = TcpClientFactory.this.bufferPool.byteBuffer(this.networkSlot);
                    slotByteBuffer.limit(slotByteBuffer.position() + this.networkSlotOffset);
                    buffer = slotBuffer;
                    offset = 0;
                    length = this.networkSlotOffset;
                    byteBuffer = slotByteBuffer;
                } else {
                    TcpClientFactory.this.writeByteBuffer.clear();
                    buffer.getBytes(offset, TcpClientFactory.this.writeByteBuffer, length);
                    TcpClientFactory.this.writeByteBuffer.flip();
                    byteBuffer = TcpClientFactory.this.writeByteBuffer;
                }
                this.doNetworkWrite(buffer, offset, length, byteBuffer, traceId);
            }
        }

        private void onApplicationEnd(EndFW end) {
            long traceId = end.traceId();
            this.state = TcpState.closingInitial(this.state);
            if (this.networkSlot == -1) {
                this.doNetworkShutdownOutput(traceId);
            }
        }

        private void onApplicationAbort(AbortFW abort) {
            long traceId = abort.traceId();
            this.doNetworkShutdownOutput(traceId);
        }

        private void onApplicationReset(ResetFW reset) {
            this.state = TcpState.closeReply(this.state);
            CloseHelper.quietClose(this.network::shutdownInput);
            long traceId = reset.traceId();
            this.doCleanup(traceId);
        }

        private void onApplicationWindow(WindowFW window) {
            long budgetId = window.budgetId();
            int credit = window.credit();
            int padding = window.padding();
            this.replyBudgetId = budgetId;
            this.replyBudget += credit;
            this.replyPadding = padding;
            this.state = TcpState.openReply(this.state);
            if (this.replyBudget > this.replyPadding) {
                this.onNetworkReadable(this.networkKey);
            } else {
                this.networkKey.clear(1);
            }
            if (this.replyBudget > this.replyPadding && !TcpState.replyClosed(this.state)) {
                this.networkKey.register(1);
                this.counters.readops.getAsLong();
            }
        }

        private void doApplicationBegin(long traceId) throws IOException {
            InetSocketAddress localAddress = (InetSocketAddress)this.network.getLocalAddress();
            InetSocketAddress remoteAddress = (InetSocketAddress)this.network.getRemoteAddress();
            TcpClientFactory.this.router.setThrottle(this.replyId, this::onApplication);
            TcpClientFactory.this.doBegin(this.application, this.routeId, this.replyId, traceId, localAddress, remoteAddress);
            this.state = TcpState.openingReply(this.state);
        }

        private void doApplicationData(DirectBuffer buffer, int offset, int length) {
            long traceId = TcpClientFactory.this.supplyTraceId.getAsLong();
            int reserved = length + this.replyPadding;
            TcpClientFactory.this.doData(this.application, this.routeId, this.replyId, traceId, this.replyBudgetId, reserved, buffer, offset, length);
            this.replyBudget -= reserved;
            if (this.replyBudget <= this.replyPadding) {
                this.networkKey.clear(1);
            }
        }

        private void doApplicationEnd(long traceId) {
            TcpClientFactory.this.doEnd(this.application, this.routeId, this.replyId, traceId);
            this.state = TcpState.closeReply(this.state);
        }

        private void doApplicationAbort(long traceId) {
            TcpClientFactory.this.doAbort(this.application, this.routeId, this.replyId, traceId);
            this.state = TcpState.closeReply(this.state);
        }

        private void doApplicationReset(long traceId) {
            TcpClientFactory.this.doReset(this.application, this.routeId, this.initialId, traceId);
            this.state = TcpState.closeInitial(this.state);
        }

        private void doApplicationWindow(long traceId, int credit) {
            this.initialBudget += credit;
            TcpClientFactory.this.doWindow(this.application, this.routeId, this.initialId, traceId, 0, credit, 0);
        }

        private void doApplicationResetIfNecessary(long traceId) {
            if (TcpState.initialOpening(this.state) && !TcpState.initialClosing(this.state)) {
                this.doApplicationReset(traceId);
            }
        }

        private void doApplicationAbortIfNecessary(long traceId) {
            if (TcpState.replyOpened(this.state) && !TcpState.replyClosed(this.state)) {
                this.doApplicationAbort(traceId);
            }
        }

        private void doCleanup(long traceId) {
            this.doApplicationAbortIfNecessary(traceId);
            this.doApplicationResetIfNecessary(traceId);
            if (!this.network.socket().isInputShutdown()) {
                this.counters.resetsRead.getAsLong();
            }
            if (!this.network.socket().isOutputShutdown()) {
                this.counters.abortsWritten.getAsLong();
            }
            TcpClientFactory.this.doCloseNetwork(this.network);
            this.cleanupNetworkSlotIfNecessary();
        }
    }
}

