/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.paimon.sink.v2;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonEvent;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
import org.apache.flink.cdc.connectors.paimon.sink.v2.StoreSinkWriteImpl;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.ExecutorThreadFactory;

public class PaimonWriter<InputT>
implements TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, MultiTableCommittable> {
    private static Catalog catalog;
    private final IOManager ioManager;
    private final String commitUser;
    private MemoryPoolFactory memoryPoolFactory;
    private final PaimonRecordSerializer<InputT> serializer;
    private final Map<Identifier, FileStoreTable> tables;
    private final Map<Identifier, StoreSinkWrite> writes;
    private final ExecutorService compactExecutor;
    private final MetricGroup metricGroup;
    private long lastCheckpointId;

    public PaimonWriter(Options catalogOptions, MetricGroup metricGroup, String commitUser, PaimonRecordSerializer<InputT> serializer, long lastCheckpointId) {
        catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
        this.metricGroup = metricGroup;
        this.commitUser = commitUser;
        this.tables = new HashMap<Identifier, FileStoreTable>();
        this.writes = new HashMap<Identifier, StoreSinkWrite>();
        this.ioManager = new IOManagerAsync();
        this.compactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "-CdcMultiWrite-Compaction"));
        this.serializer = serializer;
        this.lastCheckpointId = lastCheckpointId;
    }

    public Collection<MultiTableCommittable> prepareCommit() throws IOException {
        ArrayList<MultiTableCommittable> committables = new ArrayList<MultiTableCommittable>();
        for (Map.Entry<Identifier, StoreSinkWrite> entry : this.writes.entrySet()) {
            Identifier key = entry.getKey();
            StoreSinkWrite write = entry.getValue();
            boolean waitCompaction = true;
            committables.addAll(write.prepareCommit(waitCompaction, this.lastCheckpointId + 1L).stream().map(committable -> MultiTableCommittable.fromCommittable(key, committable)).collect(Collectors.toList()));
        }
        ++this.lastCheckpointId;
        return committables;
    }

    public void write(InputT event, SinkWriter.Context context) throws IOException {
        PaimonEvent paimonEvent = this.serializer.serialize(event);
        Identifier tableId = paimonEvent.getTableId();
        if (paimonEvent.isShouldRefreshSchema()) {
            this.tables.remove(tableId);
            try {
                if (this.writes.containsKey(tableId)) {
                    this.writes.get(tableId).replace(this.getTable(tableId));
                }
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        if (paimonEvent.getGenericRow() != null) {
            FileStoreTable table = this.getTable(tableId);
            if (this.memoryPoolFactory == null) {
                this.memoryPoolFactory = new MemoryPoolFactory(new HeapMemorySegmentPool(table.coreOptions().writeBufferSize(), table.coreOptions().pageSize()));
            }
            StoreSinkWrite write = this.writes.computeIfAbsent(tableId, id -> {
                StoreSinkWriteImpl storeSinkWrite = new StoreSinkWriteImpl(table, this.commitUser, this.ioManager, false, false, true, this.memoryPoolFactory, this.metricGroup);
                storeSinkWrite.withCompactExecutor(this.compactExecutor);
                return storeSinkWrite;
            });
            try {
                write.write(paimonEvent.getGenericRow(), paimonEvent.getBucket());
            }
            catch (Exception e) {
                throw new IOException(e);
            }
        }
    }

    private FileStoreTable getTable(Identifier tableId) {
        return this.tables.computeIfAbsent(tableId, id -> {
            try {
                return (FileStoreTable)catalog.getTable(tableId);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void flush(boolean endOfInput) {
    }

    public void close() throws Exception {
        for (StoreSinkWrite write : this.writes.values()) {
            write.close();
        }
        if (this.compactExecutor != null) {
            this.compactExecutor.shutdownNow();
        }
    }
}

