/*
 * Decompiled with CFR 0.152.
 */
package org.flowable.eventregistry.spring.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Field;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.flowable.common.engine.api.FlowableException;
import org.flowable.common.engine.api.FlowableIllegalArgumentException;
import org.flowable.eventregistry.api.ChannelModelProcessor;
import org.flowable.eventregistry.api.EventRegistry;
import org.flowable.eventregistry.api.EventRepositoryService;
import org.flowable.eventregistry.model.ChannelModel;
import org.flowable.eventregistry.model.InboundChannelModel;
import org.flowable.eventregistry.model.KafkaInboundChannelModel;
import org.flowable.eventregistry.model.KafkaOutboundChannelModel;
import org.flowable.eventregistry.spring.kafka.EventPayloadKafkaPartitionProvider;
import org.flowable.eventregistry.spring.kafka.KafkaChannelMessageListenerAdapter;
import org.flowable.eventregistry.spring.kafka.KafkaOperationsOutboundEventChannelAdapter;
import org.flowable.eventregistry.spring.kafka.KafkaPartitionProvider;
import org.flowable.eventregistry.spring.kafka.RoundRobinKafkaPartitionProvider;
import org.flowable.eventregistry.spring.kafka.SimpleKafkaListenerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.BeanExpressionResolver;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.EmbeddedValueResolver;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.core.KafkaAdminOperations;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.BackOffHandler;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
import org.springframework.kafka.listener.ContainerPausingBackOffHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.ExceptionClassifier;
import org.springframework.kafka.listener.FailedRecordProcessor;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.ListenerContainerPauseService;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory;
import org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor;
import org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver;
import org.springframework.kafka.retrytopic.DestinationTopic;
import org.springframework.kafka.retrytopic.DestinationTopicProcessor;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
import org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationBuilder;
import org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper;
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
import org.springframework.kafka.support.Suffixer;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.kafka.support.converter.ConversionException;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy;
import org.springframework.retry.backoff.FixedBackOffPolicy;
import org.springframework.retry.backoff.SleepingBackOffPolicy;
import org.springframework.retry.backoff.UniformRandomBackOffPolicy;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.StringValueResolver;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

