package com.hazelcast.jet.impl.connector;

import com.hazelcast.cluster.Address;
import com.hazelcast.internal.nio.IOUtil;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamFilesPTest.class */
public class StreamFilesPTest extends JetTestSupport {
    private static final long LINE_COUNT = 1000;
    private static final int ASSERT_COUNT_TIMEOUT_SECONDS = 10;
    private File workDir;
    private StreamFilesP processor;
    private Thread driverThread;
    private TestOutbox outbox;
    private volatile int fileOffsetsSize;
    private volatile boolean completedNormally;
    private volatile Throwable driverException;

    @Rule
    public TestName testName = new TestName();
    private final List<Map.Entry<String, String>> outboxLines = new CopyOnWriteArrayList();

    @Before
    public void before() throws Exception {
        assumeThatNoWindowsOS();
        this.workDir = Files.createTempDirectory("jet-test-streamFilesPTest", new FileAttribute[0]).toFile();
        this.driverThread = new Thread(this::driveProcessor, "driver@" + this.testName.getMethodName());
    }

    @After
    public void after() throws Throwable {
        if (this.driverException != null) {
            throw this.driverException;
        }
        if (this.driverThread != null) {
            this.driverThread.interrupt();
            this.driverThread.join();
        }
        if (this.workDir != null) {
            IOUtil.delete(this.workDir);
        }
    }

    @Test
    public void when_metaSupplier_then_returnsCorrectProcessors() throws Exception {
        ProcessorMetaSupplier streamFilesP = SourceProcessors.streamFilesP(this.workDir.getAbsolutePath(), StandardCharsets.UTF_8, "*", false, (v0, v1) -> {
            return Util.entry(v0, v1);
        });
        Address address = new Address();
        ProcessorSupplier processorSupplier = (ProcessorSupplier) streamFilesP.get(Collections.singletonList(address)).apply(address);
        processorSupplier.init(new TestProcessorContext());
        Assert.assertEquals(1L, processorSupplier.get(1).size());
        processorSupplier.close((Throwable) null);
    }

