/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class TimestampITCase
extends TestLogger {
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int NUM_TASK_SLOTS = 3;
    private static final int PARALLELISM = 6;
    static MultiShotLatch latch;
    @ClassRule
    public static final MiniClusterWithClientResource CLUSTER;

    private static Configuration getConfiguration() {
        Configuration config = new Configuration();
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"12m"));
        return config;
    }

    @Before
    public void setupLatch() {
        latch = new MultiShotLatch();
    }

    @Test
    public void testWatermarkPropagation() throws Exception {
        int numWatermarks = 10;
        long initialTime = 0L;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(6);
        DataStreamSource source1 = env.addSource((SourceFunction)new MyTimestampSource(initialTime, 10));
        DataStreamSource source2 = env.addSource((SourceFunction)new MyTimestampSource(initialTime, 5));
        source1.union(new DataStream[]{source2}).map((MapFunction)new IdentityMap()).connect((DataStream)source2).map((CoMapFunction)new IdentityCoMap()).transform("Custom Operator", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new CustomOperator(true)).addSink((SinkFunction)new DiscardingSink());
        env.execute();
        for (int i = 0; i < 6; ++i) {
            for (int j = 0; j < 5; ++j) {
                if (CustomOperator.finalWatermarks[i].get(j).equals((Object)new Watermark(initialTime + (long)j))) continue;
                System.err.println("All Watermarks: ");
                for (int k = 0; k <= 5; ++k) {
                    System.err.println(CustomOperator.finalWatermarks[i].get(k));
                }
                Assert.fail((String)"Wrong watermark.");
            }
            Assert.assertEquals((Object)Watermark.MAX_WATERMARK, (Object)CustomOperator.finalWatermarks[i].get(CustomOperator.finalWatermarks[i].size() - 1));
        }
    }

    @Test
    public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
        final ClusterClient clusterClient = CLUSTER.getClusterClient();
        while (!TimestampITCase.getRunningJobs(clusterClient).isEmpty()) {
            Thread.sleep(100L);
        }
        int numWatermarks = 10;
        long initialTime = 0L;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(6);
        DataStreamSource source1 = env.addSource((SourceFunction)new MyTimestampSourceInfinite(initialTime, 10));
        DataStreamSource source2 = env.addSource((SourceFunction)new MyTimestampSourceInfinite(initialTime, 5));
        source1.union(new DataStream[]{source2}).map((MapFunction)new IdentityMap()).connect((DataStream)source2).map((CoMapFunction)new IdentityCoMap()).transform("Custom Operator", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new CustomOperator(true)).addSink((SinkFunction)new DiscardingSink());
        Thread t = new Thread("stopper"){

            @Override
            public void run() {
                try {
                    List running = TimestampITCase.getRunningJobs(clusterClient);
                    while (running.isEmpty()) {
                        Thread.sleep(10L);
                        running = TimestampITCase.getRunningJobs(clusterClient);
                    }
                    JobID id = (JobID)running.get(0);
                    do {
                        block6: {
                            try {
                                clusterClient.stopWithSavepoint(id, false, "test").get();
                            }
                            catch (Exception e) {
                                boolean ignoreException = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class).map(CheckpointException::getCheckpointFailureReason).map(reason -> reason == CheckpointFailureReason.NOT_ALL_REQUIRED_TASKS_RUNNING).orElse(false);
                                if (ignoreException) break block6;
                                throw e;
                            }
                        }
                        Thread.sleep(10L);
                    } while (!TimestampITCase.getRunningJobs(clusterClient).isEmpty());
                }
                catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        };
        t.start();
        env.execute();
        for (List<Watermark> subtaskWatermarks : CustomOperator.finalWatermarks) {
            for (int j = 0; j < subtaskWatermarks.size(); ++j) {
                if (subtaskWatermarks.get(j).getTimestamp() == initialTime + (long)j) continue;
                System.err.println("All Watermarks: ");
                for (int k = 0; k <= 5; ++k) {
                    System.err.println(subtaskWatermarks.get(k));
                }
                Assert.fail((String)"Wrong watermark.");
            }
            if (subtaskWatermarks.size() <= 0) continue;
            Assert.assertNotEquals((Object)Watermark.MAX_WATERMARK, (Object)subtaskWatermarks.get(subtaskWatermarks.size() - 1));
        }
        t.join();
    }

    @Test
    public void testTimestampHandling() throws Exception {
        int numElements = 10;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(6);
        DataStreamSource source1 = env.addSource((SourceFunction)new MyTimestampSource(0L, 10));
        DataStreamSource source2 = env.addSource((SourceFunction)new MyTimestampSource(0L, 10));
        source1.map((MapFunction)new IdentityMap()).connect((DataStream)source2).map((CoMapFunction)new IdentityCoMap()).transform("Custom Operator", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new TimestampCheckingOperator()).addSink((SinkFunction)new DiscardingSink());
        env.execute();
    }

    @Test
    public void testDisabledTimestamps() throws Exception {
        int numElements = 10;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(6);
        DataStreamSource source1 = env.addSource((SourceFunction)new MyNonWatermarkingSource(10));
        DataStreamSource source2 = env.addSource((SourceFunction)new MyNonWatermarkingSource(10));
        source1.map((MapFunction)new IdentityMap()).connect((DataStream)source2).map((CoMapFunction)new IdentityCoMap()).transform("Custom Operator", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new DisabledTimestampCheckingOperator()).addSink((SinkFunction)new DiscardingSink());
        env.execute();
    }

    @Test
    public void testTimestampExtractorWithAutoInterval() throws Exception {
        int numElements = 10;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(10L);
        env.setParallelism(1);
        DataStreamSource source1 = env.addSource((SourceFunction)new SourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                for (int index = 1; index <= 10; ++index) {
                    ctx.collect((Object)index);
                    latch.await();
                }
            }

            public void cancel() {
            }
        });
        SingleOutputStreamOperator extractOp = source1.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTimestampExtractor<Integer>(){

            public long extractAscendingTimestamp(Integer element) {
                return element.intValue();
            }
        });
        extractOp.transform("Watermark Check", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new CustomOperator(true)).transform("Timestamp Check", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new TimestampCheckingOperator());
        Assert.assertEquals((long)extractOp.getTransformation().getParallelism(), (long)source1.getTransformation().getParallelism());
        env.execute();
        for (int j = 0; j < 10; ++j) {
            if (CustomOperator.finalWatermarks[0].get(j).equals((Object)new Watermark((long)j))) continue;
            long wm = CustomOperator.finalWatermarks[0].get(j).getTimestamp();
            Assert.fail((String)("Wrong watermark. Expected: " + j + " Found: " + wm + " All: " + CustomOperator.finalWatermarks[0]));
        }
        Assert.assertEquals((Object)Watermark.MAX_WATERMARK, (Object)CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
    }

    @Test
    public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception {
        int numElements = 10;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(10L);
        env.setParallelism(1);
        DataStreamSource source1 = env.addSource((SourceFunction)new SourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                for (int index = 1; index <= 10; ++index) {
                    ctx.collect((Object)index);
                    latch.await();
                }
            }

            public void cancel() {
            }
        });
        source1.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new AssignerWithPunctuatedWatermarks<Integer>(){

            public long extractTimestamp(Integer element, long currentTimestamp) {
                return element.intValue();
            }

            public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
                return new Watermark(extractedTimestamp - 1L);
            }
        }).transform("Watermark Check", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new CustomOperator(true)).transform("Timestamp Check", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new TimestampCheckingOperator());
        env.execute();
        for (int j = 0; j < 10; ++j) {
            if (CustomOperator.finalWatermarks[0].get(j).equals((Object)new Watermark((long)j))) continue;
            Assert.fail((String)"Wrong watermark.");
        }
        Assert.assertEquals((Object)Watermark.MAX_WATERMARK, (Object)CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
    }

    @Test
    public void testTimestampExtractorWithDecreasingCustomWatermarkEmit() throws Exception {
        int numElements = 10;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1L);
        env.setParallelism(1);
        DataStreamSource source1 = env.addSource((SourceFunction)new SourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                for (int index = 1; index <= 10; ++index) {
                    ctx.collect((Object)index);
                    Thread.sleep(100L);
                    ctx.collect((Object)(index - 1));
                    latch.await();
                }
            }

            public void cancel() {
            }
        });
        source1.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new AssignerWithPunctuatedWatermarks<Integer>(){

            public long extractTimestamp(Integer element, long previousTimestamp) {
                return element.intValue();
            }

            public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
                return new Watermark(extractedTimestamp - 1L);
            }
        }).transform("Watermark Check", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new CustomOperator(true)).transform("Timestamp Check", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new TimestampCheckingOperator());
        env.execute();
        for (int j = 0; j < 10; ++j) {
            if (CustomOperator.finalWatermarks[0].get(j).equals((Object)new Watermark((long)j))) continue;
            Assert.fail((String)"Wrong watermark.");
        }
        Assert.assertEquals((Object)Watermark.MAX_WATERMARK, (Object)CustomOperator.finalWatermarks[0].get(CustomOperator.finalWatermarks[0].size() - 1));
    }

    @Test
    public void testTimestampExtractorWithLongMaxWatermarkFromSource() throws Exception {
        int numElements = 10;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1L);
        env.setParallelism(2);
        DataStreamSource source1 = env.addSource((SourceFunction)new SourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                int index = 1;
                while (index <= 10) {
                    ctx.collectWithTimestamp((Object)index, (long)index);
                    ctx.collectWithTimestamp((Object)(index - 1), (long)(index - 1));
                    ctx.emitWatermark(new Watermark((long)(++index - 2)));
                }
                ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
                ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
            }

            public void cancel() {
            }
        });
        source1.assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new AssignerWithPunctuatedWatermarks<Integer>(){

            public long extractTimestamp(Integer element, long currentTimestamp) {
                return element.intValue();
            }

            public Watermark checkAndGetNextWatermark(Integer element, long extractedTimestamp) {
                return null;
            }
        }).transform("Watermark Check", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new CustomOperator(true));
        env.execute();
        Assert.assertTrue((CustomOperator.finalWatermarks[0].size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE ? 1 : 0) != 0);
    }

    @Test
    public void testTimestampExtractorWithLongMaxWatermarkFromSource2() throws Exception {
        int numElements = 10;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(10L);
        env.setParallelism(2);
        DataStreamSource source1 = env.addSource((SourceFunction)new SourceFunction<Integer>(){

            public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
                int index = 1;
                while (index <= 10) {
                    ctx.collectWithTimestamp((Object)index, (long)index);
                    ctx.collectWithTimestamp((Object)(index - 1), (long)(index - 1));
                    ctx.emitWatermark(new Watermark((long)(++index - 2)));
                }
                ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
                ctx.emitWatermark(new Watermark(Long.MAX_VALUE));
            }

            public void cancel() {
            }
        });
        source1.assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AssignerWithPeriodicWatermarks<Integer>(){

            public long extractTimestamp(Integer element, long currentTimestamp) {
                return element.intValue();
            }

            public Watermark getCurrentWatermark() {
                return null;
            }
        }).transform("Watermark Check", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new CustomOperator(true));
        env.execute();
        Assert.assertTrue((CustomOperator.finalWatermarks[0].size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE ? 1 : 0) != 0);
    }

    @Test
    public void testEventTimeSourceWithProcessingTime() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStreamSource source1 = env.addSource((SourceFunction)new MyTimestampSource(0L, 10));
        source1.map((MapFunction)new IdentityMap()).transform("Watermark Check", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (OneInputStreamOperator)new CustomOperator(false));
        env.execute();
        Assert.assertTrue((CustomOperator.finalWatermarks[0].size() == 0 ? 1 : 0) != 0);
    }

    @Test
    public void testErrorOnEventTimeOverProcessingTime() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStreamSource source1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)"a", (Object)1), new Tuple2((Object)"b", (Object)2)});
        source1.keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.seconds((long)5L))).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                return value1;
            }
        }).print();
        try {
            env.execute();
            Assert.fail((String)"this should fail with an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testErrorOnEventTimeWithoutTimestamps() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStreamSource source1 = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)"a", (Object)1), new Tuple2((Object)"b", (Object)2)});
        source1.keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.seconds((long)5L))).reduce((ReduceFunction)new ReduceFunction<Tuple2<String, Integer>>(){

            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) {
                return value1;
            }
        }).print();
        try {
            env.execute();
            Assert.fail((String)"this should fail with an exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
        Collection statusMessages = (Collection)client.listJobs().get();
        return statusMessages.stream().filter(status -> !status.getJobState().isGloballyTerminalState()).map(JobStatusMessage::getJobId).collect(Collectors.toList());
    }

    static {
        CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(TimestampITCase.getConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(3).build());
    }

    private static class MyNonWatermarkingSource
    implements SourceFunction<Integer> {
        int numWatermarks;

        public MyNonWatermarkingSource(int numWatermarks) {
            this.numWatermarks = numWatermarks;
        }

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            for (int i = 0; i < this.numWatermarks; ++i) {
                ctx.collect((Object)i);
            }
        }

        public void cancel() {
        }
    }

    private static class MyTimestampSourceInfinite
    implements SourceFunction<Integer> {
        private final long initialTime;
        private final int numWatermarks;
        private volatile boolean running = true;

        public MyTimestampSourceInfinite(long initialTime, int numWatermarks) {
            this.initialTime = initialTime;
            this.numWatermarks = numWatermarks;
        }

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            for (int i = 0; i < this.numWatermarks; ++i) {
                ctx.collectWithTimestamp((Object)i, this.initialTime + (long)i);
                ctx.emitWatermark(new Watermark(this.initialTime + (long)i));
            }
            while (this.running) {
                Thread.sleep(20L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    private static class MyTimestampSource
    implements SourceFunction<Integer> {
        private final long initialTime;
        private final int numWatermarks;

        public MyTimestampSource(long initialTime, int numWatermarks) {
            this.initialTime = initialTime;
            this.numWatermarks = numWatermarks;
        }

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            for (int i = 0; i < this.numWatermarks; ++i) {
                ctx.collectWithTimestamp((Object)i, this.initialTime + (long)i);
                ctx.emitWatermark(new Watermark(this.initialTime + (long)i));
            }
        }

        public void cancel() {
        }
    }

    private static class IdentityMap
    implements MapFunction<Integer, Integer> {
        private IdentityMap() {
        }

        public Integer map(Integer value) throws Exception {
            return value;
        }
    }

    private static class IdentityCoMap
    implements CoMapFunction<Integer, Integer, Integer> {
        private IdentityCoMap() {
        }

        public Integer map1(Integer value) throws Exception {
            return value;
        }

        public Integer map2(Integer value) throws Exception {
            return value;
        }
    }

    private static class DisabledTimestampCheckingOperator
    extends AbstractStreamOperator<Integer>
    implements OneInputStreamOperator<Integer, Integer> {
        private DisabledTimestampCheckingOperator() {
        }

        public void processElement(StreamRecord<Integer> element) throws Exception {
            if (element.hasTimestamp()) {
                Assert.fail((String)"Timestamps are not properly handled.");
            }
            this.output.collect(element);
        }
    }

    private static class TimestampCheckingOperator
    extends AbstractStreamOperator<Integer>
    implements OneInputStreamOperator<Integer, Integer> {
        public TimestampCheckingOperator() {
            this.setChainingStrategy(ChainingStrategy.ALWAYS);
        }

        public void processElement(StreamRecord<Integer> element) throws Exception {
            if (element.getTimestamp() != (long)((Integer)element.getValue()).intValue()) {
                Assert.fail((String)"Timestamps are not properly handled.");
            }
            this.output.collect(element);
        }
    }

    private static class CustomOperator
    extends AbstractStreamOperator<Integer>
    implements OneInputStreamOperator<Integer, Integer> {
        List<Watermark> watermarks;
        public static List<Watermark>[] finalWatermarks = new List[6];
        private final boolean timestampsEnabled;

        public CustomOperator(boolean timestampsEnabled) {
            this.setChainingStrategy(ChainingStrategy.ALWAYS);
            this.timestampsEnabled = timestampsEnabled;
        }

        public void processElement(StreamRecord<Integer> element) throws Exception {
            if (this.timestampsEnabled && element.getTimestamp() != (long)((Integer)element.getValue()).intValue()) {
                Assert.fail((String)"Timestamps are not properly handled.");
            }
            this.output.collect(element);
        }

        public void processWatermark(Watermark mark) throws Exception {
            super.processWatermark(mark);
            for (Watermark previousMark : this.watermarks) {
                Assert.assertTrue((previousMark.getTimestamp() < mark.getTimestamp() ? 1 : 0) != 0);
            }
            this.watermarks.add(mark);
            latch.trigger();
            this.output.emitWatermark(mark);
        }

        public void open() throws Exception {
            super.open();
            this.watermarks = new ArrayList<Watermark>();
        }

        public void close() throws Exception {
            super.close();
            CustomOperator.finalWatermarks[this.getRuntimeContext().getIndexOfThisSubtask()] = this.watermarks;
        }
    }
}

