/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class AdjustStreamThreadCountTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    @Rule
    public TestName testName = new TestName();
    private final List<KafkaStreams.State> stateTransitionHistory = new ArrayList<KafkaStreams.State>();
    private static String inputTopic;
    private static StreamsBuilder builder;
    private static Properties properties;
    private static String appId;
    public static final Duration DEFAULT_DURATION;

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void setup() {
        String testId = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        appId = "appId_" + testId;
        inputTopic = "input" + testId;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic);
        builder = new StreamsBuilder();
        builder.stream(inputTopic);
        properties = Utils.mkObjectProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"bootstrap.servers", (Object)CLUSTER.bootstrapServers()), Utils.mkEntry((Object)"application.id", (Object)appId), Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath()), Utils.mkEntry((Object)"num.stream.threads", (Object)2), Utils.mkEntry((Object)"default.key.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)"default.value.serde", Serdes.StringSerde.class), Utils.mkEntry((Object)"session.timeout.ms", (Object)10000)}));
    }

    private void startStreamsAndWaitForRunning(KafkaStreams kafkaStreams) throws InterruptedException {
        kafkaStreams.start();
        this.waitForRunning();
    }

    @After
    public void teardown() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(properties);
    }

    private void addStreamStateChangeListener(KafkaStreams kafkaStreams) {
        kafkaStreams.setStateListener((newState, oldState) -> this.stateTransitionHistory.add(newState));
    }

    private void waitForRunning() throws InterruptedException {
        TestUtils.waitForCondition(() -> !this.stateTransitionHistory.isEmpty() && this.stateTransitionHistory.get(this.stateTransitionHistory.size() - 1).equals((Object)KafkaStreams.State.RUNNING), (long)DEFAULT_DURATION.toMillis(), () -> String.format("Client did not transit to state %s in %d seconds", KafkaStreams.State.RUNNING, DEFAULT_DURATION.toMillis() / 1000L));
    }

    private void waitForTransitionFromRebalancingToRunning() throws InterruptedException {
        this.waitForRunning();
        int historySize = this.stateTransitionHistory.size();
        MatcherAssert.assertThat((String)("Client did not transit from REBALANCING to RUNNING. The observed state transitions are: " + this.stateTransitionHistory), (Object)(historySize >= 2 && this.stateTransitionHistory.get(historySize - 2).equals((Object)KafkaStreams.State.REBALANCING) && this.stateTransitionHistory.get(historySize - 1).equals((Object)KafkaStreams.State.RUNNING) ? 1 : 0), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldAddStreamThread() throws Exception {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            this.addStreamStateChangeListener(kafkaStreams);
            this.startStreamsAndWaitForRunning(kafkaStreams);
            int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), (Matcher)CoreMatchers.equalTo((Object)new String[]{"1", "2"}));
            this.stateTransitionHistory.clear();
            Optional name = kafkaStreams.addStreamThread();
            MatcherAssert.assertThat((Object)name, (Matcher)CoreMatchers.not(Optional.empty()));
            TestUtils.waitForCondition(() -> ((Stream)kafkaStreams.metadataForLocalThreads().stream().sequential()).map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.orElse(""))), (String)"Wait for the thread to be added");
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)(oldThreadCount + 1)));
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), (Matcher)CoreMatchers.equalTo((Object)new String[]{"1", "2", "3"}));
            this.waitForTransitionFromRebalancingToRunning();
        }
    }

    @Test
    public void shouldRemoveStreamThread() throws Exception {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            this.addStreamStateChangeListener(kafkaStreams);
            this.startStreamsAndWaitForRunning(kafkaStreams);
            int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            MatcherAssert.assertThat((Object)((String)kafkaStreams.removeStreamThread().get()).split("-")[0], (Matcher)CoreMatchers.equalTo((Object)appId));
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)(oldThreadCount - 1)));
            this.waitForTransitionFromRebalancingToRunning();
        }
    }

    @Test
    public void shouldRemoveStreamThreadWithStaticMembership() throws Exception {
        properties.put("group.instance.id", "member-A");
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            this.addStreamStateChangeListener(kafkaStreams);
            this.startStreamsAndWaitForRunning(kafkaStreams);
            int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            MatcherAssert.assertThat((Object)((String)kafkaStreams.removeStreamThread().get()).split("-")[0], (Matcher)CoreMatchers.equalTo((Object)appId));
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)(oldThreadCount - 1)));
            this.waitForTransitionFromRebalancingToRunning();
        }
    }

    @Test
    public void shouldnNotRemoveStreamThreadWithinTimeout() throws Exception {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            this.addStreamStateChangeListener(kafkaStreams);
            this.startStreamsAndWaitForRunning(kafkaStreams);
            Assert.assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));
        }
    }

    @Test
    public void shouldAddAndRemoveThreadsMultipleTimes() throws InterruptedException {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            this.addStreamStateChangeListener(kafkaStreams);
            this.startStreamsAndWaitForRunning(kafkaStreams);
            int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            CountDownLatch latch = new CountDownLatch(2);
            Thread one = this.adjustCountHelperThread(kafkaStreams, 4, latch);
            Thread two = this.adjustCountHelperThread(kafkaStreams, 6, latch);
            two.start();
            one.start();
            latch.await(30L, TimeUnit.SECONDS);
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)oldThreadCount));
            this.waitForTransitionFromRebalancingToRunning();
        }
    }

    private Thread adjustCountHelperThread(KafkaStreams kafkaStreams, int count, CountDownLatch latch) {
        return new Thread(() -> {
            for (int i = 0; i < count; ++i) {
                kafkaStreams.addStreamThread();
                kafkaStreams.removeStreamThread();
            }
            latch.countDown();
        });
    }

    @Test
    public void shouldAddAndRemoveStreamThreadsWhileKeepingNamesCorrect() throws Exception {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            this.addStreamStateChangeListener(kafkaStreams);
            this.startStreamsAndWaitForRunning(kafkaStreams);
            int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), (Matcher)CoreMatchers.equalTo((Object)new String[]{"1", "2"}));
            Optional name = kafkaStreams.addStreamThread();
            MatcherAssert.assertThat((String)"New thread has index 3", (boolean)"3".equals(((String)name.get()).split("-StreamThread-")[1]));
            TestUtils.waitForCondition(() -> ((Stream)kafkaStreams.metadataForLocalThreads().stream().sequential()).map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name.get())), (String)"Stream thread has not been added");
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)(oldThreadCount + 1)));
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), (Matcher)CoreMatchers.equalTo((Object)new String[]{"1", "2", "3"}));
            this.waitForTransitionFromRebalancingToRunning();
            oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
            this.stateTransitionHistory.clear();
            Optional removedThread = kafkaStreams.removeStreamThread();
            MatcherAssert.assertThat((Object)removedThread, (Matcher)CoreMatchers.not(Optional.empty()));
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)(oldThreadCount - 1)));
            this.waitForTransitionFromRebalancingToRunning();
            this.stateTransitionHistory.clear();
            Optional name2 = kafkaStreams.addStreamThread();
            MatcherAssert.assertThat((Object)name2, (Matcher)CoreMatchers.not(Optional.empty()));
            TestUtils.waitForCondition(() -> ((Stream)kafkaStreams.metadataForLocalThreads().stream().sequential()).map(ThreadMetadata::threadName).anyMatch(t -> t.equals(name2.orElse(""))), (String)"Wait for the thread to be added");
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().size(), (Matcher)CoreMatchers.equalTo((Object)oldThreadCount));
            MatcherAssert.assertThat((Object)kafkaStreams.metadataForLocalThreads().stream().map(t -> t.threadName().split("-StreamThread-")[1]).sorted().toArray(), (Matcher)CoreMatchers.equalTo((Object)new String[]{"1", "2", "3"}));
            MatcherAssert.assertThat((String)"the new thread should have received the old threads name", (boolean)name2.equals(removedThread));
            this.waitForTransitionFromRebalancingToRunning();
        }
    }

    @Test
    public void testConcurrentlyAccessThreads() throws InterruptedException {
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);){
            this.addStreamStateChangeListener(kafkaStreams);
            this.startStreamsAndWaitForRunning(kafkaStreams);
            int oldThreadCount = kafkaStreams.metadataForLocalThreads().size();
            int threadCount = 5;
            int loop = 3;
            AtomicReference lastException = new AtomicReference();
            ExecutorService executor = Executors.newFixedThreadPool(5);
            for (int threadIndex = 0; threadIndex < 5; ++threadIndex) {
                executor.execute(() -> {
                    try {
                        for (int i = 0; i < 4; ++i) {
                            if (!kafkaStreams.addStreamThread().isPresent()) {
                                throw new RuntimeException("failed to create stream thread");
                            }
                            kafkaStreams.metadataForLocalThreads();
                            if (kafkaStreams.removeStreamThread().isPresent()) continue;
                            throw new RuntimeException("failed to delete a stream thread");
                        }
                    }
                    catch (Exception e) {
                        lastException.set(e);
                    }
                });
            }
            executor.shutdown();
            Assert.assertTrue((boolean)executor.awaitTermination(60L, TimeUnit.SECONDS));
            Assert.assertNull(lastException.get());
            Assert.assertEquals((long)oldThreadCount, (long)kafkaStreams.metadataForLocalThreads().size());
        }
    }

    @Test
    public void shouldResizeCacheAfterThreadRemovalTimesOut() throws InterruptedException {
        long totalCacheBytes = 10L;
        Properties props = new Properties();
        props.putAll((Map<?, ?>)properties);
        props.put("num.stream.threads", (Object)2);
        props.put("cache.max.bytes.buffering", (Object)10L);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);){
            this.addStreamStateChangeListener(kafkaStreams);
            this.startStreamsAndWaitForRunning(kafkaStreams);
            try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KafkaStreams.class);){
                Assert.assertThrows(TimeoutException.class, () -> kafkaStreams.removeStreamThread(Duration.ofSeconds(0L)));
                Iterator<String> iterator = appender.getMessages().iterator();
                while (true) {
                    if (iterator.hasNext()) {
                        String log = iterator.next();
                        if (!log.endsWith("Resizing thread cache due to thread removal, new cache size per thread is 10")) continue;
                        return;
                        continue;
                    }
                    break;
                }
            }
        }
        Assert.fail();
    }

    @Test
    public void shouldResizeCacheAfterThreadReplacement() throws InterruptedException {
        long totalCacheBytes = 10L;
        Properties props = new Properties();
        props.putAll((Map<?, ?>)properties);
        props.put("num.stream.threads", (Object)2);
        props.put("cache.max.bytes.buffering", (Object)10L);
        final AtomicBoolean injectError = new AtomicBoolean(false);
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream(inputTopic);
        stream.transform(() -> new Transformer<String, String, KeyValue<String, String>>(){

            public void init(ProcessorContext context) {
                context.schedule(Duration.ofSeconds(1L), PunctuationType.WALL_CLOCK_TIME, timestamp -> {
                    if (Thread.currentThread().getName().endsWith("StreamThread-1") && injectError.get()) {
                        injectError.set(false);
                        throw new RuntimeException("BOOM");
                    }
                });
            }

            public KeyValue<String, String> transform(String key, String value) {
                return new KeyValue((Object)key, (Object)value);
            }

            public void close() {
            }
        }, new String[0]);
        try (KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);){
            this.addStreamStateChangeListener(kafkaStreams);
            kafkaStreams.setUncaughtExceptionHandler(e -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
            this.startStreamsAndWaitForRunning(kafkaStreams);
            this.stateTransitionHistory.clear();
            try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister();){
                injectError.set(true);
                TestUtils.waitForCondition(() -> !injectError.get(), (String)"StreamThread did not hit and reset the injected error");
                this.waitForTransitionFromRebalancingToRunning();
                Iterator<String> iterator = appender.getMessages().iterator();
                while (true) {
                    if (iterator.hasNext()) {
                        String log = iterator.next();
                        if (!log.endsWith("Adding StreamThread-3, there will now be 2 live threads and the new cache size per thread is 5")) continue;
                        return;
                        continue;
                    }
                    break;
                }
            }
        }
        Assert.fail();
    }

    static {
        appId = "";
        DEFAULT_DURATION = Duration.ofSeconds(30L);
    }
}