    @Test
    public void when_writeOneFile_then_seeAllLines() throws Exception {
        initializeProcessor(null);
        this.driverThread.start();
        PrintWriter printWriter = new PrintWriter(new FileWriter(new File(this.workDir, "a.txt")));
        Throwable th = null;
        try {
            for (int i = 0; i < 1000; i++) {
                printWriter.println(i);
            }
            printWriter.flush();
            assertEmittedCountEventually(1000L);
            if (printWriter != null) {
                if (0 == 0) {
                    printWriter.close();
                    return;
                }
                try {
                    printWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (printWriter != null) {
                if (0 != 0) {
                    try {
                        printWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    printWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void when_writeTwoFiles_then_seeAllLines() throws Exception {
        initializeProcessor(null);
        this.driverThread.start();
        PrintWriter printWriter = new PrintWriter(new FileWriter(new File(this.workDir, "a.txt")));
        Throwable th = null;
        try {
            PrintWriter printWriter2 = new PrintWriter(new FileWriter(new File(this.workDir, "b.txt")));
            Throwable th2 = null;
            for (int i = 0; i < 1000; i++) {
                try {
                    try {
                        printWriter.println(i);
                        printWriter.flush();
                        printWriter2.println(i);
                        printWriter2.flush();
                    } catch (Throwable th3) {
                        th2 = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (printWriter2 != null) {
                        if (th2 != null) {
                            try {
                                printWriter2.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            printWriter2.close();
                        }
                    }
                    throw th4;
                }
            }
            assertEmittedCountEventually(2000L);
            if (printWriter2 != null) {
                if (0 != 0) {
                    try {
                        printWriter2.close();
                    } catch (Throwable th6) {
                        th2.addSuppressed(th6);
                    }
                } else {
                    printWriter2.close();
                }
            }
            if (printWriter != null) {
                if (0 == 0) {
                    printWriter.close();
                    return;
                }
                try {
                    printWriter.close();
                } catch (Throwable th7) {
                    th.addSuppressed(th7);
                }
            }
        } catch (Throwable th8) {
            if (printWriter != null) {
                if (0 != 0) {
                    try {
                        printWriter.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    printWriter.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void when_glob_then_onlyMatchingProcessed() throws Exception {
        initializeProcessor("a.*");
        this.driverThread.start();
        PrintWriter printWriter = new PrintWriter(new FileWriter(new File(this.workDir, "a.txt")));
        Throwable th = null;
        try {
            PrintWriter printWriter2 = new PrintWriter(new FileWriter(new File(this.workDir, "b.txt")));
            Throwable th2 = null;
            for (int i = 0; i < 1000; i++) {
                try {
                    try {
                        printWriter.println(i);
                        printWriter.flush();
                        printWriter2.println(i);
                        printWriter2.flush();
                    } catch (Throwable th3) {
                        th2 = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (printWriter2 != null) {
                        if (th2 != null) {
                            try {
                                printWriter2.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            printWriter2.close();
                        }
                    }
                    throw th4;
                }
            }
            assertEmittedCountEventually(1000L);
            if (printWriter2 != null) {
                if (0 != 0) {
                    try {
                        printWriter2.close();
                    } catch (Throwable th6) {
                        th2.addSuppressed(th6);
                    }
                } else {
                    printWriter2.close();
                }
            }
            if (printWriter != null) {
                if (0 == 0) {
                    printWriter.close();
                    return;
                }
                try {
                    printWriter.close();
                } catch (Throwable th7) {
                    th.addSuppressed(th7);
                }
            }
        } catch (Throwable th8) {
            if (printWriter != null) {
                if (0 != 0) {
                    try {
                        printWriter.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    printWriter.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void when_preExistingFile_then_seeAppendedLines() throws Exception {
        assumeThatNoWindowsOS();
        PrintWriter printWriter = new PrintWriter(new FileWriter(new File(this.workDir, "a.txt")));
        Throwable th = null;
        try {
            for (int i = 0; i < 1000; i++) {
                printWriter.println(i);
            }
            printWriter.write("incomplete line");
            printWriter.flush();
            initializeProcessor(null);
            Thread.sleep(1000L);
            this.driverThread.start();
            printWriter.write(" still incomplete line ");
            printWriter.flush();
            Thread.sleep(2000L);
            for (int i2 = 0; i2 < 1000; i2++) {
                printWriter.println(i2);
            }
            printWriter.flush();
            assertEmittedCountEventually(999L);
            if (printWriter != null) {
                if (0 == 0) {
                    printWriter.close();
                    return;
                }
                try {
                    printWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (printWriter != null) {
                if (0 != 0) {
                    try {
                        printWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    printWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void when_fileModifiedButNotAppended_then_seeNoLines() throws Exception {
        File file = new File(this.workDir, "a.txt");
        PrintWriter printWriter = new PrintWriter(new FileWriter(file));
        Throwable th = null;
        for (int i = 0; i < 1000; i++) {
            try {
                try {
                    printWriter.println(i);
                } finally {
                }
            } catch (Throwable th2) {
                if (printWriter != null) {
                    if (th != null) {
                        try {
                            printWriter.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        printWriter.close();
                    }
                }
                throw th2;
            }
        }
        if (printWriter != null) {
            if (0 != 0) {
                try {
                    printWriter.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                printWriter.close();
            }
        }
        initializeProcessor(null);
        Thread.sleep(1000L);
        this.driverThread.start();
        RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
        randomAccessFile.write(255);
        randomAccessFile.close();
        Thread.sleep(5000L);
        Assert.assertEquals(0L, this.outboxLines.size());
    }

    @Test
    public void when_fileDeleted_then_seeDeletion() throws Exception {
        File file = new File(this.workDir, "a.txt");
        PrintWriter printWriter = new PrintWriter(new FileWriter(file));
        Throwable th = null;
        for (int i = 0; i < 1000; i++) {
            try {
                try {
                    printWriter.println(i);
                } finally {
                }
            } catch (Throwable th2) {
                if (printWriter != null) {
                    if (th != null) {
                        try {
                            printWriter.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        printWriter.close();
                    }
                }
                throw th2;
            }
        }
        if (printWriter != null) {
            if (0 != 0) {
                try {
                    printWriter.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                printWriter.close();
            }
        }
        initializeProcessor(null);
        updateFileOffsetsSize();
        Assert.assertEquals(1L, this.fileOffsetsSize);
        this.driverThread.start();
        Assert.assertTrue(file.delete());
        assertTrueEventually(() -> {
            Assert.assertEquals(0L, this.fileOffsetsSize);
        });
    }

    @Test
    public void when_watchedDirDeleted_then_complete() throws Exception {
        initializeProcessor(null);
        this.driverThread.start();
        Assert.assertTrue(this.workDir.delete());
        assertTrueEventually(() -> {
            Assert.assertTrue(this.completedNormally);
        });
    }

    @Test
    public void when_lineAddedInChunks_then_readAtOnce() throws Exception {
        initializeProcessor(null);
        this.driverThread.start();
        ILogger logger = Logger.getLogger(getClass());
        Path resolve = this.workDir.toPath().resolve("a.txt");
        Path resolve2 = this.workDir.toPath().resolve("b.txt");
        writeToFile(resolve, "incomplete1");
        writeToFile(resolve2, "incomplete2");
        Thread.sleep(2000L);
        logger.info("complete1");
        writeToFile(resolve, " complete1\n");
        Thread.sleep(2000L);
        logger.info("complete2");
        writeToFile(resolve2, " complete2\n");
        List asList = Arrays.asList(Util.entry("a.txt", "incomplete1 complete1"), Util.entry("b.txt", "incomplete2 complete2"));
        assertTrueEventually(() -> {
            Assert.assertEquals(asList, this.outboxLines);
        }, 10L);
    }

    @Test
    public void when_multipleLinesAdded_then_seenOneByOne() throws Exception {
        initializeProcessor(null);
        this.driverThread.start();
        Path resolve = this.workDir.toPath().resolve("a.txt");
        writeToFile(resolve, "line1\n");
        ArrayList arrayList = new ArrayList();
        arrayList.add(Util.entry("a.txt", "line1"));
        assertTrueEventually(() -> {
            Assert.assertEquals(arrayList, this.outboxLines);
        }, 10L);
        writeToFile(resolve, "line2\n");
        arrayList.add(Util.entry("a.txt", "line2"));
        assertTrueEventually(() -> {
            Assert.assertEquals(arrayList, this.outboxLines);
        }, 10L);
        writeToFile(resolve, "line3\n");
        arrayList.add(Util.entry("a.txt", "line3"));
        assertTrueEventually(() -> {
            Assert.assertEquals(arrayList, this.outboxLines);
        }, 10L);
    }

    private void writeToFile(Path path, String str) throws IOException {
        BufferedWriter newBufferedWriter = Files.newBufferedWriter(path, StandardOpenOption.APPEND, StandardOpenOption.CREATE);
        Throwable th = null;
        try {
            try {
                newBufferedWriter.append((CharSequence) str);
                if (newBufferedWriter != null) {
                    if (0 == 0) {
                        newBufferedWriter.close();
                        return;
                    }
                    try {
                        newBufferedWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (newBufferedWriter != null) {
                if (th != null) {
                    try {
                        newBufferedWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    newBufferedWriter.close();
                }
            }
            throw th4;
        }
    }

    private void driveProcessor() {
        while (!this.completedNormally && !Thread.interrupted()) {
            try {
                this.completedNormally = this.processor.complete();
                this.outbox.drainQueueAndReset(0, this.outboxLines, false);
                updateFileOffsetsSize();
            } catch (Throwable th) {
                this.driverException = th;
                return;
            }
        }
    }

    private void updateFileOffsetsSize() {
        this.fileOffsetsSize = this.processor.fileOffsets.size();
    }

    private void initializeProcessor(String str) throws Exception {
        if (str == null) {
            str = "*";
        }
        this.processor = new StreamFilesP(this.workDir.getAbsolutePath(), StandardCharsets.UTF_8, str, false, (v0, v1) -> {
            return Util.entry(v0, v1);
        });
        this.outbox = new TestOutbox(new int[]{1});
        this.processor.init(this.outbox, new TestProcessorContext().setLogger(Logger.getLogger(StreamFilesP.class)));
    }

    private void assertEmittedCountEventually(long j) throws Exception {
        assertTrueEventually(() -> {
            Assert.assertTrue("emittedCount=" + this.outboxLines.size() + ", expected=" + j, ((long) this.outboxLines.size()) >= j);
        }, 10L);
        Assert.assertEquals(j, this.outboxLines.size());
        Thread.sleep(2000L);
        Assert.assertEquals(j, this.outboxLines.size());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 96667762:
                if (implMethodName.equals("entry")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/Util") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/util/Map$Entry;")) {
                    return (v0, v1) -> {
                        return Util.entry(v0, v1);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/Util") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/util/Map$Entry;")) {
                    return (v0, v1) -> {
                        return Util.entry(v0, v1);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
