/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.ArrayList;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector;
import org.apache.flink.streaming.runtime.tasks.ChainingOutput;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.OutputWithChainingCheck;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.RegularOperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.WatermarkGaugeExposingOutput;
import org.apache.flink.streaming.util.MockStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

public class OperatorChainTest {
    @Test
    public void testPrepareCheckpointPreBarrier() throws Exception {
        AtomicInteger intRef = new AtomicInteger();
        ValidatingOperator one = new ValidatingOperator(intRef, 0);
        ValidatingOperator two = new ValidatingOperator(intRef, 1);
        ValidatingOperator three = new ValidatingOperator(intRef, 2);
        OperatorChain chain = OperatorChainTest.setupOperatorChain(one, two, three);
        chain.prepareSnapshotPreBarrier(5765167L);
        Assert.assertEquals((long)3L, (long)intRef.get());
    }

    @SafeVarargs
    public static <T, OP extends StreamOperator<T>> OperatorChain<T, OP> setupOperatorChain(OneInputStreamOperator<T, T> ... operators) throws Exception {
        Preconditions.checkNotNull(operators);
        Preconditions.checkArgument((operators.length > 0 ? 1 : 0) != 0);
        try (MockEnvironment env = MockEnvironment.builder().build();){
            MockStreamTask containingTask = new MockStreamTaskBuilder((Environment)env).build();
            StreamConfig cfg = new StreamConfig(new Configuration());
            cfg.setOperatorID(new OperatorID());
            cfg.setStateKeySerializer((TypeSerializer)new StringSerializer());
            cfg.serializeAllConfigs();
            ArrayList<StreamOperatorWrapper> operatorWrappers = new ArrayList<StreamOperatorWrapper>();
            BroadcastingOutputCollector lastWriter = new BroadcastingOutputCollector(new OutputWithChainingCheck[0], (Counter)new SimpleCounter());
            for (int i = 0; i < operators.length; ++i) {
                int operatorIndex = operators.length - i - 1;
                OneInputStreamOperator<T, T> op = operators[operatorIndex];
                if (op instanceof SetupableStreamOperator) {
                    ((SetupableStreamOperator)op).setup((StreamTask)containingTask, cfg, (Output)lastWriter);
                }
                lastWriter = new ChainingOutput(op, null, op.getMetricGroup(), null);
                ProcessingTimeService processingTimeService = null;
                if (op instanceof AbstractStreamOperator) {
                    processingTimeService = ((AbstractStreamOperator)op).getProcessingTimeService();
                }
                operatorWrappers.add(new StreamOperatorWrapper(op, Optional.ofNullable(processingTimeService), containingTask.getMailboxExecutorFactory().createExecutor(i), operatorIndex == 0));
            }
            StreamOperatorWrapper headOperatorWrapper = (StreamOperatorWrapper)operatorWrappers.get(operatorWrappers.size() - 1);
            RegularOperatorChain regularOperatorChain = new RegularOperatorChain(operatorWrappers, new RecordWriterOutput[0], (WatermarkGaugeExposingOutput)lastWriter, headOperatorWrapper);
            return regularOperatorChain;
        }
    }

    private static class ValidatingOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        private static final long serialVersionUID = 1L;
        static final long CHECKPOINT_ID = 5765167L;
        final AtomicInteger toUpdate;
        final int expected;

        public ValidatingOperator(AtomicInteger toUpdate, int expected) {
            this.toUpdate = toUpdate;
            this.expected = expected;
        }

        public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
            Assert.assertEquals((String)"wrong checkpointId", (long)5765167L, (long)checkpointId);
            Assert.assertEquals((String)"wrong order", (long)this.expected, (long)this.toUpdate.getAndIncrement());
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            throw new UnsupportedOperationException();
        }

        public OperatorID getOperatorID() {
            return new OperatorID();
        }
    }
}

