package com.hazelcast.jet.impl.connector;

import com.hazelcast.cache.ICache;
import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.cluster.Address;
import com.hazelcast.collection.IList;
import com.hazelcast.config.CacheSimpleConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.projection.Projections;
import com.hazelcast.query.Predicates;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/connector/HazelcastRemoteConnectorTest.class */
public class HazelcastRemoteConnectorTest extends JetTestSupport {
    private static final int ITEM_COUNT = 20;
    private static HazelcastInstance localHz;
    private static HazelcastInstance remoteHz;
    private static ClientConfig clientConfig;
    private static String SOURCE_NAME = randomName() + "-source";
    private static String SINK_NAME = randomName() + "-sink";
    private static final TestHazelcastFactory factory = new TestHazelcastFactory();

    @BeforeClass
    public static void setUp() {
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.addCacheConfig(new CacheSimpleConfig().setName("*"));
        localHz = factory.newHazelcastInstance(smallInstanceConfig);
        HazelcastInstance newHazelcastInstance = factory.newHazelcastInstance(smallInstanceConfig);
        Config smallInstanceConfig2 = smallInstanceConfig();
        CacheSimpleConfig name = new CacheSimpleConfig().setName("*");
        name.getEventJournalConfig().setEnabled(true);
        smallInstanceConfig2.addCacheConfig(name);
        smallInstanceConfig2.setClusterName(randomName());
        MapConfig mapConfig = new MapConfig();
        mapConfig.setName("*").getEventJournalConfig().setEnabled(true);
        smallInstanceConfig2.addMapConfig(mapConfig);
        remoteHz = Hazelcast.newHazelcastInstance(smallInstanceConfig2);
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(smallInstanceConfig2);
        clientConfig = new ClientConfig();
        clientConfig.setClusterName(smallInstanceConfig2.getClusterName());
        Address address = remoteHz.getCluster().getLocalMember().getAddress();
        clientConfig.getNetworkConfig().addAddress(new String[]{address.getHost() + ':' + address.getPort()});
        newHazelcastInstance2.getCacheManager().getCache(SOURCE_NAME);
        newHazelcastInstance2.getCacheManager().getCache(SINK_NAME);
        newHazelcastInstance.getCacheManager().getCache(SOURCE_NAME);
        newHazelcastInstance.getCacheManager().getCache(SINK_NAME);
    }

    @AfterClass
    public static void teardown() {
        HazelcastClient.shutdownAll();
        HazelcastInstanceFactory.terminateAll();
        factory.terminateAll();
    }

    @Before
    public void setup() {
        destroyObjects(localHz);
        destroyObjects(remoteHz);
        SOURCE_NAME = randomName() + "-source";
        SINK_NAME = randomName() + "-sink";
    }

    public void destroyObjects(HazelcastInstance hazelcastInstance) {
        hazelcastInstance.getDistributedObjects().forEach((v0) -> {
            v0.destroy();
        });
    }

