/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.clone.CloneFileInfo;
import org.apache.paimon.flink.clone.CloneSourceBuilder;
import org.apache.paimon.flink.clone.CopyFileOperator;
import org.apache.paimon.flink.clone.PickFilesForCloneOperator;
import org.apache.paimon.flink.clone.SnapshotHintChannelComputer;
import org.apache.paimon.flink.clone.SnapshotHintOperator;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

public class CloneAction
extends ActionBase {
    private final int parallelism;
    private Map<String, String> sourceCatalogConfig;
    private final String database;
    private final String tableName;
    private Map<String, String> targetCatalogConfig;
    private final String targetDatabase;
    private final String targetTableName;

    public CloneAction(String warehouse, String database, String tableName, Map<String, String> sourceCatalogConfig, String targetWarehouse, String targetDatabase, String targetTableName, Map<String, String> targetCatalogConfig, String parallelismStr) {
        super(warehouse, sourceCatalogConfig);
        Preconditions.checkNotNull(warehouse, "warehouse must not be null.");
        Preconditions.checkNotNull(targetWarehouse, "targetWarehouse must not be null.");
        this.parallelism = StringUtils.isBlank(parallelismStr) ? this.env.getParallelism() : Integer.parseInt(parallelismStr);
        this.sourceCatalogConfig = new HashMap<String, String>();
        if (!sourceCatalogConfig.isEmpty()) {
            this.sourceCatalogConfig = sourceCatalogConfig;
        }
        this.sourceCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), warehouse);
        this.database = database;
        this.tableName = tableName;
        this.targetCatalogConfig = new HashMap<String, String>();
        if (!targetCatalogConfig.isEmpty()) {
            this.targetCatalogConfig = targetCatalogConfig;
        }
        this.targetCatalogConfig.put(CatalogOptions.WAREHOUSE.key(), targetWarehouse);
        this.targetDatabase = targetDatabase;
        this.targetTableName = targetTableName;
    }

    @Override
    public void build() {
        try {
            this.buildCloneFlinkJob(this.env);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void buildCloneFlinkJob(StreamExecutionEnvironment env) throws Exception {
        DataStream<Tuple2<String, String>> cloneSource = new CloneSourceBuilder(env, this.sourceCatalogConfig, this.database, this.tableName, this.targetDatabase, this.targetTableName).build();
        SingleOutputStreamOperator pickFilesForClone = cloneSource.transform("Pick Files", TypeInformation.of(CloneFileInfo.class), (OneInputStreamOperator)new PickFilesForCloneOperator(this.sourceCatalogConfig, this.targetCatalogConfig)).forceNonParallel();
        SingleOutputStreamOperator copyFiles = pickFilesForClone.rebalance().transform("Copy Files", TypeInformation.of(CloneFileInfo.class), (OneInputStreamOperator)new CopyFileOperator(this.sourceCatalogConfig, this.targetCatalogConfig)).setParallelism(this.parallelism);
        SingleOutputStreamOperator snapshotHintOperator = FlinkStreamPartitioner.partition(copyFiles, new SnapshotHintChannelComputer(), this.parallelism).transform("Recreate Snapshot Hint", TypeInformation.of(CloneFileInfo.class), (OneInputStreamOperator)new SnapshotHintOperator(this.targetCatalogConfig)).setParallelism(this.parallelism);
        snapshotHintOperator.addSink((SinkFunction)new DiscardingSink()).name("end").setParallelism(1);
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.execute("Clone job");
    }
}

