package org.apache.iceberg.flink.sink.shuffle;

import java.lang.Thread;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ThrowableCatchingRunnable;
import org.apache.flink.util.function.ThrowingRunnable;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@Internal
/* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator.class */
public class DataStatisticsCoordinator implements OperatorCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(DataStatisticsCoordinator.class);
    private final String operatorName;
    private final OperatorCoordinator.Context context;
    private final Schema schema;
    private final SortOrder sortOrder;
    private final int downstreamParallelism;
    private final StatisticsType statisticsType;
    private final ExecutorService coordinatorExecutor;
    private final SubtaskGateways subtaskGateways;
    private final CoordinatorExecutorThreadFactory coordinatorThreadFactory;
    private final TypeSerializer<AggregatedStatistics> aggregatedStatisticsSerializer;
    private transient boolean started;
    private transient AggregatedStatisticsTracker aggregatedStatisticsTracker;
    private transient AggregatedStatistics completedStatistics;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator$CoordinatorExecutorThreadFactory.class */
    public static class CoordinatorExecutorThreadFactory implements ThreadFactory, Thread.UncaughtExceptionHandler {
        private final String coordinatorThreadName;
        private final ClassLoader classLoader;
        private final Thread.UncaughtExceptionHandler errorHandler;

        @Nullable
        private Thread thread;

        CoordinatorExecutorThreadFactory(String str, ClassLoader classLoader) {
            this(str, classLoader, FatalExitExceptionHandler.INSTANCE);
        }

        @VisibleForTesting
        CoordinatorExecutorThreadFactory(String str, ClassLoader classLoader, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
            this.coordinatorThreadName = str;
            this.classLoader = classLoader;
            this.errorHandler = uncaughtExceptionHandler;
        }

        @Override // java.util.concurrent.ThreadFactory
        public synchronized Thread newThread(@NotNull Runnable runnable) {
            this.thread = new Thread(runnable, this.coordinatorThreadName);
            this.thread.setContextClassLoader(this.classLoader);
            this.thread.setUncaughtExceptionHandler(this);
            return this.thread;
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public synchronized void uncaughtException(Thread thread, Throwable th) {
            this.errorHandler.uncaughtException(thread, th);
        }

        boolean isCurrentThreadCoordinatorThread() {
            return Thread.currentThread() == this.thread;
        }
    }

    /* loaded from: input_file:org/apache/iceberg/flink/sink/shuffle/DataStatisticsCoordinator$SubtaskGateways.class */
    private static class SubtaskGateways {
        private final String operatorName;
        private final Map<Integer, OperatorCoordinator.SubtaskGateway>[] gateways;

        private SubtaskGateways(String str, int i) {
            this.operatorName = str;
            this.gateways = new Map[i];
            for (int i2 = 0; i2 < i; i2++) {
                this.gateways[i2] = Maps.newHashMap();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void registerSubtaskGateway(OperatorCoordinator.SubtaskGateway subtaskGateway) {
            int subtask = subtaskGateway.getSubtask();
            int attemptNumber = subtaskGateway.getExecution().getAttemptNumber();
            Preconditions.checkState(!this.gateways[subtask].containsKey(Integer.valueOf(attemptNumber)), "Coordinator of %s already has a subtask gateway for %d (#%d)", new Object[]{this.operatorName, Integer.valueOf(subtask), Integer.valueOf(attemptNumber)});
            DataStatisticsCoordinator.LOG.debug("Coordinator of {} registers gateway for subtask {} attempt {}", new Object[]{this.operatorName, Integer.valueOf(subtask), Integer.valueOf(attemptNumber)});
            this.gateways[subtask].put(Integer.valueOf(attemptNumber), subtaskGateway);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void unregisterSubtaskGateway(int i, int i2) {
            DataStatisticsCoordinator.LOG.debug("Coordinator of {} unregisters gateway for subtask {} attempt {}", new Object[]{this.operatorName, Integer.valueOf(i), Integer.valueOf(i2)});
            this.gateways[i].remove(Integer.valueOf(i2));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public OperatorCoordinator.SubtaskGateway getSubtaskGateway(int i) {
            Preconditions.checkState(!this.gateways[i].isEmpty(), "Coordinator of %s subtask %d is not ready yet to receive events", new Object[]{this.operatorName, Integer.valueOf(i)});
            return (OperatorCoordinator.SubtaskGateway) Iterables.getOnlyElement(this.gateways[i].values());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void reset(int i) {
            this.gateways[i].clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataStatisticsCoordinator(String str, OperatorCoordinator.Context context, Schema schema, SortOrder sortOrder, int i, StatisticsType statisticsType) {
        this.operatorName = str;
        this.context = context;
        this.schema = schema;
        this.sortOrder = sortOrder;
        this.downstreamParallelism = i;
        this.statisticsType = statisticsType;
        this.coordinatorThreadFactory = new CoordinatorExecutorThreadFactory("DataStatisticsCoordinator-" + str, context.getUserCodeClassloader());
        this.coordinatorExecutor = Executors.newSingleThreadExecutor(this.coordinatorThreadFactory);
        this.subtaskGateways = new SubtaskGateways(str, context.currentParallelism());
        this.aggregatedStatisticsSerializer = new AggregatedStatisticsSerializer(new SortKeySerializer(schema, sortOrder));
    }

    public void start() throws Exception {
        LOG.info("Starting data statistics coordinator: {}.", this.operatorName);
        this.started = true;
        this.aggregatedStatisticsTracker = new AggregatedStatisticsTracker(this.operatorName, this.context.currentParallelism(), this.schema, this.sortOrder, this.downstreamParallelism, this.statisticsType, 100000, this.completedStatistics);
    }

    public void close() throws Exception {
        this.coordinatorExecutor.shutdown();
        this.aggregatedStatisticsTracker = null;
        this.started = false;
        LOG.info("Closed data statistics coordinator: {}.", this.operatorName);
    }

    @org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting
    void callInCoordinatorThread(Callable<Void> callable, String str) {
        ensureStarted();
        if (this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
            try {
                callable.call();
                return;
            } catch (Throwable th) {
                LOG.error("Uncaught Exception in data statistics coordinator: {} executor", this.operatorName, th);
                throw new FlinkRuntimeException(str, th);
            }
        }
        try {
            this.coordinatorExecutor.submit(() -> {
                try {
                    return (Void) callable.call();
                } catch (Throwable th2) {
                    LOG.error("Uncaught Exception in data statistics coordinator: {} executor", this.operatorName, th2);
                    ExceptionUtils.rethrowException(th2);
                    return null;
                }
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            throw new FlinkRuntimeException(str, e);
        }
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.coordinatorExecutor.execute(new ThrowableCatchingRunnable(th -> {
            this.coordinatorThreadFactory.uncaughtException(Thread.currentThread(), th);
        }, runnable));
    }

    private void runInCoordinatorThread(ThrowingRunnable<Throwable> throwingRunnable, String str) {
        ensureStarted();
        runInCoordinatorThread(() -> {
            try {
                throwingRunnable.run();
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                LOG.error("Uncaught exception in the data statistics coordinator: {} while {}. Triggering job failover", new Object[]{this.operatorName, str, th});
                this.context.failJob(th);
            }
        });
    }

    private void ensureStarted() {
        Preconditions.checkState(this.started, "The coordinator of %s has not started yet.", new Object[]{this.operatorName});
    }

    private void handleDataStatisticRequest(int i, StatisticsEvent statisticsEvent) {
        AggregatedStatistics updateAndCheckCompletion = this.aggregatedStatisticsTracker.updateAndCheckCompletion(i, statisticsEvent);
        if (updateAndCheckCompletion != null) {
            this.completedStatistics = updateAndCheckCompletion;
            sendAggregatedStatisticsToSubtasks(this.completedStatistics.checkpointId(), this.completedStatistics);
        }
    }

    private void sendAggregatedStatisticsToSubtasks(long j, AggregatedStatistics aggregatedStatistics) {
        callInCoordinatorThread(() -> {
            StatisticsEvent createAggregatedStatisticsEvent = StatisticsEvent.createAggregatedStatisticsEvent(j, aggregatedStatistics, this.aggregatedStatisticsSerializer);
            for (int i = 0; i < this.context.currentParallelism(); i++) {
                this.subtaskGateways.getSubtaskGateway(i).sendEvent(createAggregatedStatisticsEvent);
            }
            return null;
        }, String.format("Failed to send operator %s coordinator global data statistics for checkpoint %d", this.operatorName, Long.valueOf(j)));
    }

    public void handleEventFromOperator(int i, int i2, OperatorEvent operatorEvent) {
        runInCoordinatorThread(() -> {
            LOG.debug("Handling event from subtask {} (#{}) of {}: {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), this.operatorName, operatorEvent});
            Preconditions.checkArgument(operatorEvent instanceof StatisticsEvent);
            handleDataStatisticRequest(i, (StatisticsEvent) operatorEvent);
        }, String.format("handling operator event %s from subtask %d (#%d)", operatorEvent.getClass(), Integer.valueOf(i), Integer.valueOf(i2)));
    }

    public void checkpointCoordinator(long j, CompletableFuture<byte[]> completableFuture) {
        runInCoordinatorThread(() -> {
            LOG.debug("Snapshotting data statistics coordinator {} for checkpoint {}", this.operatorName, Long.valueOf(j));
            completableFuture.complete(StatisticsUtil.serializeAggregatedStatistics(this.completedStatistics, this.aggregatedStatisticsSerializer));
        }, String.format("taking checkpoint %d", Long.valueOf(j)));
    }

    public void notifyCheckpointComplete(long j) {
    }

    public void resetToCheckpoint(long j, byte[] bArr) {
        Preconditions.checkState(!this.started, "The coordinator %s can only be reset if it was not yet started", new Object[]{this.operatorName});
        if (bArr == null) {
            LOG.info("Data statistic coordinator {} has nothing to restore from checkpoint {}", this.operatorName, Long.valueOf(j));
        } else {
            LOG.info("Restoring data statistic coordinator {} from checkpoint {}", this.operatorName, Long.valueOf(j));
            this.completedStatistics = StatisticsUtil.deserializeAggregatedStatistics(bArr, this.aggregatedStatisticsSerializer);
        }
    }

    public void subtaskReset(int i, long j) {
        runInCoordinatorThread(() -> {
            LOG.info("Operator {} subtask {} is reset to checkpoint {}", new Object[]{this.operatorName, Integer.valueOf(i), Long.valueOf(j)});
            Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
            this.subtaskGateways.reset(i);
        }, String.format("handling subtask %d recovery to checkpoint %d", Integer.valueOf(i), Long.valueOf(j)));
    }

    public void executionAttemptFailed(int i, int i2, @org.jetbrains.annotations.Nullable Throwable th) {
        runInCoordinatorThread(() -> {
            LOG.info("Unregistering gateway after failure for subtask {} (#{}) of data statistic {}", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), this.operatorName});
            Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
            this.subtaskGateways.unregisterSubtaskGateway(i, i2);
        }, String.format("handling subtask %d (#%d) failure", Integer.valueOf(i), Integer.valueOf(i2)));
    }

    public void executionAttemptReady(int i, int i2, OperatorCoordinator.SubtaskGateway subtaskGateway) {
        Preconditions.checkArgument(i == subtaskGateway.getSubtask());
        Preconditions.checkArgument(i2 == subtaskGateway.getExecution().getAttemptNumber());
        runInCoordinatorThread(() -> {
            Preconditions.checkState(this.coordinatorThreadFactory.isCurrentThreadCoordinatorThread());
            this.subtaskGateways.registerSubtaskGateway(subtaskGateway);
        }, String.format("making event gateway to subtask %d (#%d) available", Integer.valueOf(i), Integer.valueOf(i2)));
    }

    @org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting
    AggregatedStatistics completedStatistics() {
        return this.completedStatistics;
    }
}
