/*
 * Decompiled with CFR 0.152.
 */
package org.gridkit.zerormi;

import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.StreamCorruptedException;
import java.rmi.Remote;
import java.rmi.RemoteException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.gridkit.util.concurrent.AdvancedExecutor;
import org.gridkit.util.concurrent.AdvancedExecutorAdapter;
import org.gridkit.util.concurrent.FutureEx;
import org.gridkit.zerormi.DuplexStream;
import org.gridkit.zerormi.EnvelopInputStream;
import org.gridkit.zerormi.EnvelopOutputStream;
import org.gridkit.zerormi.IOHelper;
import org.gridkit.zerormi.InboundCallError;
import org.gridkit.zerormi.RecoverableSerializationException;
import org.gridkit.zerormi.RemoteCall;
import org.gridkit.zerormi.RemoteInstance;
import org.gridkit.zerormi.RemoteMessage;
import org.gridkit.zerormi.RemoteMethodSignature;
import org.gridkit.zerormi.RemoteReturn;
import org.gridkit.zerormi.RmiChannel;
import org.gridkit.zerormi.RmiChannel1;
import org.gridkit.zerormi.RmiMarshaler;
import org.gridkit.zerormi.SmartRmiMarshaler;
import org.gridkit.zerormi.zlog.LogLevel;
import org.gridkit.zerormi.zlog.LogStream;
import org.gridkit.zerormi.zlog.ZLogFactory;
import org.gridkit.zerormi.zlog.ZLogger;

public class RmiGateway {
    private final RmiChannel channel;
    private final ExecutorService executor;
    private boolean connected = false;
    private boolean terminated = false;
    private String name;
    private DuplexStream socket;
    private InboundMessageStream in;
    private OutboundMessageStream out;
    private RemoteExecutionService service;
    private CounterAgent remote;
    private Thread readerThread;
    private final LogStream logVerbose;
    private final LogStream logInfo;
    private final LogStream logCritical;
    private StreamErrorHandler streamErrorHandler = new StreamErrorHandler(){

        @Override
        public void streamError(DuplexStream socket, Object stream, Exception error) {
            RmiGateway.this.shutdown();
        }

        @Override
        public void streamClosed(DuplexStream socket, Object stream) {
            RmiGateway.this.shutdown();
        }
    };
    static long TAG_CALL = 1L;
    static long TAG_RETURN = 2L;
    static long TAG_THROW = 3L;
    static long TRAILER_SUCCESS = 10L;
    static long TRAILER_DISCARD = 20L;
    static long TRAILER_ERROR = 30L;
    static byte[] canary = new byte[0];

    public RmiGateway(String name) {
        this(name, new SmartRmiMarshaler(), ZLogFactory.getDefaultRootLogger().getLogger(RmiGateway.class.getPackage().getName()), Collections.emptyMap());
    }

    public RmiGateway(String name, ZLogger logger) {
        this(name, new SmartRmiMarshaler(), logger, Collections.emptyMap());
    }

