/*
 * Decompiled with CFR 0.152.
 */
package org.gridkit.zerormi.hub;

import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.gridkit.util.concurrent.FutureEx;
import org.gridkit.zerormi.DuplexStream;
import org.gridkit.zerormi.DuplexStreamConnector;
import org.gridkit.zerormi.RmiGateway;
import org.gridkit.zerormi.SocketStream;
import org.gridkit.zerormi.hub.Ping;
import org.gridkit.zerormi.zlog.LogLevel;
import org.gridkit.zerormi.zlog.LogStream;
import org.gridkit.zerormi.zlog.ZLogFactory;
import org.gridkit.zerormi.zlog.ZLogger;

public class RemotingEndPoint
implements Runnable,
RmiGateway.StreamErrorHandler {
    public static final String HEARTBEAT_PERIOD = "org.gridkit.telecontrol.slave.heart-beat-period";
    public static final String HEARTBEAT_TIMEOUT = "org.gridkit.telecontrol.slave.heart-beat-timeout";
    private static ZLogger LROOT = ZLogFactory.getDefaultRootLogger().getLogger("RemotingEndPoint");
    private static LogStream LTRACE = LROOT.get("", LogLevel.TRACE);
    private static LogStream LVERBOSE = LROOT.get("", LogLevel.VERBOSE);
    private static LogStream LINFO = LROOT.get("", LogLevel.INFO);
    private static LogStream LWARN = LROOT.get("", LogLevel.WARN);
    private static LogStream LERROR = LROOT.get("", LogLevel.CRITICAL);
    private static LogStream LFATAL = LROOT.get("", LogLevel.FATAL);
    private String uid;
    private RmiGateway gateway;
    private long pingInterval = Long.valueOf(System.getProperty("org.gridkit.telecontrol.slave.heart-beat-period", "1000"));
    private long heartBeatTimeout = Long.valueOf(System.getProperty("org.gridkit.telecontrol.slave.heart-beat-timeout", "60000"));
    private Object pingSingnal = new Object();
    private long lastHeartBeat = System.nanoTime();
    private DuplexStreamConnector connector;

    public RemotingEndPoint(String uid, SocketAddress addr) {
        this.uid = uid;
        this.connector = new ConnectSocketConnector(addr);
        this.gateway = new RmiGateway("master");
        this.gateway.setStreamErrorHandler(this);
    }

    public RemotingEndPoint(String uid, DuplexStreamConnector connector) {
        this.uid = uid;
        this.connector = connector;
        this.gateway = new RmiGateway("master");
        this.gateway.setStreamErrorHandler(this);
    }

    public void enableHeartbeatDeatchWatch() {
        if (this.heartBeatTimeout != Integer.MAX_VALUE) {
            Thread t = new Thread(){

                @Override
                public void run() {
                    block4: while (true) {
                        try {
                            while (true) {
                                Thread.currentThread().setName("HeartbeatDeathWatch-" + SimpleDateFormat.getDateTimeInstance().format(new Date()));
                                long stale = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - RemotingEndPoint.this.lastHeartBeat);
                                if (stale > RemotingEndPoint.this.heartBeatTimeout) {
                                    System.err.println("Terminating process due to heartbeat timeout");
                                    System.err.flush();
                                    Runtime.getRuntime().halt(0);
                                }
                                try {
                                    Thread.sleep(1000L);
                                    continue block4;
                                }
                                catch (InterruptedException interruptedException) {
                                    continue;
                                }
                                break;
                            }
                        }
                        catch (Throwable e) {
                            System.err.println("Unexpected exception in death watch thread " + e.toString());
                            System.err.flush();
                            Runtime.getRuntime().halt(0);
                            return;
                        }
                    }
                }
            };
            t.setDaemon(true);
            t.setName("HeartbeatDeathWatch");
            t.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e1) {
                // empty catch block
            }
            try {
                Object ss;
                if (!this.gateway.isConnected()) {
                    LINFO.log("Connecting to master socket " + this.connector);
                    try {
                        ss = this.connector.connect();
                    }
                    catch (IOException e) {
                        LFATAL.log("Connection has failed " + this.connector, e);
                        return;
                    }
                    if (this.uid != null) {
                        byte[] magic = this.uid.getBytes();
                        ss.getOutput().write(magic);
                        ss.getOutput().flush();
                    }
                    LVERBOSE.log("Master socket connected");
                    this.gateway.connect((DuplexStream)ss);
                    LVERBOSE.log("Gateway connected");
                }
                ss = this.pingSingnal;
                synchronized (ss) {
                    this.pingSingnal.wait(this.pingInterval);
                }
                LTRACE.log("Ping");
                try {
                    FutureEx f = this.gateway.getRemoteExecutorService().submit((Callable)new Ping());
                    while (true) {
                        try {
                            f.get(5L, TimeUnit.SECONDS);
                        }
                        catch (TimeoutException e) {
                            long stale;
                            if ((stale = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - this.lastHeartBeat)) <= this.heartBeatTimeout) continue;
                            System.err.println("Terminating process due to heartbeat timeout");
                            System.err.flush();
                        }
                        break;
                    }
                    this.lastHeartBeat = System.nanoTime();
                }
                catch (RejectedExecutionException e) {
                    break;
                }
                catch (ExecutionException e) {
                    if (!this.gateway.isConnected()) break;
                    LWARN.log("Ping failed: " + e.getCause().toString());
                }
            }
            catch (Exception e) {
                LERROR.log("Communication error %s", e);
            }
        }
        LINFO.log("Slave is disconting");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void streamError(DuplexStream socket, Object stream, Exception error) {
        LWARN.log("Slave read error: " + error.toString());
        Object object = this.pingSingnal;
        synchronized (object) {
            this.pingSingnal.notifyAll();
        }
        try {
            if (socket != null) {
                socket.close();
            }
        }
        catch (IOException e) {
            LERROR.log("Stream error " + socket, e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void streamClosed(DuplexStream socket, Object stream) {
        Object object = this.pingSingnal;
        synchronized (object) {
            this.pingSingnal.notifyAll();
        }
        try {
            if (socket != null) {
                socket.close();
            }
        }
        catch (IOException e) {
            LERROR.log("Stream error " + socket, e);
        }
    }

    private static class ConnectSocketConnector
    implements DuplexStreamConnector {
        private final SocketAddress address;

        public ConnectSocketConnector(SocketAddress address) {
            this.address = address;
        }

        @Override
        public DuplexStream connect() throws IOException {
            Socket socket = new Socket();
            socket.connect(this.address);
            return new SocketStream(socket);
        }

        public String toString() {
            return String.valueOf(this.address);
        }
    }
}

