/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.hamcrest.core.IsEqual;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.experimental.categories.Category;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Category(value={IntegrationTest.class})
@RunWith(value=Parameterized.class)
public abstract class AbstractJoinIntegrationTest {
    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
    static String appID;
    private final MockTime time = new MockTime();
    private static final Long COMMIT_INTERVAL;
    static final Properties STREAMS_CONFIG;
    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
    static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
    static final String OUTPUT_TOPIC = "outputTopic";
    static final long ANY_UNIQUE_KEY = 0L;
    StreamsBuilder builder;
    private final List<Input<String>> input = Arrays.asList(new Input<Object>("inputTopicLeft", null), new Input<Object>("inputTopicRight", null), new Input<String>("inputTopicLeft", "A"), new Input<String>("inputTopicRight", "a"), new Input<String>("inputTopicLeft", "B"), new Input<String>("inputTopicRight", "b"), new Input<Object>("inputTopicLeft", null), new Input<Object>("inputTopicRight", null), new Input<String>("inputTopicLeft", "C"), new Input<String>("inputTopicRight", "c"), new Input<Object>("inputTopicRight", null), new Input<Object>("inputTopicLeft", null), new Input<Object>("inputTopicRight", null), new Input<String>("inputTopicRight", "d"), new Input<String>("inputTopicLeft", "D"));
    final ValueJoiner<String, String, String> valueJoiner = (value1, value2) -> value1 + "-" + value2;
    final boolean cacheEnabled;

    @Parameterized.Parameters(name="caching enabled = {0}")
    public static Collection<Object[]> data() {
        ArrayList<Object[]> values = new ArrayList<Object[]>();
        for (boolean cacheEnabled : Arrays.asList(true, false)) {
            values.add(new Object[]{cacheEnabled});
        }
        return values;
    }

    AbstractJoinIntegrationTest(boolean cacheEnabled) {
        this.cacheEnabled = cacheEnabled;
    }

    @BeforeClass
    public static void setupConfigsAndUtils() {
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("default.key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
    }

    void prepareEnvironment() throws InterruptedException {
        if (!this.cacheEnabled) {
            STREAMS_CONFIG.put("cache.max.bytes.buffering", (Object)0);
        }
        STREAMS_CONFIG.put("state.dir", this.testFolder.getRoot().getPath());
    }

    void runTestWithDriver(List<List<TestRecord<Long, String>>> expectedResult) {
        this.runTestWithDriver(expectedResult, null);
    }

    void runTestWithDriver(List<List<TestRecord<Long, String>>> expectedResult, String storeName) {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(STREAMS_CONFIG), STREAMS_CONFIG);){
            long firstTimestamp;
            TestInputTopic right = driver.createInputTopic(INPUT_TOPIC_RIGHT, (Serializer)new LongSerializer(), (Serializer)new StringSerializer());
            TestInputTopic left = driver.createInputTopic(INPUT_TOPIC_LEFT, (Serializer)new LongSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT_TOPIC, (Deserializer)new LongDeserializer(), (Deserializer)new StringDeserializer());
            HashMap<String, TestInputTopic> testInputTopicMap = new HashMap<String, TestInputTopic>();
            testInputTopicMap.put(INPUT_TOPIC_RIGHT, right);
            testInputTopicMap.put(INPUT_TOPIC_LEFT, left);
            TestRecord expectedFinalResult = null;
            long eventTimestamp = firstTimestamp = this.time.milliseconds();
            Iterator<List<TestRecord<Long, String>>> resultIterator = expectedResult.iterator();
            for (Input<String> singleInputRecord : this.input) {
                ((TestInputTopic)testInputTopicMap.get(singleInputRecord.topic)).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, ++eventTimestamp);
                List<TestRecord<Long, String>> expected = resultIterator.next();
                if (expected == null) continue;
                LinkedList<TestRecord> updatedExpected = new LinkedList<TestRecord>();
                for (TestRecord<Long, String> record : expected) {
                    updatedExpected.add(new TestRecord(record.key(), record.value(), null, Long.valueOf(firstTimestamp + record.timestamp())));
                }
                List output = outputTopic.readRecordsToList();
                MatcherAssert.assertThat((Object)output, (Matcher)IsEqual.equalTo(updatedExpected));
                expectedFinalResult = (TestRecord)updatedExpected.get(expected.size() - 1);
            }
            if (storeName != null) {
                this.checkQueryableStore(storeName, expectedFinalResult, driver);
            }
        }
    }

    void runTestWithDriver(TestRecord<Long, String> expectedFinalResult, String storeName) throws InterruptedException {
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(STREAMS_CONFIG), STREAMS_CONFIG);){
            long firstTimestamp;
            TestInputTopic right = driver.createInputTopic(INPUT_TOPIC_RIGHT, (Serializer)new LongSerializer(), (Serializer)new StringSerializer());
            TestInputTopic left = driver.createInputTopic(INPUT_TOPIC_LEFT, (Serializer)new LongSerializer(), (Serializer)new StringSerializer());
            TestOutputTopic outputTopic = driver.createOutputTopic(OUTPUT_TOPIC, (Deserializer)new LongDeserializer(), (Deserializer)new StringDeserializer());
            HashMap<String, TestInputTopic> testInputTopicMap = new HashMap<String, TestInputTopic>();
            testInputTopicMap.put(INPUT_TOPIC_RIGHT, right);
            testInputTopicMap.put(INPUT_TOPIC_LEFT, left);
            long eventTimestamp = firstTimestamp = this.time.milliseconds();
            for (Input<String> singleInputRecord : this.input) {
                ((TestInputTopic)testInputTopicMap.get(singleInputRecord.topic)).pipeInput(singleInputRecord.record.key, singleInputRecord.record.value, ++eventTimestamp);
            }
            TestRecord updatedExpectedFinalResult = new TestRecord(expectedFinalResult.key(), expectedFinalResult.value(), null, Long.valueOf(firstTimestamp + expectedFinalResult.timestamp()));
            List output = outputTopic.readRecordsToList();
            MatcherAssert.assertThat(output.get(output.size() - 1), (Matcher)IsEqual.equalTo((Object)updatedExpectedFinalResult));
            if (storeName != null) {
                this.checkQueryableStore(storeName, (TestRecord<Long, String>)updatedExpectedFinalResult, driver);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkQueryableStore(String queryableName, TestRecord<Long, String> expectedFinalResult, TopologyTestDriver driver) {
        KeyValueStore store = driver.getTimestampedKeyValueStore(queryableName);
        KeyValueIterator all = store.all();
        KeyValue onlyEntry = (KeyValue)all.next();
        try {
            MatcherAssert.assertThat((Object)onlyEntry.key, (Matcher)Is.is((Object)expectedFinalResult.key()));
            MatcherAssert.assertThat((Object)((ValueAndTimestamp)onlyEntry.value).value(), (Matcher)Is.is((Object)expectedFinalResult.value()));
            MatcherAssert.assertThat((Object)((ValueAndTimestamp)onlyEntry.value).timestamp(), (Matcher)Is.is((Object)expectedFinalResult.timestamp()));
            MatcherAssert.assertThat((Object)all.hasNext(), (Matcher)Is.is((Object)false));
        }
        finally {
            all.close();
        }
    }

    static {
        COMMIT_INTERVAL = 100L;
        STREAMS_CONFIG = new Properties();
    }

    private static final class Input<V> {
        String topic;
        KeyValue<Long, V> record;

        Input(String topic, V value) {
            this.topic = topic;
            this.record = KeyValue.pair((Object)0L, value);
        }
    }
}

