/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.paimon.sink.v2;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PreCommitOperator
extends AbstractStreamOperator<CommittableMessage<MultiTableCommittable>>
implements OneInputStreamOperator<CommittableMessage<MultiTableCommittable>, CommittableMessage<MultiTableCommittable>> {
    protected static final Logger LOGGER = LoggerFactory.getLogger(PreCommitOperator.class);
    private final String commitUser;
    private final Options catalogOptions;
    private Catalog catalog;
    private StoreMultiCommitter storeMultiCommitter;
    private final List<MultiTableCommittable> multiTableCommittables = new ArrayList<MultiTableCommittable>();

    public PreCommitOperator(Options catalogOptions, String commitUser) {
        this.catalogOptions = catalogOptions;
        this.commitUser = commitUser;
    }

    public void open() throws Exception {
        super.open();
    }

    public void processElement(StreamRecord<CommittableMessage<MultiTableCommittable>> element) {
        if (this.catalog == null) {
            this.catalog = FlinkCatalogFactory.createPaimonCatalog(this.catalogOptions);
            this.storeMultiCommitter = new StoreMultiCommitter(() -> FlinkCatalogFactory.createPaimonCatalog(this.catalogOptions), Committer.createContext(this.commitUser, this.getMetricGroup(), true, false, null));
        }
        if (element.getValue() instanceof CommittableWithLineage) {
            this.multiTableCommittables.add((MultiTableCommittable)((CommittableWithLineage)element.getValue()).getCommittable());
        }
    }

    public void finish() {
        this.prepareSnapshotPreBarrier(Long.MAX_VALUE);
    }

    public void prepareSnapshotPreBarrier(long checkpointId) {
        for (int i = 0; i < this.multiTableCommittables.size(); ++i) {
            MultiTableCommittable multiTableCommittable = this.multiTableCommittables.get(i);
            this.multiTableCommittables.set(i, new MultiTableCommittable(multiTableCommittable.getDatabase(), multiTableCommittable.getTable(), checkpointId, multiTableCommittable.kind(), multiTableCommittable.wrappedCommittable()));
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        long checkpointId = context.getCheckpointId();
        if (!this.multiTableCommittables.isEmpty()) {
            this.multiTableCommittables.forEach(multiTableCommittable -> LOGGER.debug("Try to commit for {}.{} : {} in checkpoint {}", new Object[]{multiTableCommittable.getDatabase(), multiTableCommittable.getTable(), this.multiTableCommittables, checkpointId}));
            Object wrappedManifestCommittable = this.storeMultiCommitter.combine(checkpointId, checkpointId, (List)this.multiTableCommittables);
            this.storeMultiCommitter.commit((List<WrappedManifestCommittable>)Collections.singletonList(wrappedManifestCommittable));
            this.multiTableCommittables.clear();
        }
    }

    public void close() throws Exception {
        super.close();
        if (this.storeMultiCommitter != null) {
            this.storeMultiCommitter.close();
        }
    }
}

