/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcActor;
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;

public class FencedPekkoRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F>>
extends PekkoRpcActor<T> {
    public FencedPekkoRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture, int version, long maximumFramesize, boolean forceSerialization, ClassLoader flinkClassLoader) {
        super(rpcEndpoint, terminationFuture, version, maximumFramesize, forceSerialization, flinkClassLoader);
    }

    @Override
    protected void handleRpcMessage(Object message) {
        if (message instanceof FencedMessage) {
            Serializable expectedFencingToken = ((FencedRpcEndpoint)this.rpcEndpoint).getFencingToken();
            if (expectedFencingToken == null) {
                if (this.log.isDebugEnabled()) {
                    this.log.debug("Fencing token not set: Ignoring message {} because the fencing token is null.", message);
                }
                this.sendErrorIfSender((Throwable)new FencingTokenException(String.format("Fencing token not set: Ignoring message %s sent to %s because the fencing token is null.", message, ((FencedRpcEndpoint)this.rpcEndpoint).getAddress())));
            } else {
                FencedMessage fencedMessage = (FencedMessage)message;
                Serializable fencingToken = fencedMessage.getFencingToken();
                if (Objects.equals(expectedFencingToken, fencingToken)) {
                    super.handleRpcMessage(fencedMessage.getPayload());
                } else {
                    if (this.log.isDebugEnabled()) {
                        this.log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did not match the expected fencing token {}.", new Object[]{message, fencingToken, expectedFencingToken});
                    }
                    this.sendErrorIfSender((Throwable)new FencingTokenException("Fencing token mismatch: Ignoring message " + message + " because the fencing token " + fencingToken + " did not match the expected fencing token " + expectedFencingToken + '.'));
                }
            }
        } else {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Unknown message type: Ignoring message {} because it is not of type {}.", message, (Object)FencedMessage.class.getSimpleName());
            }
            this.sendErrorIfSender((Throwable)((Object)new UnknownMessageException("Unknown message type: Ignoring message " + message + " of type " + message.getClass().getSimpleName() + " because it is not of type " + FencedMessage.class.getSimpleName() + ".")));
        }
    }

    @Override
    protected Object envelopeSelfMessage(Object message) {
        Serializable fencingToken = ((FencedRpcEndpoint)this.rpcEndpoint).getFencingToken();
        return new LocalFencedMessage(fencingToken, message);
    }
}

