/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.session.subscription;

import java.util.Map;
import java.util.Objects;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.session.subscription.SubscriptionConsumer;
import org.apache.iotdb.session.subscription.SubscriptionProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SubscriptionEndpointsSyncer
implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SubscriptionEndpointsSyncer.class);
    private final SubscriptionConsumer consumer;

    public SubscriptionEndpointsSyncer(SubscriptionConsumer consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        if (this.consumer.isClosed()) {
            return;
        }
        this.consumer.acquireWriteLock();
        try {
            this.syncInternal();
        }
        finally {
            this.consumer.releaseWriteLock();
        }
    }

    private void syncInternal() {
        Map<Integer, TEndPoint> allEndPoints;
        if (this.consumer.hasNoProviders()) {
            try {
                this.consumer.openProviders();
            }
            catch (IoTDBConnectionException e) {
                LOGGER.warn("something unexpected happened when syncing subscription endpoints...", (Throwable)e);
                return;
            }
        }
        try {
            allEndPoints = this.consumer.fetchAllEndPointsWithRedirection();
        }
        catch (Exception e) {
            LOGGER.warn("Failed to fetch all endpoints, exception: {}, will retry later...", (Object)e.getMessage());
            return;
        }
        for (Map.Entry<Integer, TEndPoint> entry : allEndPoints.entrySet()) {
            SubscriptionProvider provider = this.consumer.getProvider(entry.getKey());
            if (Objects.isNull(provider)) {
                SubscriptionProvider newProvider;
                TEndPoint endPoint = entry.getValue();
                try {
                    newProvider = this.consumer.constructProviderAndHandshake(endPoint);
                }
                catch (Exception e) {
                    LOGGER.warn("Failed to create connection with endpoint {}, exception: {}, will retry later...", (Object)endPoint, (Object)e.getMessage());
                    continue;
                }
                this.consumer.addProvider(entry.getKey(), newProvider);
                continue;
            }
            try {
                provider.getSessionConnection().heartbeat();
                provider.setAvailable();
            }
            catch (Exception e) {
                LOGGER.warn("something unexpected happened when sending heartbeat to subscription provider {}, exception: {}, set subscription provider unavailable", (Object)provider, (Object)e.getMessage());
                provider.setUnavailable();
            }
            if (provider.isAvailable()) continue;
            try {
                this.consumer.closeAndRemoveProvider(entry.getKey());
            }
            catch (IoTDBConnectionException e) {
                LOGGER.warn("Exception occurred when closing and removing subscription provider with data node id {}: {}", (Object)entry.getKey(), (Object)e.getMessage());
            }
        }
        for (SubscriptionProvider provider : this.consumer.getAllProviders()) {
            int dataNodeId = provider.getDataNodeId();
            if (allEndPoints.containsKey(dataNodeId)) continue;
            try {
                this.consumer.closeAndRemoveProvider(dataNodeId);
            }
            catch (IoTDBConnectionException e) {
                LOGGER.warn("Exception occurred when closing and removing subscription provider with data node id {}: {}", (Object)dataNodeId, (Object)e.getMessage());
            }
        }
    }
}

