package com.hazelcast.jet.core;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.collection.IList;
import com.hazelcast.config.Config;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.instance.impl.Node;
import com.hazelcast.internal.serialization.SerializationService;
import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Util;
import com.hazelcast.jet.function.RunnableEx;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.impl.JobExecutionRecord;
import com.hazelcast.jet.impl.JobExecutionService;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.pipeline.transform.BatchSourceTransform;
import com.hazelcast.jet.pipeline.BatchSource;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.test.Accessors;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.OverridePropertyRule;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.rules.Timeout;

/* loaded from: input_file:com/hazelcast/jet/core/JetTestSupport.class */
public abstract class JetTestSupport extends HazelcastTestSupport {
    public static final SerializationService TEST_SS = new DefaultSerializationServiceBuilder().build();

    @ClassRule
    public static Timeout globalTimeout = Timeout.seconds(900);

    @ClassRule
    public static OverridePropertyRule enableJetRule = OverridePropertyRule.set("hz.jet.enabled", "true");
    private static final ILogger SUPPORT_LOGGER = Logger.getLogger(JetTestSupport.class);
    protected ILogger logger = Logger.getLogger(getClass());
    private TestHazelcastFactory instanceFactory;

    @After
    public void shutdownFactory() throws Exception {
        if (this.instanceFactory != null) {
            Map<Long, String> shutdownJobsAndGetLeakedClassLoaders = shutdownJobsAndGetLeakedClassLoaders();
            SUPPORT_LOGGER.info("Terminating instanceFactory in JetTestSupport.@After");
            spawn(() -> {
                this.instanceFactory.terminateAll();
            }).get(1L, TimeUnit.MINUTES);
            if (shutdownJobsAndGetLeakedClassLoaders.isEmpty()) {
                return;
            }
            Assert.fail("There are one or more leaked job classloaders. This is a bug, but it is not necessarily related to this test. The classloader was leaked for the following jobIds: " + ((String) shutdownJobsAndGetLeakedClassLoaders.entrySet().stream().map(entry -> {
                return Util.idToString(((Long) entry.getKey()).longValue()) + "[" + ((String) entry.getValue()) + "]";
            }).collect(Collectors.joining(", "))));
        }
    }

