/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Test;

public class KTableAggregateTest {
    private final Serde<String> stringSerde = Serdes.String();
    private final Consumed<String, String> consumed = Consumed.with(this.stringSerde, this.stringSerde);
    private final Grouped<String, String> stringSerialized = Grouped.with(this.stringSerde, this.stringSerde);
    private final MockProcessorSupplier<String, Object> supplier = new MockProcessorSupplier();
    private static final Properties CONFIG = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory((String)"kafka-test").getAbsolutePath())}));

    @Test
    public void testAggBasic() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.groupBy(MockMapper.noOpKeyValueMapper(), this.stringSerialized).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"topic1-Canonized").withValueSerde(this.stringSerde));
        table2.toStream().process(this.supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), CONFIG, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"A", (Object)"1", 10L);
            inputTopic.pipeInput((Object)"B", (Object)"2", 15L);
            inputTopic.pipeInput((Object)"A", (Object)"3", 20L);
            inputTopic.pipeInput((Object)"B", (Object)"4", 18L);
            inputTopic.pipeInput((Object)"C", (Object)"5", 5L);
            inputTopic.pipeInput((Object)"D", (Object)"6", 25L);
            inputTopic.pipeInput((Object)"B", (Object)"7", 15L);
            inputTopic.pipeInput((Object)"C", (Object)"8", 10L);
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, String>("A", "0+1", 10L), new KeyValueTimestamp<String, String>("B", "0+2", 15L), new KeyValueTimestamp<String, String>("A", "0+1-1", 20L), new KeyValueTimestamp<String, String>("A", "0+1-1+3", 20L), new KeyValueTimestamp<String, String>("B", "0+2-2", 18L), new KeyValueTimestamp<String, String>("B", "0+2-2+4", 18L), new KeyValueTimestamp<String, String>("C", "0+5", 5L), new KeyValueTimestamp<String, String>("D", "0+6", 25L), new KeyValueTimestamp<String, String>("B", "0+2-2+4-4", 18L), new KeyValueTimestamp<String, String>("B", "0+2-2+4-4+7", 18L), new KeyValueTimestamp<String, String>("C", "0+5-5", 10L), new KeyValueTimestamp<String, String>("C", "0+5-5+8", 10L)), this.supplier.theCapturedProcessor().processed());
        }
    }

    @Test
    public void testAggRepartition() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table1 = builder.table("topic1", this.consumed);
        KTable table2 = table1.groupBy((key, value) -> {
            switch (key) {
                case "null": {
                    return KeyValue.pair(null, (Object)value);
                }
                case "NULL": {
                    return null;
                }
            }
            return KeyValue.pair((Object)value, (Object)value);
        }, this.stringSerialized).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, Materialized.as((String)"topic1-Canonized").withValueSerde(this.stringSerde));
        table2.toStream().process(this.supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), CONFIG, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"A", (Object)"1", 10L);
            inputTopic.pipeInput((Object)"A", (Object)null, 15L);
            inputTopic.pipeInput((Object)"A", (Object)"1", 12L);
            inputTopic.pipeInput((Object)"B", (Object)"2", 20L);
            inputTopic.pipeInput((Object)"null", (Object)"3", 25L);
            inputTopic.pipeInput((Object)"B", (Object)"4", 23L);
            inputTopic.pipeInput((Object)"NULL", (Object)"5", 24L);
            inputTopic.pipeInput((Object)"B", (Object)"7", 22L);
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, String>("1", "0+1", 10L), new KeyValueTimestamp<String, String>("1", "0+1-1", 15L), new KeyValueTimestamp<String, String>("1", "0+1-1+1", 15L), new KeyValueTimestamp<String, String>("2", "0+2", 20L), new KeyValueTimestamp<String, String>("2", "0+2-2", 23L), new KeyValueTimestamp<String, String>("4", "0+4", 23L), new KeyValueTimestamp<String, String>("4", "0+4-4", 23L), new KeyValueTimestamp<String, String>("7", "0+7", 22L)), this.supplier.theCapturedProcessor().processed());
        }
    }

    private static void testCountHelper(StreamsBuilder builder, String input, MockProcessorSupplier<String, Object> supplier) {
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), CONFIG, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic(input, (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            inputTopic.pipeInput((Object)"A", (Object)"green", 10L);
            inputTopic.pipeInput((Object)"B", (Object)"green", 9L);
            inputTopic.pipeInput((Object)"A", (Object)"blue", 12L);
            inputTopic.pipeInput((Object)"C", (Object)"yellow", 15L);
            inputTopic.pipeInput((Object)"D", (Object)"green", 11L);
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, Long>("green", 1L, 10L), new KeyValueTimestamp<String, Long>("green", 2L, 10L), new KeyValueTimestamp<String, Long>("green", 1L, 12L), new KeyValueTimestamp<String, Long>("blue", 1L, 12L), new KeyValueTimestamp<String, Long>("yellow", 1L, 15L), new KeyValueTimestamp<String, Long>("green", 2L, 12L)), supplier.theCapturedProcessor().processed());
        }
    }

    @Test
    public void testCount() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        builder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialized).count(Materialized.as((String)"count")).toStream().process(this.supplier, new String[0]);
        KTableAggregateTest.testCountHelper(builder, "count-test-input", this.supplier);
    }

    @Test
    public void testCountWithInternalStore() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        builder.table("count-test-input", this.consumed).groupBy(MockMapper.selectValueKeyValueMapper(), this.stringSerialized).count().toStream().process(this.supplier, new String[0]);
        KTableAggregateTest.testCountHelper(builder, "count-test-input", this.supplier);
    }

    @Test
    public void testRemoveOldBeforeAddNew() {
        StreamsBuilder builder = new StreamsBuilder();
        String input = "count-test-input";
        MockProcessorSupplier supplier = new MockProcessorSupplier();
        builder.table("count-test-input", this.consumed).groupBy((key, value) -> KeyValue.pair((Object)String.valueOf(key.charAt(0)), (Object)String.valueOf(key.charAt(1))), this.stringSerialized).aggregate(() -> "", (aggKey, value, aggregate) -> aggregate + value, (key, value, aggregate) -> aggregate.replaceAll((String)value, ""), Materialized.as((String)"someStore").withValueSerde(Serdes.String())).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), CONFIG, Instant.ofEpochMilli(0L));){
            TestInputTopic inputTopic = driver.createInputTopic("count-test-input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockProcessor proc = supplier.theCapturedProcessor();
            inputTopic.pipeInput((Object)"11", (Object)"A", 10L);
            inputTopic.pipeInput((Object)"12", (Object)"B", 8L);
            inputTopic.pipeInput((Object)"11", (Object)null, 12L);
            inputTopic.pipeInput((Object)"12", (Object)"C", 6L);
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<String, String>("1", "1", 10L), new KeyValueTimestamp<String, String>("1", "12", 10L), new KeyValueTimestamp<String, String>("1", "2", 12L), new KeyValueTimestamp<String, String>("1", "", 12L), new KeyValueTimestamp<String, String>("1", "2", 12L)), proc.processed());
        }
    }
}