    @Test
    public void when_readRemoteMap_withNativePredicateAndProjection() {
        populateMap(remoteHz.getMap(SOURCE_NAME));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("source", SourceProcessors.readRemoteMapP(SOURCE_NAME, clientConfig, Predicates.greaterThan("this", "0"), Projections.singleAttribute("value"))).localParallelism(4), dag.newVertex(SINK_NAME, SinkProcessors.writeListP(SINK_NAME)).localParallelism(1)));
        executeAndWait(dag);
        IList list = localHz.getList(SINK_NAME);
        Assert.assertEquals(19L, list.size());
        Assert.assertFalse(list.contains(0));
        Assert.assertTrue(list.contains(1));
    }

    @Test
    public void when_readRemoteMap_withPredicateAndFunction() {
        populateMap(remoteHz.getMap(SOURCE_NAME));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex(SOURCE_NAME, SourceProcessors.readRemoteMapP(SOURCE_NAME, clientConfig, entry -> {
            return !entry.getKey().equals(0);
        }, (v0) -> {
            return v0.getValue();
        })).localParallelism(4), dag.newVertex(SINK_NAME, SinkProcessors.writeListP(SINK_NAME)).localParallelism(1)));
        executeAndWait(dag);
        IList list = localHz.getList(SINK_NAME);
        Assert.assertEquals(19L, list.size());
        Assert.assertFalse(list.contains(0));
        Assert.assertTrue(list.contains(1));
    }

    @Test
    public void when_writeRemoteMap() {
        populateMap(localHz.getMap(SOURCE_NAME));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex(SOURCE_NAME, SourceProcessors.readMapP(SOURCE_NAME)), dag.newVertex(SINK_NAME, SinkProcessors.writeRemoteMapP(SINK_NAME, clientConfig))));
        executeAndWait(dag);
        Assert.assertEquals(20L, remoteHz.getMap(SINK_NAME).size());
    }

    @Test
    public void when_readRemoteCache() {
        populateCache(remoteHz.getCacheManager().getCache(SOURCE_NAME));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex(SOURCE_NAME, SourceProcessors.readRemoteCacheP(SOURCE_NAME, clientConfig)), dag.newVertex(SINK_NAME, SinkProcessors.writeListP(SINK_NAME))));
        executeAndWait(dag);
        Assert.assertEquals(20L, localHz.getList(SINK_NAME).size());
    }

    @Test
    public void when_writeRemoteCache() {
        populateCache(localHz.getCacheManager().getCache(SOURCE_NAME));
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex(SOURCE_NAME, SourceProcessors.readCacheP(SOURCE_NAME)), dag.newVertex(SINK_NAME, SinkProcessors.writeRemoteCacheP(SINK_NAME, clientConfig))));
        executeAndWait(dag);
        Assert.assertEquals(20L, remoteHz.getCacheManager().getCache(SINK_NAME).size());
    }

    @Test
    public void when_streamRemoteMap() {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex(SOURCE_NAME, SourceProcessors.streamRemoteMapP(SOURCE_NAME, clientConfig, JournalInitialPosition.START_FROM_OLDEST, EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.getValue();
        }, WatermarkPolicy.limitingLag(0L), 1L, 0L, 10000L))), dag.newVertex(SINK_NAME, SinkProcessors.writeListP(SINK_NAME))));
        Job newJob = localHz.getJet().newJob(dag);
        populateMap(remoteHz.getMap(SOURCE_NAME));
        assertSizeEventually(ITEM_COUNT, (Collection) localHz.getList(SINK_NAME));
        newJob.cancel();
    }

    @Test
    public void when_streamRemoteMap_withPredicateAndProjection() {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex(SOURCE_NAME, SourceProcessors.streamRemoteMapP(SOURCE_NAME, clientConfig, eventJournalMapEvent -> {
            return ((Integer) eventJournalMapEvent.getKey()).intValue() != 0;
        }, (v0) -> {
            return v0.getKey();
        }, JournalInitialPosition.START_FROM_OLDEST, EventTimePolicy.eventTimePolicy(num -> {
            return num.intValue();
        }, WatermarkPolicy.limitingLag(0L), 1L, 0L, 10000L))), dag.newVertex(SINK_NAME, SinkProcessors.writeListP(SINK_NAME))));
        Job newJob = localHz.getJet().newJob(dag);
        populateMap(remoteHz.getMap(SOURCE_NAME));
        assertSizeEventually(19, (Collection) localHz.getList(SINK_NAME));
        Assert.assertFalse(localHz.getList(SINK_NAME).contains(0));
        Assert.assertTrue(localHz.getList(SINK_NAME).contains(1));
        newJob.cancel();
    }

    @Test
    public void when_streamRemoteCache() {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex(SOURCE_NAME, SourceProcessors.streamRemoteCacheP(SOURCE_NAME, clientConfig, JournalInitialPosition.START_FROM_OLDEST, EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.getValue();
        }, WatermarkPolicy.limitingLag(0L), 1L, 0L, 10000L))).localParallelism(4), dag.newVertex(SINK_NAME, SinkProcessors.writeListP(SINK_NAME)).localParallelism(1)));
        Job newJob = localHz.getJet().newJob(dag);
        populateCache(remoteHz.getCacheManager().getCache(SOURCE_NAME));
        assertSizeEventually(ITEM_COUNT, (Collection) localHz.getList(SINK_NAME));
        newJob.cancel();
    }

    @Test
    public void when_streamRemoteCache_withPredicateAndProjection() {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex(SOURCE_NAME, SourceProcessors.streamRemoteCacheP(SOURCE_NAME, clientConfig, eventJournalCacheEvent -> {
            return !((Integer) eventJournalCacheEvent.getKey()).equals(0);
        }, (v0) -> {
            return v0.getKey();
        }, JournalInitialPosition.START_FROM_OLDEST, EventTimePolicy.eventTimePolicy(num -> {
            return num.intValue();
        }, WatermarkPolicy.limitingLag(0L), 1L, 0L, 10000L))), dag.newVertex(SINK_NAME, SinkProcessors.writeListP(SINK_NAME))));
        Job newJob = localHz.getJet().newJob(dag);
        populateCache(remoteHz.getCacheManager().getCache(SOURCE_NAME));
        assertSizeEventually(19, (Collection) localHz.getList(SINK_NAME));
        Assert.assertFalse(localHz.getList(SINK_NAME).contains(0));
        Assert.assertTrue(localHz.getList(SINK_NAME).contains(1));
        newJob.cancel();
    }

    private void executeAndWait(DAG dag) {
        assertCompletesEventually(localHz.getJet().newJob(dag).getFuture());
    }

    private static void populateMap(Map<Object, Object> map) {
        map.putAll((Map) IntStream.range(0, ITEM_COUNT).boxed().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return num2;
        })));
    }

    private static void populateCache(ICache<Object, Object> iCache) {
        iCache.putAll((Map) IntStream.range(0, ITEM_COUNT).boxed().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return num2;
        })));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = true;
                    break;
                }
                break;
            case -537549516:
                if (implMethodName.equals("lambda$when_streamRemoteCache_withPredicateAndProjection$9b903fa1$1")) {
                    z = 2;
                    break;
                }
                break;
            case -386712562:
                if (implMethodName.equals("lambda$when_streamRemoteMap_withPredicateAndProjection$9b903fa1$1")) {
                    z = 6;
                    break;
                }
                break;
            case 320266620:
                if (implMethodName.equals("lambda$when_streamRemoteCache_withPredicateAndProjection$cc54fb6f$1")) {
                    z = 5;
                    break;
                }
                break;
            case 471103574:
                if (implMethodName.equals("lambda$when_streamRemoteMap_withPredicateAndProjection$cc54fb6f$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1449763309:
                if (implMethodName.equals("lambda$when_readRemoteMap_withPredicateAndFunction$a89fc23a$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/projection/Projection") && serializedLambda.getFunctionalInterfaceMethodName().equals("transform") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/map/EventJournalMapEvent") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/cache/EventJournalCacheEvent") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/HazelcastRemoteConnectorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)J")) {
                    return num -> {
                        return num.intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/HazelcastRemoteConnectorTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/map/EventJournalMapEvent;)Z")) {
                    return eventJournalMapEvent -> {
                        return ((Integer) eventJournalMapEvent.getKey()).intValue() != 0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/query/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/util/Map$Entry;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/HazelcastRemoteConnectorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map$Entry;)Z")) {
                    return entry -> {
                        return !entry.getKey().equals(0);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/HazelcastRemoteConnectorTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/cache/EventJournalCacheEvent;)Z")) {
                    return eventJournalCacheEvent -> {
                        return !((Integer) eventJournalCacheEvent.getKey()).equals(0);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/HazelcastRemoteConnectorTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)J")) {
                    return num2 -> {
                        return num2.intValue();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
