/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.ip.tcp;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.Lifecycle;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.common.LiteralExpression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnection;
import org.springframework.integration.ip.tcp.connection.TcpConnectionFailedCorrelationEvent;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.integration.ip.tcp.connection.TcpListener;
import org.springframework.integration.ip.tcp.connection.TcpSender;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;

public class TcpOutboundGateway
extends AbstractReplyProducingMessageHandler
implements TcpSender,
TcpListener,
Lifecycle {
    private volatile AbstractClientConnectionFactory connectionFactory;
    private volatile boolean isSingleUse;
    private final Map<String, AsyncReply> pendingReplies = new ConcurrentHashMap<String, AsyncReply>();
    private final Semaphore semaphore = new Semaphore(1, true);
    private volatile Expression remoteTimeoutExpression = new LiteralExpression("10000");
    private volatile long requestTimeout = 10000L;
    private volatile EvaluationContext evaluationContext = new StandardEvaluationContext();

    public void setRequestTimeout(long requestTimeout) {
        this.requestTimeout = requestTimeout;
    }

    public void setRemoteTimeout(long remoteTimeout) {
        this.remoteTimeoutExpression = new LiteralExpression("" + remoteTimeout);
    }

    public void setRemoteTimeoutExpression(Expression remoteTimeoutExpression) {
        this.remoteTimeoutExpression = remoteTimeoutExpression;
    }

    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        this.evaluationContext = evaluationContext;
    }

    protected void doInit() {
        super.doInit();
        if (this.evaluationContext == null) {
            this.evaluationContext = ExpressionUtils.createStandardEvaluationContext((BeanFactory)this.getBeanFactory());
        }
    }

    protected Object handleRequestMessage(Message<?> requestMessage) {
        Message<?> message;
        boolean haveSemaphore;
        block21: {
            Assert.notNull((Object)this.connectionFactory, (String)(this.getClass().getName() + " requires a client connection factory"));
            haveSemaphore = false;
            TcpConnectionSupport connection = null;
            String connectionId = null;
            try {
                if (!this.isSingleUse) {
                    this.logger.debug((Object)"trying semaphore");
                    if (!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
                        throw new MessageTimeoutException(requestMessage, "Timed out waiting for connection");
                    }
                    haveSemaphore = true;
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)"got semaphore");
                    }
                }
                connection = this.connectionFactory.getConnection();
                AsyncReply reply = new AsyncReply((Long)this.remoteTimeoutExpression.getValue(this.evaluationContext, requestMessage, Long.class));
                connectionId = connection.getConnectionId();
                this.pendingReplies.put(connectionId, reply);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Added pending reply " + connectionId));
                }
                connection.send(requestMessage);
                Message<?> replyMessage = reply.getReply();
                if (replyMessage == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug((Object)("Remote Timeout on " + connectionId));
                    }
                    this.connectionFactory.forceClose(connection);
                    throw new MessageTimeoutException(requestMessage, "Timed out waiting for response");
                }
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug((Object)("Response " + replyMessage));
                }
                message = replyMessage;
                if (connectionId == null) break block21;
                this.pendingReplies.remove(connectionId);
            }
            catch (Exception e) {
                try {
                    this.logger.error((Object)"Tcp Gateway exception", (Throwable)e);
                    if (e instanceof MessagingException) {
                        throw (MessagingException)e;
                    }
                    throw new MessagingException("Failed to send or receive", (Throwable)e);
                }
                catch (Throwable throwable) {
                    if (connectionId != null) {
                        this.pendingReplies.remove(connectionId);
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)("Removed pending reply " + connectionId));
                        }
                        if (this.isSingleUse) {
                            connection.close();
                        }
                    }
                    if (haveSemaphore) {
                        this.semaphore.release();
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug((Object)"released semaphore");
                        }
                    }
                    throw throwable;
                }
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)("Removed pending reply " + connectionId));
            }
            if (this.isSingleUse) {
                connection.close();
            }
        }
        if (haveSemaphore) {
            this.semaphore.release();
            if (this.logger.isDebugEnabled()) {
                this.logger.debug((Object)"released semaphore");
            }
        }
        return message;
    }

    @Override
    public boolean onMessage(Message<?> message) {
        AsyncReply reply;
        String connectionId = (String)message.getHeaders().get((Object)"ip_connectionId");
        if (connectionId == null) {
            this.logger.error((Object)"Cannot correlate response - no connection id");
            this.publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id");
            return false;
        }
        if (this.logger.isTraceEnabled()) {
            this.logger.trace((Object)("onMessage: " + connectionId + "(" + message + ")"));
        }
        if ((reply = this.pendingReplies.get(connectionId)) == null) {
            if (message instanceof ErrorMessage) {
                return false;
            }
            String errorMessage = "Cannot correlate response - no pending reply for " + connectionId;
            this.logger.error((Object)errorMessage);
            this.publishNoConnectionEvent(message, connectionId, errorMessage);
            return false;
        }
        reply.setReply(message);
        return false;
    }

    private void publishNoConnectionEvent(Message<?> message, String connectionId, String errorMessage) {
        ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent)new TcpConnectionFailedCorrelationEvent(this, connectionId, new MessagingException(message, errorMessage)));
        }
    }

    public void setConnectionFactory(AbstractClientConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
        connectionFactory.registerListener(this);
        connectionFactory.registerSender(this);
        this.isSingleUse = connectionFactory.isSingleUse();
    }

    @Override
    public void addNewConnection(TcpConnection connection) {
    }

    @Override
    public void removeDeadConnection(TcpConnection connection) {
    }

    public void setReplyChannel(MessageChannel replyChannel) {
        this.setOutputChannel(replyChannel);
    }

    public String getComponentType() {
        return "ip:tcp-outbound-gateway";
    }

    public void start() {
        this.connectionFactory.start();
    }

    public void stop() {
        this.connectionFactory.stop();
    }

    public boolean isRunning() {
        return this.connectionFactory.isRunning();
    }

    protected AbstractConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }

    private class AsyncReply {
        private final CountDownLatch latch = new CountDownLatch(1);
        private final CountDownLatch secondChanceLatch = new CountDownLatch(1);
        private final long remoteTimeout;
        private volatile Message<?> reply;

        public AsyncReply(long remoteTimeout) {
            this.remoteTimeout = remoteTimeout;
        }

        public Message<?> getReply() throws Exception {
            try {
                if (!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
                    return null;
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            boolean waitForMessageAfterError = true;
            while (this.reply instanceof ErrorMessage) {
                if (waitForMessageAfterError) {
                    TcpOutboundGateway.this.logger.debug((Object)"second chance");
                    this.secondChanceLatch.await(2L, TimeUnit.SECONDS);
                    waitForMessageAfterError = false;
                    continue;
                }
                if (this.reply.getPayload() instanceof MessagingException) {
                    throw (MessagingException)this.reply.getPayload();
                }
                throw new MessagingException("Exception while awaiting reply", (Throwable)this.reply.getPayload());
            }
            return this.reply;
        }

        public void setReply(Message<?> reply) {
            if (this.reply == null) {
                this.reply = reply;
                this.latch.countDown();
            } else if (this.reply instanceof ErrorMessage) {
                this.reply = reply;
                this.secondChanceLatch.countDown();
            }
        }
    }
}