    @Nonnull
    private Map<Long, String> shutdownJobsAndGetLeakedClassLoaders() {
        HashMap hashMap = new HashMap();
        Collection<HazelcastInstance> allHazelcastInstances = this.instanceFactory.getAllHazelcastInstances();
        Iterator<HazelcastInstance> it = allHazelcastInstances.iterator();
        while (it.hasNext()) {
            HazelcastInstanceImpl hazelcastInstanceImpl = (HazelcastInstance) it.next();
            if (hazelcastInstanceImpl.getConfig().getJetConfig().isEnabled()) {
                Iterator it2 = hazelcastInstanceImpl.getJet().getJobs().iterator();
                while (it2.hasNext()) {
                    ditchJob((Job) it2.next(), (HazelcastInstance[]) allHazelcastInstances.toArray(new HazelcastInstance[allHazelcastInstances.size()]));
                }
                Map classLoaders = ((JetServiceBackend) hazelcastInstanceImpl.node.getNodeEngine().getService("hz:impl:jetService")).getJobClassLoaderService().getClassLoaders();
                for (int i = 0; i < 100 && !classLoaders.isEmpty(); i++) {
                    sleepMillis(100);
                }
                for (Map.Entry entry : classLoaders.entrySet()) {
                    hashMap.put(entry.getKey(), entry.toString());
                }
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HazelcastInstance createHazelcastClient() {
        return this.instanceFactory.newHazelcastClient();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HazelcastInstance createHazelcastClient(ClientConfig clientConfig) {
        return this.instanceFactory.newHazelcastClient(clientConfig);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClientConfig configForNonSmartClientConnectingTo(HazelcastInstance hazelcastInstance) {
        ClientConfig clientConfig = new ClientConfig();
        Member localMember = hazelcastInstance.getCluster().getLocalMember();
        clientConfig.getNetworkConfig().addAddress(new String[]{localMember.getAddress().getHost() + ':' + localMember.getAddress().getPort()}).setSmartRouting(false);
        return clientConfig;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.test.HazelcastTestSupport
    public HazelcastInstance createHazelcastInstance() {
        return createHazelcastInstance(smallInstanceConfig());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.test.HazelcastTestSupport
    public HazelcastInstance createHazelcastInstance(Config config) {
        if (this.instanceFactory == null) {
            this.instanceFactory = new TestHazelcastFactory();
        }
        return this.instanceFactory.newHazelcastInstance(config);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HazelcastInstance createHazelcastInstance(Config config, Address[] addressArr) {
        if (this.instanceFactory == null) {
            this.instanceFactory = new TestHazelcastFactory();
        }
        return this.instanceFactory.newHazelcastInstance(config, addressArr);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.test.HazelcastTestSupport
    public HazelcastInstance[] createHazelcastInstances(int i) {
        return createHazelcastInstances(smallInstanceConfig(), i);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.test.HazelcastTestSupport
    public HazelcastInstance[] createHazelcastInstances(Config config, int i) {
        if (this.instanceFactory == null) {
            this.instanceFactory = new TestHazelcastFactory();
        }
        return this.instanceFactory.newInstances(config, i);
    }

    protected static <K, V> IMap<K, V> getMap(HazelcastInstance hazelcastInstance) {
        return hazelcastInstance.getMap(randomName());
    }

    protected static <E> IList<E> getList(HazelcastInstance hazelcastInstance) {
        return hazelcastInstance.getList(randomName());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void fillListWithInts(IList<Integer> iList, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            iList.add(Integer.valueOf(i2));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void appendToFile(File file, String... strArr) throws IOException {
        PrintWriter printWriter = new PrintWriter(new FileOutputStream(file, true));
        Throwable th = null;
        try {
            try {
                for (String str : strArr) {
                    printWriter.write(str + '\n');
                }
                if (printWriter != null) {
                    if (0 == 0) {
                        printWriter.close();
                        return;
                    }
                    try {
                        printWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (printWriter != null) {
                if (th != null) {
                    try {
                        printWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    printWriter.close();
                }
            }
            throw th4;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static File createTempDirectory() throws IOException {
        File file = Files.createTempDirectory("jet-test-temp", new FileAttribute[0]).toFile();
        file.deleteOnExit();
        return file;
    }

    public static void assertJobStatusEventually(Job job, @Nonnull JobStatus jobStatus) {
        assertJobStatusEventually(job, jobStatus, ASSERT_TRUE_EVENTUALLY_TIMEOUT);
    }

    public static Config smallInstanceWithResourceUploadConfig() {
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.getJetConfig().setResourceUploadEnabled(true);
        return smallInstanceConfig;
    }

    public static Config defaultInstanceConfigWithJetEnabled() {
        Config config = new Config();
        config.getJetConfig().setEnabled(true);
        return config;
    }

    public static long assertJobRunningEventually(HazelcastInstance hazelcastInstance, Job job, Long l) {
        JobExecutionService jobExecutionService = getJetServiceBackend(hazelcastInstance).getJobExecutionService();
        long j = Long.MIN_VALUE;
        while (true) {
            assertJobStatusEventually(job, JobStatus.RUNNING);
            Long executionIdForJobId = jobExecutionService.getExecutionIdForJobId(job.getId());
            if (executionIdForJobId != null) {
                j = Long.MIN_VALUE;
            } else if (j == Long.MIN_VALUE) {
                j = System.nanoTime();
            } else if (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - j) > 10) {
                throw new RuntimeException("The executionId is null for 10 secs - is the job running on all members?");
            }
            if (executionIdForJobId != null && !executionIdForJobId.equals(l)) {
                return executionIdForJobId.longValue();
            }
        }
    }

    public static void assertJobStatusEventually(Job job, JobStatus jobStatus, int i) {
        Assert.assertNotNull(job);
        assertTrueEventually(() -> {
            Assert.assertEquals("jobId=" + Util.idToString(job.getId()), jobStatus, job.getStatus());
        }, i);
    }

    public static JetServiceBackend getJetServiceBackend(HazelcastInstance hazelcastInstance) {
        return (JetServiceBackend) getNodeEngineImpl(hazelcastInstance).getService("hz:impl:jetService");
    }

    public static Address getAddress(HazelcastInstance hazelcastInstance) {
        return Accessors.getAddress(hazelcastInstance);
    }

    public static Node getNode(HazelcastInstance hazelcastInstance) {
        return Accessors.getNode(hazelcastInstance);
    }

    public static NodeEngineImpl getNodeEngineImpl(HazelcastInstance hazelcastInstance) {
        return Accessors.getNodeEngineImpl(hazelcastInstance);
    }

    public Address nextAddress() {
        return this.instanceFactory.nextAddress();
    }

    protected void terminateInstance(HazelcastInstance hazelcastInstance) {
        this.instanceFactory.terminate(hazelcastInstance);
    }

    public Future spawnSafe(RunnableEx runnableEx) {
        return spawn(() -> {
            try {
                runnableEx.runEx();
            } catch (Throwable th) {
                SUPPORT_LOGGER.warning("Spawned Runnable failed", th);
            }
        });
    }

    public static Watermark wm(long j) {
        return new Watermark(j);
    }

    public void waitForFirstSnapshot(JobRepository jobRepository, long j, int i, boolean z) {
        long[] jArr = {-1};
        assertTrueEventually(() -> {
            JobExecutionRecord jobExecutionRecord = jobRepository.getJobExecutionRecord(j);
            Assert.assertNotNull("null JobExecutionRecord", jobExecutionRecord);
            Assert.assertTrue("No snapshot produced", jobExecutionRecord.dataMapIndex() >= 0 && jobExecutionRecord.snapshotId() >= 0);
            Assert.assertTrue("stats are 0", z || jobExecutionRecord.snapshotStats().numBytes() > 0);
            jArr[0] = jobExecutionRecord.snapshotId();
        }, i);
        SUPPORT_LOGGER.info("First snapshot found (id=" + jArr[0] + ")");
    }

    public void waitForNextSnapshot(JobRepository jobRepository, long j, int i, boolean z) {
        long snapshotId = jobRepository.getJobExecutionRecord(j).snapshotId();
        long[] jArr = {-1};
        long nanoTime = System.nanoTime();
        assertTrueEventually(() -> {
            JobExecutionRecord jobExecutionRecord = jobRepository.getJobExecutionRecord(j);
            Assert.assertNotNull("jobExecutionRecord is null", jobExecutionRecord);
            jArr[0] = jobExecutionRecord.snapshotId();
            Assert.assertTrue("No more snapshots produced in " + i + " seconds", jArr[0] > snapshotId);
            Assert.assertTrue("stats are 0", z || jobExecutionRecord.snapshotStats().numBytes() > 0);
        }, i);
        SUPPORT_LOGGER.info("Next snapshot found after " + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nanoTime) + " ms (id=" + jArr[0] + ", previous id=" + snapshotId + ")");
    }

    public void cleanUpCluster(HazelcastInstance... hazelcastInstanceArr) {
        Iterator it = hazelcastInstanceArr[0].getJet().getJobs().iterator();
        while (it.hasNext()) {
            ditchJob((Job) it.next(), hazelcastInstanceArr);
        }
        Iterator it2 = hazelcastInstanceArr[0].getDistributedObjects().iterator();
        while (it2.hasNext()) {
            ((DistributedObject) it2.next()).destroy();
        }
    }

    public static void ditchJob(@Nonnull Job job, @Nonnull HazelcastInstance... hazelcastInstanceArr) {
        int i = 0;
        while (i < 10) {
            JobStatus jobStatus = null;
            try {
                jobStatus = job.getStatus();
            } catch (JobNotFoundException e) {
                SUPPORT_LOGGER.fine("Job " + job.getIdString() + " is gone.");
                return;
            } catch (Exception e2) {
                SUPPORT_LOGGER.warning("Failure to read job status: " + e2, e2);
            }
            if (jobStatus == JobStatus.FAILED) {
                return;
            }
            if (jobStatus == JobStatus.COMPLETED) {
                return;
            }
            try {
                job.cancel();
                try {
                    job.join();
                    return;
                } catch (JobNotFoundException e3) {
                    SUPPORT_LOGGER.fine("Job " + job.getIdString() + " is gone.");
                    return;
                } catch (Exception e4) {
                    return;
                }
            } catch (Exception e5) {
                sleepMillis(500);
                SUPPORT_LOGGER.warning("Failed to cancel the job and it is " + jobStatus + ", retrying. Failure: " + e5, e5);
                i++;
            } catch (JobNotFoundException e6) {
                SUPPORT_LOGGER.fine("Job " + job.getIdString() + " is gone.");
                return;
            }
        }
        try {
            for (HazelcastInstance hazelcastInstance : hazelcastInstanceArr) {
                hazelcastInstance.getLifecycleService().terminate();
            }
        } catch (Exception e7) {
        }
        throw new RuntimeException(i + " attempts to cancel the job failed" + (hazelcastInstanceArr.length > 0 ? ", shut down the cluster" : ""));
    }

    public static void cancelAndJoin(@Nonnull Job job) {
        job.cancel();
        try {
            job.join();
            Assert.fail("join didn't fail with CancellationException");
        } catch (CancellationException e) {
        }
    }

    public static <T> ProcessorMetaSupplier processorFromPipelineSource(BatchSource<T> batchSource) {
        return ((BatchSourceTransform) batchSource).metaSupplier;
    }

    public static Job awaitSingleRunningJob(HazelcastInstance hazelcastInstance) {
        AtomicReference atomicReference = new AtomicReference();
        assertTrueEventually(() -> {
            List list = (List) hazelcastInstance.getJet().getJobs().stream().filter(job -> {
                return job.getStatus() == JobStatus.RUNNING;
            }).collect(Collectors.toList());
            Assert.assertEquals(1L, list.size());
            atomicReference.set(list.get(0));
        });
        return (Job) atomicReference.get();
    }

    public static void assertJobExecuting(Job job, HazelcastInstance hazelcastInstance) {
        Assert.assertNotNull("Job should be executing on member " + hazelcastInstance + ", but is not", getJetServiceBackend(hazelcastInstance).getJobExecutionService().getExecutionContext(job.getId()));
    }

    public static void assertJobNotExecuting(Job job, HazelcastInstance hazelcastInstance) {
        Assert.assertNull("Job should not be executing on member " + hazelcastInstance + ", but is", getJetServiceBackend(hazelcastInstance).getJobExecutionService().getExecutionContext(job.getId()));
    }
}
