/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.session.subscription;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.session.subscription.ConsumerHeartbeatWorker;
import org.apache.iotdb.session.subscription.SubscriptionEndpointsSyncer;
import org.apache.iotdb.session.subscription.SubscriptionProvider;
import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.SubscriptionPushConsumer;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SubscriptionConsumer
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionConsumer.class);
    private static final IoTDBConnectionException NO_PROVIDERS_EXCEPTION = new IoTDBConnectionException("Cluster has no available subscription providers to connect");
    private final List<TEndPoint> initialEndpoints;
    private final String username;
    private final String password;
    private String consumerId;
    private String consumerGroupId;
    private final long heartbeatIntervalMs;
    private final long endpointsSyncIntervalMs;
    private final SortedMap<Integer, SubscriptionProvider> subscriptionProviders = new ConcurrentSkipListMap<Integer, SubscriptionProvider>();
    private final ReentrantReadWriteLock subscriptionProvidersLock = new ReentrantReadWriteLock(true);
    private ScheduledExecutorService heartbeatWorkerExecutor;
    private ScheduledExecutorService endpointsSyncerExecutor;
    private final AtomicBoolean isClosed = new AtomicBoolean(true);

    public String getConsumerId() {
        return this.consumerId;
    }

    public String getConsumerGroupId() {
        return this.consumerGroupId;
    }

    protected SubscriptionConsumer(Builder builder) {
        this.initialEndpoints = new ArrayList<TEndPoint>();
        if (Objects.nonNull(builder.host)) {
            this.initialEndpoints.add(new TEndPoint(builder.host, builder.port));
        } else {
            this.initialEndpoints.addAll(SessionUtils.parseSeedNodeUrls(builder.nodeUrls));
        }
        this.username = builder.username;
        this.password = builder.password;
        this.consumerId = builder.consumerId;
        this.consumerGroupId = builder.consumerGroupId;
        this.heartbeatIntervalMs = builder.heartbeatIntervalMs;
        this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs;
    }

    protected SubscriptionConsumer(Builder builder, Properties properties) {
        this(builder.host((String)properties.getOrDefault((Object)"host", "localhost")).port((Integer)properties.getOrDefault((Object)"port", (Object)6667)).nodeUrls((List)properties.get("node-urls")).username((String)properties.getOrDefault((Object)"username", "root")).password((String)properties.getOrDefault((Object)"password", "root")).consumerId((String)properties.get("consumer-id")).consumerGroupId((String)properties.get("group-id")).heartbeatIntervalMs((Long)properties.getOrDefault((Object)"heartbeat-interval-ms", (Object)5000L)).endpointsSyncIntervalMs((Long)properties.getOrDefault((Object)"endpoints-sync-interval-ms", (Object)30000L)));
    }

    public synchronized void open() throws TException, IoTDBConnectionException, IOException, StatementExecutionException {
        if (!this.isClosed.get()) {
            return;
        }
        this.acquireWriteLock();
        try {
            this.openProviders();
        }
        finally {
            this.releaseWriteLock();
        }
        this.launchHeartbeatWorker();
        this.launchEndpointsSyncer();
        this.isClosed.set(false);
    }

    @Override
    public synchronized void close() throws IoTDBConnectionException {
        if (this.isClosed.get()) {
            return;
        }
        try {
            this.shutdownEndpointsSyncer();
            this.shutdownHeartbeatWorker();
            this.acquireWriteLock();
            try {
                this.closeProviders();
            }
            finally {
                this.releaseWriteLock();
            }
        }
        finally {
            this.isClosed.set(true);
        }
    }

    boolean isClosed() {
        return this.isClosed.get();
    }

    void acquireReadLock() {
        this.subscriptionProvidersLock.readLock().lock();
    }

    void releaseReadLock() {
        this.subscriptionProvidersLock.readLock().unlock();
    }

    void acquireWriteLock() {
        this.subscriptionProvidersLock.writeLock().lock();
    }

    void releaseWriteLock() {
        this.subscriptionProvidersLock.writeLock().unlock();
    }

    public void subscribe(String topicName) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        this.subscribe(Collections.singleton(topicName));
    }

    public void subscribe(String ... topicNames) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        this.subscribe(new HashSet<String>(Arrays.asList(topicNames)));
    }

    public void subscribe(Set<String> topicNames) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        this.acquireReadLock();
        try {
            this.subscribeWithRedirection(topicNames);
        }
        finally {
            this.releaseReadLock();
        }
    }

    public void unsubscribe(String topicName) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        this.unsubscribe(Collections.singleton(topicName));
    }

    public void unsubscribe(String ... topicNames) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        this.unsubscribe(new HashSet<String>(Arrays.asList(topicNames)));
    }

    public void unsubscribe(Set<String> topicNames) throws TException, IOException, StatementExecutionException, IoTDBConnectionException {
        this.acquireReadLock();
        try {
            this.unsubscribeWithRedirection(topicNames);
        }
        finally {
            this.releaseReadLock();
        }
    }

    private void launchHeartbeatWorker() {
        this.heartbeatWorkerExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "ConsumerHeartbeatWorker", 0L);
            if (!t.isDaemon()) {
                t.setDaemon(true);
            }
            if (t.getPriority() != 5) {
                t.setPriority(5);
            }
            return t;
        });
        this.heartbeatWorkerExecutor.scheduleAtFixedRate(new ConsumerHeartbeatWorker(this), 0L, this.heartbeatIntervalMs, TimeUnit.MILLISECONDS);
    }

    private void shutdownHeartbeatWorker() {
        this.heartbeatWorkerExecutor.shutdown();
        this.heartbeatWorkerExecutor = null;
    }

    private void launchEndpointsSyncer() {
        this.endpointsSyncerExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, "SubscriptionEndpointsSyncer", 0L);
            if (!t.isDaemon()) {
                t.setDaemon(true);
            }
            if (t.getPriority() != 5) {
                t.setPriority(5);
            }
            return t;
        });
        this.endpointsSyncerExecutor.scheduleAtFixedRate(new SubscriptionEndpointsSyncer(this), 0L, this.endpointsSyncIntervalMs, TimeUnit.MILLISECONDS);
    }

    private void shutdownEndpointsSyncer() {
        this.endpointsSyncerExecutor.shutdown();
        this.endpointsSyncerExecutor = null;
    }

    SubscriptionProvider constructProviderAndHandshake(TEndPoint endPoint) throws TException, IoTDBConnectionException, IOException, StatementExecutionException {
        SubscriptionProvider provider = new SubscriptionProvider(endPoint, this.username, this.password, this.consumerId, this.consumerGroupId);
        provider.handshake();
        if (Objects.isNull(this.consumerId)) {
            this.consumerId = provider.getConsumerId();
        }
        if (Objects.isNull(this.consumerGroupId)) {
            this.consumerGroupId = provider.getConsumerGroupId();
        }
        return provider;
    }

    void openProviders() throws IoTDBConnectionException {
        this.closeProviders();
        for (TEndPoint endPoint : this.initialEndpoints) {
            Map<Integer, TEndPoint> allEndPoints;
            SubscriptionProvider defaultProvider;
            try {
                defaultProvider = this.constructProviderAndHandshake(endPoint);
            }
            catch (Exception e) {
                LOGGER.warn("Failed to create connection with {}, exception: {}", (Object)endPoint, (Object)e.getMessage());
                continue;
            }
            int defaultDataNodeId = defaultProvider.getDataNodeId();
            this.addProvider(defaultDataNodeId, defaultProvider);
            try {
                allEndPoints = defaultProvider.getSessionConnection().fetchAllEndPoints();
            }
            catch (Exception e) {
                LOGGER.warn("Failed to fetch all endpoints from {}, exception: {}, will retry later...", (Object)endPoint, (Object)e.getMessage());
                break;
            }
            for (Map.Entry<Integer, TEndPoint> entry : allEndPoints.entrySet()) {
                SubscriptionProvider provider;
                if (defaultDataNodeId == entry.getKey()) continue;
                try {
                    provider = this.constructProviderAndHandshake(entry.getValue());
                }
                catch (Exception e) {
                    LOGGER.warn("Failed to create connection with {}, exception: {}, will retry later...", (Object)entry.getValue(), (Object)e.getMessage());
                    continue;
                }
                this.addProvider(entry.getKey(), provider);
            }
        }
        if (this.hasNoProviders()) {
            throw NO_PROVIDERS_EXCEPTION;
        }
    }

    private void closeProviders() throws IoTDBConnectionException {
        for (SubscriptionProvider provider : this.getAllProviders()) {
            provider.close();
        }
        this.subscriptionProviders.clear();
    }

    void addProvider(int dataNodeId, SubscriptionProvider provider) {
        LOGGER.info("add new subscription provider {}", (Object)provider);
        this.subscriptionProviders.put(dataNodeId, provider);
    }

    void closeAndRemoveProvider(int dataNodeId) throws IoTDBConnectionException {
        if (!this.containsProvider(dataNodeId)) {
            return;
        }
        SubscriptionProvider provider = (SubscriptionProvider)this.subscriptionProviders.get(dataNodeId);
        try {
            provider.close();
        }
        finally {
            LOGGER.info("close and remove stale subscription provider {}", (Object)provider);
            this.subscriptionProviders.remove(dataNodeId);
        }
    }

    boolean hasNoProviders() {
        return this.subscriptionProviders.isEmpty();
    }

    boolean containsProvider(int dataNodeId) {
        return this.subscriptionProviders.containsKey(dataNodeId);
    }

    List<SubscriptionProvider> getAllAvailableProviders() {
        return this.subscriptionProviders.values().stream().filter(SubscriptionProvider::isAvailable).collect(Collectors.toList());
    }

    List<SubscriptionProvider> getAllProviders() {
        return new ArrayList<SubscriptionProvider>(this.subscriptionProviders.values());
    }

    SubscriptionProvider getProvider(int dataNodeId) {
        return this.containsProvider(dataNodeId) ? (SubscriptionProvider)this.subscriptionProviders.get(dataNodeId) : null;
    }

    public void subscribeWithRedirection(Set<String> topicNames) throws IoTDBConnectionException {
        for (SubscriptionProvider provider : this.getAllAvailableProviders()) {
            try {
                provider.getSessionConnection().subscribe(topicNames);
                return;
            }
            catch (Exception e) {
                LOGGER.warn("Failed to subscribe topics {} from subscription provider {}, exception: {}, try next subscription provider...", new Object[]{topicNames, provider, e.getMessage()});
            }
        }
        throw NO_PROVIDERS_EXCEPTION;
    }

    public void unsubscribeWithRedirection(Set<String> topicNames) throws IoTDBConnectionException {
        for (SubscriptionProvider provider : this.getAllAvailableProviders()) {
            try {
                provider.getSessionConnection().unsubscribe(topicNames);
                return;
            }
            catch (Exception e) {
                LOGGER.warn("Failed to unsubscribe topics {} from subscription provider {}, exception: {}, try next subscription provider...", new Object[]{topicNames, provider, e.getMessage()});
            }
        }
        throw NO_PROVIDERS_EXCEPTION;
    }

    public Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection() throws IoTDBConnectionException {
        Map<Integer, TEndPoint> endPoints = null;
        for (SubscriptionProvider provider : this.getAllAvailableProviders()) {
            try {
                endPoints = provider.getSessionConnection().fetchAllEndPoints();
                break;
            }
            catch (Exception e) {
                LOGGER.warn("Failed to fetch all endpoints from subscription provider {}, exception: {}, try next subscription provider...", (Object)provider, (Object)e.getMessage());
            }
        }
        if (Objects.isNull(endPoints)) {
            throw NO_PROVIDERS_EXCEPTION;
        }
        return endPoints;
    }

    public static abstract class Builder {
        protected String host = "localhost";
        protected int port = 6667;
        protected List<String> nodeUrls = null;
        protected String username = "root";
        protected String password = "root";
        protected String consumerId;
        protected String consumerGroupId;
        protected long heartbeatIntervalMs = 5000L;
        protected long endpointsSyncIntervalMs = 30000L;

        public Builder host(String host) {
            this.host = host;
            return this;
        }

        public Builder port(int port) {
            this.port = port;
            return this;
        }

        public Builder nodeUrls(List<String> nodeUrls) {
            this.nodeUrls = nodeUrls;
            return this;
        }

        public Builder username(String username) {
            this.username = username;
            return this;
        }

        public Builder password(String password) {
            this.password = password;
            return this;
        }

        public Builder consumerId(String consumerId) {
            this.consumerId = consumerId;
            return this;
        }

        public Builder consumerGroupId(String consumerGroupId) {
            this.consumerGroupId = consumerGroupId;
            return this;
        }

        public Builder heartbeatIntervalMs(long heartbeatIntervalMs) {
            this.heartbeatIntervalMs = Math.max(heartbeatIntervalMs, 1000L);
            return this;
        }

        public Builder endpointsSyncIntervalMs(long endpointsSyncIntervalMs) {
            this.endpointsSyncIntervalMs = Math.max(endpointsSyncIntervalMs, 5000L);
            return this;
        }

        public abstract SubscriptionPullConsumer buildPullConsumer();

        public abstract SubscriptionPushConsumer buildPushConsumer();
    }
}

