/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.Semaphore;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.TimerException;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class StreamTaskTimerITCase
extends AbstractTestBase {
    private final TimeCharacteristic timeCharacteristic;

    public StreamTaskTimerITCase(TimeCharacteristic characteristic) {
        this.timeCharacteristic = characteristic;
    }

    @Test
    public void testOperatorChainedToSource() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(this.timeCharacteristic);
        env.setParallelism(1);
        DataStreamSource source = env.addSource((SourceFunction)new InfiniteTestSource());
        source.transform("Custom Operator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)new TimerOperator(ChainingStrategy.ALWAYS));
        boolean testSuccess = false;
        try {
            env.execute("Timer test");
        }
        catch (JobExecutionException e) {
            if (e.getCause() instanceof TimerException) {
                TimerException te = (TimerException)e.getCause();
                if (te.getCause() instanceof RuntimeException) {
                    RuntimeException re = (RuntimeException)te.getCause();
                    if (re.getMessage().equals("TEST SUCCESS")) {
                        testSuccess = true;
                    }
                    throw e;
                }
                throw e;
            }
            throw e;
        }
        Assert.assertTrue((boolean)testSuccess);
    }

    @Test
    public void testOneInputOperatorWithoutChaining() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(this.timeCharacteristic);
        env.setParallelism(1);
        DataStreamSource source = env.addSource((SourceFunction)new InfiniteTestSource());
        source.transform("Custom Operator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)new TimerOperator(ChainingStrategy.NEVER));
        boolean testSuccess = false;
        try {
            env.execute("Timer test");
        }
        catch (JobExecutionException e) {
            if (e.getCause() instanceof TimerException) {
                TimerException te = (TimerException)e.getCause();
                if (te.getCause() instanceof RuntimeException) {
                    RuntimeException re = (RuntimeException)te.getCause();
                    if (re.getMessage().equals("TEST SUCCESS")) {
                        testSuccess = true;
                    }
                    throw e;
                }
                throw e;
            }
            throw e;
        }
        Assert.assertTrue((boolean)testSuccess);
    }

    @Test
    public void testTwoInputOperatorWithoutChaining() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(this.timeCharacteristic);
        env.setParallelism(1);
        DataStreamSource source = env.addSource((SourceFunction)new InfiniteTestSource());
        source.connect((DataStream)source).transform("Custom Operator", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TwoInputStreamOperator)new TwoInputTimerOperator(ChainingStrategy.NEVER));
        boolean testSuccess = false;
        try {
            env.execute("Timer test");
        }
        catch (JobExecutionException e) {
            if (e.getCause() instanceof TimerException) {
                TimerException te = (TimerException)e.getCause();
                if (te.getCause() instanceof RuntimeException) {
                    RuntimeException re = (RuntimeException)te.getCause();
                    if (re.getMessage().equals("TEST SUCCESS")) {
                        testSuccess = true;
                    }
                    throw e;
                }
                throw e;
            }
            throw e;
        }
        Assert.assertTrue((boolean)testSuccess);
    }

    @Parameterized.Parameters(name="Time Characteristic = {0}")
    public static Collection<Object[]> executionModes() {
        return Arrays.asList({TimeCharacteristic.ProcessingTime}, {TimeCharacteristic.IngestionTime}, {TimeCharacteristic.EventTime});
    }

    private static class InfiniteTestSource
    implements SourceFunction<String> {
        private static final long serialVersionUID = 1L;
        private volatile boolean running = true;

        private InfiniteTestSource() {
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.running) {
                ctx.collect((Object)"hello");
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    private static class TwoInputTimerOperator
    extends AbstractStreamOperator<String>
    implements TwoInputStreamOperator<String, String, String>,
    ProcessingTimeCallback {
        private static final long serialVersionUID = 1L;
        int numTimers = 0;
        int numElements = 0;
        private boolean first = true;
        private Semaphore semaphore = new Semaphore(1);

        public TwoInputTimerOperator(ChainingStrategy chainingStrategy) {
            this.setChainingStrategy(chainingStrategy);
        }

        public void processElement1(StreamRecord<String> element) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            if (this.first) {
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100L, (ProcessingTimeCallback)this);
                this.first = false;
            }
            ++this.numElements;
            this.semaphore.release();
        }

        public void processElement2(StreamRecord<String> element) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            if (this.first) {
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100L, (ProcessingTimeCallback)this);
                this.first = false;
            }
            ++this.numElements;
            this.semaphore.release();
        }

        public void onProcessingTime(long time) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            try {
                ++this.numTimers;
                this.throwIfDone();
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1L, (ProcessingTimeCallback)this);
            }
            finally {
                this.semaphore.release();
            }
        }

        private void throwIfDone() {
            if (this.numTimers > 1000 && this.numElements > 10000) {
                throw new RuntimeException("TEST SUCCESS");
            }
        }

        public void processWatermark1(Watermark mark) throws Exception {
        }

        public void processWatermark2(Watermark mark) throws Exception {
        }
    }

    private static class TimerOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String>,
    ProcessingTimeCallback {
        private static final long serialVersionUID = 1L;
        int numTimers = 0;
        int numElements = 0;
        private boolean first = true;
        private Semaphore semaphore = new Semaphore(1);

        public TimerOperator(ChainingStrategy chainingStrategy) {
            this.setChainingStrategy(chainingStrategy);
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            if (this.first) {
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 100L, (ProcessingTimeCallback)this);
                this.first = false;
            }
            ++this.numElements;
            this.semaphore.release();
        }

        public void onProcessingTime(long time) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            try {
                ++this.numTimers;
                this.throwIfDone();
                this.getProcessingTimeService().registerTimer(System.currentTimeMillis() + 1L, (ProcessingTimeCallback)this);
            }
            finally {
                this.semaphore.release();
            }
        }

        private void throwIfDone() {
            if (this.numTimers > 1000 && this.numElements > 10000) {
                throw new RuntimeException("TEST SUCCESS");
            }
        }

        public void processWatermark(Watermark mark) throws Exception {
            if (!this.semaphore.tryAcquire()) {
                Assert.fail((String)"Concurrent invocation of operator functions.");
            }
            this.semaphore.release();
        }
    }
}

