/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.paimon.sink.v2;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.Committer;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.options.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PaimonCommitter
implements org.apache.flink.api.connector.sink2.Committer<MultiTableCommittable> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PaimonCommitter.class);
    private final StoreMultiCommitter storeMultiCommitter;

    public PaimonCommitter(Options catalogOptions, String commitUser) {
        this.storeMultiCommitter = new StoreMultiCommitter(() -> FlinkCatalogFactory.createPaimonCatalog(catalogOptions), Committer.createContext(commitUser, null, true, false, null));
    }

    public void commit(Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests) throws IOException {
        if (commitRequests.isEmpty()) {
            return;
        }
        List committables = commitRequests.stream().map(Committer.CommitRequest::getCommittable).collect(Collectors.toList());
        long checkpointId = ((MultiTableCommittable)committables.get(0)).checkpointId();
        Object wrappedManifestCommittable = this.storeMultiCommitter.combine(checkpointId, 1L, committables);
        try {
            this.storeMultiCommitter.filterAndCommit(Collections.singletonList(wrappedManifestCommittable));
            commitRequests.forEach(Committer.CommitRequest::signalAlreadyCommitted);
            LOGGER.info("Commit succeeded for {} with {} committable", (Object)checkpointId, (Object)committables.size());
        }
        catch (Exception e) {
            commitRequests.forEach(Committer.CommitRequest::retryLater);
            LOGGER.warn("Commit failed for {} with {} committable", new Object[]{checkpointId, committables.size(), e});
        }
    }

    public void close() throws Exception {
        this.storeMultiCommitter.close();
    }
}

