/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.api;

import java.io.File;
import java.io.PrintWriter;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Test;

public class ContinuousFileReaderOperatorITCase
extends AbstractTestBase {
    @Test
    public void testEndInput() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        File sourceFile = TEMPORARY_FOLDER.newFile();
        int elementCount = 10000;
        try (PrintWriter printWriter = new PrintWriter(sourceFile);){
            for (int i = 0; i < 10000; ++i) {
                printWriter.println(i);
            }
        }
        DataStreamSource source = env.readTextFile(sourceFile.getAbsolutePath());
        TestBoundedOneInputStreamOperator checkingOperator = new TestBoundedOneInputStreamOperator(10000);
        SingleOutputStreamOperator endInputChecking = source.transform("EndInputChecking", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (OneInputStreamOperator)checkingOperator);
        endInputChecking.addSink((SinkFunction)new DiscardingSink());
        env.execute("ContinuousFileReaderOperatorITCase.testEndInput");
    }

    private static class TestBoundedOneInputStreamOperator
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String>,
    BoundedOneInput {
        private final int expectedProcessedElementCount;
        private boolean hasEnded = false;
        private int processedElementCount = 0;

        TestBoundedOneInputStreamOperator(int expectedProcessedElementCount) {
            this.chainingStrategy = ChainingStrategy.ALWAYS;
            this.expectedProcessedElementCount = expectedProcessedElementCount;
        }

        public void endInput() throws Exception {
            Assert.assertEquals((long)this.expectedProcessedElementCount, (long)this.processedElementCount);
            this.hasEnded = true;
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            Assert.assertFalse((boolean)this.hasEnded);
            this.output.collect(element);
            ++this.processedElementCount;
        }
    }
}

