/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.io.EOFException;
import java.io.OutputStreamWriter;
import java.net.ServerSocket;
import java.net.Socket;
import org.apache.commons.io.IOUtils;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.util.NetUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class SocketTextStreamFunctionTest {
    private static final String LOCALHOST = "127.0.0.1";

    SocketTextStreamFunctionTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Test
    void testSocketSourceSimpleOutput() throws Exception {
        ServerSocket server;
        block3: {
            server = new ServerSocket(0);
            Socket channel = null;
            try {
                SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 0L);
                SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
                runner.start();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
                writer.write("test1\n");
                writer.write("check\n");
                writer.flush();
                runner.waitForNumElements(2);
                runner.cancel();
                runner.interrupt();
                runner.waitUntilDone();
                channel.close();
                if (channel == null) break block3;
            }
            catch (Throwable throwable) {
                if (channel != null) {
                    IOUtils.closeQuietly(channel);
                }
                IOUtils.closeQuietly((ServerSocket)server);
                throw throwable;
            }
            IOUtils.closeQuietly((Socket)channel);
        }
        IOUtils.closeQuietly((ServerSocket)server);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Test
    void testExitNoRetries() throws Exception {
        ServerSocket server;
        block5: {
            server = new ServerSocket(0);
            Socket channel = null;
            try {
                SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 0L);
                SocketSourceThread runner = new SocketSourceThread(source, new String[0]);
                runner.start();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                channel.close();
                try {
                    runner.waitUntilDone();
                }
                catch (Exception e) {
                    Assertions.assertThat((Throwable)e).hasCauseInstanceOf(EOFException.class);
                }
                if (channel == null) break block5;
            }
            catch (Throwable throwable) {
                if (channel != null) {
                    IOUtils.closeQuietly(channel);
                }
                IOUtils.closeQuietly((ServerSocket)server);
                throw throwable;
            }
            IOUtils.closeQuietly((Socket)channel);
        }
        IOUtils.closeQuietly((ServerSocket)server);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Test
    void testSocketSourceOutputWithRetries() throws Exception {
        ServerSocket server;
        block3: {
            server = new ServerSocket(0);
            Socket channel = null;
            try {
                SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 10L, 100L);
                SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
                runner.start();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                channel.close();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
                writer.write("test1\n");
                writer.close();
                channel.close();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                channel.close();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                writer = new OutputStreamWriter(channel.getOutputStream());
                writer.write("check\n");
                writer.flush();
                runner.waitForNumElements(2);
                runner.cancel();
                runner.waitUntilDone();
                if (channel == null) break block3;
            }
            catch (Throwable throwable) {
                if (channel != null) {
                    IOUtils.closeQuietly(channel);
                }
                IOUtils.closeQuietly((ServerSocket)server);
                throw throwable;
            }
            IOUtils.closeQuietly((Socket)channel);
        }
        IOUtils.closeQuietly((ServerSocket)server);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Test
    void testSocketSourceOutputInfiniteRetries() throws Exception {
        ServerSocket server;
        block3: {
            server = new ServerSocket(0);
            Socket channel = null;
            try {
                SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", -1L, 100L);
                SocketSourceThread runner = new SocketSourceThread(source, "test1", "check");
                runner.start();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                channel.close();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
                writer.write("test1\n");
                writer.close();
                channel.close();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                channel.close();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                writer = new OutputStreamWriter(channel.getOutputStream());
                writer.write("check\n");
                writer.flush();
                runner.waitForNumElements(2);
                runner.cancel();
                runner.waitUntilDone();
                if (channel == null) break block3;
            }
            catch (Throwable throwable) {
                if (channel != null) {
                    IOUtils.closeQuietly(channel);
                }
                IOUtils.closeQuietly((ServerSocket)server);
                throw throwable;
            }
            IOUtils.closeQuietly((Socket)channel);
        }
        IOUtils.closeQuietly((ServerSocket)server);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Test
    void testSocketSourceOutputAcrossRetries() throws Exception {
        ServerSocket server;
        block3: {
            server = new ServerSocket(0);
            Socket channel = null;
            try {
                SocketTextStreamFunction source = new SocketTextStreamFunction(LOCALHOST, server.getLocalPort(), "\n", 10L, 100L);
                SocketSourceThread runner = new SocketSourceThread(source, "test1", "check1", "check2");
                runner.start();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                channel.close();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                OutputStreamWriter writer = new OutputStreamWriter(channel.getOutputStream());
                writer.write("te");
                writer.close();
                channel.close();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                channel.close();
                channel = NetUtils.acceptWithoutTimeout((ServerSocket)server);
                writer = new OutputStreamWriter(channel.getOutputStream());
                writer.write("st1\n");
                writer.write("check1\n");
                writer.write("check2\n");
                writer.flush();
                runner.waitForNumElements(2);
                runner.cancel();
                runner.waitUntilDone();
                if (channel == null) break block3;
            }
            catch (Throwable throwable) {
                if (channel != null) {
                    IOUtils.closeQuietly(channel);
                }
                IOUtils.closeQuietly((ServerSocket)server);
                throw throwable;
            }
            IOUtils.closeQuietly((Socket)channel);
        }
        IOUtils.closeQuietly((ServerSocket)server);
    }

    private static class SocketSourceThread
    extends Thread {
        private final Object sync = new Object();
        private final SocketTextStreamFunction socketSource;
        private final String[] expectedData;
        private volatile Throwable error;
        private volatile int numElementsReceived;
        private volatile boolean canceled;
        private volatile boolean done;

        public SocketSourceThread(SocketTextStreamFunction socketSource, String ... expectedData) {
            this.socketSource = socketSource;
            this.expectedData = expectedData;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object ctx;
            try {
                ctx = new SourceFunction.SourceContext<String>(){
                    private final Object lock = new Object();

                    /*
                     * WARNING - Removed try catching itself - possible behaviour change.
                     */
                    public void collect(String element) {
                        int pos = numElementsReceived;
                        Object object = sync;
                        synchronized (object) {
                            numElementsReceived++;
                            sync.notifyAll();
                        }
                        if (expectedData != null && expectedData.length > pos) {
                            Assertions.assertThat((String)element).isEqualTo(expectedData[pos]);
                        }
                    }

                    public void collectWithTimestamp(String element, long timestamp) {
                        this.collect(element);
                    }

                    public void emitWatermark(Watermark mark) {
                        throw new UnsupportedOperationException();
                    }

                    public void markAsTemporarilyIdle() {
                        throw new UnsupportedOperationException();
                    }

                    public Object getCheckpointLock() {
                        return this.lock;
                    }

                    public void close() {
                    }
                };
                this.socketSource.run((SourceFunction.SourceContext)ctx);
            }
            catch (Throwable t) {
                Object object = this.sync;
                synchronized (object) {
                    if (!this.canceled) {
                        this.error = t;
                    }
                    this.sync.notifyAll();
                }
            }
            finally {
                ctx = this.sync;
                synchronized (ctx) {
                    this.done = true;
                    this.sync.notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            Object object = this.sync;
            synchronized (object) {
                this.canceled = true;
                this.socketSource.cancel();
                this.interrupt();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitForNumElements(int numElements) throws InterruptedException {
            Object object = this.sync;
            synchronized (object) {
                while (this.error == null && !this.canceled && !this.done && this.numElementsReceived < numElements) {
                    this.sync.wait();
                }
                if (this.error != null) {
                    throw new RuntimeException("Error in source thread", this.error);
                }
                if (this.canceled) {
                    throw new RuntimeException("canceled");
                }
                if (this.done) {
                    throw new RuntimeException("Exited cleanly before expected number of elements");
                }
            }
        }

        public void waitUntilDone() throws InterruptedException {
            this.join();
            if (this.error != null) {
                throw new RuntimeException("Error in source thread", this.error);
            }
        }
    }
}

