/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.cluster.protocol;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.ProtocolContext;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolMessageMarshaller;
import org.apache.nifi.cluster.protocol.ProtocolMessageUnmarshaller;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.ConnectionRequestMessage;
import org.apache.nifi.cluster.protocol.message.ConnectionResponseMessage;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.io.socket.SocketConfiguration;
import org.apache.nifi.io.socket.SocketUtils;

public abstract class AbstractNodeProtocolSender
implements NodeProtocolSender {
    private final SocketConfiguration socketConfiguration;
    private final ProtocolContext<ProtocolMessage> protocolContext;

    public AbstractNodeProtocolSender(SocketConfiguration socketConfiguration, ProtocolContext<ProtocolMessage> protocolContext) {
        this.socketConfiguration = socketConfiguration;
        this.protocolContext = protocolContext;
    }

    @Override
    public ConnectionResponseMessage requestConnection(ConnectionRequestMessage msg) throws ProtocolException, UnknownServiceAddressException {
        Socket socket = null;
        try {
            ProtocolMessage response;
            socket = this.createSocket();
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, socket.getOutputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed marshalling '" + (Object)((Object)msg.getType()) + "' protocol message due to: " + ioe, ioe);
            }
            try {
                ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = this.protocolContext.createUnmarshaller();
                response = unmarshaller.unmarshal(socket.getInputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed unmarshalling '" + (Object)((Object)ProtocolMessage.MessageType.CONNECTION_RESPONSE) + "' protocol message from " + socket.getRemoteSocketAddress() + " due to: " + ioe, ioe);
            }
            if (ProtocolMessage.MessageType.CONNECTION_RESPONSE == response.getType()) {
                ConnectionResponseMessage connectionResponse;
                ConnectionResponseMessage connectionResponseMessage = connectionResponse = (ConnectionResponseMessage)response;
                return connectionResponseMessage;
            }
            throw new ProtocolException("Expected message type '" + (Object)((Object)ProtocolMessage.MessageType.CONNECTION_RESPONSE) + "' but found '" + (Object)((Object)response.getType()) + "'");
        }
        finally {
            SocketUtils.closeQuietly((Socket)socket);
        }
    }

    @Override
    public void heartbeat(HeartbeatMessage msg, String address) throws ProtocolException {
        int port;
        String hostname;
        try {
            String[] parts = address.split(":");
            hostname = parts[0];
            port = Integer.parseInt(parts[1]);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot send heartbeat to address [" + address + "]. Address must be in <hostname>:<port> format");
        }
        this.sendProtocolMessage(msg, hostname, port);
    }

    private Socket createSocket() {
        InetSocketAddress socketAddress = null;
        try {
            socketAddress = this.getServiceAddress();
            return SocketUtils.createSocket((InetSocketAddress)socketAddress, (SocketConfiguration)this.socketConfiguration);
        }
        catch (IOException ioe) {
            if (socketAddress == null) {
                throw new ProtocolException("Failed to create socket due to: " + ioe, ioe);
            }
            throw new ProtocolException("Failed to create socket to " + socketAddress + " due to: " + ioe, ioe);
        }
    }

    public SocketConfiguration getSocketConfiguration() {
        return this.socketConfiguration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendProtocolMessage(ProtocolMessage msg, String hostname, int port) {
        Socket socket = null;
        try {
            try {
                socket = SocketUtils.createSocket((InetSocketAddress)new InetSocketAddress(hostname, port), (SocketConfiguration)this.socketConfiguration);
            }
            catch (IOException e) {
                throw new ProtocolException("Failed to send message to Cluster Coordinator due to: " + e, e);
            }
            try {
                ProtocolMessageMarshaller<ProtocolMessage> marshaller = this.protocolContext.createMarshaller();
                marshaller.marshal(msg, socket.getOutputStream());
            }
            catch (IOException ioe) {
                throw new ProtocolException("Failed marshalling '" + (Object)((Object)msg.getType()) + "' protocol message due to: " + ioe, ioe);
            }
        }
        catch (Throwable throwable) {
            SocketUtils.closeQuietly(socket);
            throw throwable;
        }
        SocketUtils.closeQuietly((Socket)socket);
    }

    protected abstract InetSocketAddress getServiceAddress() throws IOException;
}

