/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.binder.kafka.streams;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsRegistry;
import org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsVersionAgnosticTopologyInfoFacade;
import org.springframework.cloud.stream.binder.kafka.streams.StoreQueryParametersCustomizer;
import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties;
import org.springframework.core.retry.RetryException;
import org.springframework.core.retry.RetryPolicy;
import org.springframework.core.retry.RetryTemplate;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

public class InteractiveQueryService {
    private static final Log LOG = LogFactory.getLog(InteractiveQueryService.class);
    private final KafkaStreamsRegistry kafkaStreamsRegistry;
    private final KafkaStreamsBinderConfigurationProperties binderConfigurationProperties;
    private final KafkaStreamsVersionAgnosticTopologyInfoFacade topologyInfoFacade;
    private StoreQueryParametersCustomizer<?> storeQueryParametersCustomizer;

    public InteractiveQueryService(KafkaStreamsRegistry kafkaStreamsRegistry, KafkaStreamsBinderConfigurationProperties binderConfigurationProperties) {
        this.kafkaStreamsRegistry = kafkaStreamsRegistry;
        this.binderConfigurationProperties = binderConfigurationProperties;
        this.topologyInfoFacade = new KafkaStreamsVersionAgnosticTopologyInfoFacade();
    }

    public <T> T getQueryableStore(String storeName, QueryableStoreType<T> storeType) {
        KafkaStreams contextSpecificKafkaStreams = this.getThreadContextSpecificKafkaStreams();
        StoreQueryParameters<?> storeQueryParams = StoreQueryParameters.fromNameAndType((String)storeName, storeType);
        if (this.storeQueryParametersCustomizer != null) {
            storeQueryParams = this.storeQueryParametersCustomizer.customize(storeQueryParams);
        }
        AtomicReference<StoreQueryParameters> storeQueryParametersAtomicReference = new AtomicReference<StoreQueryParameters>(storeQueryParams);
        try {
            return (T)this.getRetryTemplate().execute(() -> {
                Object store = null;
                Throwable throwable = null;
                if (contextSpecificKafkaStreams != null) {
                    try {
                        store = contextSpecificKafkaStreams.store((StoreQueryParameters)storeQueryParametersAtomicReference.get());
                    }
                    catch (InvalidStateStoreException e2) {
                        throwable = e2;
                    }
                }
                if (store != null) {
                    return store;
                }
                if (contextSpecificKafkaStreams != null) {
                    LOG.warn((Object)("Store (" + storeName + ") could not be found in Streams context, falling back to all known Streams instances"));
                }
                Map<Object, Object> candidateStores = new HashMap<KafkaStreams, Object>();
                for (KafkaStreams kafkaStreamApp : this.kafkaStreamsRegistry.getKafkaStreams()) {
                    try {
                        candidateStores.put(kafkaStreamApp, kafkaStreamApp.store((StoreQueryParameters)storeQueryParametersAtomicReference.get()));
                    }
                    catch (Exception ex) {
                        throwable = ex;
                    }
                }
                if (candidateStores.size() == 1) {
                    return candidateStores.values().stream().findFirst().get();
                }
                if (candidateStores.size() > 1) {
                    if ((candidateStores = candidateStores.entrySet().stream().filter(e -> this.topologyInfoFacade.streamsAppActuallyHasStore((KafkaStreams)e.getKey(), storeName)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).size() == 1) {
                        return candidateStores.values().stream().findFirst().get();
                    }
                    throwable = candidateStores.isEmpty() ? new UnknownStateStoreException("Store (" + storeName + ") not available to Streams instance") : new InvalidStateStoreException("Store (" + storeName + ") available to more than one Streams instance");
                }
                throw new IllegalStateException("Error retrieving state store: " + storeName, throwable);
            });
        }
        catch (RetryException ex) {
            ReflectionUtils.rethrowRuntimeException((Throwable)ex.getCause());
            return null;
        }
    }

    private KafkaStreams getThreadContextSpecificKafkaStreams() {
        return this.kafkaStreamsRegistry.getKafkaStreams().stream().filter(this::filterByThreadName).findAny().orElse(null);
    }

    private boolean filterByThreadName(KafkaStreams streams) {
        String applicationId = Objects.requireNonNull(this.kafkaStreamsRegistry.streamBuilderFactoryBean(streams).getStreamsConfiguration()).getProperty("application.id");
        return Thread.currentThread().getName().contains(applicationId);
    }

    public HostInfo getCurrentHostInfo() {
        Map configuration = this.binderConfigurationProperties.getConfiguration();
        if (configuration.containsKey("application.server")) {
            String applicationServer = (String)configuration.get("application.server");
            String[] splits = StringUtils.split((String)applicationServer, (String)":");
            return new HostInfo(Objects.requireNonNull(splits)[0], Integer.parseInt(splits[1]));
        }
        return null;
    }

    public <K> HostInfo getHostInfo(String store, K key, Serializer<K> serializer) {
        RetryTemplate retryTemplate = this.getRetryTemplate();
        try {
            return (HostInfo)retryTemplate.execute(() -> {
                Exception throwable = null;
                try {
                    KeyQueryMetadata keyQueryMetadata = this.kafkaStreamsRegistry.getKafkaStreams().stream().map(k -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer))).filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
                    if (keyQueryMetadata != null) {
                        return keyQueryMetadata.activeHost();
                    }
                }
                catch (Exception e) {
                    throwable = e;
                }
                throw new IllegalStateException("Error when retrieving state store.", throwable != null ? throwable : new Throwable("Kafka Streams is not ready."));
            });
        }
        catch (RetryException ex) {
            ReflectionUtils.rethrowRuntimeException((Throwable)ex.getCause());
            return null;
        }
    }

    private RetryTemplate getRetryTemplate() {
        KafkaStreamsBinderConfigurationProperties.StateStoreRetry stateStoreRetry = this.binderConfigurationProperties.getStateStoreRetry();
        RetryPolicy retryPolicy = RetryPolicy.builder().maxRetries((long)stateStoreRetry.getMaxAttempts()).delay(Duration.ofMillis(stateStoreRetry.getBackoffPeriod())).build();
        return new RetryTemplate(retryPolicy);
    }

    public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer) {
        return this.kafkaStreamsRegistry.getKafkaStreams().stream().map(k -> Optional.ofNullable(k.queryMetadataForKey(store, key, serializer))).filter(Optional::isPresent).map(Optional::get).findFirst().orElse(null);
    }

    public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer) {
        AtomicReference kafkaStreamsAtomicReference = new AtomicReference();
        this.kafkaStreamsRegistry.getKafkaStreams().forEach(k -> {
            KeyQueryMetadata keyQueryMetadata = k.queryMetadataForKey(store, key, serializer);
            if (keyQueryMetadata != null) {
                kafkaStreamsAtomicReference.set(k);
            }
        });
        return (KafkaStreams)kafkaStreamsAtomicReference.get();
    }

    public List<HostInfo> getAllHostsInfo(String store) {
        return this.kafkaStreamsRegistry.getKafkaStreams().stream().flatMap(k -> k.streamsMetadataForStore(store).stream()).filter(Objects::nonNull).map(StreamsMetadata::hostInfo).collect(Collectors.toList());
    }

    public void setStoreQueryParametersCustomizer(StoreQueryParametersCustomizer<?> storeQueryParametersCustomizer) {
        this.storeQueryParametersCustomizer = storeQueryParametersCustomizer;
    }
}

