/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class KStreamKStreamIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final MockTime MOCK_TIME = KStreamKStreamIntegrationTest.CLUSTER.time;
    private static final String LEFT_STREAM = "leftStream";
    private static final String RIGHT_STREAM = "rightStream";
    private static final String OUTPUT = "output";
    private Properties streamsConfig;
    private KafkaStreams streams;
    private static final Properties CONSUMER_CONFIG = new Properties();
    private static final Properties PRODUCER_CONFIG = new Properties();

    @BeforeAll
    public static void startCluster() throws Exception {
        CLUSTER.start();
        CLUSTER.createTopic(LEFT_STREAM, 4, 1);
        CLUSTER.createTopic(RIGHT_STREAM, 4, 1);
        CLUSTER.createTopic(OUTPUT, 4, 1);
        CONSUMER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        CONSUMER_CONFIG.put("group.id", "result-consumer");
        CONSUMER_CONFIG.put("key.deserializer", StringDeserializer.class);
        CONSUMER_CONFIG.put("value.deserializer", StringDeserializer.class);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws IOException {
        String stateDirBasePath = TestUtils.tempDirectory().getPath();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.streamsConfig = this.getStreamsConfig(safeTestName);
        this.streamsConfig.put("state.dir", stateDirBasePath);
        this.streamsConfig.put("__emit.interval.ms.kstreams.outer.join.spurious.results.fix__", (Object)0L);
    }

    @AfterEach
    public void after() throws IOException {
        if (this.streams != null) {
            this.streams.close();
            this.streams = null;
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfig);
    }

    @Test
    public void shouldOuterJoin() throws Exception {
        HashSet<KeyValue<String, String>> expected = new HashSet<KeyValue<String, String>>();
        expected.add(new KeyValue((Object)"Key-1", (Object)"value1=left-1a,value2=null"));
        expected.add(new KeyValue((Object)"Key-2", (Object)"value1=left-2a,value2=null"));
        expected.add(new KeyValue((Object)"Key-3", (Object)"value1=left-3a,value2=null"));
        expected.add(new KeyValue((Object)"Key-4", (Object)"value1=left-4a,value2=null"));
        expected.add(new KeyValue(null, (Object)"value1=left-5a,value2=null"));
        this.verifyKStreamKStreamOuterJoin(expected);
    }

    private void verifyKStreamKStreamOuterJoin(Set<KeyValue<String, String>> expectedResult) throws Exception {
        this.streams = KStreamKStreamIntegrationTest.prepareTopology(this.streamsConfig);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(this.streams), Duration.ofSeconds(120L));
        PRODUCER_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        PRODUCER_CONFIG.put("acks", "all");
        PRODUCER_CONFIG.put("key.serializer", StringSerializer.class);
        PRODUCER_CONFIG.put("value.serializer", StringSerializer.class);
        List left1 = Arrays.asList(new KeyValue((Object)"Key-1", (Object)"left-1a"), new KeyValue((Object)"Key-2", (Object)"left-2a"), new KeyValue((Object)"Key-3", (Object)"left-3a"), new KeyValue((Object)"Key-4", (Object)"left-4a"), new KeyValue(null, (Object)"left-5a"));
        List left2 = Arrays.asList(new KeyValue((Object)"Key-1", (Object)"left-1b"), new KeyValue((Object)"Key-2", (Object)"left-2b"), new KeyValue((Object)"Key-3", (Object)"left-3b"), new KeyValue((Object)"Key-4", (Object)"left-4b"));
        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left1, PRODUCER_CONFIG, (Time)MOCK_TIME);
        MOCK_TIME.sleep(10000L);
        IntegrationTestUtils.produceKeyValuesSynchronously(LEFT_STREAM, left2, PRODUCER_CONFIG, (Time)MOCK_TIME);
        HashSet result = new HashSet(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(CONSUMER_CONFIG, OUTPUT, expectedResult.size()));
        MatcherAssert.assertThat(expectedResult, (Matcher)IsEqual.equalTo(result));
    }

    private Properties getStreamsConfig(String testName) {
        Properties streamsConfig = new Properties();
        streamsConfig.put("application.id", "KStream-KStream-join" + testName);
        streamsConfig.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfig.put("auto.offset.reset", "earliest");
        streamsConfig.put("commit.interval.ms", (Object)100L);
        streamsConfig.put("default.key.serde", Serdes.StringSerde.class);
        streamsConfig.put("default.value.serde", Serdes.StringSerde.class);
        return streamsConfig;
    }

    private static KafkaStreams prepareTopology(Properties streamsConfig) {
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream1 = builder.stream(LEFT_STREAM);
        KStream stream2 = builder.stream(RIGHT_STREAM);
        ValueJoiner joiner = (value1, value2) -> "value1=" + value1 + ",value2=" + value2;
        stream1.outerJoin(stream2, joiner, JoinWindows.ofTimeDifferenceWithNoGrace((Duration)Duration.ofMillis(10L))).to(OUTPUT);
        return new KafkaStreams(builder.build(streamsConfig), streamsConfig);
    }
}

