/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.dispatcher.ArchivedExecutionGraphStore;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.BlobServerResource;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.VoidMetricQueryServiceRetriever;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest;
import org.apache.flink.test.util.TestProcessBuilder;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.CheckedSupplier;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ProcessFailureCancelingITCase
extends TestLogger {
    @Rule
    public final BlobServerResource blobServerResource = new BlobServerResource();
    @Rule
    public final ZooKeeperResource zooKeeperResource = new ZooKeeperResource();
    @Rule
    public final TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testCancelingOnProcessFailure() throws Exception {
        Time timeout = Time.minutes((long)2L);
        RestClusterClient clusterClient = null;
        TestProcessBuilder.TestProcess taskManagerProcess = null;
        TestingFatalErrorHandler fatalErrorHandler = new TestingFatalErrorHandler();
        final Configuration config = new Configuration();
        config.setString(JobManagerOptions.ADDRESS, "localhost");
        config.setString(AkkaOptions.ASK_TIMEOUT, "100 s");
        config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.zooKeeperResource.getConnectString());
        config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, this.temporaryFolder.newFolder().getAbsolutePath());
        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"4m"));
        config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, (Object)MemorySize.parse((String)"3200k"));
        config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, (Object)MemorySize.parse((String)"3200k"));
        config.set(TaskManagerOptions.TASK_HEAP_MEMORY, (Object)MemorySize.parse((String)"128m"));
        config.set(TaskManagerOptions.CPU_CORES, (Object)1.0);
        config.setInteger(RestOptions.PORT, 0);
        AkkaRpcService rpcService = AkkaRpcServiceUtils.remoteServiceBuilder((Configuration)config, (String)"localhost", (int)0).createAndStart();
        int jobManagerPort = rpcService.getPort();
        config.setInteger(JobManagerOptions.PORT, jobManagerPort);
        DefaultDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory((ResourceManagerFactory)StandaloneResourceManagerFactory.getInstance());
        DispatcherResourceManagerComponent dispatcherResourceManagerComponent = null;
        ScheduledExecutorService ioExecutor = TestingUtils.defaultExecutor();
        HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)config, (Executor)ioExecutor, (HighAvailabilityServicesUtils.AddressResolution)HighAvailabilityServicesUtils.AddressResolution.NO_ADDRESS_RESOLUTION);
        try {
            if (CommonTestUtils.getJavaCommandPath() == null) {
                System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
                return;
            }
            dispatcherResourceManagerComponent = resourceManagerComponentFactory.create(config, (Executor)ioExecutor, (RpcService)rpcService, haServices, this.blobServerResource.getBlobServer(), new HeartbeatServices(100L, 1000L), NoOpMetricRegistry.INSTANCE, (ArchivedExecutionGraphStore)new MemoryArchivedExecutionGraphStore(), (MetricQueryServiceRetriever)VoidMetricQueryServiceRetriever.INSTANCE, (FatalErrorHandler)fatalErrorHandler);
            Map keyValues = config.toMap();
            ArrayList commands = new ArrayList((keyValues.size() << 1) + 8);
            TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(AbstractTaskManagerProcessFailureRecoveryTest.TaskExecutorProcessEntryPoint.class.getName());
            taskManagerProcessBuilder.addConfigAsMainClassArgs(config);
            taskManagerProcess = taskManagerProcessBuilder.start();
            final Throwable[] errorRef = new Throwable[1];
            Runnable programRunner = new Runnable(){

                @Override
                public void run() {
                    try {
                        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)1337, (Configuration)config, (String[])new String[0]);
                        env.setParallelism(2);
                        env.setRestartStrategy(RestartStrategies.noRestart());
                        env.generateSequence(0L, Long.MAX_VALUE).map((MapFunction)new MapFunction<Long, Long>(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            public Long map(Long value) throws Exception {
                                1 var2_2 = this;
                                synchronized (var2_2) {
                                    this.wait();
                                }
                                return 0L;
                            }
                        }).output((OutputFormat)new DiscardingOutputFormat());
                        env.execute();
                    }
                    catch (Throwable t) {
                        errorRef[0] = t;
                    }
                }
            };
            Thread programThread = new Thread(programRunner);
            programThread.start();
            DispatcherGateway dispatcherGateway = ProcessFailureCancelingITCase.retrieveDispatcherGateway((RpcService)rpcService, haServices);
            this.waitUntilAllSlotsAreUsed(dispatcherGateway, timeout);
            clusterClient = new RestClusterClient(config, (Object)"standalone");
            Collection<JobID> jobIds = this.waitForRunningJobs((ClusterClient<?>)clusterClient, timeout);
            Assert.assertThat(jobIds, (Matcher)Matchers.hasSize((int)1));
            JobID jobId = jobIds.iterator().next();
            taskManagerProcess.destroy();
            taskManagerProcess = null;
            clusterClient.cancel(jobId).get();
            programThread.join(120000L);
            Assert.assertFalse((String)"The program did not cancel in time (2 minutes)", (boolean)programThread.isAlive());
            Throwable error = errorRef[0];
            Assert.assertNotNull((String)"The program did not fail properly", (Object)error);
            Assert.assertTrue((boolean)(error.getCause() instanceof ProgramInvocationException));
        }
        catch (Exception e) {
            this.printProcessLog("TaskManager", taskManagerProcess.getErrorOutput().toString());
            throw e;
        }
        catch (Error e) {
            this.printProcessLog("TaskManager 1", taskManagerProcess.getErrorOutput().toString());
            throw e;
        }
        finally {
            if (taskManagerProcess != null) {
                taskManagerProcess.destroy();
            }
            if (clusterClient != null) {
                clusterClient.close();
            }
            if (dispatcherResourceManagerComponent != null) {
                dispatcherResourceManagerComponent.deregisterApplicationAndClose(ApplicationStatus.SUCCEEDED, null);
            }
            fatalErrorHandler.rethrowError();
            RpcUtils.terminateRpcService((RpcService)rpcService, (Time)Time.seconds((long)100L));
            haServices.closeAndCleanupAllData();
        }
    }

    static DispatcherGateway retrieveDispatcherGateway(RpcService rpcService, HighAvailabilityServices haServices) throws Exception {
        LeaderConnectionInfo leaderConnectionInfo = LeaderRetrievalUtils.retrieveLeaderConnectionInfo((LeaderRetrievalService)haServices.getDispatcherLeaderRetriever(), (Duration)Duration.ofSeconds(10L));
        return (DispatcherGateway)rpcService.connect(leaderConnectionInfo.getAddress(), (Serializable)DispatcherId.fromUuid((UUID)leaderConnectionInfo.getLeaderSessionId()), DispatcherGateway.class).get();
    }

    private void waitUntilAllSlotsAreUsed(DispatcherGateway dispatcherGateway, Time timeout) throws ExecutionException, InterruptedException {
        FutureUtils.retrySuccessfulWithDelay(() -> dispatcherGateway.requestClusterOverview(timeout), (Time)Time.milliseconds((long)50L), (Deadline)Deadline.fromNow((Duration)Duration.ofMillis(timeout.toMilliseconds())), clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= 1 && clusterOverview.getNumSlotsAvailable() == 0 && clusterOverview.getNumSlotsTotal() == 2, (ScheduledExecutor)TestingUtils.defaultScheduledExecutor()).get();
    }

    private Collection<JobID> waitForRunningJobs(ClusterClient<?> clusterClient, Time timeout) throws ExecutionException, InterruptedException {
        return ((Collection)FutureUtils.retrySuccessfulWithDelay((Supplier)CheckedSupplier.unchecked(() -> clusterClient.listJobs()), (Time)Time.milliseconds((long)50L), (Deadline)Deadline.fromNow((Duration)Duration.ofMillis(timeout.toMilliseconds())), jobs -> !jobs.isEmpty(), (ScheduledExecutor)TestingUtils.defaultScheduledExecutor()).get()).stream().map(JobStatusMessage::getJobId).collect(Collectors.toList());
    }

    private void printProcessLog(String processName, String log) {
        if (log == null || log.length() == 0) {
            return;
        }
        System.out.println("-----------------------------------------");
        System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
        System.out.println("-----------------------------------------");
        System.out.println(log);
        System.out.println("-----------------------------------------");
        System.out.println("\t\tEND SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }
}

