package com.hazelcast.jet.impl.connector;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.core.test.TestInbox;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.core.test.TestSupport;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamEventJournalPTest.class */
public class StreamEventJournalPTest extends JetTestSupport {
    private static final int NUM_PARTITIONS = 2;
    private static final int CAPACITY_PER_PARTITION = 5;
    private static final int JOURNAL_CAPACITY = 10;
    private MapProxyImpl<String, Integer> map;
    private SupplierEx<Processor> supplier;
    private HazelcastInstance instance;
    private String key0;
    private String key1;

    @Before
    public void setUp() {
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.setProperty(ClusterProperty.PARTITION_COUNT.getName(), String.valueOf(2));
        smallInstanceConfig.getMapConfig("*").getEventJournalConfig().setEnabled(true).setCapacity(10);
        this.instance = createHazelcastInstance(smallInstanceConfig);
        this.map = this.instance.getMap("test");
        List list = (List) IntStream.range(0, 2).boxed().collect(Collectors.toList());
        this.supplier = () -> {
            return new StreamEventJournalP(this.map, list, eventJournalMapEvent -> {
                return true;
            }, (v0) -> {
                return v0.getNewValue();
            }, JournalInitialPosition.START_FROM_OLDEST, false, EventTimePolicy.noEventTime());
        };
        this.key0 = generateKeyForPartition(this.instance, 0);
        this.key1 = generateKeyForPartition(this.instance, 1);
    }

    @Test
    public void smokeTest() {
        fillJournal(2);
        TestSupport.verifyProcessor(this.supplier).disableProgressAssertion().runUntilOutputMatches(60000L, 100L).outputChecker(TestSupport.SAME_ITEMS_ANY_ORDER).hazelcastInstance(this.instance).expectOutput(Arrays.asList(0, 1, 2, 3));
    }

    @Test
    public void when_newData() throws Exception {
        TestOutbox testOutbox = new TestOutbox(new int[]{16}, 16);
        ArrayList arrayList = new ArrayList();
        Processor processor = (Processor) this.supplier.get();
        processor.init(testOutbox, new TestProcessorContext().setHazelcastInstance(this.instance));
        fillJournal(5);
        assertTrueEventually(() -> {
            Assert.assertFalse("Processor should never complete", processor.complete());
            testOutbox.drainQueueAndReset(0, arrayList, true);
            Assert.assertEquals("consumed different number of items than expected", 10L, arrayList.size());
            Assert.assertEquals(IntStream.range(0, 10).boxed().collect(Collectors.toSet()), new HashSet(arrayList));
        }, 3L);
        fillJournal(5);
        assertTrueEventually(() -> {
            Assert.assertFalse("Processor should never complete", processor.complete());
            testOutbox.drainQueueAndReset(0, arrayList, true);
            Assert.assertEquals("consumed different number of items than expected", 12L, arrayList.size());
            Assert.assertEquals(IntStream.range(0, 10).boxed().collect(Collectors.toSet()), new HashSet(arrayList));
        }, 3L);
    }

    @Test
    public void when_lostItems() throws Exception {
        TestOutbox testOutbox = new TestOutbox(new int[]{16}, 16);
        Processor processor = (Processor) this.supplier.get();
        processor.init(testOutbox, new TestProcessorContext().setHazelcastInstance(this.instance));
        fillJournal(6);
        ArrayList arrayList = new ArrayList();
        assertTrueEventually(() -> {
            Assert.assertFalse("Processor should never complete", processor.complete());
            testOutbox.drainQueueAndReset(0, arrayList, true);
            Assert.assertTrue("consumed different number of items than expected", arrayList.size() == 10);
        }, 3L);
    }

    @Test
    public void when_lostItems_afterRestore() throws Exception {
        TestOutbox testOutbox = new TestOutbox(new int[]{16}, 16);
        Processor processor = (Processor) this.supplier.get();
        processor.init(testOutbox, new TestProcessorContext().setHazelcastInstance(this.instance));
        ArrayList arrayList = new ArrayList();
        assertTrueEventually(() -> {
            Assert.assertFalse("Processor should never complete", processor.complete());
            testOutbox.drainQueueAndReset(0, arrayList, true);
            Assert.assertTrue("consumed different number of items than expected", arrayList.size() == 0);
        }, 3L);
        assertTrueEventually(() -> {
            Assert.assertTrue("Processor did not finish snapshot", processor.saveToSnapshot());
        }, 3L);
        fillJournal(6);
        ArrayList arrayList2 = new ArrayList();
        testOutbox.drainSnapshotQueueAndReset(arrayList2, false);
        this.logger.info("Restoring journal");
        assertRestore(arrayList2);
    }

