/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.execution;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.ClassRule;
import org.junit.Test;

public class JobListenerITCase
extends TestLogger {
    @ClassRule
    public static MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().build());

    private static Configuration getClientConfiguration() {
        Configuration result = new Configuration((Configuration)miniClusterResource.getClientConfiguration());
        result.set(DeploymentOptions.TARGET, (Object)"remote");
        return result;
    }

    @Test
    public void testExecuteCallsJobListenerOnBatchEnvironment() throws Exception {
        final AtomicReference jobIdReference = new AtomicReference();
        final OneShotLatch submissionLatch = new OneShotLatch();
        final OneShotLatch executionLatch = new OneShotLatch();
        ExecutionEnvironment env = new ExecutionEnvironment(JobListenerITCase.getClientConfiguration());
        env.registerJobListener(new JobListener(){

            public void onJobSubmitted(JobClient jobClient, Throwable t) {
                jobIdReference.set(jobClient.getJobID());
                submissionLatch.trigger();
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
                executionLatch.trigger();
            }
        });
        env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).output((OutputFormat)new DiscardingOutputFormat());
        JobExecutionResult jobExecutionResult = env.execute();
        submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
        executionLatch.await(2000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat((Object)jobExecutionResult.getJobID(), (Matcher)CoreMatchers.is(jobIdReference.get()));
    }

    @Test
    public void testExecuteAsyncCallsJobListenerOnBatchEnvironment() throws Exception {
        final AtomicReference jobIdReference = new AtomicReference();
        final OneShotLatch submissionLatch = new OneShotLatch();
        ExecutionEnvironment env = new ExecutionEnvironment(JobListenerITCase.getClientConfiguration());
        env.registerJobListener(new JobListener(){

            public void onJobSubmitted(JobClient jobClient, Throwable t) {
                jobIdReference.set(jobClient.getJobID());
                submissionLatch.trigger();
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
            }
        });
        env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).output((OutputFormat)new DiscardingOutputFormat());
        JobClient jobClient = env.executeAsync();
        submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat((Object)jobClient.getJobID(), (Matcher)CoreMatchers.is(jobIdReference.get()));
    }

    @Test
    public void testExecuteCallsJobListenerOnMainThreadOnBatchEnvironment() throws Exception {
        final AtomicReference threadReference = new AtomicReference();
        ExecutionEnvironment env = new ExecutionEnvironment(JobListenerITCase.getClientConfiguration());
        env.registerJobListener(new JobListener(){

            public void onJobSubmitted(JobClient jobClient, Throwable t) {
                threadReference.set(Thread.currentThread());
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
            }
        });
        env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
        MatcherAssert.assertThat((Object)Thread.currentThread(), (Matcher)CoreMatchers.is(threadReference.get()));
    }

    @Test
    public void testExecuteAsyncCallsJobListenerOnMainThreadOnBatchEnvironment() throws Exception {
        final AtomicReference threadReference = new AtomicReference();
        ExecutionEnvironment env = new ExecutionEnvironment(JobListenerITCase.getClientConfiguration());
        env.registerJobListener(new JobListener(){

            public void onJobSubmitted(JobClient jobClient, Throwable t) {
                threadReference.set(Thread.currentThread());
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
            }
        });
        env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).output((OutputFormat)new DiscardingOutputFormat());
        env.executeAsync();
        MatcherAssert.assertThat((Object)Thread.currentThread(), (Matcher)CoreMatchers.is(threadReference.get()));
    }

    @Test
    public void testExecuteCallsJobListenerOnStreamingEnvironment() throws Exception {
        final AtomicReference jobIdReference = new AtomicReference();
        final OneShotLatch submissionLatch = new OneShotLatch();
        final OneShotLatch executionLatch = new OneShotLatch();
        StreamExecutionEnvironment env = new StreamExecutionEnvironment(JobListenerITCase.getClientConfiguration());
        env.registerJobListener(new JobListener(){

            public void onJobSubmitted(JobClient jobClient, Throwable t) {
                jobIdReference.set(jobClient.getJobID());
                submissionLatch.trigger();
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
                executionLatch.trigger();
            }
        });
        env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).addSink((SinkFunction)new DiscardingSink());
        JobExecutionResult jobExecutionResult = env.execute();
        submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
        executionLatch.await(2000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat((Object)jobExecutionResult.getJobID(), (Matcher)CoreMatchers.is(jobIdReference.get()));
    }

    @Test
    public void testExecuteAsyncCallsJobListenerOnStreamingEnvironment() throws Exception {
        final AtomicReference jobIdReference = new AtomicReference();
        final OneShotLatch submissionLatch = new OneShotLatch();
        StreamExecutionEnvironment env = new StreamExecutionEnvironment(JobListenerITCase.getClientConfiguration());
        env.registerJobListener(new JobListener(){

            public void onJobSubmitted(JobClient jobClient, Throwable t) {
                jobIdReference.set(jobClient.getJobID());
                submissionLatch.trigger();
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
            }
        });
        env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).addSink((SinkFunction)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        submissionLatch.await(2000L, TimeUnit.MILLISECONDS);
        MatcherAssert.assertThat((Object)jobClient.getJobID(), (Matcher)CoreMatchers.is(jobIdReference.get()));
    }

    @Test
    public void testExecuteCallsJobListenerOnMainThreadOnStreamEnvironment() throws Exception {
        final AtomicReference threadReference = new AtomicReference();
        StreamExecutionEnvironment env = new StreamExecutionEnvironment(JobListenerITCase.getClientConfiguration());
        env.registerJobListener(new JobListener(){

            public void onJobSubmitted(JobClient jobClient, Throwable t) {
                threadReference.set(Thread.currentThread());
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
            }
        });
        env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).addSink((SinkFunction)new DiscardingSink());
        env.execute();
        MatcherAssert.assertThat((Object)Thread.currentThread(), (Matcher)CoreMatchers.is(threadReference.get()));
    }

    @Test
    public void testExecuteAsyncCallsJobListenerOnMainThreadOnStreamEnvironment() throws Exception {
        final AtomicReference threadReference = new AtomicReference();
        StreamExecutionEnvironment env = new StreamExecutionEnvironment(JobListenerITCase.getClientConfiguration());
        env.registerJobListener(new JobListener(){

            public void onJobSubmitted(JobClient jobClient, Throwable t) {
                threadReference.set(Thread.currentThread());
            }

            public void onJobExecuted(JobExecutionResult jobExecutionResult, Throwable throwable) {
            }
        });
        env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5}).addSink((SinkFunction)new DiscardingSink());
        env.executeAsync();
        MatcherAssert.assertThat((Object)Thread.currentThread(), (Matcher)CoreMatchers.is(threadReference.get()));
    }
}

