/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitEnumerator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class CoordinatedSourceITCase
extends AbstractTestBaseJUnit4 {
    @Test
    public void testEnumeratorReaderCommunication() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
        DataStreamSource stream = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "TestingSource");
        this.executeAndVerify(env, (DataStream<Integer>)stream, 20);
    }

    @Test
    public void testMultipleSources() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
        MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
        DataStreamSource stream1 = env.fromSource((Source)source1, WatermarkStrategy.noWatermarks(), "TestingSource1");
        DataStreamSource stream2 = env.fromSource((Source)source2, WatermarkStrategy.noWatermarks(), "TestingSource2");
        this.executeAndVerify(env, (DataStream<Integer>)stream1.union(new DataStream[]{stream2}), 40);
    }

    @Test
    public void testEnumeratorCreationFails() throws Exception {
        OnceFailingToCreateEnumeratorSource.reset();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
        OnceFailingToCreateEnumeratorSource source = new OnceFailingToCreateEnumeratorSource(2, 10, Boundedness.BOUNDED);
        DataStreamSource stream = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "TestingSource");
        this.executeAndVerify(env, (DataStream<Integer>)stream, 20);
    }

    @Test
    public void testEnumeratorRestoreFails() throws Exception {
        OnceFailingToRestoreEnumeratorSource.reset();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
        env.enableCheckpointing(10L);
        OnceFailingToRestoreEnumeratorSource source = new OnceFailingToRestoreEnumeratorSource(2, 10, Boundedness.BOUNDED);
        DataStreamSource stream = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "TestingSource");
        this.executeAndVerify(env, (DataStream<Integer>)stream, 20);
    }

    private void executeAndVerify(StreamExecutionEnvironment env, DataStream<Integer> stream, int numRecords) throws Exception {
        stream.addSink((SinkFunction)new RichSinkFunction<Integer>(){

            public void open(OpenContext openContext) throws Exception {
                this.getRuntimeContext().addAccumulator("result", (Accumulator)new ListAccumulator());
            }

            public void invoke(Integer value, SinkFunction.Context context) throws Exception {
                this.getRuntimeContext().getAccumulator("result").add((Object)value);
            }
        });
        List result = (List)env.execute().getAccumulatorResult("result");
        Collections.sort(result);
        Assertions.assertThat((List)result).hasSize(numRecords);
        Assertions.assertThat((int)((Integer)result.get(0))).isEqualTo(0);
        Assertions.assertThat((int)((Integer)result.get(result.size() - 1))).isEqualTo(numRecords - 1);
    }

    private static class OnceFailingToRestoreEnumeratorSource
    extends MockBaseSource {
        private static final long serialVersionUID = 1L;
        private static boolean hasFailed;

        OnceFailingToRestoreEnumeratorSource(int numSplits, int numRecordsPerSplit, Boundedness boundedness) {
            super(numSplits, numRecordsPerSplit, boundedness);
        }

        @Override
        public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext) {
            SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> enumerator = super.createEnumerator(enumContext);
            if (hasFailed) {
                return enumerator;
            }
            try {
                List splits = (List)enumerator.snapshotState(1L);
                return new NonAssigningEnumerator(splits, enumContext);
            }
            catch (Exception e) {
                throw new FlinkRuntimeException(e.getMessage(), (Throwable)e);
            }
        }

        @Override
        public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> restoreEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext, List<MockSourceSplit> checkpoint) throws IOException {
            if (!hasFailed) {
                hasFailed = true;
                throw new FlinkRuntimeException("Test Failure");
            }
            return super.restoreEnumerator(enumContext, checkpoint);
        }

        static void reset() {
            hasFailed = false;
        }

        private static class NonAssigningEnumerator
        extends MockSplitEnumerator {
            private final SplitEnumeratorContext<?> context;

            NonAssigningEnumerator(List<MockSourceSplit> splits, SplitEnumeratorContext<MockSourceSplit> context) {
                super(splits, context);
                this.context = context;
            }

            @Override
            public void addReader(int subtaskId) {
            }

            @Override
            public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
            }

            public void notifyCheckpointComplete(long checkpointId) throws Exception {
                this.context.callAsync(() -> null, (success, failure) -> {
                    throw new FlinkRuntimeException("Artificial trigger for Global Failover");
                });
            }
        }
    }

    private static class OnceFailingToCreateEnumeratorSource
    extends MockBaseSource {
        private static final long serialVersionUID = 1L;
        private static boolean hasFailed;

        OnceFailingToCreateEnumeratorSource(int numSplits, int numRecordsPerSplit, Boundedness boundedness) {
            super(numSplits, numRecordsPerSplit, boundedness);
        }

        @Override
        public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator(SplitEnumeratorContext<MockSourceSplit> enumContext) {
            if (!hasFailed) {
                hasFailed = true;
                throw new FlinkRuntimeException("Test Failure");
            }
            return super.createEnumerator(enumContext);
        }

        static void reset() {
            hasFailed = false;
        }
    }
}

