/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.operators.schema.common;

import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
import org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetEvolvedSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetEvolvedSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaRequest;
import org.apache.flink.cdc.runtime.operators.schema.common.event.GetOriginalSchemaResponse;
import org.apache.flink.cdc.runtime.operators.schema.common.event.SinkWriterRegisterEvent;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class SchemaRegistry
implements OperatorCoordinator,
CoordinationRequestHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistry.class);
    protected final OperatorCoordinator.Context context;
    protected final String operatorName;
    protected final ExecutorService coordinatorExecutor;
    protected final MetadataApplier metadataApplier;
    protected final Duration rpcTimeout;
    protected final List<RouteRule> routingRules;
    protected final SchemaChangeBehavior behavior;
    protected transient int currentParallelism;
    protected transient Set<Integer> activeSinkWriters;
    protected transient Map<Integer, Throwable> failedReasons;
    protected transient SchemaManager schemaManager;
    protected transient TableIdRouter router;

    protected SchemaRegistry(OperatorCoordinator.Context context, String operatorName, ExecutorService coordinatorExecutor, MetadataApplier metadataApplier, List<RouteRule> routingRules, SchemaChangeBehavior schemaChangeBehavior, Duration rpcTimeout) {
        this.context = context;
        this.operatorName = operatorName;
        this.coordinatorExecutor = coordinatorExecutor;
        this.metadataApplier = metadataApplier;
        this.routingRules = routingRules;
        this.rpcTimeout = rpcTimeout;
        this.behavior = schemaChangeBehavior;
    }

    public void start() throws Exception {
        LOG.info("Starting SchemaRegistry - {}.", (Object)this.operatorName);
        this.currentParallelism = this.context.currentParallelism();
        this.activeSinkWriters = ConcurrentHashMap.newKeySet();
        this.failedReasons = new ConcurrentHashMap<Integer, Throwable>();
        this.schemaManager = new SchemaManager();
        this.router = new TableIdRouter(this.routingRules);
    }

    public void close() throws Exception {
        LOG.info("Closing SchemaRegistry - {}.", (Object)this.operatorName);
        this.coordinatorExecutor.shutdown();
        try {
            this.metadataApplier.close();
        }
        catch (Exception e) {
            LOG.error("Failed to close metadata applier.", (Throwable)e);
            throw new IOException("Failed to close metadata applier.", e);
        }
    }

    protected abstract void snapshot(CompletableFuture<byte[]> var1) throws Exception;

    protected abstract void restore(byte[] var1) throws Exception;

    protected void handleSinkWriterRegisterEvent(SinkWriterRegisterEvent event) throws Exception {
        LOG.info("Sink subtask {} already registered.", (Object)event.getSubtask());
        this.activeSinkWriters.add(event.getSubtask());
    }

    protected abstract void handleFlushSuccessEvent(FlushSuccessEvent var1) throws Exception;

    protected void handleGetEvolvedSchemaRequest(GetEvolvedSchemaRequest request, CompletableFuture<CoordinationResponse> responseFuture) throws Exception {
        LOG.info("Handling evolved schema request: {}", (Object)request);
        int schemaVersion = request.getSchemaVersion();
        TableId tableId = request.getTableId();
        if (schemaVersion == -1) {
            responseFuture.complete(CoordinationResponseUtils.wrap(new GetEvolvedSchemaResponse(this.schemaManager.getLatestEvolvedSchema(tableId).orElse(null))));
        } else {
            try {
                responseFuture.complete(CoordinationResponseUtils.wrap(new GetEvolvedSchemaResponse(this.schemaManager.getEvolvedSchema(tableId, schemaVersion))));
            }
            catch (IllegalArgumentException iae) {
                LOG.warn("Some client is requesting an non-existed evolved schema for table {} with version {}", (Object)tableId, (Object)schemaVersion);
                responseFuture.complete(CoordinationResponseUtils.wrap(new GetEvolvedSchemaResponse(null)));
            }
        }
    }

    protected void handleGetOriginalSchemaRequest(GetOriginalSchemaRequest request, CompletableFuture<CoordinationResponse> responseFuture) throws Exception {
        LOG.info("Handling original schema request: {}", (Object)request);
        int schemaVersion = request.getSchemaVersion();
        TableId tableId = request.getTableId();
        if (schemaVersion == -1) {
            responseFuture.complete(CoordinationResponseUtils.wrap(new GetOriginalSchemaResponse(this.schemaManager.getLatestOriginalSchema(tableId).orElse(null))));
        } else {
            try {
                responseFuture.complete(CoordinationResponseUtils.wrap(new GetOriginalSchemaResponse(this.schemaManager.getOriginalSchema(tableId, schemaVersion))));
            }
            catch (IllegalArgumentException iae) {
                LOG.warn("Some client is requesting an non-existed original schema for table {} with version {}", (Object)tableId, (Object)schemaVersion);
                responseFuture.complete(CoordinationResponseUtils.wrap(new GetOriginalSchemaResponse(null)));
            }
        }
    }

    protected abstract void handleCustomCoordinationRequest(CoordinationRequest var1, CompletableFuture<CoordinationResponse> var2) throws Exception;

    protected void handleUnrecoverableError(String taskDescription, Throwable t) {
        LOG.error("Uncaught exception in the Schema Registry ({}) event loop for {}.", new Object[]{this.operatorName, taskDescription, t});
        LOG.error("\tCurrent schema manager state: {}", (Object)this.schemaManager);
    }

    public final CompletableFuture<CoordinationResponse> handleCoordinationRequest(CoordinationRequest request) {
        CompletableFuture<CoordinationResponse> future = new CompletableFuture<CoordinationResponse>();
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            if (request instanceof GetEvolvedSchemaRequest) {
                this.handleGetEvolvedSchemaRequest((GetEvolvedSchemaRequest)request, future);
            } else if (request instanceof GetOriginalSchemaRequest) {
                this.handleGetOriginalSchemaRequest((GetOriginalSchemaRequest)request, future);
            } else {
                this.handleCustomCoordinationRequest(request, future);
            }
        }), "Handling request - %s", request);
        return future;
    }

    public final void handleEventFromOperator(int subTaskId, int attemptNumber, OperatorEvent event) {
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            if (event instanceof FlushSuccessEvent) {
                this.handleFlushSuccessEvent((FlushSuccessEvent)event);
            } else if (event instanceof SinkWriterRegisterEvent) {
                this.handleSinkWriterRegisterEvent((SinkWriterRegisterEvent)event);
            } else {
                throw new FlinkRuntimeException("Unrecognized Operator Event: " + event);
            }
        }), "Handling event - %s (from subTask %d)", event, subTaskId);
    }

    public final void subtaskReset(int subTaskId, long checkpointId) {
        Throwable rootCause = this.failedReasons.get(subTaskId);
        LOG.error("Subtask {} reset at checkpoint {}.", new Object[]{subTaskId, checkpointId, rootCause});
    }

    public final void executionAttemptFailed(int subTaskId, int attemptNumber, @Nullable Throwable reason) {
        if (reason != null) {
            this.failedReasons.put(subTaskId, reason);
        }
    }

    public final void executionAttemptReady(int subTaskId, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) {
    }

    public final void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> completableFuture) throws Exception {
        LOG.info("Going to start checkpoint No.{}", (Object)checkpointId);
        this.runInEventLoop((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> this.snapshot(completableFuture)), "Taking checkpoint - %d", checkpointId);
    }

    public final void notifyCheckpointComplete(long checkpointId) {
        LOG.info("Successfully completed checkpoint No.{}", (Object)checkpointId);
    }

    public final void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) throws Exception {
        LOG.info("Going to restore from checkpoint No.{}", (Object)checkpointId);
        if (checkpointData == null) {
            return;
        }
        this.restore(checkpointData);
    }

    protected void runInEventLoop(ThrowingRunnable<Throwable> action, String actionName, Object ... actionNameFormatParameters) {
        this.coordinatorExecutor.execute(() -> {
            try {
                action.run();
            }
            catch (Throwable t) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM((Throwable)t);
                this.handleUnrecoverableError(String.format(actionName, actionNameFormatParameters), t);
                this.context.failJob(t);
            }
        });
    }

    protected void loopUntil(BooleanSupplier conditionChecker, Runnable message, Duration timeout, Duration interval) throws TimeoutException {
        this.loopWhen(() -> !conditionChecker.getAsBoolean(), message, timeout, interval);
    }

    protected void loopWhen(BooleanSupplier conditionChecker, Runnable message, Duration timeout, Duration interval) throws TimeoutException {
        long deadline = System.currentTimeMillis() + timeout.toMillis();
        long intervalMs = interval.toMillis();
        while (conditionChecker.getAsBoolean()) {
            message.run();
            if (System.currentTimeMillis() > deadline) {
                throw new TimeoutException("Loop checking time limit has exceeded.");
            }
            try {
                Thread.sleep(intervalMs);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    protected <T extends Throwable> void failJob(String taskDescription, T t) {
        ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
        LOG.error("An exception was triggered from {}. Job will fail now.", (Object)taskDescription, t);
        this.handleUnrecoverableError(taskDescription, t);
        this.context.failJob(t);
    }

    @VisibleForTesting
    public void emplaceOriginalSchema(TableId tableId, Schema schema) {
        this.schemaManager.registerNewOriginalSchema(tableId, schema);
    }

    @VisibleForTesting
    public void emplaceEvolvedSchema(TableId tableId, Schema schema) {
        this.schemaManager.registerNewEvolvedSchema(tableId, schema);
    }
}