public class KafkaChannelDefinitionProcessor
implements BeanFactoryAware,
ApplicationContextAware,
ApplicationListener<ContextRefreshedEvent>,
DisposableBean,
ChannelModelProcessor {
    public static final String CHANNEL_ID_PREFIX = "org.flowable.eventregistry.kafka.ChannelKafkaListenerEndpointContainer#";
    protected static final int DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT = 0;
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    protected KafkaOperations<Object, Object> kafkaOperations;
    protected KafkaAdminOperations kafkaAdminOperations;
    protected KafkaListenerEndpointRegistry endpointRegistry;
    protected String containerFactoryBeanName = "kafkaListenerContainerFactory";
    protected KafkaListenerContainerFactory<?> containerFactory;
    protected TaskScheduler retryTopicTaskScheduler;
    protected ThreadPoolTaskScheduler retryTopicThreadPoolTaskScheduler;
    protected BeanFactory beanFactory;
    protected ApplicationContext applicationContext;
    protected boolean contextRefreshed;
    protected ObjectMapper objectMapper;
    protected BeanExpressionResolver resolver = new StandardBeanExpressionResolver();
    protected StringValueResolver embeddedValueResolver;
    protected BeanExpressionContext expressionContext;
    protected Map<String, Collection<String>> retryEndpointsByMainEndpointId = new HashMap<String, Collection<String>>();

    public KafkaChannelDefinitionProcessor(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public boolean canProcess(ChannelModel channelModel) {
        return channelModel instanceof KafkaInboundChannelModel || channelModel instanceof KafkaOutboundChannelModel;
    }

    public boolean canProcessIfChannelModelAlreadyRegistered(ChannelModel channelModel) {
        return channelModel instanceof KafkaOutboundChannelModel;
    }

    public void registerChannelModel(ChannelModel channelModel, String tenantId, EventRegistry eventRegistry, EventRepositoryService eventRepositoryService, boolean fallbackToDefaultTenant) {
        if (channelModel instanceof KafkaInboundChannelModel) {
            KafkaInboundChannelModel kafkaChannelModel = (KafkaInboundChannelModel)channelModel;
            this.logger.info("Starting to register inbound channel {} in tenant {}", (Object)channelModel.getKey(), (Object)tenantId);
            this.processAndRegisterEndpoints(kafkaChannelModel, tenantId, eventRegistry);
            this.logger.info("Finished registering inbound channel {} in tenant {}", (Object)channelModel.getKey(), (Object)tenantId);
        } else if (channelModel instanceof KafkaOutboundChannelModel) {
            this.logger.info("Starting to register outbound channel {} in tenant {}", (Object)channelModel.getKey(), (Object)tenantId);
            this.processOutboundDefinition((KafkaOutboundChannelModel)channelModel);
            this.logger.info("Finished registering outbound channel {} in tenant {}", (Object)channelModel.getKey(), (Object)tenantId);
        }
    }

    protected KafkaListenerEndpoint createKafkaListenerEndpoint(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) {
        String endpointId = this.getEndpointId((ChannelModel)channelModel, tenantId);
        SimpleKafkaListenerEndpoint<Object, Object> endpoint = new SimpleKafkaListenerEndpoint<Object, Object>();
        endpoint.setId(endpointId);
        endpoint.setMainListenerId(endpointId);
        endpoint.setGroupId(this.getEndpointGroupId(channelModel, endpoint.getId()));
        endpoint.setTopics(this.resolveTopics(channelModel));
        endpoint.setTopicPattern(this.resolvePattern(channelModel));
        endpoint.setTopicPartitions(this.resolveTopicPartitions(channelModel));
        endpoint.setClientIdPrefix(this.resolveExpressionAsString(channelModel.getClientIdPrefix(), "clientIdPrefix"));
        endpoint.setConcurrency(this.resolveExpressionAsInteger(channelModel.getConcurrency(), "concurrency"));
        endpoint.setConsumerProperties(this.resolveProperties(channelModel.getCustomProperties()));
        endpoint.setMessageListener(this.createMessageListener(eventRegistry, (InboundChannelModel)channelModel));
        return endpoint;
    }

    protected void processAndRegisterEndpoints(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry) {
        KafkaListenerEndpoint mainEndpoint = this.createKafkaListenerEndpoint(channelModel, tenantId, eventRegistry);
        KafkaListenerContainerFactory<?> containerFactory = this.resolveContainerFactory(mainEndpoint, null);
        Collection<Configuration> configurations = this.createEndpointConfigurations(channelModel, tenantId, eventRegistry, mainEndpoint, containerFactory);
        this.retryEndpointsByMainEndpointId.put(mainEndpoint.getId(), configurations.stream().map(Configuration::getEndpoint).map(KafkaListenerEndpoint::getId).collect(Collectors.toList()));
        for (Configuration configuration : configurations) {
            this.registerEndpoint(configuration.getEndpoint(), configuration.getFactory());
        }
    }

    protected Collection<Configuration> createEndpointConfigurations(KafkaInboundChannelModel channelModel, String tenantId, EventRegistry eventRegistry, KafkaListenerEndpoint mainEndpoint, KafkaListenerContainerFactory<?> containerFactory) {
        ResolvedRetryConfiguration retryConfiguration = this.resolveRetryConfiguration(channelModel);
        FixedBackOff backOff = retryConfiguration != null && retryConfiguration.attempts != null ? new FixedBackOff(0L, (long)(retryConfiguration.attempts - 1)) : null;
        RetryTopicConfiguration retryTopicConfiguration = this.createRetryTopicConfiguration(retryConfiguration);
        if (retryTopicConfiguration != null) {
            TopicPartitionOffset[] topicPartitionsToAssign;
            Collection<Object> topics = mainEndpoint.getTopics().isEmpty() ? ((topicPartitionsToAssign = mainEndpoint.getTopicPartitionsToAssign()) != null && topicPartitionsToAssign.length > 0 ? (Collection)Arrays.stream(topicPartitionsToAssign).map(TopicPartitionOffset::getTopic).collect(Collectors.toCollection(LinkedHashSet::new)) : Collections.emptyList()) : mainEndpoint.getTopics();
            if (topics.isEmpty()) {
                throw new FlowableException("Channel model " + channelModel.getKey() + " in tenant " + tenantId + " has retry configuration but no topics have been provided for it");
            }
            ArrayList<Configuration> configurations = new ArrayList<Configuration>(retryTopicConfiguration.getDestinationTopicProperties().size());
            DefaultDestinationTopicResolver topicResolver = new DefaultDestinationTopicResolver(Clock.systemUTC());
            topicResolver.setApplicationContext(this.applicationContext);
            DefaultDestinationTopicProcessor processor = new DefaultDestinationTopicProcessor((DestinationTopicResolver)topicResolver);
            ListenerContainerFactoryConfigurer factoryConfigurer = this.createListenerContainerFactoryConfigurer(retryConfiguration, (BackOff)backOff, topicResolver);
            String id = mainEndpoint.getId();
            if (id == null) {
                id = "no.id.provided";
            }
            DestinationTopicProcessor.Context context = new DestinationTopicProcessor.Context(id, retryTopicConfiguration.getDestinationTopicProperties());
            processor.processDestinationTopicProperties(destinationTopicProperties -> {
                Object endpoint;
                Suffixer suffixer = new Suffixer(destinationTopicProperties.suffix());
                if (destinationTopicProperties.isMainEndpoint() || destinationTopicProperties.isDltTopic() || retryConfiguration.hasRetryTopic()) {
                    for (String topic : topics) {
                        String destinationTopic = suffixer.maybeAddTo(topic);
                        processor.registerDestinationTopic(topic, destinationTopic, destinationTopicProperties, context);
                    }
                }
                if ((endpoint = destinationTopicProperties.isMainEndpoint() ? mainEndpoint : (!destinationTopicProperties.isDltTopic() && retryConfiguration.hasRetryTopic() ? this.createKafkaListenerEndpoint(channelModel, tenantId, eventRegistry) : null)) instanceof SimpleKafkaListenerEndpoint) {
                    SimpleKafkaListenerEndpoint simpleEndpoint = (SimpleKafkaListenerEndpoint)endpoint;
                    simpleEndpoint.setId(suffixer.maybeAddTo(simpleEndpoint.getId()));
                    simpleEndpoint.setMainListenerId(mainEndpoint.getMainListenerId());
                    simpleEndpoint.setGroupId(suffixer.maybeAddTo(simpleEndpoint.getGroupId()));
                    TopicPartitionOffset[] topicPartitionsToAssign = endpoint.getTopicPartitionsToAssign();
                    if (endpoint.getTopics().isEmpty() && topicPartitionsToAssign != null) {
                        simpleEndpoint.setTopicPartitions(KafkaChannelDefinitionProcessor.getTopicPartitions(destinationTopicProperties, suffixer, endpoint.getTopicPartitionsToAssign()));
                    } else {
                        simpleEndpoint.setTopics(suffixer.maybeAddTo(simpleEndpoint.getTopics()));
                    }
                    simpleEndpoint.setClientIdPrefix(suffixer.maybeAddTo(simpleEndpoint.getClientIdPrefix()));
                    configurations.add(new Configuration(simpleEndpoint, this.decorateFactory((DestinationTopic.Properties)destinationTopicProperties, factoryConfigurer, retryTopicConfiguration)));
                }
            }, context);
            processor.processRegisteredDestinations(this.getTopicCreationFunction(retryConfiguration), context);
            topicResolver.onApplicationEvent(new ContextRefreshedEvent(this.applicationContext));
            return configurations;
        }
        if (backOff != null) {
            return Collections.singleton(new Configuration(mainEndpoint, new RetryTopicContainerFactoryDecorator(containerFactory, () -> KafkaChannelDefinitionProcessor.lambda$createEndpointConfigurations$1((BackOff)backOff))));
        }
        return Collections.singleton(new Configuration(mainEndpoint, containerFactory));
    }

    protected static Collection<TopicPartitionOffset> getTopicPartitions(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset[] topicPartitionOffsets) {
        return Stream.of(topicPartitionOffsets).map(tpo -> properties.isMainEndpoint() ? KafkaChannelDefinitionProcessor.getTPOForMainTopic(suffixer, tpo) : KafkaChannelDefinitionProcessor.getTPOForRetryTopics(properties, suffixer, tpo)).collect(Collectors.toList());
    }

    protected static TopicPartitionOffset getTPOForRetryTopics(DestinationTopic.Properties properties, Suffixer suffixer, TopicPartitionOffset tpo) {
        return new TopicPartitionOffset(suffixer.maybeAddTo(tpo.getTopic()), tpo.getPartition() <= properties.numPartitions() ? tpo.getPartition() : 0);
    }

    protected static TopicPartitionOffset getTPOForMainTopic(Suffixer suffixer, TopicPartitionOffset tpo) {
        TopicPartitionOffset newTpo = new TopicPartitionOffset(suffixer.maybeAddTo(tpo.getTopic()), tpo.getPartition(), tpo.getOffset(), tpo.getPosition());
        newTpo.setRelativeToCurrent(tpo.isRelativeToCurrent());
        return newTpo;
    }

    protected Consumer<Collection<String>> getTopicCreationFunction(ResolvedRetryConfiguration retryConfiguration) {
        if (retryConfiguration.autoCreateTopics) {
            if (this.kafkaAdminOperations == null) {
                throw new FlowableException("It is not possible to auto create new topics when no kafka admin operations have been configured");
            }
            return topics -> this.createNewTopics((Collection<String>)topics, retryConfiguration.numPartitions, retryConfiguration.replicationFactor);
        }
        return topics -> {};
    }

    protected void createNewTopics(Collection<String> topics, int numPartitions, short replicationFactor) {
        this.kafkaAdminOperations.createOrModifyTopics((NewTopic[])topics.stream().map(topic -> new NewTopic(topic, numPartitions, replicationFactor)).toArray(NewTopic[]::new));
    }

    protected ListenerContainerFactoryConfigurer createListenerContainerFactoryConfigurer(ResolvedRetryConfiguration retryConfiguration, BackOff backOff, DefaultDestinationTopicResolver topicResolver) {
        DeadLetterPublishingRecovererFactory recovererFactory = new DeadLetterPublishingRecovererFactory((DestinationTopicResolver)topicResolver);
        ContainerPartitionPausingBackOffManagerFactory managerFactory = new ContainerPartitionPausingBackOffManagerFactory((ListenerContainerRegistry)this.endpointRegistry, this.applicationContext);
        this.configurePartitionPausingFactory(managerFactory);
        KafkaConsumerBackoffManager manager = managerFactory.create();
        ListenerContainerFactoryConfigurer factoryConfigurer = new ListenerContainerFactoryConfigurer(manager, recovererFactory, Clock.systemUTC());
        if (retryConfiguration.hasNoRetryTopic()) {
            factoryConfigurer.setErrorHandlerCustomizer(errorHandler -> {
                if (errorHandler instanceof ExceptionClassifier) {
                    HashMap<Class, Boolean> classified = new HashMap<Class, Boolean>();
                    classified.put(DeserializationException.class, false);
                    classified.put(MessageConversionException.class, false);
                    classified.put(ConversionException.class, false);
                    classified.put(MethodArgumentResolutionException.class, false);
                    classified.put(NoSuchMethodException.class, false);
                    classified.put(ClassCastException.class, false);
                    errorHandler.setClassifications(classified, true);
                }
                if (errorHandler instanceof FailedRecordProcessor) {
                    errorHandler.setCommitRecovered(false);
                }
            });
            if (backOff != null) {
                factoryConfigurer.setBlockingRetriesBackOff(backOff);
            }
        }
        return factoryConfigurer;
    }

    protected void configurePartitionPausingFactory(ContainerPartitionPausingBackOffManagerFactory factory) {
        TaskScheduler scheduler = this.getOrCreateRetryTopicTaskScheduler();
        Assert.notNull((Object)scheduler, (String)"Either a RetryTopicSchedulerWrapper or TaskScheduler bean is required");
        factory.setBackOffHandler((BackOffHandler)new ContainerPausingBackOffHandler(new ListenerContainerPauseService((ListenerContainerRegistry)this.endpointRegistry, this.getOrCreateRetryTopicTaskScheduler())));
    }

    protected TaskScheduler getOrCreateRetryTopicTaskScheduler() {
        if (this.retryTopicTaskScheduler != null) {
            return this.retryTopicTaskScheduler;
        }
        ObjectProvider retryTopicSchedulerWrapperProvider = this.applicationContext.getBeanProvider(RetryTopicSchedulerWrapper.class);
        RetryTopicSchedulerWrapper schedulerWrapper = (RetryTopicSchedulerWrapper)retryTopicSchedulerWrapperProvider.getIfAvailable();
        this.retryTopicTaskScheduler = schedulerWrapper != null ? schedulerWrapper.getScheduler() : (TaskScheduler)this.applicationContext.getBeanProvider(TaskScheduler.class).getIfAvailable();
        if (this.retryTopicTaskScheduler == null) {
            this.retryTopicThreadPoolTaskScheduler = new ThreadPoolTaskScheduler();
            this.retryTopicThreadPoolTaskScheduler.afterPropertiesSet();
            this.retryTopicTaskScheduler = this.retryTopicThreadPoolTaskScheduler;
        }
        return this.retryTopicTaskScheduler;
    }

    protected KafkaListenerContainerFactory<?> decorateFactory(DestinationTopic.Properties destinationTopicProperties, ListenerContainerFactoryConfigurer factoryConfigurer, RetryTopicConfiguration retryTopicConfiguration) {
        return destinationTopicProperties.isMainEndpoint() ? factoryConfigurer.decorateFactoryWithoutSettingContainerProperties((ConcurrentKafkaListenerContainerFactory)this.containerFactory, retryTopicConfiguration.forContainerFactoryConfigurer()) : factoryConfigurer.decorateFactory((ConcurrentKafkaListenerContainerFactory)this.containerFactory, retryTopicConfiguration.forContainerFactoryConfigurer());
    }

    protected void processOutboundDefinition(KafkaOutboundChannelModel channelModel) {
        String topic = channelModel.getTopic();
        if (channelModel.getOutboundEventChannelAdapter() == null && StringUtils.hasText((String)topic)) {
            String resolvedTopic = this.resolve(topic);
            KafkaPartitionProvider partitionProvider = this.resolveKafkaPartitionProvider(channelModel);
            channelModel.setOutboundEventChannelAdapter((Object)new KafkaOperationsOutboundEventChannelAdapter(this.kafkaOperations, partitionProvider, resolvedTopic, channelModel.getRecordKey()));
        }
    }

    protected Integer resolveExpressionAsInteger(String value, String attribute) {
        return this.resolveExpressionAsInteger(value, attribute, null);
    }

    protected Integer resolveExpressionAsInteger(String value, String attribute, Integer defaultValue) {
        Object resolved = this.resolveExpression(value);
        Integer result = defaultValue;
        if (resolved instanceof String) {
            result = Integer.parseInt((String)resolved);
        } else if (resolved instanceof Number) {
            result = ((Number)resolved).intValue();
        } else if (resolved != null) {
            throw new IllegalStateException("The [" + attribute + "] must resolve to an Number or a String that can be parsed as an Integer. Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
        return result;
    }

    protected Long resolveExpressionAsLong(String value, String attribute) {
        Object resolved = this.resolveExpression(value);
        Long result = null;
        if (resolved instanceof String) {
            result = Long.parseLong((String)resolved);
        } else if (resolved instanceof Number) {
            result = ((Number)resolved).longValue();
        } else if (resolved != null) {
            throw new IllegalStateException("The [" + attribute + "] must resolve to an Number or a String that can be parsed as a Long. Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
        return result;
    }

    protected Double resolveExpressionAsDouble(String value, String attribute) {
        Object resolved = this.resolveExpression(value);
        Double result = null;
        if (resolved instanceof String) {
            result = Double.parseDouble((String)resolved);
        } else if (resolved instanceof Number) {
            result = ((Number)resolved).doubleValue();
        } else if (resolved != null) {
            throw new IllegalStateException("The [" + attribute + "] must resolve to an Number or a String that can be parsed as a Double. Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
        return result;
    }

    protected Boolean resolveExpressionAsBoolean(String value, String attribute) {
        return this.resolveExpressionAsBoolean(value, attribute, null);
    }

    protected Boolean resolveExpressionAsBoolean(String value, String attribute, Boolean defaultValue) {
        Object resolved = this.resolveExpression(value);
        Boolean result = defaultValue;
        if (resolved instanceof String) {
            result = Boolean.parseBoolean((String)resolved);
        } else if (resolved instanceof Boolean) {
            result = (Boolean)resolved;
        } else if (resolved != null) {
            throw new IllegalStateException("The [" + attribute + "] must resolve to a Boolean or a String that can be parsed as a Boolean. Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
        return result;
    }

    protected String resolveExpressionAsString(String value, String attribute) {
        if (!StringUtils.hasLength((String)value)) {
            return null;
        }
        Object resolved = this.resolveExpression(value);
        if (resolved instanceof String) {
            return (String)resolved;
        }
        throw new IllegalStateException("The [" + attribute + "] must resolve to a String. Resolved to [" + resolved.getClass() + "] for [" + value + "]");
    }

    protected Collection<String> resolveTopics(KafkaInboundChannelModel channelDefinition) {
        Collection topics = channelDefinition.getTopics();
        if (topics == null || topics.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<String> resultTopics = new ArrayList<String>();
        for (String queue : topics) {
            this.resolveTopics(this.resolveExpression(queue), resultTopics, channelDefinition);
        }
        return resultTopics;
    }

    protected void resolveTopics(Object resolvedValue, List<String> result, KafkaInboundChannelModel channelDefinition) {
        if (resolvedValue instanceof String[]) {
            for (String object : (String[])resolvedValue) {
                this.resolveTopics(object, result, channelDefinition);
            }
        } else if (resolvedValue instanceof String) {
            result.add((String)resolvedValue);
        } else if (resolvedValue instanceof Iterable) {
            for (Object object : (Iterable)resolvedValue) {
                this.resolveTopics(object, result, channelDefinition);
            }
        } else {
            throw new IllegalArgumentException("Channel definition " + channelDefinition + " cannot resolve " + resolvedValue + " as a String[] or a String");
        }
    }

    protected Pattern resolvePattern(KafkaInboundChannelModel channelModel) {
        Pattern pattern = null;
        String topicPattern = channelModel.getTopicPattern();
        if (StringUtils.hasText((String)topicPattern)) {
            Object resolved = this.resolveExpression(topicPattern);
            if (resolved instanceof String) {
                pattern = Pattern.compile((String)resolved);
            } else if (resolved instanceof Pattern) {
                pattern = (Pattern)resolved;
            } else if (resolved != null) {
                throw new IllegalStateException("topicPattern in channel model [ " + channelModel + " ] must resolve to a Pattern or String, not " + resolved.getClass());
            }
        }
        return pattern;
    }

    protected Collection<TopicPartitionOffset> resolveTopicPartitions(KafkaInboundChannelModel channelModel) {
        Collection topicPartitions = channelModel.getTopicPartitions();
        if (topicPartitions == null || topicPartitions.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<TopicPartitionOffset> tps = new ArrayList<TopicPartitionOffset>();
        for (KafkaInboundChannelModel.TopicPartition topicPartition : topicPartitions) {
            String topic = this.resolveExpressionAsString(topicPartition.getTopic(), "topicPartitions[].topic");
            if (!StringUtils.hasText((String)topic)) {
                throw new FlowableIllegalArgumentException("topic in topic partition in channel model [ " + channelModel.getKey() + " ] must resolve to a non empty string");
            }
            Collection partitions = topicPartition.getPartitions();
            if (partitions == null || partitions.isEmpty()) {
                throw new FlowableIllegalArgumentException("partitions in topic partition in channel model [ " + channelModel.getKey() + " ] must not be empty");
            }
            for (String partition : partitions) {
                this.resolvePartitionAsInteger(topic, this.resolveExpression(partition), tps);
            }
        }
        return tps;
    }

    protected void resolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionOffset> result) {
        if (resolvedValue instanceof String[]) {
            for (String object : (String[])resolvedValue) {
                this.resolvePartitionAsInteger(topic, object, result);
            }
        } else if (resolvedValue instanceof String) {
            Assert.state((boolean)StringUtils.hasText((String)((String)resolvedValue)), () -> "partition in TopicPartition for topic '" + topic + "' cannot be empty");
            List collected = this.parsePartitions((String)resolvedValue).map(part -> new TopicPartitionOffset(topic, part.intValue())).collect(Collectors.toList());
            result.addAll(collected);
        } else if (resolvedValue instanceof Integer[]) {
            for (Integer partition : (Integer[])resolvedValue) {
                result.add(new TopicPartitionOffset(topic, partition.intValue()));
            }
        } else if (resolvedValue instanceof Integer) {
            result.add(new TopicPartitionOffset(topic, ((Integer)resolvedValue).intValue()));
        } else if (resolvedValue instanceof Iterable) {
            for (Object object : (Iterable)resolvedValue) {
                this.resolvePartitionAsInteger(topic, object, result);
            }
        } else {
            throw new IllegalArgumentException("partition in TopicPartition for topic '" + topic + "' can't resolve '" + resolvedValue + "' as an Integer or String");
        }
    }

    protected Stream<Integer> parsePartitions(String partsString) {
        String[] partsStrings = partsString.split(",");
        if (partsStrings.length == 1 && !partsStrings[0].contains("-")) {
            return Stream.of(Integer.valueOf(Integer.parseInt(partsStrings[0].trim())));
        }
        ArrayList<Integer> parts = new ArrayList<Integer>();
        for (String part : partsStrings) {
            if (part.contains("-")) {
                String[] startEnd = part.split("-");
                Assert.state((startEnd.length == 2 ? 1 : 0) != 0, (String)("Only one hyphen allowed for a range of partitions: " + part));
                int start = Integer.parseInt(startEnd[0].trim());
                int end = Integer.parseInt(startEnd[1].trim());
                Assert.state((end >= start ? 1 : 0) != 0, (String)("Invalid range: " + part));
                for (int i = start; i <= end; ++i) {
                    parts.add(i);
                }
                continue;
            }
            this.parsePartitions(part).forEach(p -> parts.add((Integer)p));
        }
        return parts.stream().sorted().distinct();
    }

    protected KafkaPartitionProvider resolveKafkaPartitionProvider(KafkaOutboundChannelModel channelModel) {
        KafkaOutboundChannelModel.KafkaPartition partition = channelModel.getPartition();
        if (partition == null) {
            return null;
        }
        if (StringUtils.hasText((String)partition.getEventField())) {
            return new EventPayloadKafkaPartitionProvider(partition.getEventField());
        }
        if (StringUtils.hasText((String)partition.getDelegateExpression())) {
            return this.resolveExpression(partition.getDelegateExpression(), KafkaPartitionProvider.class);
        }
        if (StringUtils.hasText((String)partition.getRoundRobin())) {
            ArrayList<TopicPartitionOffset> tpo = new ArrayList<TopicPartitionOffset>();
            this.resolvePartitionAsInteger(channelModel.getTopic(), this.resolveExpression(partition.getRoundRobin()), tpo);
            ArrayList<Integer> partitions = new ArrayList<Integer>(tpo.size());
            for (TopicPartitionOffset offset : tpo) {
                partitions.add(offset.getPartition());
            }
            return new RoundRobinKafkaPartitionProvider(partitions);
        }
        throw new FlowableException("The kafka partition value was not found for the channel model with key " + channelModel.getKey() + ". One of eventField, delegateExpression should be set.");
    }

    protected <T> T resolveExpression(String expression, Class<T> type) {
        Object value = this.resolveExpression(expression);
        if (type.isInstance(value)) {
            return type.cast(value);
        }
        throw new FlowableException("expected expression " + expression + " to resolve to " + type + " but it did not. Resolved value is " + value);
    }

    protected Object resolveExpression(String value) {
        String resolvedValue = this.resolve(value);
        return this.resolver.evaluate(resolvedValue, this.expressionContext);
    }

    protected GenericMessageListener<ConsumerRecord<Object, Object>> createMessageListener(EventRegistry eventRegistry, InboundChannelModel inboundChannelModel) {
        KafkaChannelMessageListenerAdapter kafkaChannelMessageListenerAdapter = new KafkaChannelMessageListenerAdapter(eventRegistry, inboundChannelModel);
        return kafkaChannelMessageListenerAdapter;
    }

    public void unregisterChannelModel(ChannelModel channelModel, String tenantId, EventRepositoryService eventRepositoryService) {
        this.logger.info("Starting to unregister channel {} in tenant {}", (Object)channelModel.getKey(), (Object)tenantId);
        String mainEndpointId = this.getEndpointId(channelModel, tenantId);
        Collection endpointsToUnregister = this.retryEndpointsByMainEndpointId.getOrDefault(mainEndpointId, Collections.singleton(mainEndpointId));
        for (String endpointId : endpointsToUnregister) {
            this.unregisterEndpoint(endpointId, channelModel, tenantId);
        }
        this.logger.info("Finished unregistering channel {} in tenant {}", (Object)channelModel.getKey(), (Object)tenantId);
    }

    protected void unregisterEndpoint(String endpointId, ChannelModel channelModel, String tenantId) {
        Field listenerContainersField;
        this.logger.info("Unregistering endpoint {}", (Object)endpointId);
        MessageListenerContainer listenerContainer = this.endpointRegistry.getListenerContainer(endpointId);
        if (listenerContainer != null) {
            this.logger.debug("Stopping message listener {} for channel {} in tenant {}", new Object[]{listenerContainer, channelModel.getKey(), tenantId});
            listenerContainer.stop();
        }
        if (listenerContainer instanceof DisposableBean) {
            try {
                this.logger.debug("Destroying message listener {} for channel {} in tenant {}", new Object[]{listenerContainer, channelModel.getKey(), tenantId});
                listenerContainer.destroy();
            }
            catch (Exception e) {
                throw new RuntimeException("Failed to destroy listener container", e);
            }
        }
        if ((listenerContainersField = ReflectionUtils.findField(this.endpointRegistry.getClass(), (String)"listenerContainers")) != null) {
            listenerContainersField.setAccessible(true);
            Map listenerContainers = (Map)ReflectionUtils.getField((Field)listenerContainersField, (Object)this.endpointRegistry);
            if (listenerContainers != null) {
                listenerContainers.remove(endpointId);
            }
        } else {
            throw new IllegalStateException("Endpoint registry " + this.endpointRegistry + " does not have listenerContainers field");
        }
        this.logger.info("Finished unregistering endpoint {}", (Object)endpointId);
    }

    protected void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
        Assert.notNull((Object)endpoint, (String)"Endpoint must not be null");
        Assert.hasText((String)endpoint.getId(), (String)"Endpoint id must be set");
        Assert.state((this.endpointRegistry != null ? 1 : 0) != 0, (String)"No KafkaListenerEndpointRegistry set");
        boolean startImmediately = this.contextRefreshed || this.endpointRegistry.isRunning();
        this.logger.info("Registering endpoint {}", (Object)endpoint);
        this.endpointRegistry.registerListenerContainer(endpoint, this.resolveContainerFactory(endpoint, factory), startImmediately);
        this.logger.info("Finished registering endpoint {}", (Object)endpoint);
    }

    protected KafkaListenerContainerFactory<?> resolveContainerFactory(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> containerFactory) {
        if (containerFactory != null) {
            return containerFactory;
        }
        if (this.containerFactory != null) {
            return this.containerFactory;
        }
        if (this.containerFactoryBeanName != null) {
            Assert.state((this.beanFactory != null ? 1 : 0) != 0, (String)"BeanFactory must be set to obtain container factory by bean name");
            this.containerFactory = (KafkaListenerContainerFactory)this.beanFactory.getBean(this.containerFactoryBeanName, KafkaListenerContainerFactory.class);
            return this.containerFactory;
        }
        throw new IllegalStateException("Could not resolve the " + KafkaListenerContainerFactory.class.getSimpleName() + " to use for [" + endpoint + "] no factory was given and no default is set.");
    }

    protected String getEndpointId(ChannelModel channelModel, String tenantId) {
        String channelDefinitionKey = channelModel.getKey();
        if (!StringUtils.hasText((String)tenantId)) {
            return CHANNEL_ID_PREFIX + channelDefinitionKey;
        }
        return CHANNEL_ID_PREFIX + tenantId + "#" + channelDefinitionKey;
    }

    protected String getEndpointGroupId(KafkaInboundChannelModel channelDefinition, String id) {
        String groupId = this.resolveExpressionAsString(channelDefinition.getGroupId(), "groupId");
        if (groupId == null) {
            groupId = id;
        }
        return groupId;
    }

    protected String resolve(String value) {
        if (value == null) {
            return null;
        }
        if (this.embeddedValueResolver != null) {
            return this.embeddedValueResolver.resolveStringValue(value);
        }
        return value;
    }

    protected Properties resolveProperties(List<KafkaInboundChannelModel.CustomProperty> consumerProperties) {
        if (consumerProperties != null && !consumerProperties.isEmpty()) {
            Properties properties = new Properties();
            for (KafkaInboundChannelModel.CustomProperty consumerProperty : consumerProperties) {
                properties.put(consumerProperty.getName(), this.resolveExpressionAsString(consumerProperty.getValue(), consumerProperty.getName()));
            }
            return properties;
        }
        return null;
    }

    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
        if (beanFactory instanceof ConfigurableListableBeanFactory) {
            this.embeddedValueResolver = new EmbeddedValueResolver((ConfigurableBeanFactory)beanFactory);
            this.resolver = ((ConfigurableListableBeanFactory)beanFactory).getBeanExpressionResolver();
            this.expressionContext = new BeanExpressionContext((ConfigurableBeanFactory)((ConfigurableListableBeanFactory)beanFactory), null);
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext() == this.applicationContext) {
            this.contextRefreshed = true;
        }
    }

    public void destroy() throws Exception {
        if (this.retryTopicThreadPoolTaskScheduler != null) {
            this.retryTopicThreadPoolTaskScheduler.destroy();
        }
    }

    public KafkaOperations<Object, Object> getKafkaOperations() {
        return this.kafkaOperations;
    }

    public void setKafkaOperations(KafkaOperations<Object, Object> kafkaOperations) {
        this.kafkaOperations = kafkaOperations;
    }

    public KafkaAdminOperations getKafkaAdminOperations() {
        return this.kafkaAdminOperations;
    }

    public void setKafkaAdminOperations(KafkaAdminOperations kafkaAdminOperations) {
        this.kafkaAdminOperations = kafkaAdminOperations;
    }

    public KafkaListenerEndpointRegistry getEndpointRegistry() {
        return this.endpointRegistry;
    }

    public void setEndpointRegistry(KafkaListenerEndpointRegistry endpointRegistry) {
        this.endpointRegistry = endpointRegistry;
    }

    public String getContainerFactoryBeanName() {
        return this.containerFactoryBeanName;
    }

    public void setContainerFactoryBeanName(String containerFactoryBeanName) {
        this.containerFactoryBeanName = containerFactoryBeanName;
    }

    public KafkaListenerContainerFactory<?> getContainerFactory() {
        return this.containerFactory;
    }

    public void setContainerFactory(KafkaListenerContainerFactory<?> containerFactory) {
        this.containerFactory = containerFactory;
    }

    public TaskScheduler getRetryTopicTaskScheduler() {
        return this.retryTopicTaskScheduler;
    }

    public void setRetryTopicTaskScheduler(TaskScheduler retryTopicTaskScheduler) {
        this.retryTopicTaskScheduler = retryTopicTaskScheduler;
    }

    protected RetryTopicConfiguration createRetryTopicConfiguration(ResolvedRetryConfiguration retryConfiguration) {
        if (retryConfiguration == null) {
            return null;
        }
        String dltTopicSuffix = retryConfiguration.dltTopicSuffix;
        String retryTopicSuffix = retryConfiguration.retryTopicSuffix;
        if (dltTopicSuffix == null && retryTopicSuffix == null) {
            return null;
        }
        Integer attempts = retryConfiguration.attempts;
        RetryTopicConfigurationBuilder retryTopicConfigurationBuilder = RetryTopicConfigurationBuilder.newInstance().autoStartDltHandler(Boolean.valueOf(false)).autoCreateTopics(retryConfiguration.autoCreateTopics, retryConfiguration.numPartitions, retryConfiguration.replicationFactor).dltSuffix(dltTopicSuffix).retryTopicSuffix(retryTopicSuffix).useSingleTopicForFixedDelays(retryConfiguration.fixedDelayTopicStrategy).setTopicSuffixingStrategy(retryConfiguration.topicSuffixingStrategy);
        if (dltTopicSuffix == null) {
            retryTopicConfigurationBuilder.doNotConfigureDlt();
        }
        if (retryConfiguration.hasRetryTopic()) {
            retryTopicConfigurationBuilder.customBackoff(retryConfiguration.nonBlockingBackOff);
        } else {
            retryTopicConfigurationBuilder.noBackoff();
        }
        if (attempts != null) {
            retryTopicConfigurationBuilder.maxAttempts(attempts.intValue());
        }
        RetryTopicConfiguration retryTopicConfiguration = retryTopicConfigurationBuilder.create(this.kafkaOperations);
        return retryTopicConfiguration;
    }

    protected ResolvedRetryConfiguration resolveRetryConfiguration(KafkaInboundChannelModel channelModel) {
        KafkaInboundChannelModel.RetryConfiguration retry = channelModel.getRetry();
        if (retry == null) {
            return null;
        }
        ResolvedRetryConfiguration resolvedRetryConfiguration = new ResolvedRetryConfiguration();
        resolvedRetryConfiguration.attempts = this.resolveExpressionAsInteger(retry.getAttempts(), "retry.attempts");
        resolvedRetryConfiguration.dltTopicSuffix = this.resolveExpressionAsString(retry.getDltTopicSuffix(), "retry.dltTopicSuffix");
        resolvedRetryConfiguration.retryTopicSuffix = this.resolveExpressionAsString(retry.getRetryTopicSuffix(), "retry.retryTopicSuffix");
        String fixedDelayTopicStrategy = this.resolveExpressionAsString(retry.getFixedDelayTopicStrategy(), "retry.fixedDelayTopicStrategy");
        resolvedRetryConfiguration.fixedDelayTopicStrategy = fixedDelayTopicStrategy == null ? FixedDelayStrategy.SINGLE_TOPIC : FixedDelayStrategy.valueOf((String)fixedDelayTopicStrategy);
        String topicSuffixingStrategy = this.resolveExpressionAsString(retry.getTopicSuffixingStrategy(), "retry.topicSuffixingStrategy");
        resolvedRetryConfiguration.topicSuffixingStrategy = topicSuffixingStrategy == null ? TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE : TopicSuffixingStrategy.valueOf((String)topicSuffixingStrategy);
        resolvedRetryConfiguration.nonBlockingBackOff = this.createNonBlockingBackOffPolicy(retry.getNonBlockingBackOff());
        resolvedRetryConfiguration.autoCreateTopics = this.resolveExpressionAsBoolean(retry.getAutoCreateTopics(), "retry.autoCreateTopics", true);
        resolvedRetryConfiguration.numPartitions = this.resolveExpressionAsInteger(retry.getNumPartitions(), "retry.numPartitions", 1);
        resolvedRetryConfiguration.replicationFactor = this.resolveExpressionAsInteger(retry.getReplicationFactor(), "retry.replicationFactor", 1).shortValue();
        return resolvedRetryConfiguration;
    }

    protected SleepingBackOffPolicy<?> createNonBlockingBackOffPolicy(KafkaInboundChannelModel.NonBlockingRetryBackOff backOff) {
        Long delay;
        if (backOff == null) {
            return new FixedBackOffPolicy();
        }
        Long min = delay = this.resolveExpressionAsLong(backOff.getDelay(), "retry.nonBlockingBackOff.delay");
        Long max = this.resolveExpressionAsLong(backOff.getMaxDelay(), "retry.nonBlockingBackOff.maxDelay");
        Double multiplier = this.resolveExpressionAsDouble(backOff.getMultiplier(), "retry.nonBlockingBackOff.multiplier");
        if (multiplier != null && multiplier > 0.0) {
            Boolean random = this.resolveExpressionAsBoolean(backOff.getRandom(), "retry.nonBlockingBackOff.random");
            Object policy = Boolean.TRUE.equals(random) ? new ExponentialRandomBackOffPolicy() : new ExponentialBackOffPolicy();
            if (min != null) {
                policy.setInitialInterval(min.longValue());
            }
            policy.setMultiplier(multiplier.doubleValue());
            if (max != null && max > policy.getInitialInterval()) {
                policy.setMaxInterval(max.longValue());
            }
            return policy;
        }
        if (max != null && min != null && max > min) {
            UniformRandomBackOffPolicy policy = new UniformRandomBackOffPolicy();
            policy.setMinBackOffPeriod(min.longValue());
            policy.setMaxBackOffPeriod(max.longValue());
            return policy;
        }
        FixedBackOffPolicy policy = new FixedBackOffPolicy();
        if (min != null) {
            policy.setBackOffPeriod(min.longValue());
        }
        return policy;
    }

    private static /* synthetic */ CommonErrorHandler lambda$createEndpointConfigurations$1(BackOff backOff) {
        return new DefaultErrorHandler(backOff);
    }

    protected static class Configuration {
        protected final KafkaListenerEndpoint endpoint;
        protected final KafkaListenerContainerFactory<?> factory;

        protected Configuration(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
            this.endpoint = endpoint;
            this.factory = factory;
        }

        public KafkaListenerEndpoint getEndpoint() {
            return this.endpoint;
        }

        public KafkaListenerContainerFactory<?> getFactory() {
            return this.factory;
        }
    }

    protected static class ResolvedRetryConfiguration {
        protected Integer attempts;
        protected String dltTopicSuffix;
        protected String retryTopicSuffix;
        protected FixedDelayStrategy fixedDelayTopicStrategy;
        protected TopicSuffixingStrategy topicSuffixingStrategy;
        protected SleepingBackOffPolicy<?> nonBlockingBackOff;
        protected boolean autoCreateTopics;
        protected int numPartitions;
        protected short replicationFactor;

        protected ResolvedRetryConfiguration() {
        }

        protected boolean hasRetryTopic() {
            return this.retryTopicSuffix != null;
        }

        protected boolean hasNoRetryTopic() {
            return !this.hasRetryTopic();
        }
    }

    protected static class RetryTopicContainerFactoryDecorator
    implements KafkaListenerContainerFactory<MessageListenerContainer> {
        private final KafkaListenerContainerFactory<?> delegate;
        private final Supplier<CommonErrorHandler> errorHandlerProvider;

        private RetryTopicContainerFactoryDecorator(KafkaListenerContainerFactory<?> delegate, Supplier<CommonErrorHandler> errorHandlerProvider) {
            this.delegate = delegate;
            this.errorHandlerProvider = errorHandlerProvider;
        }

        public MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint) {
            return this.decorate(this.delegate.createListenerContainer(endpoint));
        }

        public MessageListenerContainer createContainer(TopicPartitionOffset ... topicPartitions) {
            return this.decorate(this.delegate.createContainer(topicPartitions));
        }

        public MessageListenerContainer createContainer(String ... topics) {
            return this.decorate(this.delegate.createContainer(topics));
        }

        public MessageListenerContainer createContainer(Pattern topicPattern) {
            return this.decorate(this.delegate.createContainer(topicPattern));
        }

        protected MessageListenerContainer decorate(MessageListenerContainer listenerContainer) {
            if (listenerContainer instanceof AbstractMessageListenerContainer) {
                ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer)listenerContainer;
                container.setCommonErrorHandler(this.errorHandlerProvider.get());
            }
            return listenerContainer;
        }
    }
}