    @Test
    public void when_futureSequence_thenResetOffset() throws Exception {
        TestOutbox testOutbox = new TestOutbox(new int[]{16}, 16);
        StreamEventJournalP streamEventJournalP = (StreamEventJournalP) this.supplier.get();
        fillJournal(6);
        streamEventJournalP.init(testOutbox, new TestProcessorContext().setHazelcastInstance(this.instance));
        this.map.destroy();
        ArrayList arrayList = new ArrayList();
        assertTrueFiveSeconds(() -> {
            Assert.assertFalse("Processor should never complete", streamEventJournalP.complete());
            testOutbox.drainQueueAndReset(0, arrayList, true);
            Assert.assertTrue("consumed different number of items than expected", arrayList.size() == 0);
        });
        fillJournal(1);
        assertTrueEventually(() -> {
            Assert.assertFalse("Processor should never complete", streamEventJournalP.complete());
            testOutbox.drainQueueAndReset(0, arrayList, true);
            Assert.assertTrue("consumed different number of items than expected", arrayList.size() == 2);
        });
    }

    @Test
    public void when_processorsWithNoPartitions_then_snapshotRestoreWorks() {
        DAG dag = new DAG();
        Assert.assertTrue("partition count should be lower than local parallelism", dag.newVertex("src", SourceProcessors.streamMapP(this.map.getName(), JournalInitialPosition.START_FROM_OLDEST, EventTimePolicy.noEventTime())).localParallelism(8).getLocalParallelism() > this.instance.getPartitionService().getPartitions().size());
        Job newJob = this.instance.getJet().newJob(dag, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE).setSnapshotIntervalMillis(200000L));
        assertJobStatusEventually(newJob, JobStatus.RUNNING, 25);
        newJob.restart();
        sleepMillis(3000);
        assertJobStatusEventually(newJob, JobStatus.RUNNING, 10);
    }

    private void fillJournal(int i) {
        for (int i2 = 0; i2 < i; i2++) {
            this.map.put(this.key0, Integer.valueOf(i2 * 2));
            this.map.put(this.key1, Integer.valueOf((i2 * 2) + 1));
        }
    }

    private void assertRestore(List<Map.Entry> list) throws Exception {
        Processor processor = (Processor) this.supplier.get();
        TestOutbox testOutbox = new TestOutbox(new int[]{16}, 16);
        ArrayList arrayList = new ArrayList();
        processor.init(testOutbox, new TestProcessorContext().setHazelcastInstance(this.instance));
        TestInbox testInbox = new TestInbox();
        testInbox.addAll(list);
        processor.restoreFromSnapshot(testInbox);
        processor.finishSnapshotRestore();
        assertTrueEventually(() -> {
            Assert.assertFalse("Processor should never complete", processor.complete());
            testOutbox.drainQueueAndReset(0, arrayList, true);
            Assert.assertEquals("consumed different number of items than expected", 10L, arrayList.size());
        }, 3L);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2111000804:
                if (implMethodName.equals("lambda$setUp$ddf855e$1")) {
                    z = false;
                    break;
                }
                break;
            case -848611033:
                if (implMethodName.equals("getNewValue")) {
                    z = true;
                    break;
                }
                break;
            case 135889156:
                if (implMethodName.equals("lambda$null$b5447638$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalPTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;)Lcom/hazelcast/jet/core/Processor;")) {
                    StreamEventJournalPTest streamEventJournalPTest = (StreamEventJournalPTest) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return new StreamEventJournalP(this.map, list, eventJournalMapEvent -> {
                            return true;
                        }, (v0) -> {
                            return v0.getNewValue();
                        }, JournalInitialPosition.START_FROM_OLDEST, false, EventTimePolicy.noEventTime());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/map/EventJournalMapEvent") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getNewValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalPTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/map/EventJournalMapEvent;)Z")) {
                    return eventJournalMapEvent -> {
                        return true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