    private ExecutorService createRmiExecutor() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory(){
            int counter = 1;

            @Override
            public synchronized Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName("RMI[" + RmiGateway.this.name + "]-worker-" + this.counter++);
                t.setDaemon(true);
                return t;
            }
        });
    }

    public RmiGateway(String name, RmiMarshaler marshaler, ZLogger logger, Map<String, Object> props) {
        this.executor = this.createRmiExecutor();
        this.channel = new RmiChannel1(name, new MessageOut(), this.executor, marshaler, logger, props);
        this.service = new RemoteExecutionService();
        this.name = name;
        this.logVerbose = logger.get(this.getClass().getSimpleName(), LogLevel.VERBOSE);
        this.logInfo = logger.get(this.getClass().getSimpleName(), LogLevel.INFO);
        this.logCritical = logger.get(this.getClass().getSimpleName(), LogLevel.CRITICAL);
    }

    public AdvancedExecutor getRemoteExecutorService() {
        return this.service;
    }

    public void setStreamErrorHandler(StreamErrorHandler errorHandler) {
        this.streamErrorHandler = errorHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void disconnect() {
        Thread readerThread = null;
        RmiGateway rmiGateway = this;
        synchronized (rmiGateway) {
            if (this.connected) {
                this.logInfo.log("RMI gateway [" + this.name + "] disconneted.");
                readerThread = this.readerThread;
                try {
                    this.out.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.in.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.out.close();
                }
                catch (Exception e) {
                    // empty catch block
                }
                try {
                    this.socket.close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                this.in = null;
                this.out = null;
                this.socket = null;
                this.connected = false;
            }
        }
        if (readerThread != null) {
            readerThread.interrupt();
            try {
                readerThread.join();
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    public synchronized boolean isConnected() {
        return this.connected && !this.terminated && !this.socket.isClosed();
    }

    public synchronized void shutdown() {
        if (this.terminated) {
            return;
        }
        this.logInfo.log("RMI gateway [" + this.name + "] terminated.");
        this.terminated = true;
        try {
            this.out.close();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.out.close();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.in.close();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.socket.close();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.service.shutdown();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.channel.close();
        }
        catch (Exception e) {
            // empty catch block
        }
        try {
            this.executor.shutdown();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    public synchronized void connect(DuplexStream socket) throws IOException {
        if (this.socket != null) {
            throw new IllegalStateException("Already connected");
        }
        try {
            this.socket = socket;
            this.out = new OutboundMessageStream(socket.getOutput());
            LocalAgent localAgent = new LocalAgent();
            this.channel.exportObject(CounterAgent.class, localAgent);
            this.out.writeHandShake(localAgent);
            this.in = new InboundMessageStream(socket.getInput());
            this.remote = (CounterAgent)this.in.readHandShake();
            this.readerThread = new SocketReader();
            this.readerThread.setName("RMI-Receiver: " + socket);
            this.readerThread.start();
            this.connected = true;
        }
        catch (Exception e) {
            if (this.in != null) {
                this.in.close();
            }
            if (this.out != null) {
                this.out.close();
            }
            try {
                if (this.socket != null) {
                    this.socket.close();
                }
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.in = null;
            this.out = null;
            this.socket = null;
            if (e instanceof IOException) {
                throw (IOException)e;
            }
            throw new RuntimeException(e);
        }
    }

    private class LocalAgent
    implements CounterAgent {
        private LocalAgent() {
        }

        @Override
        public <T> T remoteCall(Callable<T> callable) throws Exception {
            return callable.call();
        }
    }

    public static interface CounterAgent
    extends Remote {
        public <T> T remoteCall(Callable<T> var1) throws RemoteException, Exception;
    }

    public static class CallableRunnableWrapper<T>
    implements Callable<T>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private Runnable runnable;
        private T result;

        public CallableRunnableWrapper() {
        }

        public CallableRunnableWrapper(Runnable runnable, T result) {
            this.runnable = runnable;
            this.result = result;
        }

        @Override
        public T call() throws Exception {
            this.runnable.run();
            return this.result;
        }
    }

    private class RemoteExecutionService
    extends AbstractExecutorService
    implements AdvancedExecutor {
        private final ExecutorService threadPool;
        private final AdvancedExecutorAdapter adapter;

        private RemoteExecutionService() {
            this.threadPool = RmiGateway.this.executor;
            this.adapter = new AdvancedExecutorAdapter((Executor)this.threadPool);
        }

        @Override
        public <T> Future<T> submit(Runnable task, T result) {
            return this.submit(new CallableRunnableWrapper<T>(task, result));
        }

        public FutureEx<Void> submit(Runnable task) {
            return this.submit(new CallableRunnableWrapper<Object>(task, null));
        }

        public <T> FutureEx<T> submit(Callable<T> task) {
            task = this.wrap(task);
            return this.adapter.submit(task);
        }

        @Override
        public void execute(Runnable command) {
            this.submit(new CallableRunnableWrapper<Object>(command, null));
        }

        private <T> Callable<T> wrap(final Callable<T> task) {
            return new Callable<T>(){

                @Override
                public T call() throws Exception {
                    return RmiGateway.this.remote.remoteCall(task);
                }
            };
        }

        @Override
        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isShutdown() {
            throw new UnsupportedOperationException();
        }

        @Override
        public boolean isTerminated() {
            throw new UnsupportedOperationException();
        }

        @Override
        public void shutdown() {
            RmiGateway.this.shutdown();
        }

        @Override
        public List<Runnable> shutdownNow() {
            throw new UnsupportedOperationException();
        }
    }

    public static interface StreamErrorHandler {
        public void streamError(DuplexStream var1, Object var2, Exception var3);

        public void streamClosed(DuplexStream var1, Object var2);
    }

    private class MessageOut
    implements RmiChannel1.OutputChannel {
        private MessageOut() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void send(RemoteMessage message) throws IOException {
            try {
                OutboundMessageStream outboundMessageStream = RmiGateway.this.out;
                synchronized (outboundMessageStream) {
                    if (message instanceof RemoteCall) {
                        RmiGateway.this.out.writeMessage((RemoteCall)message);
                    } else {
                        RmiGateway.this.out.writeMessage((RemoteReturn)message);
                    }
                }
            }
            catch (NullPointerException e) {
                if (RmiGateway.this.out == null) {
                    throw new IOException("RMI gatway [" + RmiGateway.this.name + "] channel is not connected");
                }
                throw e;
            }
            catch (IOException e) {
                DuplexStream socket = RmiGateway.this.socket;
                OutputStream out = ((RmiGateway)RmiGateway.this).out.tstream;
                RmiGateway.this.disconnect();
                RmiGateway.this.streamErrorHandler.streamError(socket, out, e);
                throw e;
            }
        }
    }

    private class RmiObjectOutputStream
    extends ObjectOutputStream {
        public RmiObjectOutputStream(OutputStream in) throws IOException {
            super(in);
            this.enableReplaceObject(true);
        }

        @Override
        protected Object replaceObject(Object obj) throws IOException {
            Object r = RmiGateway.this.channel.streamReplaceObject(obj);
            return r;
        }

        @Override
        protected void writeStreamHeader() throws IOException {
        }
    }

    private class RmiObjectInputStream
    extends ObjectInputStream {
        public RmiObjectInputStream(InputStream in) throws IOException {
            super(in);
            this.enableResolveObject(true);
        }

        @Override
        protected void readStreamHeader() throws IOException, StreamCorruptedException {
        }

        @Override
        protected Object resolveObject(Object obj) throws IOException {
            Object r = RmiGateway.this.channel.streamResolveObject(obj);
            return r;
        }

        public String toString() {
            return "RmiObjectInputStream[" + RmiGateway.this.name + "]";
        }
    }

    private class OutboundMessageStream {
        OutputStream tstream;
        EnvelopOutputStream estream;
        DataOutputStream dstream;
        RmiObjectOutputStream ostream;

        public OutboundMessageStream(OutputStream stream) throws IOException {
            this.tstream = stream;
            this.estream = new EnvelopOutputStream(this.tstream);
            this.dstream = new DataOutputStream(this.estream);
            this.ostream = new RmiObjectOutputStream(this.estream);
        }

        public void close() {
            try {
                this.tstream.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }

        public void writeHandShake(Object object) throws IOException {
            this.ostream.writeObject(object);
            this.ostream.reset();
            this.ostream.writeObject(null);
            this.ostream.flush();
            this.estream.closeMessage();
        }

        public void writeMessage(RemoteCall call) throws IOException {
            long id = call.getCallId();
            this.dstream.writeLong(id |= TAG_CALL << 56);
            try {
                this.ostream.writeObject(call.getRemoteInstance());
                this.ostream.writeObject(call.getMethod());
                this.ostream.writeObject(call.getArgs());
                this.ostream.reset();
                this.ostream.writeObject(null);
                this.ostream.flush();
                this.estream.closeMessage();
                this.dstream.writeLong(TRAILER_SUCCESS << 56);
                this.estream.closeMessage();
            }
            catch (Exception e) {
                this.recover();
                this.discard();
                throw new RecoverableSerializationException(e);
            }
        }

        public void writeMessage(RemoteReturn result) throws IOException {
            long id = result.getCallId();
            id = result.isThrowing() ? (id |= TAG_THROW << 56) : (id |= TAG_RETURN << 56);
            this.dstream.writeLong(id);
            try {
                this.ostream.writeObject(result.getRet());
                this.ostream.reset();
                this.ostream.writeObject(null);
                this.ostream.flush();
                this.estream.closeMessage();
                this.dstream.writeLong(TRAILER_SUCCESS << 56);
                this.estream.closeMessage();
            }
            catch (Exception e) {
                this.recover();
                this.followUp(result.callId, e);
            }
        }

        private void followUp(long callId, Exception e) throws IOException {
            long id = callId;
            this.dstream.writeLong(id |= TRAILER_ERROR << 56);
            try {
                this.ostream.writeObject(new RemoteException("Unwritable result", e));
                this.ostream.reset();
                this.ostream.writeObject(null);
                this.ostream.flush();
                this.estream.closeMessage();
            }
            catch (Exception ee) {
                this.recover();
            }
        }

        private void discard() throws IOException {
            this.dstream.writeLong(TRAILER_DISCARD << 56);
            this.estream.closeMessage();
        }

        private void recover() throws IOException {
            this.estream.closeMessage();
            this.ostream = new RmiObjectOutputStream(this.estream);
        }
    }

    private class InboundMessageStream {
        byte[] callId = new byte[7];
        InputStream tstream;
        EnvelopInputStream estream;
        DataInputStream dstream;
        RmiObjectInputStream ostream;

        public InboundMessageStream(InputStream stream) throws IOException {
            this.tstream = stream;
            this.estream = new EnvelopInputStream(this.tstream);
            this.dstream = new DataInputStream(this.estream);
            this.ostream = new RmiObjectInputStream(this.estream);
        }

        public void close() {
            try {
                this.tstream.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }

        public Object readHandShake() throws IOException, ClassNotFoundException {
            Object obj = this.ostream.readObject();
            this.ostream.readObject();
            this.estream.nextMessage();
            return obj;
        }

        public RemoteMessage readMessage() throws IOException {
            int tag;
            block12: {
                RemoteMessage msg;
                while (true) {
                    long callId;
                    if ((tag = this.estream.read()) == -1) {
                        return null;
                    }
                    if ((long)tag == TRAILER_SUCCESS) {
                        this.estream.nextMessage();
                        return this.readMessage();
                    }
                    if ((long)tag == TRAILER_DISCARD) {
                        this.estream.nextMessage();
                        return this.readMessage();
                    }
                    if ((long)tag == TAG_CALL) {
                        callId = this.readCallId();
                        RemoteInstance ri = null;
                        RemoteMethodSignature m = null;
                        Object[] args = null;
                        try {
                            ri = (RemoteInstance)this.ostream.readObject();
                            m = (RemoteMethodSignature)this.ostream.readObject();
                            args = (Object[])this.ostream.readObject();
                            this.ostream.readObject();
                            msg = new RemoteCall(callId, ri, m, args);
                            this.estream.nextMessage();
                        }
                        catch (NoClassDefFoundError e) {
                            this.recover();
                            msg = this.processFollowUp(new InboundCallError(callId, ri, m, new RemoteException("Unparsable call", e)));
                            if (msg == null) {
                                continue;
                            }
                        }
                        catch (Exception e) {
                            this.recover();
                            msg = this.processFollowUp(new InboundCallError(callId, ri, m, new RemoteException("Unparsable call", e)));
                            if (msg == null) continue;
                        }
                        return msg;
                    }
                    if ((long)tag != TAG_RETURN && (long)tag != TAG_THROW) break block12;
                    callId = this.readCallId();
                    try {
                        Object obj = this.ostream.readObject();
                        this.ostream.readObject();
                        msg = (long)tag == TAG_RETURN ? new RemoteReturn(callId, false, obj) : new RemoteReturn(callId, true, obj);
                        this.estream.nextMessage();
                    }
                    catch (NoClassDefFoundError e) {
                        this.recover();
                        msg = new RemoteReturn(callId, true, new RemoteException("Unparsable result", e));
                        msg = this.processFollowUp(msg);
                        if (msg != null) break;
                        continue;
                    }
                    catch (Exception e) {
                        this.recover();
                        msg = new RemoteReturn(callId, true, new RemoteException("Unparsable result", e));
                        msg = this.processFollowUp(msg);
                        if (msg != null) break;
                        continue;
                    }
                    break;
                }
                return msg;
            }
            throw new IOException("Stream corrupted, unknown tag: " + tag);
        }

        private RemoteMessage processFollowUp(RemoteMessage lastError) throws IOException {
            int tag = this.estream.read();
            if (tag < 0) {
                this.estream.nextMessage();
                return lastError;
            }
            if ((long)tag == TRAILER_SUCCESS) {
                this.estream.nextMessage();
                return lastError;
            }
            if ((long)tag == TRAILER_DISCARD) {
                this.estream.nextMessage();
                return null;
            }
            if ((long)tag == TRAILER_ERROR) {
                RemoteMessage msg = lastError;
                try {
                    this.readCallId();
                    Object error = this.ostream.readObject();
                    this.ostream.readObject();
                    msg = new RemoteReturn(lastError.getCallId(), true, error);
                    this.estream.nextMessage();
                }
                catch (NoClassDefFoundError e) {
                    this.recover();
                }
                catch (Exception e) {
                    this.recover();
                }
                return msg;
            }
            throw new IOException("Stream corrupted, unknown tag: " + tag);
        }

        private void recover() throws IOException {
            this.estream.skip(Long.MAX_VALUE);
            this.estream.nextMessage();
            this.ostream = new RmiObjectInputStream(this.estream);
        }

        private long readCallId() throws IOException {
            this.dstream.readFully(this.callId, 0, this.callId.length);
            return ((long)(this.callId[0] & 0xFF) << 48) + ((long)(this.callId[1] & 0xFF) << 40) + ((long)(this.callId[2] & 0xFF) << 32) + ((long)(this.callId[3] & 0xFF) << 24) + (long)((this.callId[4] & 0xFF) << 16) + (long)((this.callId[5] & 0xFF) << 8) + (long)((this.callId[6] & 0xFF) << 0);
        }
    }

    private final class SocketReader
    extends Thread
    implements Closeable {
        private SocketReader() {
        }

        @Override
        public void interrupt() {
            super.interrupt();
            this.close();
        }

        @Override
        public void close() {
            if (RmiGateway.this.in != null) {
                RmiGateway.this.in.close();
            }
            try {
                if (RmiGateway.this.socket != null) {
                    RmiGateway.this.socket.close();
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }

        @Override
        public void run() {
            InboundMessageStream ims = RmiGateway.this.in;
            try {
                while (!RmiGateway.this.terminated) {
                    RemoteMessage message = ims.readMessage();
                    if (message == null) {
                        RmiGateway.this.logInfo.log("RMI gateway [" + RmiGateway.this.name + "], remote side has requested termination");
                        RmiGateway.this.shutdown();
                        continue;
                    }
                    RmiGateway.this.channel.handleMessage(message);
                }
            }
            catch (Exception e) {
                if (IOHelper.isSocketTerminationException(e)) {
                    RmiGateway.this.logVerbose.log("RMI stream, socket has been discontinued [" + RmiGateway.this.socket + "] - " + e.toString());
                } else {
                    RmiGateway.this.logCritical.log("RMI stream read exception [" + RmiGateway.this.socket + "]", e);
                }
                DuplexStream socket = RmiGateway.this.socket;
                InputStream in = ((RmiGateway)RmiGateway.this).in.tstream;
                RmiGateway.this.readerThread = null;
                RmiGateway.this.logVerbose.log("disconnecting");
                RmiGateway.this.disconnect();
                if (IOHelper.isSocketTerminationException(e)) {
                    RmiGateway.this.streamErrorHandler.streamClosed(socket, in);
                }
                RmiGateway.this.streamErrorHandler.streamError(socket, in, e);
            }
        }
    }
}

