/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResolverJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.processor.MockProcessorContext;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.Test;

public class SubscriptionResolverJoinProcessorSupplierTest {
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final ValueJoiner<String, String, String> JOINER = (value1, value2) -> "(" + value1 + "," + value2 + ")";

    @Test
    public void shouldNotForwardWhenHashDoesNotMatch() {
        TestKTableValueGetterSupplier<String, String> valueGetterSupplier = new TestKTableValueGetterSupplier<String, String>();
        boolean leftJoin = false;
        SubscriptionResolverJoinProcessorSupplier processorSupplier = new SubscriptionResolverJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, false);
        Processor processor = processorSupplier.get();
        MockProcessorContext context = new MockProcessorContext();
        processor.init((ProcessorContext)context);
        context.setRecordMetadata("topic", 0, 0L, (Headers)new RecordHeaders(), 0L);
        valueGetterSupplier.put("lhs1", "lhsValue");
        long[] oldHash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "oldLhsValue"));
        processor.process((Object)"lhs1", (Object)new SubscriptionResponseWrapper(oldHash, (Object)"rhsValue"));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded, (Matcher)IsEmptyCollection.empty());
    }

    @Test
    public void shouldIgnoreUpdateWhenLeftHasBecomeNull() {
        TestKTableValueGetterSupplier<String, Object> valueGetterSupplier = new TestKTableValueGetterSupplier<String, Object>();
        boolean leftJoin = false;
        SubscriptionResolverJoinProcessorSupplier processorSupplier = new SubscriptionResolverJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, false);
        Processor processor = processorSupplier.get();
        MockProcessorContext context = new MockProcessorContext();
        processor.init((ProcessorContext)context);
        context.setRecordMetadata("topic", 0, 0L, (Headers)new RecordHeaders(), 0L);
        valueGetterSupplier.put("lhs1", null);
        long[] hash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
        processor.process((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, (Object)"rhsValue"));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded, (Matcher)IsEmptyCollection.empty());
    }

    @Test
    public void shouldForwardWhenHashMatches() {
        TestKTableValueGetterSupplier<String, String> valueGetterSupplier = new TestKTableValueGetterSupplier<String, String>();
        boolean leftJoin = false;
        SubscriptionResolverJoinProcessorSupplier processorSupplier = new SubscriptionResolverJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, false);
        Processor processor = processorSupplier.get();
        MockProcessorContext context = new MockProcessorContext();
        processor.init((ProcessorContext)context);
        context.setRecordMetadata("topic", 0, 0L, (Headers)new RecordHeaders(), 0L);
        valueGetterSupplier.put("lhs1", "lhsValue");
        long[] hash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
        processor.process((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, (Object)"rhsValue"));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((MockProcessorContext.CapturedForward)forwarded.get(0)).keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)"lhs1", (Object)"(lhsValue,rhsValue)")));
    }

    @Test
    public void shouldEmitTombstoneForInnerJoinWhenRightIsNull() {
        TestKTableValueGetterSupplier<String, String> valueGetterSupplier = new TestKTableValueGetterSupplier<String, String>();
        boolean leftJoin = false;
        SubscriptionResolverJoinProcessorSupplier processorSupplier = new SubscriptionResolverJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, false);
        Processor processor = processorSupplier.get();
        MockProcessorContext context = new MockProcessorContext();
        processor.init((ProcessorContext)context);
        context.setRecordMetadata("topic", 0, 0L, (Headers)new RecordHeaders(), 0L);
        valueGetterSupplier.put("lhs1", "lhsValue");
        long[] hash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
        processor.process((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, null));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((MockProcessorContext.CapturedForward)forwarded.get(0)).keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)"lhs1", null)));
    }

    @Test
    public void shouldEmitResultForLeftJoinWhenRightIsNull() {
        TestKTableValueGetterSupplier<String, String> valueGetterSupplier = new TestKTableValueGetterSupplier<String, String>();
        boolean leftJoin = true;
        SubscriptionResolverJoinProcessorSupplier processorSupplier = new SubscriptionResolverJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, true);
        Processor processor = processorSupplier.get();
        MockProcessorContext context = new MockProcessorContext();
        processor.init((ProcessorContext)context);
        context.setRecordMetadata("topic", 0, 0L, (Headers)new RecordHeaders(), 0L);
        valueGetterSupplier.put("lhs1", "lhsValue");
        long[] hash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
        processor.process((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, null));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((MockProcessorContext.CapturedForward)forwarded.get(0)).keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)"lhs1", (Object)"(lhsValue,null)")));
    }

    @Test
    public void shouldEmitTombstoneForLeftJoinWhenRightIsNullAndLeftIsNull() {
        TestKTableValueGetterSupplier<String, Object> valueGetterSupplier = new TestKTableValueGetterSupplier<String, Object>();
        boolean leftJoin = true;
        SubscriptionResolverJoinProcessorSupplier processorSupplier = new SubscriptionResolverJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, true);
        Processor processor = processorSupplier.get();
        MockProcessorContext context = new MockProcessorContext();
        processor.init((ProcessorContext)context);
        context.setRecordMetadata("topic", 0, 0L, (Headers)new RecordHeaders(), 0L);
        valueGetterSupplier.put("lhs1", null);
        long[] hash = null;
        processor.process((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, null));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((MockProcessorContext.CapturedForward)forwarded.get(0)).keyValue(), (Matcher)CoreMatchers.is((Object)new KeyValue((Object)"lhs1", null)));
    }

    private static class TestKTableValueGetterSupplier<K, V>
    implements KTableValueGetterSupplier<K, V> {
        private final Map<K, V> map = new HashMap();

        private TestKTableValueGetterSupplier() {
        }

        public KTableValueGetter<K, V> get() {
            return new KTableValueGetter<K, V>(){

                public void init(ProcessorContext context) {
                }

                public ValueAndTimestamp<V> get(K key) {
                    return ValueAndTimestamp.make(map.get(key), (long)-1L);
                }
            };
        }

        public String[] storeNames() {
            return new String[0];
        }

        void put(K key, V value) {
            this.map.put(key, value);
        }
    }
}

