/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators;

import java.util.Optional;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.AsyncStateException;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class AbstractAsyncStateStreamOperator<OUT>
extends AbstractStreamOperator<OUT>
implements AsyncStateProcessingOperator {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncStateStreamOperator.class);
    private AsyncExecutionController asyncExecutionController;
    private RecordContext currentProcessingContext;
    private Environment environment;

    @Override
    public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
        super.initializeState(streamTaskStateManager);
        this.getRuntimeContext().setKeyedStateStoreV2(this.stateHandler.getKeyedStateStoreV2().orElse(null));
        StreamTask containingTask = (StreamTask)Preconditions.checkNotNull(this.getContainingTask());
        this.environment = containingTask.getEnvironment();
        MailboxExecutor mailboxExecutor = this.environment.getMainMailboxExecutor();
        int maxParallelism = this.environment.getTaskInfo().getMaxNumberOfParallelSubtasks();
        int inFlightRecordsLimit = this.environment.getExecutionConfig().getAsyncInflightRecordsLimit();
        int asyncBufferSize = this.environment.getExecutionConfig().getAsyncStateBufferSize();
        long asyncBufferTimeout = this.environment.getExecutionConfig().getAsyncStateBufferTimeout();
        AsyncKeyedStateBackend asyncKeyedStateBackend = this.stateHandler.getAsyncKeyedStateBackend();
        if (asyncKeyedStateBackend != null) {
            this.asyncExecutionController = new AsyncExecutionController(mailboxExecutor, this::handleAsyncStateException, asyncKeyedStateBackend.createStateExecutor(), maxParallelism, asyncBufferSize, asyncBufferTimeout, inFlightRecordsLimit, asyncKeyedStateBackend);
            asyncKeyedStateBackend.setup(this.asyncExecutionController);
            if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) {
                LOG.warn("A normal KeyedStateBackend({}) is used when enabling the async state processing. Parallel asynchronous processing does not work. All state access will be processed synchronously.", this.stateHandler.getKeyedStateBackend());
            }
        } else if (this.stateHandler.getKeyedStateBackend() != null) {
            throw new UnsupportedOperationException("Current State Backend doesn't support async access, AsyncExecutionController could not work");
        }
    }

    private void handleAsyncStateException(String message, Throwable exception) {
        this.environment.failExternally(new AsyncStateException(message, exception));
    }

    @Override
    public boolean isAsyncStateProcessingEnabled() {
        return true;
    }

    @Override
    public ElementOrder getElementOrder() {
        return ElementOrder.RECORD_ORDER;
    }

    @Override
    public final <T> void setAsyncKeyedContextElement(StreamRecord<T> record, KeySelector<T, ?> keySelector) throws Exception {
        this.currentProcessingContext = this.asyncExecutionController.buildContext(record.getValue(), keySelector.getKey(record.getValue()));
        this.currentProcessingContext.retain();
        this.asyncExecutionController.setCurrentContext(this.currentProcessingContext);
    }

    @Override
    public final void postProcessElement() {
        this.currentProcessingContext.release();
    }

    @Override
    public final void preserveRecordOrderAndProcess(ThrowingRunnable<Exception> processing) {
        this.asyncExecutionController.syncPointRequestWithCallback(processing);
    }

    @Override
    public <K> void asyncProcessWithKey(K key, ThrowingRunnable<Exception> processing) {
        RecordContext previousContext = this.currentProcessingContext;
        this.currentProcessingContext = this.asyncExecutionController.buildContext(null, key);
        this.currentProcessingContext.retain();
        this.asyncExecutionController.setCurrentContext(this.currentProcessingContext);
        this.preserveRecordOrderAndProcess(processing);
        this.postProcessElement();
        this.asyncExecutionController.setCurrentContext(previousContext);
        this.currentProcessingContext = previousContext;
    }

    @Override
    public final <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(int inputId) {
        if (this instanceof TwoInputStreamOperator) {
            switch (inputId) {
                case 1: {
                    return AsyncStateProcessing.makeRecordProcessor(this, this.stateKeySelector1, ((TwoInputStreamOperator)((Object)this))::processElement1);
                }
                case 2: {
                    return AsyncStateProcessing.makeRecordProcessor(this, this.stateKeySelector2, ((TwoInputStreamOperator)((Object)this))::processElement2);
                }
            }
        } else if (this instanceof Input && inputId == 1) {
            return AsyncStateProcessing.makeRecordProcessor(this, this.stateKeySelector1, ((Input)((Object)this))::processElement);
        }
        throw new IllegalArgumentException(String.format("Unsupported operator type %s with input id %d", this.getClass().getName(), inputId));
    }

    protected <N, S extends State, T> S getOrCreateKeyedState(@Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<T> stateDescriptor) throws Exception {
        return this.stateHandler.getOrCreateKeyedState(defaultNamespace, namespaceSerializer, stateDescriptor);
    }

    @Override
    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        if (this.isAsyncStateProcessingEnabled()) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
    }

    @Override
    public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory) throws Exception {
        return this.stateHandler.snapshotState(this, Optional.ofNullable(this.timeServiceManager), this.getOperatorName(), checkpointId, timestamp, checkpointOptions, factory, this.isUsingCustomRawKeyedState(), true);
    }

    @Override
    public <K, N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<K, N> triggerable) {
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
        if (!this.isAsyncStateProcessingEnabled()) {
            return super.getInternalTimerService(name, namespaceSerializer, triggerable);
        }
        InternalTimeServiceManager keyedTimeServiceHandler = this.timeServiceManager;
        KeyedStateBackend keyedStateBackend = this.getKeyedStateBackend();
        Preconditions.checkState((keyedStateBackend != null ? 1 : 0) != 0, (Object)"Timers can only be used on keyed operators.");
        return keyedTimeServiceHandler.getAsyncInternalTimerService(name, keyedStateBackend.getKeySerializer(), namespaceSerializer, triggerable, this.asyncExecutionController);
    }

    @Override
    public void setKeyContextElement1(StreamRecord record) throws Exception {
        super.setKeyContextElement1(record);
        if (this.stateKeySelector1 != null) {
            this.setAsyncKeyedContextElement(record, this.stateKeySelector1);
        }
    }

    @Override
    public void setKeyContextElement2(StreamRecord record) throws Exception {
        super.setKeyContextElement2(record);
        if (this.stateKeySelector2 != null) {
            this.setAsyncKeyedContextElement(record, this.stateKeySelector2);
        }
    }

    @Override
    public Object getCurrentKey() {
        return this.currentProcessingContext.getKey();
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        if (!this.isAsyncStateProcessingEnabled()) {
            super.processWatermark(mark);
            return;
        }
        this.asyncExecutionController.processNonRecord((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> super.processWatermark(mark)));
    }

    @Override
    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        if (!this.isAsyncStateProcessingEnabled()) {
            super.processWatermarkStatus(watermarkStatus);
            return;
        }
        this.asyncExecutionController.processNonRecord((ThrowingRunnable<Exception>)((ThrowingRunnable)() -> super.processWatermarkStatus(watermarkStatus)));
    }

    @VisibleForTesting
    AsyncExecutionController<?> getAsyncExecutionController() {
        return this.asyncExecutionController;
    }

    @VisibleForTesting
    RecordContext getCurrentProcessingContext() {
        return this.currentProcessingContext;
    }

    @Override
    public void finish() throws Exception {
        super.finish();
        this.asyncExecutionController.drainInflightRecords(0);
    }

    @Override
    public void close() throws Exception {
        super.close();
        this.asyncExecutionController.close();
    }
}

