/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.runtime.concurrent.ClassLoadingUtils;
import org.apache.flink.runtime.rpc.Local;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
import org.apache.flink.runtime.rpc.exceptions.HandshakeException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
import org.apache.flink.runtime.rpc.messages.RemoteHandshakeMessage;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.runtime.rpc.pekko.ControlMessages;
import org.apache.flink.runtime.rpc.pekko.RpcSerializedValue;
import org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Status;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

class PekkoRpcActor<T extends RpcEndpoint>
extends AbstractActor {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());
    protected final T rpcEndpoint;
    private final ClassLoader flinkClassLoader;
    private final MainThreadValidatorUtil mainThreadValidator;
    private final CompletableFuture<Boolean> terminationFuture;
    private final int version;
    private final long maximumFramesize;
    private final AtomicBoolean rpcEndpointStopped;
    private final boolean forceSerialization;
    private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult;
    @Nonnull
    private State state;

    PekkoRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version, long maximumFramesize, boolean forceSerialization, ClassLoader flinkClassLoader) {
        Preconditions.checkArgument((maximumFramesize > 0L ? 1 : 0) != 0, (Object)"Maximum framesize must be positive.");
        this.rpcEndpoint = (RpcEndpoint)Preconditions.checkNotNull(rpcEndpoint, (String)"rpc endpoint");
        this.flinkClassLoader = (ClassLoader)Preconditions.checkNotNull((Object)flinkClassLoader);
        this.forceSerialization = forceSerialization;
        this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
        this.terminationFuture = (CompletableFuture)Preconditions.checkNotNull(terminationFuture);
        this.version = version;
        this.maximumFramesize = maximumFramesize;
        this.rpcEndpointStopped = new AtomicBoolean(false);
        this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure((Throwable)new RpcException(String.format("RpcEndpoint %s has not been properly stopped.", rpcEndpoint.getEndpointId())));
        this.state = StoppedState.STOPPED;
    }

    @Override
    public void postStop() throws Exception {
        super.postStop();
        if (this.rpcEndpointTerminationResult.isSuccess()) {
            this.log.debug("The RpcEndpoint {} terminated successfully.", (Object)this.rpcEndpoint.getEndpointId());
            this.terminationFuture.complete(null);
        } else {
            this.log.info("The RpcEndpoint {} failed.", (Object)this.rpcEndpoint.getEndpointId(), (Object)this.rpcEndpointTerminationResult.getFailureCause());
            this.terminationFuture.completeExceptionally(this.rpcEndpointTerminationResult.getFailureCause());
        }
        this.state = this.state.finishTermination();
    }

    @Override
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(RemoteHandshakeMessage.class, this::handleHandshakeMessage).match(ControlMessages.class, this::handleControlMessage).matchAny(this::handleMessage).build();
    }

    private void handleMessage(Object message) {
        if (this.state.isRunning()) {
            this.mainThreadValidator.enterMainThread();
            try {
                this.handleRpcMessage(message);
            }
            finally {
                this.mainThreadValidator.exitMainThread();
            }
        } else {
            this.log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", (Object)this.rpcEndpoint.getClass().getName(), message);
            this.sendErrorIfSender((Throwable)new EndpointNotStartedException(String.format("Discard message %s, because the rpc endpoint %s has not been started yet.", message, this.rpcEndpoint.getAddress())));
        }
    }

    private void handleControlMessage(ControlMessages controlMessage) {
        try {
            switch (controlMessage) {
                case START: {
                    this.state = this.state.start(this, this.flinkClassLoader);
                    break;
                }
                case STOP: {
                    this.state = this.state.stop();
                    break;
                }
                case TERMINATE: {
                    this.state = this.state.terminate(this, this.flinkClassLoader);
                    break;
                }
                default: {
                    this.handleUnknownControlMessage(controlMessage);
                    break;
                }
            }
        }
        catch (Exception e) {
            this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);
            throw e;
        }
    }

    private void handleUnknownControlMessage(ControlMessages controlMessage) {
        String message = String.format("Received unknown control message %s. Dropping this message!", new Object[]{controlMessage});
        this.log.warn(message);
        this.sendErrorIfSender((Throwable)((Object)new UnknownMessageException(message)));
    }

    protected void handleRpcMessage(Object message) {
        if (message instanceof RunAsync) {
            this.handleRunAsync((RunAsync)message);
        } else if (message instanceof CallAsync) {
            this.handleCallAsync((CallAsync)message);
        } else if (message instanceof RpcInvocation) {
            this.handleRpcInvocation((RpcInvocation)message);
        } else {
            this.log.warn("Received message of unknown type {} with value {}. Dropping this message!", (Object)message.getClass().getName(), message);
            this.sendErrorIfSender((Throwable)((Object)new UnknownMessageException("Received unknown message " + message + " of type " + message.getClass().getSimpleName() + '.')));
        }
    }

    private void handleHandshakeMessage(RemoteHandshakeMessage handshakeMessage) {
        if (!this.isCompatibleVersion(handshakeMessage.getVersion())) {
            this.sendErrorIfSender((Throwable)new HandshakeException(String.format("Version mismatch between source (%s) and target (%s) rpc component. Please verify that all components have the same version.", handshakeMessage.getVersion(), this.getVersion())));
        } else if (!this.isGatewaySupported(handshakeMessage.getRpcGateway())) {
            this.sendErrorIfSender((Throwable)new HandshakeException(String.format("The rpc endpoint does not support the gateway %s.", handshakeMessage.getRpcGateway().getSimpleName())));
        } else {
            this.getSender().tell(new Status.Success(HandshakeSuccessMessage.INSTANCE), this.getSelf());
        }
    }

    private boolean isGatewaySupported(Class<?> rpcGateway) {
        return rpcGateway.isAssignableFrom(this.rpcEndpoint.getClass());
    }

    private boolean isCompatibleVersion(int sourceVersion) {
        return sourceVersion == this.getVersion();
    }

    private int getVersion() {
        return this.version;
    }

    private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        block10: {
            Method rpcMethod = null;
            try {
                String methodName = rpcInvocation.getMethodName();
                Class[] parameterTypes = rpcInvocation.getParameterTypes();
                rpcMethod = this.lookupRpcMethod(methodName, parameterTypes);
            }
            catch (NoSuchMethodException e) {
                this.log.error("Could not find rpc method for rpc invocation.", (Throwable)e);
                RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", (Throwable)e);
                this.getSender().tell(new Status.Failure((Throwable)rpcException), this.getSelf());
            }
            if (rpcMethod != null) {
                try {
                    boolean isLocalRpcInvocation;
                    Object result;
                    rpcMethod.setAccessible(true);
                    Method capturedRpcMethod = rpcMethod;
                    if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                        ClassLoadingUtils.runWithContextClassLoader(() -> capturedRpcMethod.invoke(this.rpcEndpoint, rpcInvocation.getArgs()), (ClassLoader)this.flinkClassLoader);
                        break block10;
                    }
                    try {
                        result = ClassLoadingUtils.runWithContextClassLoader(() -> capturedRpcMethod.invoke(this.rpcEndpoint, rpcInvocation.getArgs()), (ClassLoader)this.flinkClassLoader);
                    }
                    catch (InvocationTargetException e) {
                        this.log.debug("Reporting back error thrown in remote procedure {}", (Object)rpcMethod, (Object)e);
                        this.getSender().tell(new Status.Failure(e.getTargetException()), this.getSelf());
                        return;
                    }
                    String methodName = rpcMethod.getName();
                    boolean bl = isLocalRpcInvocation = rpcMethod.getAnnotation(Local.class) != null;
                    if (result instanceof CompletableFuture) {
                        CompletableFuture responseFuture = (CompletableFuture)result;
                        this.sendAsyncResponse(responseFuture, methodName, isLocalRpcInvocation);
                    } else {
                        this.sendSyncResponse(result, methodName, isLocalRpcInvocation);
                    }
                }
                catch (Throwable e) {
                    this.log.error("Error while executing remote procedure call {}.", (Object)rpcMethod, (Object)e);
                    this.getSender().tell(new Status.Failure(e), this.getSelf());
                }
            }
        }
    }

    private void sendSyncResponse(Object response, String methodName, boolean isLocalRpcInvocation) {
        if (this.isRemoteSender(this.getSender()) || this.forceSerialization && !isLocalRpcInvocation) {
            Either<RpcSerializedValue, RpcException> serializedResult = this.serializeRemoteResultAndVerifySize(response, methodName);
            if (serializedResult.isLeft()) {
                this.getSender().tell(new Status.Success(serializedResult.left()), this.getSelf());
            } else {
                this.getSender().tell(new Status.Failure((Throwable)serializedResult.right()), this.getSelf());
            }
        } else {
            this.getSender().tell(new Status.Success(response), this.getSelf());
        }
    }

    private void sendAsyncResponse(CompletableFuture<?> asyncResponse, String methodName, boolean isLocalRpcInvocation) {
        ActorRef sender = this.getSender();
        Promise.DefaultPromise promise = new Promise.DefaultPromise();
        FutureUtils.assertNoException((CompletableFuture)asyncResponse.handle((value, throwable) -> {
            if (throwable != null) {
                promise.failure((Throwable)throwable);
            } else if (this.isRemoteSender(sender) || this.forceSerialization && !isLocalRpcInvocation) {
                Either<RpcSerializedValue, RpcException> serializedResult = this.serializeRemoteResultAndVerifySize(value, methodName);
                if (serializedResult.isLeft()) {
                    promise.success(serializedResult.left());
                } else {
                    promise.failure((Throwable)serializedResult.right());
                }
            } else {
                promise.success(new Status.Success(value));
            }
            return null;
        }));
        Patterns.pipe(promise.future(), (ExecutionContext)this.getContext().dispatcher()).to(sender);
    }

    private boolean isRemoteSender(ActorRef sender) {
        return !sender.path().address().hasLocalScope();
    }

    private Either<RpcSerializedValue, RpcException> serializeRemoteResultAndVerifySize(Object result, String methodName) {
        try {
            RpcSerializedValue serializedResult = RpcSerializedValue.valueOf(result);
            long resultSize = serializedResult.getSerializedDataLength();
            if (resultSize > this.maximumFramesize) {
                return Either.Right((Object)new RpcException("The method " + methodName + "'s result size " + resultSize + " exceeds the maximum size " + this.maximumFramesize + " ."));
            }
            return Either.Left((Object)serializedResult);
        }
        catch (IOException e) {
            return Either.Right((Object)new RpcException("Failed to serialize the result for RPC call : " + methodName + '.', (Throwable)e));
        }
    }

    private void handleCallAsync(CallAsync callAsync) {
        try {
            Object result = ClassLoadingUtils.runWithContextClassLoader(() -> callAsync.getCallable().call(), (ClassLoader)this.flinkClassLoader);
            this.getSender().tell(new Status.Success(result), this.getSelf());
        }
        catch (Throwable e) {
            this.getSender().tell(new Status.Failure(e), this.getSelf());
        }
    }

    private void handleRunAsync(RunAsync runAsync) {
        long delayNanos;
        long timeToRun = runAsync.getTimeNanos();
        if (timeToRun == 0L || (delayNanos = timeToRun - System.nanoTime()) <= 0L) {
            try {
                ClassLoadingUtils.runWithContextClassLoader(() -> runAsync.getRunnable().run(), (ClassLoader)this.flinkClassLoader);
            }
            catch (Throwable t) {
                this.log.error("Caught exception while executing runnable in main thread.", t);
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
            }
        } else {
            FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
            RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
            Object envelopedSelfMessage = this.envelopeSelfMessage(message);
            this.getContext().system().scheduler().scheduleOnce(delay, this.getSelf(), envelopedSelfMessage, (ExecutionContext)this.getContext().dispatcher(), ActorRef.noSender());
        }
    }

    private Method lookupRpcMethod(String methodName, Class<?>[] parameterTypes) throws NoSuchMethodException {
        return this.rpcEndpoint.getClass().getMethod(methodName, parameterTypes);
    }

    protected void sendErrorIfSender(Throwable throwable) {
        if (!this.getSender().equals(ActorRef.noSender())) {
            this.getSender().tell(new Status.Failure(throwable), this.getSelf());
        }
    }

    protected Object envelopeSelfMessage(Object message) {
        return message;
    }

    private void stop(RpcEndpointTerminationResult rpcEndpointTerminationResult) {
        if (this.rpcEndpointStopped.compareAndSet(false, true)) {
            this.rpcEndpointTerminationResult = rpcEndpointTerminationResult;
            this.getContext().stop(this.getSelf());
        }
    }

    private static final class RpcEndpointTerminationResult {
        private static final RpcEndpointTerminationResult SUCCESS = new RpcEndpointTerminationResult(null);
        @Nullable
        private final Throwable failureCause;

        private RpcEndpointTerminationResult(@Nullable Throwable failureCause) {
            this.failureCause = failureCause;
        }

        public boolean isSuccess() {
            return this.failureCause == null;
        }

        public Throwable getFailureCause() {
            Preconditions.checkState((this.failureCause != null ? 1 : 0) != 0);
            return this.failureCause;
        }

        private static RpcEndpointTerminationResult success() {
            return SUCCESS;
        }

        private static RpcEndpointTerminationResult failure(Throwable failureCause) {
            return new RpcEndpointTerminationResult(failureCause);
        }

        private static RpcEndpointTerminationResult of(@Nullable Throwable failureCause) {
            if (failureCause == null) {
                return RpcEndpointTerminationResult.success();
            }
            return RpcEndpointTerminationResult.failure(failureCause);
        }
    }

    static enum TerminatedState implements State
    {
        TERMINATED;

    }

    static enum TerminatingState implements State
    {
        TERMINATING;


        @Override
        public State terminate(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {
            return TERMINATING;
        }

        @Override
        public boolean isRunning() {
            return true;
        }
    }

    static enum StoppedState implements State
    {
        STOPPED;


        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public State start(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {
            ((PekkoRpcActor)pekkoRpcActor).mainThreadValidator.enterMainThread();
            try {
                ClassLoadingUtils.runWithContextClassLoader(() -> pekkoRpcActor.rpcEndpoint.internalCallOnStart(), (ClassLoader)flinkClassLoader);
            }
            catch (Throwable throwable) {
                ((PekkoRpcActor)pekkoRpcActor).stop(RpcEndpointTerminationResult.failure((Throwable)new RpcException(String.format("Could not start RpcEndpoint %s.", pekkoRpcActor.rpcEndpoint.getEndpointId()), throwable)));
            }
            finally {
                ((PekkoRpcActor)pekkoRpcActor).mainThreadValidator.exitMainThread();
            }
            return StartedState.STARTED;
        }

        @Override
        public State stop() {
            return STOPPED;
        }

        @Override
        public State terminate(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {
            ((PekkoRpcActor)pekkoRpcActor).stop(RpcEndpointTerminationResult.success());
            return TerminatingState.TERMINATING;
        }
    }

    static enum StartedState implements State
    {
        STARTED;


        @Override
        public State start(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {
            return STARTED;
        }

        @Override
        public State stop() {
            return StoppedState.STOPPED;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public State terminate(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {
            CompletableFuture terminationFuture;
            ((PekkoRpcActor)pekkoRpcActor).mainThreadValidator.enterMainThread();
            try {
                terminationFuture = (CompletableFuture)ClassLoadingUtils.runWithContextClassLoader(() -> pekkoRpcActor.rpcEndpoint.internalCallOnStop(), (ClassLoader)flinkClassLoader);
            }
            catch (Throwable t) {
                terminationFuture = FutureUtils.completedExceptionally((Throwable)new RpcException(String.format("Failure while stopping RpcEndpoint %s.", pekkoRpcActor.rpcEndpoint.getEndpointId()), t));
            }
            finally {
                ((PekkoRpcActor)pekkoRpcActor).mainThreadValidator.exitMainThread();
            }
            terminationFuture.whenComplete((ignored, throwable) -> ((PekkoRpcActor)pekkoRpcActor).stop(RpcEndpointTerminationResult.of(throwable)));
            return TerminatingState.TERMINATING;
        }

        @Override
        public boolean isRunning() {
            return true;
        }
    }

    static interface State {
        default public State start(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {
            throw new RpcInvalidStateException(this.invalidStateTransitionMessage(StartedState.STARTED));
        }

        default public State stop() {
            throw new RpcInvalidStateException(this.invalidStateTransitionMessage(StoppedState.STOPPED));
        }

        default public State terminate(PekkoRpcActor<?> pekkoRpcActor, ClassLoader flinkClassLoader) {
            throw new RpcInvalidStateException(this.invalidStateTransitionMessage(TerminatingState.TERMINATING));
        }

        default public State finishTermination() {
            return TerminatedState.TERMINATED;
        }

        default public boolean isRunning() {
            return false;
        }

        default public String invalidStateTransitionMessage(State targetState) {
            return String.format("RpcActor is currently in state %s and cannot go into state %s.", this, targetState);
        }
    }
}

