package com.hazelcast.jet.core;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.SimpleTestInClusterSupport;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.core.processor.DiagnosticProcessors;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobExecutionService;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.util.ImdgUtil;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.test.PacketFiltersUtil;
import com.hazelcast.test.annotation.NightlyTest;
import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.concurrent.CancellationException;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/OperationLossTest.class */
public class OperationLossTest extends SimpleTestInClusterSupport {
    @BeforeClass
    public static void beforeClass() {
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.getJetConfig().setEnabled(true);
        smallInstanceConfig.setProperty(ClusterProperty.OPERATION_CALL_TIMEOUT_MILLIS.getName(), "2000");
        initialize(2, smallInstanceConfig);
    }

    @Before
    public void before() {
        TestProcessors.reset(1);
        for (HazelcastInstance hazelcastInstance : instances()) {
            PacketFiltersUtil.resetPacketFiltersFrom(hazelcastInstance);
        }
    }

    @Test
    public void when_initExecutionOperationLost_then_initRetried_lightJob() {
        PacketFiltersUtil.dropOperationsFrom(instance(), JetInitDataSerializerHook.FACTORY_ID, Collections.singletonList(5));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", () -> {
            return new TestProcessors.NoOutputSourceP();
        }), dag.newVertex("v2", Processors.mapP(FunctionEx.identity())).localParallelism(1)).distributed());
        Job newLightJob = instance().getJet().newLightJob(dag);
        JobExecutionService jobExecutionService = ((JetServiceBackend) getNodeEngineImpl(instances()[1]).getService("hz:impl:jetService")).getJobExecutionService();
        assertTrueAllTheTime(() -> {
            Assert.assertNull(jobExecutionService.getExecutionContext(newLightJob.getId()));
        }, 1L);
        PacketFiltersUtil.resetPacketFiltersFrom(instance());
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newLightJob.join();
    }

    @Test
    public void when_initExecutionOperationLost_then_initOpRetried_normalJob() {
        when_operationLost_then_jobRestarts(5, JobStatus.STARTING);
    }

    @Test
    public void when_startExecutionOperationLost_then_jobRestarts() {
        when_operationLost_then_jobRestarts(6, JobStatus.RUNNING);
    }

    private void when_operationLost_then_jobRestarts(int i, JobStatus jobStatus) {
        PacketFiltersUtil.dropOperationsFrom(instance(), JetInitDataSerializerHook.FACTORY_ID, Collections.singletonList(Integer.valueOf(i)));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", () -> {
            return new TestProcessors.NoOutputSourceP();
        }).localParallelism(1), dag.newVertex("v2", Processors.mapP(FunctionEx.identity())).localParallelism(1)).distributed());
        Job newJob = instance().getJet().newJob(dag);
        assertJobStatusEventually(newJob, jobStatus);
        assertJobStatusEventually(newJob, JobStatus.NOT_RUNNING);
        PacketFiltersUtil.resetPacketFiltersFrom(instance());
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
    }

    @Test
    public void when_snapshotOperationLost_then_retried() {
        PacketFiltersUtil.dropOperationsFrom(instance(), JetInitDataSerializerHook.FACTORY_ID, Collections.singletonList(10));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", () -> {
            return new TestProcessors.DummyStatefulP();
        }).localParallelism(1), dag.newVertex("v2", Processors.mapP(FunctionEx.identity())).localParallelism(1)).distributed());
        Job newJob = instance().getJet().newJob(dag, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE).setSnapshotIntervalMillis(100L));
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        JobRepository jobRepository = new JobRepository(instance());
        assertTrueEventually(() -> {
            JobExecutionRecord jobExecutionRecord = jobRepository.getJobExecutionRecord(newJob.getId());
            Assert.assertNotNull("null JobExecutionRecord", jobExecutionRecord);
            Assert.assertEquals("ongoingSnapshotId", 0L, jobExecutionRecord.ongoingSnapshotId());
        }, 20L);
        sleepSeconds(1);
        this.logger.info("Lifting the packet filter...");
        PacketFiltersUtil.resetPacketFiltersFrom(instance());
        waitForFirstSnapshot(jobRepository, newJob.getId(), 10, false);
        cancelAndJoin(newJob);
    }

    @Test
    public void when_connectionDroppedWithoutMemberLeaving_then_jobRestarts_normalJob() {
        when_connectionDroppedWithoutMemberLeaving_then_jobRestarts(false);
    }

    @Test
    public void when_connectionDroppedWithoutMemberLeaving_then_jobFails_lightJob() {
        when_connectionDroppedWithoutMemberLeaving_then_jobRestarts(true);
    }

    private void when_connectionDroppedWithoutMemberLeaving_then_jobRestarts(boolean z) {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("source", () -> {
            return new TestProcessors.NoOutputSourceP();
        }).localParallelism(1), dag.newVertex("sink", DiagnosticProcessors.writeLoggerP())).distributed());
        Job newLightJob = z ? instance().getJet().newLightJob(dag) : instance().getJet().newJob(dag);
        assertTrueEventually(() -> {
            Assert.assertEquals(2L, TestProcessors.NoOutputSourceP.initCount.get());
        });
        ImdgUtil.getMemberConnection(getNodeEngineImpl(instance()), getAddress(instances()[1])).close((String) null, (Throwable) null);
        System.out.println("connection closed");
        sleepSeconds(1);
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        if (z) {
            newLightJob.getClass();
            Assertions.assertThatThrownBy(newLightJob::join).hasMessageContaining("The member was reconnected");
        } else {
            newLightJob.join();
            Assert.assertEquals(4L, TestProcessors.NoOutputSourceP.initCount.get());
        }
    }

    @Test
    public void when_terminateExecutionOperationLost_then_jobTerminates() {
        PacketFiltersUtil.dropOperationsFrom(instance(), JetInitDataSerializerHook.FACTORY_ID, Collections.singletonList(17));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", () -> {
            return new TestProcessors.NoOutputSourceP();
        }).localParallelism(1), dag.newVertex("v2", Processors.mapP(FunctionEx.identity())).localParallelism(1)).distributed());
        Job newJob = instance().getJet().newJob(dag);
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        newJob.cancel();
        sleepSeconds(1);
        PacketFiltersUtil.resetPacketFiltersFrom(instance());
        try {
            newJob.join();
        } catch (CancellationException e) {
        }
    }

    @Test
    public void lightJob_when_terminateExecutionOperationLost_then_jobTerminates() {
        DAG dag = new DAG();
        dag.newVertex("v", () -> {
            return new TestProcessors.MockP().streaming();
        });
        Job newLightJob = instance().getJet().newLightJob(dag);
        PacketFiltersUtil.dropOperationsFrom(instance(), JetInitDataSerializerHook.FACTORY_ID, Collections.singletonList(17));
        newLightJob.cancel();
        newLightJob.getClass();
        Assertions.assertThatThrownBy(newLightJob::join).isInstanceOf(CancellationException.class);
    }

    @Test
    public void when_terminalSnapshotOperationLost_then_jobRestarts() {
        PacketFiltersUtil.dropOperationsFrom(instance(), JetInitDataSerializerHook.FACTORY_ID, Collections.singletonList(10));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", () -> {
            return new TestProcessors.NoOutputSourceP();
        }).localParallelism(1), dag.newVertex("v2", Processors.mapP(FunctionEx.identity())).localParallelism(1)).distributed());
        Job newJob = instance().getJet().newJob(dag, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE));
        assertJobStatusEventually(newJob, JobStatus.RUNNING, 20);
        newJob.restart();
        sleepSeconds(1);
        PacketFiltersUtil.resetPacketFiltersFrom(instance());
        assertTrueEventually(() -> {
            Assert.assertEquals(4L, TestProcessors.NoOutputSourceP.initCount.get());
        });
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -680189922:
                if (implMethodName.equals("lambda$when_snapshotOperationLost_then_retried$e9599fe$1")) {
                    z = 3;
                    break;
                }
                break;
            case -386984106:
                if (implMethodName.equals("lambda$when_initExecutionOperationLost_then_initRetried_lightJob$e9599fe$1")) {
                    z = 2;
                    break;
                }
                break;
            case -180659875:
                if (implMethodName.equals("lambda$when_operationLost_then_jobRestarts$7d91a43b$1")) {
                    z = false;
                    break;
                }
                break;
            case -83811057:
                if (implMethodName.equals("lambda$lightJob_when_terminateExecutionOperationLost_then_jobTerminates$fb1a34a4$1")) {
                    z = 5;
                    break;
                }
                break;
            case 934861092:
                if (implMethodName.equals("lambda$when_terminalSnapshotOperationLost_then_jobRestarts$e9599fe$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1592681065:
                if (implMethodName.equals("lambda$when_terminateExecutionOperationLost_then_jobTerminates$e9599fe$1")) {
                    z = true;
                    break;
                }
                break;
            case 2109941868:
                if (implMethodName.equals("lambda$when_connectionDroppedWithoutMemberLeaving_then_jobRestarts$3294c468$1")) {
                    z = 6;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/OperationLossTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.NoOutputSourceP();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/OperationLossTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.NoOutputSourceP();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/OperationLossTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.NoOutputSourceP();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/OperationLossTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.DummyStatefulP();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/OperationLossTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.NoOutputSourceP();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/OperationLossTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.MockP().streaming();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/OperationLossTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TestProcessors.NoOutputSourceP();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
