package org.springframework.cloud.stream.binder.kafka.streams;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Predicate;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.boot.context.properties.bind.BindContext;
import org.springframework.boot.context.properties.bind.BindHandler;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.bind.PropertySourcesPlaceholdersResolver;
import org.springframework.boot.context.properties.source.ConfigurationPropertyName;
import org.springframework.boot.context.properties.source.ConfigurationPropertySources;
import org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.GlobalKTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.KTableBoundElementFactory;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsConsumerProperties;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsExtendedBindingProperties;
import org.springframework.cloud.stream.config.BindingProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.ResolvableType;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MutablePropertySources;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.streams.RecoveringDeserializationExceptionHandler;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor.class */
public abstract class AbstractKafkaStreamsBinderProcessor implements ApplicationContextAware {
    private static final Log LOG = LogFactory.getLog(AbstractKafkaStreamsBinderProcessor.class);
    private final KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue;
    private final BindingServiceProperties bindingServiceProperties;
    private final KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties;
    private final CleanupConfig cleanupConfig;
    private final KeyValueSerdeResolver keyValueSerdeResolver;
    protected ConfigurableApplicationContext applicationContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor$4, reason: invalid class name */
    /* loaded from: input_file:org/springframework/cloud/stream/binder/kafka/streams/AbstractKafkaStreamsBinderProcessor$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$springframework$cloud$stream$binder$kafka$properties$KafkaConsumerProperties$StartOffset = new int[KafkaConsumerProperties.StartOffset.values().length];

        static {
            try {
                $SwitchMap$org$springframework$cloud$stream$binder$kafka$properties$KafkaConsumerProperties$StartOffset[KafkaConsumerProperties.StartOffset.earliest.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$springframework$cloud$stream$binder$kafka$properties$KafkaConsumerProperties$StartOffset[KafkaConsumerProperties.StartOffset.latest.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public AbstractKafkaStreamsBinderProcessor(BindingServiceProperties bindingServiceProperties, KafkaStreamsBindingInformationCatalogue kafkaStreamsBindingInformationCatalogue, KafkaStreamsExtendedBindingProperties kafkaStreamsExtendedBindingProperties, KeyValueSerdeResolver keyValueSerdeResolver, CleanupConfig cleanupConfig) {
        this.bindingServiceProperties = bindingServiceProperties;
        this.kafkaStreamsBindingInformationCatalogue = kafkaStreamsBindingInformationCatalogue;
        this.kafkaStreamsExtendedBindingProperties = kafkaStreamsExtendedBindingProperties;
        this.keyValueSerdeResolver = keyValueSerdeResolver;
        this.cleanupConfig = cleanupConfig;
    }

    public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Topology.AutoOffsetReset getAutoOffsetReset(String str, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties) {
        KafkaConsumerProperties.StartOffset startOffset = kafkaStreamsConsumerProperties.getStartOffset();
        Topology.AutoOffsetReset autoOffsetReset = null;
        if (startOffset != null) {
            switch (AnonymousClass4.$SwitchMap$org$springframework$cloud$stream$binder$kafka$properties$KafkaConsumerProperties$StartOffset[startOffset.ordinal()]) {
                case 1:
                    autoOffsetReset = Topology.AutoOffsetReset.EARLIEST;
                    break;
                case 2:
                    autoOffsetReset = Topology.AutoOffsetReset.LATEST;
                    break;
            }
        }
        if (kafkaStreamsConsumerProperties.isResetOffsets()) {
            LOG.warn("Detected resetOffsets configured on binding " + str + ". Setting resetOffsets in Kafka Streams binder does not have any effect.");
        }
        return autoOffsetReset;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleKTableGlobalKTableInputs(Object[] objArr, int i, String str, Class<?> cls, Object obj, StreamsBuilderFactoryBean streamsBuilderFactoryBean, StreamsBuilder streamsBuilder, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, Serde<?> serde, Serde<?> serde2, Topology.AutoOffsetReset autoOffsetReset, boolean z) {
        if (z) {
            addStateStoreBeans(streamsBuilder);
        }
        if (cls.isAssignableFrom(KTable.class)) {
            KTable<?, ?> kTable = getKTable(kafkaStreamsConsumerProperties, streamsBuilder, serde, serde2, kafkaStreamsConsumerProperties.getMaterializedAs(), this.bindingServiceProperties.getBindingDestination(str), autoOffsetReset);
            ((KTableBoundElementFactory.KTableWrapper) obj).wrap(kTable);
            this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(str, streamsBuilderFactoryBean);
            this.kafkaStreamsBindingInformationCatalogue.addConsumerPropertiesPerSbfb(streamsBuilderFactoryBean, this.bindingServiceProperties.getConsumerProperties(str));
            objArr[i] = kTable;
            return;
        }
        if (cls.isAssignableFrom(GlobalKTable.class)) {
            GlobalKTable<?, ?> globalKTable = getGlobalKTable(kafkaStreamsConsumerProperties, streamsBuilder, serde, serde2, kafkaStreamsConsumerProperties.getMaterializedAs(), this.bindingServiceProperties.getBindingDestination(str), autoOffsetReset);
            ((GlobalKTableBoundElementFactory.GlobalKTableWrapper) obj).wrap(globalKTable);
            this.kafkaStreamsBindingInformationCatalogue.addStreamBuilderFactoryPerBinding(str, streamsBuilderFactoryBean);
            this.kafkaStreamsBindingInformationCatalogue.addConsumerPropertiesPerSbfb(streamsBuilderFactoryBean, this.bindingServiceProperties.getConsumerProperties(str));
            objArr[i] = globalKTable;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamsBuilderFactoryBean buildStreamsBuilderAndRetrieveConfig(String str, ApplicationContext applicationContext, String str2, KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties, StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer, ConfigurableEnvironment configurableEnvironment, BindingProperties bindingProperties) {
        KafkaStreamsBinderConfigurationProperties.Functions functions;
        BeanDefinitionRegistry beanFactory = this.applicationContext.getBeanFactory();
        Map<String, Object> hashMap = new HashMap<>((Map<? extends String, ? extends Object>) applicationContext.getBean("streamConfigGlobalProperties", Map.class));
        if (kafkaStreamsBinderConfigurationProperties != null) {
            Map<String, KafkaStreamsBinderConfigurationProperties.Functions> functions2 = kafkaStreamsBinderConfigurationProperties.getFunctions();
            if (!CollectionUtils.isEmpty(functions2) && (functions = functions2.get(str)) != null) {
                Map<? extends String, ? extends Object> configuration = functions.getConfiguration();
                if (!CollectionUtils.isEmpty(configuration)) {
                    hashMap.putAll(configuration);
                }
                Object applicationId = functions.getApplicationId();
                if (!StringUtils.isEmpty(applicationId)) {
                    hashMap.put("application.id", applicationId);
                }
            }
        }
        MutablePropertySources propertySources = configurableEnvironment.getPropertySources();
        if (!StringUtils.isEmpty(bindingProperties.getBinder())) {
            KafkaStreamsBinderConfigurationProperties kafkaStreamsBinderConfigurationProperties2 = (KafkaStreamsBinderConfigurationProperties) applicationContext.getBean(bindingProperties.getBinder() + "-KafkaStreamsBinderConfigurationProperties", KafkaStreamsBinderConfigurationProperties.class);
            Object kafkaConnectionString = kafkaStreamsBinderConfigurationProperties2.getKafkaConnectionString();
            if (StringUtils.isEmpty(kafkaConnectionString)) {
                kafkaConnectionString = (String) propertySources.get(bindingProperties.getBinder() + "-kafkaStreamsBinderEnv").getProperty("spring.cloud.stream.kafka.binder.brokers");
            }
            hashMap.put("bootstrap.servers", kafkaConnectionString);
            String applicationId2 = kafkaStreamsBinderConfigurationProperties2.getApplicationId();
            if (StringUtils.hasText(applicationId2)) {
                hashMap.put("application.id", applicationId2);
            }
            if (kafkaStreamsBinderConfigurationProperties2.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndContinue) {
                hashMap.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
            } else if (kafkaStreamsBinderConfigurationProperties2.getDeserializationExceptionHandler() == DeserializationExceptionHandler.logAndFail) {
                hashMap.put("default.deserialization.exception.handler", LogAndFailExceptionHandler.class);
            } else if (kafkaStreamsBinderConfigurationProperties2.getDeserializationExceptionHandler() == DeserializationExceptionHandler.sendToDlq) {
                hashMap.put("default.deserialization.exception.handler", RecoveringDeserializationExceptionHandler.class);
                hashMap.put("spring.deserialization.recoverer", (SendToDlqAndContinue) applicationContext.getBean(SendToDlqAndContinue.class));
            }
            if (!ObjectUtils.isEmpty(kafkaStreamsBinderConfigurationProperties2.getConfiguration())) {
                hashMap.putAll(kafkaStreamsBinderConfigurationProperties2.getConfiguration());
            }
            if (!hashMap.containsKey("replication.factor")) {
                hashMap.put("replication.factor", Integer.valueOf(kafkaStreamsBinderConfigurationProperties2.getReplicationFactor()));
            }
        }
        KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties = (KafkaStreamsConsumerProperties) this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(str2);
        Map<? extends String, ? extends Object> configuration2 = kafkaStreamsConsumerProperties.getConfiguration();
        Assert.state(!configuration2.containsKey("bootstrap.servers"), "bootstrap.servers cannot be overridden at the binding level; use multiple binders instead");
        hashMap.putAll(configuration2);
        String applicationId3 = kafkaStreamsConsumerProperties.getApplicationId();
        if (StringUtils.hasText(applicationId3)) {
            hashMap.put("application.id", applicationId3);
        }
        hashMap.computeIfAbsent("application.id", str3 -> {
            String str3 = str + "-applicationId";
            LOG.info("Binder Generated Kafka Streams Application ID: " + str3);
            LOG.info("Use the binder generated application ID only for development and testing. ");
            LOG.info("For production deployments, please consider explicitly setting an application ID using a configuration property.");
            LOG.info("The generated applicationID is static and will be preserved over application restarts.");
            return str3;
        });
        handleConcurrency(applicationContext, str2, hashMap);
        DeserializationExceptionHandler deserializationExceptionHandler = kafkaStreamsConsumerProperties.getDeserializationExceptionHandler();
        if (deserializationExceptionHandler == DeserializationExceptionHandler.logAndFail) {
            hashMap.put("default.deserialization.exception.handler", LogAndFailExceptionHandler.class);
        } else if (deserializationExceptionHandler == DeserializationExceptionHandler.logAndContinue) {
            hashMap.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        } else if (deserializationExceptionHandler == DeserializationExceptionHandler.sendToDlq) {
            hashMap.put("default.deserialization.exception.handler", RecoveringDeserializationExceptionHandler.class);
            hashMap.put("spring.deserialization.recoverer", applicationContext.getBean(SendToDlqAndContinue.class));
        } else if (deserializationExceptionHandler == DeserializationExceptionHandler.skipAndContinue) {
            hashMap.put("default.deserialization.exception.handler", SkipAndContinueExceptionHandler.class);
        }
        KafkaStreamsConfiguration kafkaStreamsConfiguration = new KafkaStreamsConfiguration(hashMap);
        StreamsBuilderFactoryBean streamsBuilderFactoryBean = this.cleanupConfig == null ? new StreamsBuilderFactoryBean(kafkaStreamsConfiguration) : new StreamsBuilderFactoryBean(kafkaStreamsConfiguration, this.cleanupConfig);
        streamsBuilderFactoryBean.setAutoStartup(false);
        beanFactory.registerBeanDefinition("stream-builder-" + str, BeanDefinitionBuilder.genericBeanDefinition(streamsBuilderFactoryBean.getClass(), () -> {
            return streamsBuilderFactoryBean;
        }).getRawBeanDefinition());
        kafkaStreamsConsumerProperties.setApplicationId((String) hashMap.get("application.id"));
        StreamsBuilderFactoryBean streamsBuilderFactoryBean2 = (StreamsBuilderFactoryBean) applicationContext.getBean("&stream-builder-" + str, StreamsBuilderFactoryBean.class);
        if (streamsBuilderFactoryBeanConfigurer != null) {
            streamsBuilderFactoryBeanConfigurer.configure(streamsBuilderFactoryBean);
        }
        return streamsBuilderFactoryBean2;
    }

    private void handleConcurrency(ApplicationContext applicationContext, final String str, Map<String, Object> map) {
        final boolean[] zArr = {false};
        try {
            new Binder(ConfigurationPropertySources.get(applicationContext.getEnvironment()), new PropertySourcesPlaceholdersResolver(applicationContext.getEnvironment()), IntegrationUtils.getConversionService(this.applicationContext.getBeanFactory()), (Consumer) null).bind("spring.cloud.stream", Bindable.ofInstance(new BindingServiceProperties()), new BindHandler() { // from class: org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor.1
                public Object onSuccess(ConfigurationPropertyName configurationPropertyName, Bindable<?> bindable, BindContext bindContext, Object obj) {
                    if (!zArr[0]) {
                        zArr[0] = configurationPropertyName.getLastElement(ConfigurationPropertyName.Form.UNIFORM).equals("concurrency") && ConfigurationPropertyName.of(new StringBuilder().append("spring.cloud.stream.bindings.").append(str.toLowerCase()).append(".consumer").toString()).isAncestorOf(configurationPropertyName);
                    }
                    return obj;
                }
            });
        } catch (Exception e) {
        }
        int concurrency = this.bindingServiceProperties.getConsumerProperties(str).getConcurrency();
        if (concurrency < 1 || !zArr[0]) {
            return;
        }
        map.put("num.stream.threads", Integer.valueOf(concurrency));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Serde<?> getValueSerde(String str, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, ResolvableType resolvableType) {
        if (!this.bindingServiceProperties.getConsumerProperties(str).isUseNativeDecoding()) {
            return Serdes.ByteArray();
        }
        return this.keyValueSerdeResolver.getInboundValueSerde(this.bindingServiceProperties.getBindingProperties(str).getConsumer(), kafkaStreamsConsumerProperties, resolvableType);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KStream<?, ?> getKStream(String str, BindingProperties bindingProperties, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, StreamsBuilder streamsBuilder, Serde<?> serde, Serde<?> serde2, Topology.AutoOffsetReset autoOffsetReset, boolean z) {
        if (z) {
            addStateStoreBeans(streamsBuilder);
        }
        boolean isUseNativeDecoding = this.bindingServiceProperties.getConsumerProperties(str).isUseNativeDecoding();
        if (isUseNativeDecoding) {
            LOG.info("Native decoding is enabled for " + str + ". Inbound deserialization done at the broker.");
        } else {
            LOG.info("Native decoding is disabled for " + str + ". Inbound message conversion done by Spring Cloud Stream.");
        }
        Consumed consumed = getConsumed(kafkaStreamsConsumerProperties, serde, StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes()) ? new Serdes.BytesSerde<>() : serde2, autoOffsetReset);
        KStream<?, ?> stream = ((KafkaStreamsConsumerProperties) this.kafkaStreamsExtendedBindingProperties.getExtendedConsumerProperties(str)).isDestinationIsPattern() ? streamsBuilder.stream(Pattern.compile(this.bindingServiceProperties.getBindingDestination(str)), consumed) : streamsBuilder.stream(Arrays.asList(StringUtils.commaDelimitedListToStringArray(this.bindingServiceProperties.getBindingDestination(str))), consumed);
        if (!StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
            return getkStream(bindingProperties, stream, isUseNativeDecoding);
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        stream.process(() -> {
            return eventTypeProcessor(kafkaStreamsConsumerProperties, atomicBoolean, atomicReference, atomicReference2);
        }, new String[0]);
        return getkStream(bindingProperties, stream.branch(new Predicate[]{(obj, obj2) -> {
            return atomicBoolean.getAndSet(false);
        }})[0].mapValues(obj3 -> {
            return serde2.deserializer().deserialize((String) atomicReference.get(), (Headers) atomicReference2.get(), ((Bytes) obj3).get());
        }), isUseNativeDecoding);
    }

    private KStream<?, ?> getkStream(BindingProperties bindingProperties, KStream<?, ?> kStream, boolean z) {
        if (!z) {
            AtomicReference atomicReference = new AtomicReference();
            kStream.process(() -> {
                return new Processor<Object, Object, Void, Void>() { // from class: org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor.2
                    public void init(ProcessorContext<Void, Void> processorContext) {
                        super.init(processorContext);
                    }

                    public void process(Record<Object, Object> record) {
                        atomicReference.set(record.headers());
                    }

                    public void close() {
                        super.close();
                    }
                };
            }, new String[0]);
            kStream = kStream.mapValues(obj -> {
                Object obj;
                String contentType = bindingProperties.getContentType();
                if (obj == null || !StringUtils.hasText(contentType)) {
                    obj = obj;
                } else {
                    Headers headers = (Headers) atomicReference.get();
                    HashMap hashMap = new HashMap();
                    headers.forEach(header -> {
                        hashMap.put(header.key(), header.value());
                    });
                    obj = MessageBuilder.withPayload(obj).copyHeaders(hashMap).setHeader("contentType", contentType).build();
                }
                return obj;
            });
        }
        return kStream;
    }

    private void addStateStoreBeans(StreamsBuilder streamsBuilder) {
        try {
            Map beansOfType = this.applicationContext.getBeansOfType(StoreBuilder.class);
            if (!CollectionUtils.isEmpty(beansOfType)) {
                beansOfType.values().forEach(storeBuilder -> {
                    streamsBuilder.addStateStore(storeBuilder);
                    if (LOG.isInfoEnabled()) {
                        LOG.info("state store " + storeBuilder.name() + " added to topology");
                    }
                });
            }
        } catch (Exception e) {
        }
    }

    private <K, V> KTable<K, V> materializedAs(StreamsBuilder streamsBuilder, String str, String str2, Serde<K> serde, Serde<V> serde2, Topology.AutoOffsetReset autoOffsetReset, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties) {
        return streamsBuilder.table(this.bindingServiceProperties.getBindingDestination(str), getConsumed(kafkaStreamsConsumerProperties, serde, serde2, autoOffsetReset), getMaterialized(str2, serde, serde2));
    }

    private <K, V> Materialized<K, V, KeyValueStore<Bytes, byte[]>> getMaterialized(String str, Serde<K> serde, Serde<V> serde2) {
        return Materialized.as(str).withKeySerde(serde).withValueSerde(serde2);
    }

    private <K, V> GlobalKTable<K, V> materializedAsGlobalKTable(StreamsBuilder streamsBuilder, String str, String str2, Serde<K> serde, Serde<V> serde2, Topology.AutoOffsetReset autoOffsetReset, KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties) {
        return streamsBuilder.globalTable(this.bindingServiceProperties.getBindingDestination(str), getConsumed(kafkaStreamsConsumerProperties, serde, serde2, autoOffsetReset), getMaterialized(str2, serde, serde2));
    }

    private GlobalKTable<?, ?> getGlobalKTable(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, StreamsBuilder streamsBuilder, Serde<?> serde, Serde<?> serde2, String str, String str2, Topology.AutoOffsetReset autoOffsetReset) {
        return str != null ? materializedAsGlobalKTable(streamsBuilder, str2, str, serde, serde2, autoOffsetReset, kafkaStreamsConsumerProperties) : streamsBuilder.globalTable(str2, getConsumed(kafkaStreamsConsumerProperties, serde, serde2, autoOffsetReset));
    }

    private KTable<?, ?> getKTable(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, StreamsBuilder streamsBuilder, Serde<?> serde, Serde<?> serde2, String str, String str2, Topology.AutoOffsetReset autoOffsetReset) {
        Serde<?> bytesSerde = StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes()) ? new Serdes.BytesSerde<>() : serde2;
        KTable<?, ?> materializedAs = str != null ? materializedAs(streamsBuilder, str2, str, serde, bytesSerde, autoOffsetReset, kafkaStreamsConsumerProperties) : streamsBuilder.table(str2, getConsumed(kafkaStreamsConsumerProperties, serde, bytesSerde, autoOffsetReset));
        if (!StringUtils.hasText(kafkaStreamsConsumerProperties.getEventTypes())) {
            return materializedAs;
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        KStream stream = materializedAs.toStream();
        stream.process(() -> {
            return eventTypeProcessor(kafkaStreamsConsumerProperties, atomicBoolean, atomicReference, atomicReference2);
        }, new String[0]);
        return stream.branch(new Predicate[]{(obj, obj2) -> {
            return atomicBoolean.getAndSet(false);
        }})[0].mapValues(obj3 -> {
            return serde2.deserializer().deserialize((String) atomicReference.get(), (Headers) atomicReference2.get(), ((Bytes) obj3).get());
        }).toTable();
    }

    private <K, V> Consumed<K, V> getConsumed(KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, Serde<K> serde, Serde<V> serde2, Topology.AutoOffsetReset autoOffsetReset) {
        TimestampExtractor timestampExtractor = null;
        if (!StringUtils.isEmpty(kafkaStreamsConsumerProperties.getTimestampExtractorBeanName())) {
            timestampExtractor = (TimestampExtractor) this.applicationContext.getBean(kafkaStreamsConsumerProperties.getTimestampExtractorBeanName(), TimestampExtractor.class);
        }
        Consumed<K, V> withOffsetResetPolicy = Consumed.with(serde, serde2).withOffsetResetPolicy(autoOffsetReset);
        if (timestampExtractor != null) {
            withOffsetResetPolicy.withTimestampExtractor(timestampExtractor);
        }
        if (StringUtils.hasText(kafkaStreamsConsumerProperties.getConsumedAs())) {
            withOffsetResetPolicy.withName(kafkaStreamsConsumerProperties.getConsumedAs());
        }
        return withOffsetResetPolicy;
    }

    private <K, V> Processor<K, V, Void, Void> eventTypeProcessor(final KafkaStreamsConsumerProperties kafkaStreamsConsumerProperties, final AtomicBoolean atomicBoolean, final AtomicReference<String> atomicReference, final AtomicReference<Headers> atomicReference2) {
        return new Processor<K, V, Void, Void>() { // from class: org.springframework.cloud.stream.binder.kafka.streams.AbstractKafkaStreamsBinderProcessor.3
            ProcessorContext<?, ?> context;

            public void init(ProcessorContext<Void, Void> processorContext) {
                super.init(processorContext);
                this.context = processorContext;
            }

            public void process(Record<K, V> record) {
                Headers headers = record.headers();
                atomicReference2.set(headers);
                Optional recordMetadata = this.context.recordMetadata();
                if (recordMetadata.isPresent()) {
                    atomicReference.set(((RecordMetadata) recordMetadata.get()).topic());
                }
                Iterable headers2 = headers.headers(kafkaStreamsConsumerProperties.getEventTypeHeaderKey());
                if (headers2 == null || !headers2.iterator().hasNext()) {
                    return;
                }
                String str = new String(((Header) headers2.iterator().next()).value());
                for (String str2 : StringUtils.commaDelimitedListToStringArray(kafkaStreamsConsumerProperties.getEventTypes())) {
                    if (str.equals(str2)) {
                        atomicBoolean.set(true);
                        return;
                    }
                }
            }

            public void close() {
            }
        };
    }
}
