/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandleStreamImpl;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public class TaskExecutorStateChangelogStoragesManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorStateChangelogStoragesManager.class);
    @GuardedBy(value="lock")
    private final Map<JobID, Optional<StateChangelogStorage<?>>> changelogStoragesByJobId;
    @GuardedBy(value="lock")
    private final Map<JobID, StateChangelogStorageView<ChangelogStateHandleStreamImpl>> changelogStorageViewsByJobId;
    @GuardedBy(value="lock")
    private boolean closed = false;
    private final Object lock = new Object();
    private final Thread shutdownHook;

    public TaskExecutorStateChangelogStoragesManager() {
        this.changelogStoragesByJobId = new HashMap();
        this.changelogStorageViewsByJobId = new HashMap<JobID, StateChangelogStorageView<ChangelogStateHandleStreamImpl>>();
        this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, this.getClass().getSimpleName(), LOG);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    public StateChangelogStorage<?> stateChangelogStorageForJob(@Nonnull JobID jobId, Configuration configuration, TaskManagerJobMetricGroup metricGroup, LocalRecoveryConfig localRecoveryConfig) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                throw new IllegalStateException("TaskExecutorStateChangelogStoragesManager is already closed and cannot register a new StateChangelogStorage.");
            }
            Optional<StateChangelogStorage<?>> stateChangelogStorage = this.changelogStoragesByJobId.get(jobId);
            if (stateChangelogStorage == null) {
                StateChangelogStorage<?> loaded = StateChangelogStorageLoader.load(jobId, configuration, metricGroup, localRecoveryConfig);
                stateChangelogStorage = Optional.ofNullable(loaded);
                this.changelogStoragesByJobId.put(jobId, stateChangelogStorage);
                if (loaded != null) {
                    LOG.debug("Registered new state changelog storage for job {} : {}.", (Object)jobId, loaded);
                } else {
                    LOG.info("Try to registered new state changelog storage for job {}, but result is null.", (Object)jobId);
                }
            } else if (stateChangelogStorage.isPresent()) {
                LOG.debug("Found existing state changelog storage for job {}: {}.", (Object)jobId, stateChangelogStorage.get());
            } else {
                LOG.debug("Found a previously loaded NULL state changelog storage for job {}.", (Object)jobId);
            }
            return stateChangelogStorage.orElse(null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseStateChangelogStorageForJob(@Nonnull JobID jobId) {
        Optional<StateChangelogStorage<?>> cleanupChangelogStorage;
        LOG.debug("Releasing state changelog storage under job id {}.", (Object)jobId);
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            cleanupChangelogStorage = this.changelogStoragesByJobId.remove(jobId);
        }
        if (cleanupChangelogStorage != null) {
            cleanupChangelogStorage.ifPresent(this::doRelease);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    StateChangelogStorageView<?> stateChangelogStorageViewForJob(@Nonnull JobID jobID, Configuration configuration, ChangelogStateHandle changelogStateHandle) throws IOException {
        if (this.closed) {
            throw new IllegalStateException("TaskExecutorStateChangelogStoragesManager is already closed and cannot register a new StateChangelogStorageView.");
        }
        Object object = this.lock;
        synchronized (object) {
            StateChangelogStorageView<ChangelogStateHandleStreamImpl> storageView = this.changelogStorageViewsByJobId.get(jobID);
            if (storageView == null) {
                StateChangelogStorageView<?> loaded = StateChangelogStorageLoader.loadFromStateHandle(configuration, changelogStateHandle);
                storageView = loaded;
                this.changelogStorageViewsByJobId.put(jobID, storageView);
                LOG.debug("Registered new state changelog storage view for job {} : {}.", (Object)jobID, loaded);
            } else {
                LOG.debug("Found existing state changelog storage view for job {}: {}.", (Object)jobID, storageView);
            }
            return storageView;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void releaseStateChangelogStorageViewForJob(@Nonnull JobID jobID) {
        StateChangelogStorageView<ChangelogStateHandleStreamImpl> cleanupStorageView;
        LOG.debug("Releasing state changelog storage view under job id {}.", (Object)jobID);
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            cleanupStorageView = this.changelogStorageViewsByJobId.remove(jobID);
        }
        if (cleanupStorageView != null) {
            this.doRelease(cleanupStorageView);
        }
    }

    public void releaseResourcesForJob(@Nonnull JobID jobID) {
        this.releaseStateChangelogStorageForJob(jobID);
        this.releaseStateChangelogStorageViewForJob(jobID);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        HashMap<JobID, StateChangelogStorageView<ChangelogStateHandleStreamImpl>> toReleaseStorageView;
        HashMap toReleaseStorage;
        Iterator<Map.Entry<JobID, Object>> iterator = this.lock;
        synchronized (iterator) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            toReleaseStorage = new HashMap(this.changelogStoragesByJobId);
            toReleaseStorageView = new HashMap<JobID, StateChangelogStorageView<ChangelogStateHandleStreamImpl>>(this.changelogStorageViewsByJobId);
            this.changelogStoragesByJobId.clear();
            this.changelogStorageViewsByJobId.clear();
        }
        ShutdownHookUtil.removeShutdownHook(this.shutdownHook, this.getClass().getSimpleName(), LOG);
        LOG.info("Shutting down TaskExecutorStateChangelogStoragesManager.");
        for (Map.Entry<JobID, Optional<StateChangelogStorage<?>>> entry : toReleaseStorage.entrySet()) {
            entry.getValue().ifPresent(this::doRelease);
        }
        for (Map.Entry<JobID, Object> entry : toReleaseStorageView.entrySet()) {
            this.doRelease((StateChangelogStorageView)entry.getValue());
        }
    }

    private void doRelease(StateChangelogStorageView<?> storage) {
        if (storage != null) {
            try {
                storage.close();
            }
            catch (Exception e) {
                LOG.warn("Exception while disposing state changelog storage {}.", storage, (Object)e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    @VisibleForTesting
    public Optional<StateChangelogStorage<?>> getChangelogStoragesByJobId(JobID jobId) {
        Object object = this.lock;
        synchronized (object) {
            return this.changelogStoragesByJobId.get(jobId);
        }
    }
}

