/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.Closeable;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.changelog.fs.ChangelogStorageMetricGroup;
import org.apache.flink.changelog.fs.ChangelogStreamHandleReader;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.changelog.fs.FsStateChangelogStorageForRecovery;
import org.apache.flink.changelog.fs.FsStateChangelogWriter;
import org.apache.flink.changelog.fs.StateChangeFsUploader;
import org.apache.flink.changelog.fs.StateChangeUploadScheduler;
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistry;
import org.apache.flink.runtime.state.changelog.LocalChangelogRegistryImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
@ThreadSafe
public class FsStateChangelogStorage
extends FsStateChangelogStorageForRecovery
implements StateChangelogStorage<ChangelogStateHandleStreamImpl> {
    private static final Logger LOG = LoggerFactory.getLogger(FsStateChangelogStorage.class);
    private final StateChangeUploadScheduler uploader;
    private final long preEmptivePersistThresholdInBytes;
    private final AtomicInteger logIdGenerator = new AtomicInteger(0);
    private final TaskChangelogRegistry changelogRegistry;
    @Nullable
    private LocalChangelogRegistry localChangelogRegistry = LocalChangelogRegistry.NO_OP;
    @Nonnull
    private final LocalRecoveryConfig localRecoveryConfig;

    public FsStateChangelogStorage(JobID jobID, Configuration config, TaskManagerJobMetricGroup metricGroup, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        this(jobID, config, metricGroup, TaskChangelogRegistry.defaultChangelogRegistry((Integer)config.get(FsStateChangelogOptions.NUM_DISCARD_THREADS)), localRecoveryConfig);
    }

    public FsStateChangelogStorage(JobID jobID, Configuration config, TaskManagerJobMetricGroup metricGroup, TaskChangelogRegistry changelogRegistry, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        this(StateChangeUploadScheduler.fromConfig(jobID, (ReadableConfig)config, new ChangelogStorageMetricGroup((MetricGroup)metricGroup), changelogRegistry, localRecoveryConfig), ((MemorySize)config.get(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD)).getBytes(), changelogRegistry, localRecoveryConfig);
    }

    @VisibleForTesting
    public FsStateChangelogStorage(JobID jobID, Path basePath, boolean compression, int bufferSize, ChangelogStorageMetricGroup metricGroup, TaskChangelogRegistry changelogRegistry, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        this(StateChangeUploadScheduler.directScheduler(new StateChangeFsUploader(jobID, basePath, basePath.getFileSystem(), compression, bufferSize, metricGroup, changelogRegistry)), ((MemorySize)FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD.defaultValue()).getBytes(), changelogRegistry, localRecoveryConfig);
    }

    @VisibleForTesting
    public FsStateChangelogStorage(StateChangeUploadScheduler uploader, long preEmptivePersistThresholdInBytes, TaskChangelogRegistry changelogRegistry, LocalRecoveryConfig localRecoveryConfig) {
        super(ChangelogStreamHandleReader.DIRECT_READER);
        this.preEmptivePersistThresholdInBytes = preEmptivePersistThresholdInBytes;
        this.changelogRegistry = changelogRegistry;
        this.uploader = uploader;
        this.localRecoveryConfig = localRecoveryConfig;
        if (localRecoveryConfig.isLocalRecoveryEnabled()) {
            this.localChangelogRegistry = new LocalChangelogRegistryImpl(Executors.newSingleThreadExecutor());
        }
    }

    public FsStateChangelogWriter createWriter(String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
        UUID logId = new UUID(0L, this.logIdGenerator.getAndIncrement());
        LOG.info("createWriter for operator {}/{}: {}", new Object[]{operatorID, keyGroupRange, logId});
        return new FsStateChangelogWriter(logId, keyGroupRange, this.uploader, this.preEmptivePersistThresholdInBytes, mailboxExecutor, this.changelogRegistry, this.localRecoveryConfig, this.localChangelogRegistry);
    }

    @Override
    public void close() throws Exception {
        this.uploader.close();
        IOUtils.closeQuietly((Closeable)this.localChangelogRegistry);
    }

    public AvailabilityProvider getAvailabilityProvider() {
        return this.uploader.getAvailabilityProvider();
    }
}

