/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class KStreamWindowAggregateTest {
    private static final String WINDOW_STORE_NAME = "dummy-store-name";
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
    private final String threadId = Thread.currentThread().getName();
    @Parameterized.Parameter
    public EmitStrategy.StrategyType type;
    @Parameterized.Parameter(value=1)
    public boolean withCache;
    private EmitStrategy emitStrategy;
    private boolean emitFinal;

    @Parameterized.Parameters(name="{0}_cache:{1}")
    public static Collection<Object[]> getEmitStrategy() {
        return Arrays.asList({EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true}, {EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false}, {EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true}, {EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false});
    }

    @Before
    public void before() {
        this.emitFinal = this.type.equals((Object)EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
        this.emitStrategy = EmitStrategy.StrategyType.forType((EmitStrategy.StrategyType)this.type);
    }

    @Test
    public void testAggBasic() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        KTable table2 = builder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(100L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.setMaterializedCache(Materialized.as((String)"topic1-Canonized").withValueSerde(Serdes.String())));
        MockApiProcessorSupplier supplier = new MockApiProcessorSupplier();
        table2.toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic1.pipeInput((Object)"A", (Object)"1", 0L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 1L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 2L);
            inputTopic1.pipeInput((Object)"D", (Object)"4", 3L);
            inputTopic1.pipeInput((Object)"A", (Object)"1", 4L);
            inputTopic1.pipeInput((Object)"A", (Object)"1", 5L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 6L);
            inputTopic1.pipeInput((Object)"D", (Object)"4", 7L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 8L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 9L);
            inputTopic1.pipeInput((Object)"A", (Object)"1", 10L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 11L);
            inputTopic1.pipeInput((Object)"D", (Object)"4", 12L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 13L);
            inputTopic1.pipeInput((Object)"C", (Object)"3", 14L);
            inputTopic1.pipeInput((Object)"B", (Object)"1", 3L);
            inputTopic1.pipeInput((Object)"B", (Object)"2", 2L);
            inputTopic1.pipeInput((Object)"B", (Object)"3", 9L);
        }
        if (this.emitFinal) {
            Assert.assertTrue((boolean)supplier.theCapturedProcessor().processed().isEmpty());
        } else {
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3", 2L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4", 3L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1", 4L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1+1", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4+4", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3+3", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(5L, 15L)), "0+3", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(10L, 20L)), "0+1", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2+2", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+2", 11L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4+4", 12L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(10L, 20L)), "0+4", 12L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2+2+2", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), "0+2+2", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(5L, 15L)), "0+3+3", 14L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(10L, 20L)), "0+3", 14L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2+1", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2+1+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2+1+2+3", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2+2+2+3", 13L)), supplier.theCapturedProcessor().processed());
        }
    }

    @Test
    public void testJoin() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic1 = "topic1";
        String topic2 = "topic2";
        long grace = this.emitFinal ? 5L : 100L;
        KTable table1 = builder.stream("topic1", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(grace)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.setMaterializedCache(Materialized.as((String)"topic1-Canonized").withValueSerde(Serdes.String())));
        MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier = new MockApiProcessorSupplier<Windowed<String>, String, Void, Void>();
        table1.toStream().process(supplier, new String[0]);
        KTable table2 = builder.stream("topic2", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(grace)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.setMaterializedCache(Materialized.as((String)"topic2-Canonized").withValueSerde(Serdes.String())));
        table2.toStream().process(supplier, new String[0]);
        table1.join(table2, (p1, p2) -> p1 + "%" + p2).toStream().process(supplier, new String[0]);
        try (TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic1 = driver.createInputTopic("topic1", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            TestInputTopic inputTopic2 = driver.createInputTopic("topic2", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            if (this.emitFinal) {
                this.processEmitFinalJoin((TestInputTopic<String, String>)inputTopic1, (TestInputTopic<String, String>)inputTopic2, supplier);
            } else {
                this.processEmitUpdateJoin((TestInputTopic<String, String>)inputTopic1, (TestInputTopic<String, String>)inputTopic2, supplier);
            }
        }
    }

    private void processEmitFinalJoin(TestInputTopic<String, String> inputTopic1, TestInputTopic<String, String> inputTopic2, MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier) {
        inputTopic1.pipeInput((Object)"A", (Object)"1", 0L);
        inputTopic1.pipeInput((Object)"B", (Object)"2", 1L);
        inputTopic1.pipeInput((Object)"C", (Object)"3", 2L);
        inputTopic1.pipeInput((Object)"D", (Object)"4", 3L);
        inputTopic1.pipeInput((Object)"A", (Object)"1", 9L);
        inputTopic1.pipeInput((Object)"A", (Object)"1", 15L);
        List<MockApiProcessor<Windowed<String>, String, Void, Void>> processors = supplier.capturedProcessors(3);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3", 2L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4", 3L));
        processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        inputTopic1.pipeInput((Object)"A", (Object)"1", 10L);
        inputTopic1.pipeInput((Object)"B", (Object)"2", 11L);
        inputTopic1.pipeInput((Object)"D", (Object)"4", 12L);
        inputTopic1.pipeInput((Object)"B", (Object)"2", 13L);
        inputTopic1.pipeInput((Object)"C", (Object)"3", 14L);
        inputTopic1.pipeInput((Object)"A", (Object)"1", 20L);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(5L, 15L)), "0+3", 14L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4", 12L));
        processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        inputTopic2.pipeInput((Object)"A", (Object)"a", 0L);
        inputTopic2.pipeInput((Object)"B", (Object)"b", 1L);
        inputTopic2.pipeInput((Object)"C", (Object)"c", 2L);
        inputTopic2.pipeInput((Object)"D", (Object)"d", 10L);
        inputTopic2.pipeInput((Object)"A", (Object)"a", 15L);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        if (this.withCache) {
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+a", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+b", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+c", 2L));
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1%0+a", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2%0+b", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3%0+c", 2L));
        } else {
            processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
            processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        }
        inputTopic2.pipeInput((Object)"A", (Object)"a", 5L);
        inputTopic2.pipeInput((Object)"B", (Object)"b", 6L);
        inputTopic2.pipeInput((Object)"D", (Object)"d", 7L);
        inputTopic2.pipeInput((Object)"D", (Object)"d", 18L);
        inputTopic2.pipeInput((Object)"A", (Object)"a", 21L);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        if (this.withCache) {
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+a", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+b", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+d+d", 10L));
        } else {
            processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+a", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+b", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+c", 2L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+a", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+b", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+d+d", 10L));
        }
        processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1%0+a", 10L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2%0+b", 13L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4%0+d+d", 12L));
    }

    private void processEmitUpdateJoin(TestInputTopic<String, String> inputTopic1, TestInputTopic<String, String> inputTopic2, MockApiProcessorSupplier<Windowed<String>, String, Void, Void> supplier) {
        inputTopic1.pipeInput((Object)"A", (Object)"1", 0L);
        inputTopic1.pipeInput((Object)"B", (Object)"2", 1L);
        inputTopic1.pipeInput((Object)"C", (Object)"3", 2L);
        inputTopic1.pipeInput((Object)"D", (Object)"4", 3L);
        inputTopic1.pipeInput((Object)"A", (Object)"1", 9L);
        List<MockApiProcessor<Windowed<String>, String, Void, Void>> processors = supplier.capturedProcessors(3);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3", 2L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4", 3L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1", 9L));
        processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        inputTopic1.pipeInput((Object)"A", (Object)"1", 5L);
        inputTopic1.pipeInput((Object)"B", (Object)"2", 6L);
        inputTopic1.pipeInput((Object)"D", (Object)"4", 7L);
        inputTopic1.pipeInput((Object)"B", (Object)"2", 8L);
        inputTopic1.pipeInput((Object)"C", (Object)"3", 9L);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1+1", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4+4", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3+3", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(5L, 15L)), "0+3", 9L));
        processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        inputTopic2.pipeInput((Object)"A", (Object)"a", 0L);
        inputTopic2.pipeInput((Object)"B", (Object)"b", 1L);
        inputTopic2.pipeInput((Object)"C", (Object)"c", 2L);
        inputTopic2.pipeInput((Object)"D", (Object)"d", 20L);
        inputTopic2.pipeInput((Object)"A", (Object)"a", 20L);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+a", 0L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+b", 1L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+c", 2L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(15L, 25L)), "0+d", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(20L, 30L)), "0+d", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(15L, 25L)), "0+a", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(20L, 30L)), "0+a", 20L));
        processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1+1%0+a", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2%0+b", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"C", (Window)new TimeWindow(0L, 10L)), "0+3+3%0+c", 9L));
        inputTopic2.pipeInput((Object)"A", (Object)"a", 5L);
        inputTopic2.pipeInput((Object)"B", (Object)"b", 6L);
        inputTopic2.pipeInput((Object)"D", (Object)"d", 7L);
        inputTopic2.pipeInput((Object)"D", (Object)"d", 18L);
        inputTopic2.pipeInput((Object)"A", (Object)"a", 21L);
        processors.get(0).checkAndClearProcessResult(new KeyValueTimestamp[0]);
        processors.get(1).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+a+a", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+a", 5L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+b+b", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+b", 6L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+d", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+d", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(10L, 20L)), "0+d", 18L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(15L, 25L)), "0+d+d", 20L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(15L, 25L)), "0+a+a", 21L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(20L, 30L)), "0+a+a", 21L));
        processors.get(2).checkAndClearProcessResult(new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), "0+1+1+1%0+a+a", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), "0+1+1%0+a", 9L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), "0+2+2+2%0+b+b", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), "0+2+2%0+b", 8L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(0L, 10L)), "0+4+4%0+d", 7L), new KeyValueTimestamp<Windowed, String>(new Windowed((Object)"D", (Window)new TimeWindow(5L, 15L)), "0+4%0+d", 7L));
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKey() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.ofSizeWithNoGrace((Duration)Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.toStringInstance("+"), this.setMaterializedCache(Materialized.as((String)"topic1-Canonicalized").withValueSerde(Serdes.String())));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamWindowAggregate.class);
             TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput(null, (Object)"1");
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. topic=[topic] partition=[0] offset=[0]"));
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindow() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(90L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(() -> "", MockAggregator.toStringInstance("+"), this.setMaterializedCache(Materialized.as((String)"topic1-Canonicalized").withValueSerde(Serdes.String()).withLoggingDisabled().withRetention(Duration.ofMillis(100L)))).toStream().map((key, value) -> new KeyValue((Object)key.toString(), value)).to("output");
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamWindowAggregate.class);
             TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"k", (Object)"100", 100L);
            inputTopic.pipeInput((Object)"k", (Object)"0", 0L);
            inputTopic.pipeInput((Object)"k", (Object)"1", 1L);
            inputTopic.pipeInput((Object)"k", (Object)"2", 2L);
            inputTopic.pipeInput((Object)"k", (Object)"3", 3L);
            inputTopic.pipeInput((Object)"k", (Object)"4", 4L);
            inputTopic.pipeInput((Object)"k", (Object)"5", 5L);
            inputTopic.pipeInput((Object)"k", (Object)"6", 6L);
            inputTopic.pipeInput((Object)"k", (Object)"105", 105L);
            inputTopic.pipeInput((Object)"k", (Object)"106", 106L);
            this.assertLatenessMetrics(driver, (Matcher<Object>)CoreMatchers.is((Object)7.0), (Matcher<Object>)CoreMatchers.is((Object)100.0), (Matcher<Object>)CoreMatchers.is((Object)67.9));
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"Skipping record for expired window. topic=[topic] partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10] streamTime=[100]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10] streamTime=[100]"}));
            TestOutputTopic outputTopic = driver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
            if (this.emitFinal) {
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@5/15]", (Object)"+5+6", null, Long.valueOf(6L))));
                this.assertEmittedMetrics(driver, (Matcher<Object>)CoreMatchers.is((Object)1.0));
            } else {
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@95/105]", (Object)"+100", null, Long.valueOf(100L))));
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@100/110]", (Object)"+100", null, Long.valueOf(100L))));
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@5/15]", (Object)"+5", null, Long.valueOf(5L))));
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@5/15]", (Object)"+5+6", null, Long.valueOf(6L))));
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@100/110]", (Object)"+100+105", null, Long.valueOf(105L))));
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@105/115]", (Object)"+105", null, Long.valueOf(105L))));
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@100/110]", (Object)"+100+105+106", null, Long.valueOf(106L))));
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@105/115]", (Object)"+105+106", null, Long.valueOf(106L))));
            }
            Assert.assertTrue((boolean)outputTopic.isEmpty());
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingExpiredWindowByGrace() {
        StreamsBuilder builder = new StreamsBuilder();
        String topic = "topic";
        builder.stream("topic", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy((Windows)TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(90L)).advanceBy(Duration.ofMillis(10L))).emitStrategy(this.emitStrategy).aggregate(() -> "", MockAggregator.toStringInstance("+"), this.setMaterializedCache(Materialized.as((String)"topic1-Canonicalized").withValueSerde(Serdes.String()).withLoggingDisabled())).toStream().map((key, value) -> new KeyValue((Object)key.toString(), value)).to("output");
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamWindowAggregate.class);
             TopologyTestDriver driver = new TopologyTestDriver(builder.build(), this.props);){
            TestInputTopic inputTopic = driver.createInputTopic("topic", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            inputTopic.pipeInput((Object)"k", (Object)"100", 200L);
            inputTopic.pipeInput((Object)"k", (Object)"0", 100L);
            inputTopic.pipeInput((Object)"k", (Object)"1", 101L);
            inputTopic.pipeInput((Object)"k", (Object)"2", 102L);
            inputTopic.pipeInput((Object)"k", (Object)"3", 103L);
            inputTopic.pipeInput((Object)"k", (Object)"4", 104L);
            inputTopic.pipeInput((Object)"k", (Object)"5", 105L);
            inputTopic.pipeInput((Object)"k", (Object)"6", 6L);
            this.assertLatenessMetrics(driver, (Matcher<Object>)CoreMatchers.is((Object)7.0), (Matcher<Object>)CoreMatchers.is((Object)194.0), (Matcher<Object>)CoreMatchers.is((Object)97.375));
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItems((Object[])new String[]{"Skipping record for expired window. topic=[topic] partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110] streamTime=[200]", "Skipping record for expired window. topic=[topic] partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110] streamTime=[200]"}));
            if (!this.emitFinal) {
                TestOutputTopic outputTopic = driver.createOutputTopic("output", (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
                MatcherAssert.assertThat((Object)outputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)"[k@200/210]", (Object)"+100", null, Long.valueOf(200L))));
                Assert.assertTrue((boolean)outputTopic.isEmpty());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotEmitFinalIfNotProgressEnough() throws IOException {
        File stateDir = TestUtils.tempDirectory();
        long windowSize = 10L;
        TimeWindows windows = TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L));
        try {
            this.props.put("__emit.interval.ms.kstreams.windowed.aggregation__", (Object)0);
            MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = this.makeContext(stateDir, 10L);
            KStreamWindowAggregate processorSupplier = new KStreamWindowAggregate((Windows)windows, WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER);
            Processor processor = processorSupplier.get();
            processor.init(context);
            context.setSystemTimeMs(0L);
            processor.process(new Record((Object)"A", (Object)"1", 0L));
            processor.process(new Record((Object)"B", (Object)"2", 5L));
            processor.process(new Record((Object)"C", (Object)"3", 15L));
            List forwarded = context.forwarded();
            List<MockProcessorContext.CapturedForward> expected = this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1", null), 0L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+2", null), 5L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1", null), 0L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+2", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+2", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"C", (Window)new TimeWindow(10L, 20L)), (Object)new Change((Object)"0+3", null), 15L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"C", (Window)new TimeWindow(15L, 25L)), (Object)new Change((Object)"0+3", null), 15L)));
            MatcherAssert.assertThat((Object)forwarded, (Matcher)CoreMatchers.is(expected));
            context.resetForwards();
            processor.process(new Record((Object)"D", (Object)"4", 15L));
            forwarded = context.forwarded();
            if (this.emitFinal) {
                Assert.assertTrue((boolean)forwarded.isEmpty());
            } else {
                expected = Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"D", (Window)new TimeWindow(10L, 20L)), (Object)new Change((Object)"0+4", null), 15L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"D", (Window)new TimeWindow(15L, 25L)), (Object)new Change((Object)"0+4", null), 15L)));
                MatcherAssert.assertThat((Object)forwarded, (Matcher)CoreMatchers.is(expected));
            }
            context.resetForwards();
            processor.process(new Record((Object)"E", (Object)"5", 19L));
            forwarded = context.forwarded();
            if (this.emitFinal) {
                Assert.assertTrue((boolean)forwarded.isEmpty());
            } else {
                expected = Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"E", (Window)new TimeWindow(10L, 20L)), (Object)new Change((Object)"0+5", null), 19L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"E", (Window)new TimeWindow(15L, 25L)), (Object)new Change((Object)"0+5", null), 19L)));
                MatcherAssert.assertThat((Object)forwarded, (Matcher)CoreMatchers.is(expected));
            }
            context.getStateStore(WINDOW_STORE_NAME).close();
        }
        finally {
            Utils.delete((File)stateDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldEmitWithInterval0() throws IOException {
        File stateDir = TestUtils.tempDirectory();
        long windowSize = 10L;
        TimeWindows windows = TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L));
        try {
            this.props.put("__emit.interval.ms.kstreams.windowed.aggregation__", (Object)0);
            MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = this.makeContext(stateDir, 10L);
            KStreamWindowAggregate processorSupplier = new KStreamWindowAggregate((Windows)windows, WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER);
            Processor processor = processorSupplier.get();
            processor.init(context);
            context.setSystemTimeMs(0L);
            processor.process(new Record((Object)"A", (Object)"1", 0L));
            processor.process(new Record((Object)"A", (Object)"1", 5L));
            processor.process(new Record((Object)"B", (Object)"2", 10L));
            processor.process(new Record((Object)"C", (Object)"3", 15L));
            processor.process(new Record((Object)"D", (Object)"4", 20L));
            List forwarded = context.forwarded();
            List<MockProcessorContext.CapturedForward> expected = this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1+1", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+1", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+2", null), 10L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1", null), 0L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1+1", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+1", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+2", null), 10L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), (Object)new Change((Object)"0+2", null), 10L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"C", (Window)new TimeWindow(10L, 20L)), (Object)new Change((Object)"0+3", null), 15L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"C", (Window)new TimeWindow(15L, 25L)), (Object)new Change((Object)"0+3", null), 15L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"D", (Window)new TimeWindow(15L, 25L)), (Object)new Change((Object)"0+4", null), 20L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"D", (Window)new TimeWindow(20L, 30L)), (Object)new Change((Object)"0+4", null), 20L)));
            MatcherAssert.assertThat((Object)forwarded, (Matcher)CoreMatchers.is(expected));
            context.getStateStore(WINDOW_STORE_NAME).close();
        }
        finally {
            Utils.delete((File)stateDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldEmitWithLargeInterval() throws IOException {
        File stateDir = TestUtils.tempDirectory();
        long windowSize = 10L;
        TimeWindows windows = TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L));
        try {
            List<MockProcessorContext.CapturedForward> expected;
            this.props.put("__emit.interval.ms.kstreams.windowed.aggregation__", (Object)1000L);
            MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = this.makeContext(stateDir, 10L);
            KStreamWindowAggregate processorSupplier = new KStreamWindowAggregate((Windows)windows, WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER);
            Processor processor = processorSupplier.get();
            processor.init(context);
            context.setSystemTimeMs(0L);
            processor.process(new Record((Object)"A", (Object)"1", 0L));
            processor.process(new Record((Object)"A", (Object)"1", 5L));
            processor.process(new Record((Object)"B", (Object)"2", 10L));
            processor.process(new Record((Object)"C", (Object)"3", 15L));
            List forwarded = context.forwarded();
            if (this.emitFinal) {
                Assert.assertTrue((boolean)forwarded.isEmpty());
            } else {
                expected = Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1", null), 0L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1+1", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+1", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+2", null), 10L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(10L, 20L)), (Object)new Change((Object)"0+2", null), 10L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"C", (Window)new TimeWindow(10L, 20L)), (Object)new Change((Object)"0+3", null), 15L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"C", (Window)new TimeWindow(15L, 25L)), (Object)new Change((Object)"0+3", null), 15L)));
                MatcherAssert.assertThat((Object)forwarded, (Matcher)CoreMatchers.is(expected));
            }
            context.resetForwards();
            context.setSystemTimeMs(10000L);
            processor.process(new Record((Object)"D", (Object)"4", 20L));
            forwarded = context.forwarded();
            expected = this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1+1", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+1", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+2", null), 10L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"D", (Window)new TimeWindow(15L, 25L)), (Object)new Change((Object)"0+4", null), 20L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"D", (Window)new TimeWindow(20L, 30L)), (Object)new Change((Object)"0+4", null), 20L)));
            MatcherAssert.assertThat((Object)forwarded, (Matcher)CoreMatchers.is(expected));
            context.resetForwards();
            context.setSystemTimeMs(10100L);
            processor.process(new Record((Object)"E", (Object)"5", 40L));
            forwarded = context.forwarded();
            if (this.emitFinal) {
                Assert.assertTrue((boolean)forwarded.isEmpty());
            } else {
                expected = Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"E", (Window)new TimeWindow(35L, 45L)), (Object)new Change((Object)"0+5", null), 40L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"E", (Window)new TimeWindow(40L, 50L)), (Object)new Change((Object)"0+5", null), 40L)));
                MatcherAssert.assertThat((Object)forwarded, (Matcher)CoreMatchers.is(expected));
            }
            context.getStateStore(WINDOW_STORE_NAME).close();
        }
        finally {
            Utils.delete((File)stateDir);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldEmitFromLastEmitTime() throws IOException {
        File stateDir = TestUtils.tempDirectory();
        long windowSize = 10L;
        TimeWindows windows = TimeWindows.ofSizeAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L));
        try {
            this.props.put("__emit.interval.ms.kstreams.windowed.aggregation__", (Object)0);
            MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = this.makeContext(stateDir, 10L);
            KStreamWindowAggregate processorSupplier = new KStreamWindowAggregate((Windows)windows, WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER);
            Processor processor = processorSupplier.get();
            processor.init(context);
            context.setSystemTimeMs(0L);
            processor.process(new Record((Object)"A", (Object)"1", 0L));
            processor.process(new Record((Object)"B", (Object)"2", 5L));
            processor.process(new Record((Object)"C", (Object)"3", 15L));
            List forwarded = context.forwarded();
            List<MockProcessorContext.CapturedForward> expected = this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1", null), 0L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+2", null), 5L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"A", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+1", null), 0L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(0L, 10L)), (Object)new Change((Object)"0+2", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+2", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"C", (Window)new TimeWindow(10L, 20L)), (Object)new Change((Object)"0+3", null), 15L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"C", (Window)new TimeWindow(15L, 25L)), (Object)new Change((Object)"0+3", null), 15L)));
            MatcherAssert.assertThat((Object)forwarded, (Matcher)CoreMatchers.is(expected));
            context.resetForwards();
            Processor newProcessor = processorSupplier.get();
            newProcessor.init(context);
            newProcessor.process(new Record((Object)"D", (Object)"4", 25L));
            forwarded = context.forwarded();
            expected = this.emitFinal ? Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"B", (Window)new TimeWindow(5L, 15L)), (Object)new Change((Object)"0+2", null), 5L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"C", (Window)new TimeWindow(10L, 20L)), (Object)new Change((Object)"0+3", null), 15L))) : Arrays.asList(new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"D", (Window)new TimeWindow(20L, 30L)), (Object)new Change((Object)"0+4", null), 25L)), new MockProcessorContext.CapturedForward(new Record((Object)new Windowed((Object)"D", (Window)new TimeWindow(25L, 35L)), (Object)new Change((Object)"0+4", null), 25L)));
            MatcherAssert.assertThat((Object)forwarded, (Matcher)CoreMatchers.is(expected));
            context.resetForwards();
            context.getStateStore(WINDOW_STORE_NAME).close();
        }
        finally {
            Utils.delete((File)stateDir);
        }
    }

    @Test
    public void showThrowIfEmitFinalUsedWithUnlimitedWindow() {
        if (this.emitFinal) {
            IllegalArgumentException e = (IllegalArgumentException)Assert.assertThrows(IllegalArgumentException.class, () -> new KStreamWindowAggregate((Windows)UnlimitedWindows.of(), WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER));
            MatcherAssert.assertThat((Object)e.getMessage(), (Matcher)CoreMatchers.is((Object)"ON_WINDOW_CLOSE strategy is only supported for TimeWindows and SlidingWindows for TimeWindowedKStream"));
        } else {
            new KStreamWindowAggregate((Windows)UnlimitedWindows.of(), WINDOW_STORE_NAME, this.emitStrategy, MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER);
        }
    }

    private TimestampedWindowStore<String, String> getWindowStore(long windowSize) {
        Object supplier = this.emitFinal ? RocksDbIndexedTimeOrderedWindowBytesStoreSupplier.create((String)WINDOW_STORE_NAME, (Duration)Duration.ofDays(1L), (Duration)Duration.ofMillis(windowSize), (boolean)false, (boolean)false) : Stores.persistentTimestampedWindowStore((String)WINDOW_STORE_NAME, (Duration)Duration.ofDays(1L), (Duration)Duration.ofMillis(windowSize), (boolean)false);
        return (TimestampedWindowStore)Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)supplier, (Serde)Serdes.String(), (Serde)Serdes.String()).withLoggingDisabled().withCachingDisabled().build();
    }

    private MockInternalNewProcessorContext<Windowed<String>, Change<String>> makeContext(File stateDir, long windowSize) {
        MockInternalNewProcessorContext<Windowed<String>, Change<String>> context = new MockInternalNewProcessorContext<Windowed<String>, Change<String>>(this.props, new TaskId(0, 0), stateDir);
        context.setCurrentNode(new ProcessorNode("testNode"));
        TimestampedWindowStore<String, String> store = this.getWindowStore(windowSize);
        store.init(context.getStateStoreContext(), store);
        context.getStateStoreContext().register(store, null);
        return context;
    }

    private void assertLatenessMetrics(TopologyTestDriver driver, Matcher<Object> dropTotal, Matcher<Object> maxLateness, Matcher<Object> avgLateness) {
        MetricName dropTotalMetric = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MetricName dropRateMetric = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MetricName latenessMaxMetric = new MetricName("record-lateness-max", "stream-task-metrics", "The observed maximum lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MetricName latenessAvgMetric = new MetricName("record-lateness-avg", "stream-task-metrics", "The observed average lateness of records in milliseconds, measured by comparing the record timestamp with the current stream time", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(dropTotalMetric)).metricValue(), dropTotal);
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(dropRateMetric)).metricValue(), (Matcher)CoreMatchers.not((Object)0.0));
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(latenessMaxMetric)).metricValue(), maxLateness);
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(latenessAvgMetric)).metricValue(), avgLateness);
    }

    private void assertEmittedMetrics(TopologyTestDriver driver, Matcher<Object> emittedTotal) {
        MetricName emittedTotalMetric = new MetricName("window-aggregate-final-emit-total", "stream-processor-node-metrics", "The total number of emit final records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)"processor-node-id", (Object)"KSTREAM-AGGREGATE-0000000001")}));
        MetricName emittedRateMetric = new MetricName("window-aggregate-final-emit-rate", "stream-processor-node-metrics", "The average number of emit final records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)"processor-node-id", (Object)"KSTREAM-AGGREGATE-0000000001")}));
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(emittedTotalMetric)).metricValue(), emittedTotal);
        MatcherAssert.assertThat((Object)((Metric)driver.metrics().get(emittedRateMetric)).metricValue(), (Matcher)CoreMatchers.not((Object)0.0));
    }

    private <K, V, S extends StateStore> Materialized<K, V, S> setMaterializedCache(Materialized<K, V, S> materialized) {
        if (this.withCache) {
            return materialized.withCachingEnabled();
        }
        return materialized.withCachingDisabled();
    }
}

