/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc;

import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class RpcEndpointTest {
    private static RpcService rpcService = null;

    RpcEndpointTest() {
    }

    @BeforeAll
    static void setup() throws Exception {
        rpcService = RpcSystem.load().localServiceBuilder(new Configuration()).createAndStart();
    }

    @AfterAll
    static void teardown() throws Exception {
        rpcService.closeAsync().get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSelfGateway() throws Exception {
        int expectedValue = 1337;
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, expectedValue);
        try {
            baseEndpoint.start();
            BaseGateway baseGateway = (BaseGateway)baseEndpoint.getSelfGateway(BaseGateway.class);
            CompletableFuture<Integer> foobar = baseGateway.foobar();
            Assertions.assertThat((Integer)foobar.get()).isEqualTo(expectedValue);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{baseEndpoint});
        baseEndpoint.validateResourceClosed();
    }

    @Test
    void testWrongSelfGateway() throws ExecutionException, InterruptedException {
        int expectedValue = 1337;
        BaseEndpoint baseEndpoint = new BaseEndpoint(rpcService, expectedValue);
        try {
            baseEndpoint.start();
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
                DifferentGateway cfr_ignored_0 = (DifferentGateway)baseEndpoint.getSelfGateway(DifferentGateway.class);
            }).withFailMessage("Expected to fail with a RuntimeException since we requested the wrong gateway type.", new Object[0])).isInstanceOf(RuntimeException.class);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{baseEndpoint});
            baseEndpoint.validateResourceClosed();
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{baseEndpoint});
        baseEndpoint.validateResourceClosed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testEndpointInheritance() throws Exception {
        int foobar = 1;
        int barfoo = 2;
        String foo = "foobar";
        ExtendedEndpoint endpoint = new ExtendedEndpoint(rpcService, foobar, barfoo, foo);
        try {
            endpoint.start();
            BaseGateway baseGateway = (BaseGateway)endpoint.getSelfGateway(BaseGateway.class);
            ExtendedGateway extendedGateway = (ExtendedGateway)endpoint.getSelfGateway(ExtendedGateway.class);
            DifferentGateway differentGateway = (DifferentGateway)endpoint.getSelfGateway(DifferentGateway.class);
            Assertions.assertThat((Integer)baseGateway.foobar().get()).isEqualTo(foobar);
            Assertions.assertThat((Integer)extendedGateway.foobar().get()).isEqualTo(foobar);
            Assertions.assertThat((Integer)extendedGateway.barfoo().get()).isEqualTo(barfoo);
            Assertions.assertThat((String)differentGateway.foo().get()).isEqualTo(foo);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            endpoint.validateResourceClosed();
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
        endpoint.validateResourceClosed();
    }

    @Test
    void testRunningState() throws InterruptedException, ExecutionException, TimeoutException {
        RunningStateTestingEndpoint endpoint = new RunningStateTestingEndpoint(rpcService, CompletableFuture.completedFuture(null));
        RunningStateTestingEndpointGateway gateway = (RunningStateTestingEndpointGateway)endpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
        try {
            endpoint.start();
            Assertions.assertThat((Boolean)gateway.queryIsRunningFlag().get()).isTrue();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            endpoint.validateResourceClosed();
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
        endpoint.validateResourceClosed();
    }

    @Test
    void testNotRunningState() throws InterruptedException, ExecutionException, TimeoutException {
        CompletableFuture<Void> stopFuture = new CompletableFuture<Void>();
        RunningStateTestingEndpoint endpoint = new RunningStateTestingEndpoint(rpcService, stopFuture);
        RunningStateTestingEndpointGateway gateway = (RunningStateTestingEndpointGateway)endpoint.getSelfGateway(RunningStateTestingEndpointGateway.class);
        endpoint.start();
        CompletableFuture<Void> terminationFuture = endpoint.closeAndWaitUntilOnStopCalled();
        Assertions.assertThat((Boolean)gateway.queryIsRunningFlag().get()).isFalse();
        stopFuture.complete(null);
        terminationFuture.get();
        endpoint.validateResourceClosed();
    }

    @Test
    void testExecute() throws InterruptedException, ExecutionException, TimeoutException {
        BaseEndpoint endpoint = new BaseEndpoint(rpcService);
        CompletableFuture asyncExecutionFuture = new CompletableFuture();
        try {
            endpoint.start();
            endpoint.getMainThreadExecutor().execute(() -> {
                endpoint.validateRunsInMainThread();
                asyncExecutionFuture.complete(null);
            });
            asyncExecutionFuture.get();
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            endpoint.validateResourceClosed();
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
        endpoint.validateResourceClosed();
    }

    @Test
    void testScheduleRunnableWithDelayInMilliseconds() throws Exception {
        RpcEndpointTest.testScheduleWithDelay((mainThreadExecutor, expectedDelay) -> mainThreadExecutor.schedule(() -> {}, expectedDelay.toMillis(), TimeUnit.MILLISECONDS));
    }

    @Test
    void testScheduleRunnableWithDelayInSeconds() throws Exception {
        RpcEndpointTest.testScheduleWithDelay((mainThreadExecutor, expectedDelay) -> mainThreadExecutor.schedule(() -> {}, expectedDelay.toMillis() / 1000L, TimeUnit.SECONDS));
    }

    @Test
    void testScheduleRunnableAfterClose() throws Exception {
        RpcEndpointTest.testScheduleAfterClose((mainThreadExecutor, expectedDelay) -> mainThreadExecutor.schedule(() -> {}, expectedDelay.toMillis() / 1000L, TimeUnit.SECONDS));
    }

    @Test
    void testCancelScheduledRunnable() throws Exception {
        RpcEndpointTest.testCancelScheduledTask((mainThreadExecutor, future) -> {
            Duration delayDuration = Duration.ofMillis(2L);
            return mainThreadExecutor.schedule(() -> future.complete(null), delayDuration.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    @Test
    void testScheduleCallableWithDelayInMilliseconds() throws Exception {
        RpcEndpointTest.testScheduleWithDelay((mainThreadExecutor, expectedDelay) -> mainThreadExecutor.schedule(() -> 1, expectedDelay.toMillis(), TimeUnit.MILLISECONDS));
    }

    @Test
    void testScheduleCallableWithDelayInSeconds() throws Exception {
        RpcEndpointTest.testScheduleWithDelay((mainThreadExecutor, expectedDelay) -> mainThreadExecutor.schedule(() -> 1, expectedDelay.toMillis() / 1000L, TimeUnit.SECONDS));
    }

    @Test
    void testScheduleCallableAfterClose() throws Exception {
        RpcEndpointTest.testScheduleAfterClose((mainThreadExecutor, expectedDelay) -> mainThreadExecutor.schedule(() -> 1, expectedDelay.toMillis() / 1000L, TimeUnit.SECONDS));
    }

    @Test
    void testCancelScheduledCallable() {
        RpcEndpointTest.testCancelScheduledTask((mainThreadExecutor, future) -> {
            Duration delayDuration = Duration.ofMillis(2L);
            return mainThreadExecutor.schedule(() -> {
                future.complete(null);
                return null;
            }, delayDuration.toMillis(), TimeUnit.MILLISECONDS);
        });
    }

    private static void testScheduleWithDelay(BiConsumer<RpcEndpoint.MainThreadExecutor, Duration> scheduler) throws Exception {
        CompletableFuture taskCompletedFuture = new CompletableFuture();
        String endpointId = "foobar";
        TestMainThreadExecutable mainThreadExecutable = new TestMainThreadExecutable(runnable -> taskCompletedFuture.complete(null));
        RpcEndpoint.MainThreadExecutor mainThreadExecutor = new RpcEndpoint.MainThreadExecutor((MainThreadExecutable)mainThreadExecutable, () -> {}, "foobar");
        Duration expectedDelay = Duration.ofSeconds(1L);
        scheduler.accept(mainThreadExecutor, expectedDelay);
        taskCompletedFuture.get();
        mainThreadExecutor.close();
    }

    private static void testScheduleAfterClose(BiFunction<RpcEndpoint.MainThreadExecutor, Duration, ScheduledFuture<?>> scheduler) {
        CompletableFuture taskCompletedFuture = new CompletableFuture();
        String endpointId = "foobar";
        TestMainThreadExecutable mainThreadExecutable = new TestMainThreadExecutable(runnable -> taskCompletedFuture.complete(null));
        RpcEndpoint.MainThreadExecutor mainThreadExecutor = new RpcEndpoint.MainThreadExecutor((MainThreadExecutable)mainThreadExecutable, () -> {}, "foobar");
        mainThreadExecutor.close();
        Duration expectedDelay = Duration.ofSeconds(0L);
        ScheduledFuture<?> future = scheduler.apply(mainThreadExecutor, expectedDelay);
        Assertions.assertThat(taskCompletedFuture).isNotDone();
        Assertions.assertThat(future).isNotDone();
    }

    private static void testCancelScheduledTask(BiFunction<RpcEndpoint.MainThreadExecutor, CompletableFuture<Void>, ScheduledFuture<?>> scheduler) {
        TestMainThreadExecutable mainThreadExecutable = new TestMainThreadExecutable(Runnable::run);
        ManuallyTriggeredScheduledExecutorService manuallyTriggeredScheduledExecutorService = new ManuallyTriggeredScheduledExecutorService();
        RpcEndpoint.MainThreadExecutor mainThreadExecutor = new RpcEndpoint.MainThreadExecutor((MainThreadExecutable)mainThreadExecutable, () -> {}, (ScheduledExecutorService)manuallyTriggeredScheduledExecutorService);
        CompletableFuture actionFuture = new CompletableFuture();
        ScheduledFuture<?> scheduledFuture = scheduler.apply(mainThreadExecutor, actionFuture);
        scheduledFuture.cancel(true);
        manuallyTriggeredScheduledExecutorService.triggerAllNonPeriodicTasks();
        Assertions.assertThat(scheduledFuture).isCancelled();
        Assertions.assertThat(actionFuture).isNotDone();
        mainThreadExecutor.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testCallAsync() throws InterruptedException, ExecutionException, TimeoutException {
        BaseEndpoint endpoint = new BaseEndpoint(rpcService);
        Integer expectedInteger = 12345;
        try {
            endpoint.start();
            CompletableFuture integerFuture = endpoint.callAsync(() -> {
                endpoint.validateRunsInMainThread();
                return expectedInteger;
            }, Duration.ofSeconds(10L));
            Assertions.assertThat((Integer)((Integer)integerFuture.get())).isEqualTo((Object)expectedInteger);
        }
        catch (Throwable throwable) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            endpoint.validateResourceClosed();
            throw throwable;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
        endpoint.validateResourceClosed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testCallAsyncTimeout() throws InterruptedException, ExecutionException, TimeoutException {
        BaseEndpoint endpoint = new BaseEndpoint(rpcService);
        Duration timeout = Duration.ofMillis(100L);
        CountDownLatch latch = new CountDownLatch(1);
        try {
            endpoint.start();
            CompletionStage throwableFuture = endpoint.callAsync(() -> {
                endpoint.validateRunsInMainThread();
                latch.await();
                return 12345;
            }, timeout).handle((ignore, throwable) -> throwable);
            Throwable throwable2 = (Throwable)((CompletableFuture)throwableFuture).get();
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)throwable2).isNotNull()).isInstanceOf(TimeoutException.class);
            latch.countDown();
        }
        catch (Throwable throwable3) {
            latch.countDown();
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
            endpoint.validateResourceClosed();
            throw throwable3;
        }
        RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{endpoint});
        endpoint.validateResourceClosed();
    }

    private static class TestMainThreadExecutable
    implements MainThreadExecutable {
        private final Consumer<Runnable> scheduleRunAsyncConsumer;

        private TestMainThreadExecutable(Consumer<Runnable> scheduleRunAsyncConsumer) {
            this.scheduleRunAsyncConsumer = scheduleRunAsyncConsumer;
        }

        public void runAsync(Runnable runnable) {
            this.scheduleRunAsyncConsumer.accept(runnable);
        }

        public <V> CompletableFuture<V> callAsync(Callable<V> callable, Duration callTimeout) {
            throw new UnsupportedOperationException();
        }

        public void scheduleRunAsync(Runnable runnable, long delay) {
            this.scheduleRunAsyncConsumer.accept(runnable);
        }
    }

    private static final class RunningStateTestingEndpoint
    extends RpcEndpoint
    implements RunningStateTestingEndpointGateway {
        private final CountDownLatch onStopCalled;
        private final CompletableFuture<Void> stopFuture;

        RunningStateTestingEndpoint(RpcService rpcService, CompletableFuture<Void> stopFuture) {
            super(rpcService);
            this.stopFuture = stopFuture;
            this.onStopCalled = new CountDownLatch(1);
        }

        public CompletableFuture<Void> onStop() {
            this.onStopCalled.countDown();
            return this.stopFuture;
        }

        CompletableFuture<Void> closeAndWaitUntilOnStopCalled() throws InterruptedException {
            CompletableFuture terminationFuture = this.closeAsync();
            this.onStopCalled.await();
            return terminationFuture;
        }

        @Override
        public CompletableFuture<Boolean> queryIsRunningFlag() {
            return CompletableFuture.completedFuture(this.isRunning());
        }
    }

    public static interface RunningStateTestingEndpointGateway
    extends RpcGateway {
        public CompletableFuture<Boolean> queryIsRunningFlag();
    }

    public static class ExtendedEndpoint
    extends BaseEndpoint
    implements ExtendedGateway,
    DifferentGateway {
        private final int barfooValue;
        private final String fooString;

        protected ExtendedEndpoint(RpcService rpcService, int foobarValue, int barfooValue, String fooString) {
            super(rpcService, foobarValue);
            this.barfooValue = barfooValue;
            this.fooString = fooString;
        }

        @Override
        public CompletableFuture<Integer> barfoo() {
            return CompletableFuture.completedFuture(this.barfooValue);
        }

        @Override
        public CompletableFuture<String> foo() {
            return CompletableFuture.completedFuture(this.fooString);
        }
    }

    public static class BaseEndpoint
    extends RpcEndpoint
    implements BaseGateway {
        private final int foobarValue;

        protected BaseEndpoint(RpcService rpcService) {
            super(rpcService);
            this.foobarValue = Integer.MAX_VALUE;
        }

        protected BaseEndpoint(RpcService rpcService, int foobarValue) {
            super(rpcService);
            this.foobarValue = foobarValue;
        }

        @Override
        public CompletableFuture<Integer> foobar() {
            return CompletableFuture.completedFuture(this.foobarValue);
        }
    }

    public static interface DifferentGateway
    extends RpcGateway {
        public CompletableFuture<String> foo();
    }

    public static interface ExtendedGateway
    extends BaseGateway {
        public CompletableFuture<Integer> barfoo();
    }

    public static interface BaseGateway
    extends RpcGateway {
        public CompletableFuture<Integer> foobar();
    }
}

