/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.util.Collection;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.delegate.DelegatingStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class AbstractChangelogStateBackend
implements DelegatingStateBackend,
StateBackend {
    private static final long serialVersionUID = 1000L;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractChangelogStateBackend.class);
    protected final StateBackend delegatedStateBackend;

    AbstractChangelogStateBackend(StateBackend stateBackend) {
        this.delegatedStateBackend = (StateBackend)Preconditions.checkNotNull((Object)stateBackend);
        Preconditions.checkArgument((!(stateBackend instanceof DelegatingStateBackend) ? 1 : 0) != 0, (Object)"Recursive Delegation is not supported.");
        LOG.info("ChangelogStateBackend is used, delegating {}.", (Object)this.delegatedStateBackend.getClass().getSimpleName());
    }

    public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
        return this.restore(env, operatorIdentifier, keyGroupRange, ttlTimeProvider, metricGroup, this.castHandles(stateHandles), baseHandles -> (AbstractKeyedStateBackend)this.delegatedStateBackend.createKeyedStateBackend(env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, ttlTimeProvider, metricGroup, baseHandles, cancelStreamRegistry));
    }

    public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment env, JobID jobID, String operatorIdentifier, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, TaskKvStateRegistry kvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry, double managedMemoryFraction) throws Exception {
        return this.restore(env, operatorIdentifier, keyGroupRange, ttlTimeProvider, metricGroup, this.castHandles(stateHandles), baseHandles -> (AbstractKeyedStateBackend)this.delegatedStateBackend.createKeyedStateBackend(env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, ttlTimeProvider, metricGroup, baseHandles, cancelStreamRegistry, managedMemoryFraction));
    }

    public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier, @Nonnull Collection<OperatorStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) throws Exception {
        return this.delegatedStateBackend.createOperatorStateBackend(env, operatorIdentifier, stateHandles, cancelStreamRegistry);
    }

    public boolean useManagedMemory() {
        return this.delegatedStateBackend.useManagedMemory();
    }

    public StateBackend getDelegatedStateBackend() {
        return this.delegatedStateBackend;
    }

    protected abstract <K> CheckpointableKeyedStateBackend<K> restore(Environment var1, String var2, KeyGroupRange var3, TtlTimeProvider var4, MetricGroup var5, Collection<ChangelogStateBackendHandle> var6, ChangelogBackendRestoreOperation.BaseBackendBuilder<K> var7) throws Exception;

    private Collection<ChangelogStateBackendHandle> castHandles(Collection<KeyedStateHandle> stateHandles) {
        if (stateHandles.stream().anyMatch(h -> !(h instanceof ChangelogStateBackendHandle))) {
            LOG.warn("Some state handles do not contain changelog: {}.", stateHandles);
        }
        return stateHandles.stream().filter(Objects::nonNull).map(ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl::getChangelogStateBackendHandle).collect(Collectors.toList());
    }
}

