/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.async.AsyncCleanerService;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.com.codahale.metrics.Timer;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.FlinkHoodieIndexFactory;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.io.FlinkAppendHandle;
import org.apache.hudi.io.FlinkConcatAndReplaceHandle;
import org.apache.hudi.io.FlinkConcatHandle;
import org.apache.hudi.io.FlinkCreateHandle;
import org.apache.hudi.io.FlinkMergeAndReplaceHandle;
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.compact.CompactHelpers;
import org.apache.hudi.table.marker.WriteMarkersFactory;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngradeHelper;
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoodieFlinkWriteClient<T extends HoodieRecordPayload>
extends BaseHoodieWriteClient<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
    private static final Logger LOG = LoggerFactory.getLogger(HoodieFlinkWriteClient.class);
    private final Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles = new HashMap();

    public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
        super(context, writeConfig, FlinkUpgradeDowngradeHelper.getInstance());
    }

    @Override
    protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
        return FlinkHoodieIndexFactory.createIndex((HoodieFlinkEngineContext)this.context, this.config);
    }

    @Override
    public boolean commit(String instantTime, List<WriteStatus> writeStatuses, Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, List<String>> partitionToReplacedFileIds) {
        List<HoodieWriteStat> writeStats = writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
        return this.commitStats(instantTime, writeStats, extraMetadata, commitActionType, partitionToReplacedFileIds);
    }

    @Override
    protected HoodieTable createTable(HoodieWriteConfig config, Configuration hadoopConf) {
        return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext)this.context);
    }

    @Override
    public List<HoodieRecord<T>> filterExists(List<HoodieRecord<T>> hoodieRecords) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        Timer.Context indexTimer = this.metrics.getIndexCtx();
        List recordsWithLocation = this.getIndex().tagLocation(HoodieListData.eager(hoodieRecords), this.context, (HoodieTable)table).collectAsList();
        this.metrics.updateIndexMetrics("lookup", this.metrics.getDurationInMs(indexTimer == null ? 0L : indexTimer.stop()));
        return recordsWithLocation.stream().filter(v1 -> !v1.isCurrentLocationKnown()).collect(Collectors.toList());
    }

    @Override
    public void bootstrap(Option<Map<String, String>> extraMetadata) {
        throw new HoodieNotSupportedException("Bootstrap operation is not supported yet");
    }

    @Override
    public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
        HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle(records.get(0), this.getConfig(), instantTime, table, records.listIterator());
        HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable)table).upsert(this.context, writeHandle, instantTime, records);
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", result.getIndexLookupDuration().get().toMillis());
        }
        return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
    }

    @Override
    public List<WriteStatus> upsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
        table.validateUpsertSchema();
        this.preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, table.getMetaClient());
        Map<String, List<HoodieRecord>> preppedRecordsByFileId = ((Stream)preppedRecords.stream().parallel()).collect(Collectors.groupingBy(r -> r.getCurrentLocation().getFileId()));
        return ((Stream)preppedRecordsByFileId.values().stream().parallel()).map(records -> {
            HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle((HoodieRecord)records.get(0), this.getConfig(), instantTime, table, records.listIterator());
            HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable)table).upsertPrepped(this.context, writeHandle, instantTime, records);
            return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
        }).flatMap(Collection::stream).collect(Collectors.toList());
    }

    @Override
    public List<WriteStatus> insert(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
        HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle(records.get(0), this.getConfig(), instantTime, table, records.listIterator());
        HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable)table).insert(this.context, writeHandle, instantTime, records);
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics("lookup", result.getIndexLookupDuration().get().toMillis());
        }
        return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
    }

    public List<WriteStatus> insertOverwrite(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.INSERT_OVERWRITE, Option.ofNullable(instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, table.getMetaClient());
        HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle(records.get(0), this.getConfig(), instantTime, table, records.listIterator());
        HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable)table).insertOverwrite(this.context, writeHandle, instantTime, records);
        return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
    }

    public List<WriteStatus> insertOverwriteTable(List<HoodieRecord<T>> records, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, Option.ofNullable(instantTime));
        table.validateInsertSchema();
        this.preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, table.getMetaClient());
        HoodieWriteHandle<?, ?, ?, ?> writeHandle = this.getOrCreateWriteHandle(records.get(0), this.getConfig(), instantTime, table, records.listIterator());
        HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable)table).insertOverwriteTable(this.context, writeHandle, instantTime, records);
        return this.postWrite((HoodieWriteMetadata)result, instantTime, table);
    }

    @Override
    public List<WriteStatus> insertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime) {
        throw new HoodieNotSupportedException("InsertPrepped operation is not supported yet");
    }

    @Override
    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    @Override
    public List<WriteStatus> bulkInsert(List<HoodieRecord<T>> records, String instantTime, Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsert operation is not supported yet");
    }

    @Override
    public List<WriteStatus> bulkInsertPreppedRecords(List<HoodieRecord<T>> preppedRecords, String instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
        throw new HoodieNotSupportedException("BulkInsertPrepped operation is not supported yet");
    }

    @Override
    public List<WriteStatus> delete(List<HoodieKey> keys2, String instantTime) {
        HoodieTable table = this.initTable(WriteOperationType.DELETE, Option.ofNullable(instantTime));
        this.preWrite(instantTime, WriteOperationType.DELETE, table.getMetaClient());
        HoodieWriteMetadata result = table.delete(this.context, instantTime, keys2);
        return this.postWrite(result, instantTime, table);
    }

    @Override
    public void preWrite(String instantTime, WriteOperationType writeOperationType, HoodieTableMetaClient metaClient) {
        this.setOperationType(writeOperationType);
    }

    @Override
    protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
        try (HoodieBackedTableMetadataWriter metadataWriter = this.initMetadataWriter();){
            metadataWriter.update(metadata, instantTime, this.getHoodieTable().isTableServiceAction(actionType, instantTime));
        }
        catch (Exception e) {
            throw new HoodieException("Failed to update metadata", e);
        }
    }

    public HoodieBackedTableMetadataWriter initMetadataWriter() {
        return (HoodieBackedTableMetadataWriter)FlinkHoodieBackedTableMetadataWriter.create(FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
    }

    public void initMetadataTable() {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        if (this.config.isMetadataTableEnabled()) {
            try {
                HoodieBackedTableMetadataWriter metadataWriter = this.initMetadataWriter();
                Throwable throwable = null;
                if (metadataWriter != null) {
                    if (throwable != null) {
                        try {
                            metadataWriter.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    } else {
                        metadataWriter.close();
                    }
                }
            }
            catch (Exception e) {
                throw new HoodieException("Failed to initialize metadata table", e);
            }
            table.deleteMetadataIndexIfNecessary();
        } else {
            table.maybeDeleteMetadataTable();
        }
    }

    public void startAsyncCleaning() {
        if (this.asyncCleanerService == null) {
            this.asyncCleanerService = AsyncCleanerService.startAsyncCleaningIfEnabled(this);
        } else {
            this.asyncCleanerService.start(null);
        }
    }

    public void waitForCleaningFinish() {
        if (this.asyncCleanerService != null) {
            LOG.info("Cleaner has been spawned already. Waiting for it to finish");
            AsyncCleanerService.waitForCompletion(this.asyncCleanerService);
            LOG.info("Cleaner has finished");
        }
    }

    @Override
    protected List<WriteStatus> postWrite(HoodieWriteMetadata<List<WriteStatus>> result, String instantTime, HoodieTable hoodieTable) {
        if (result.getIndexLookupDuration().isPresent()) {
            this.metrics.updateIndexMetrics(this.getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());
        }
        return result.getWriteStatuses();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata, String instantTime, Option<Map<String, String>> extraMetadata, boolean acquireLockForArchival) {
        try {
            WriteMarkersFactory.get(this.config.getMarkersType(), this.createTable(this.config, this.hadoopConf), instantTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
            this.autoArchiveOnCommit(table, acquireLockForArchival);
        }
        finally {
            this.heartbeatClient.stop(instantTime);
        }
    }

    @Override
    public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        extraMetadata.ifPresent(m -> m.forEach(metadata::addMetadata));
        this.completeCompaction(metadata, table, compactionInstantTime);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + this.config.getTableName());
        List<HoodieWriteStat> writeStats = metadata.getWriteStats();
        HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
        try {
            this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
            this.finalizeWrite(table, compactionCommitTime, writeStats);
            this.writeTableMetadata(table, compactionCommitTime, compactionInstant.getAction(), metadata);
            LOG.info("Committing Compaction {} finished with result {}.", (Object)compactionCommitTime, (Object)metadata);
            CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
        }
        finally {
            this.txnManager.endTransaction(Option.of(compactionInstant));
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, compactionCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.compactionTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.compactionTimer.stop());
            try {
                this.metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(), durationInMs, metadata, "compaction");
            }
            catch (ParseException e) {
                throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + this.config.getBasePath() + " at time " + compactionCommitTime, e);
            }
        }
        LOG.info("Compacted successfully on commit " + compactionCommitTime);
    }

    @Override
    protected HoodieWriteMetadata<List<WriteStatus>> compact(String compactionInstantTime, boolean shouldComplete) {
        HoodieWriteMetadata<List<WriteStatus>> compactionMetadata = this.getHoodieTable().compact(this.context, compactionInstantTime);
        this.commitCompaction(compactionInstantTime, compactionMetadata.getCommitMetadata().get(), Option.empty());
        return compactionMetadata;
    }

    @Override
    public HoodieWriteMetadata<List<WriteStatus>> cluster(String clusteringInstant, boolean shouldComplete) {
        throw new HoodieNotSupportedException("Clustering is not supported yet");
    }

    private void completeClustering(HoodieReplaceCommitMetadata metadata, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String clusteringCommitTime) {
        this.context.setJobStatus(this.getClass().getSimpleName(), "Collect clustering write status and commit clustering");
        HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, "replacecommit", clusteringCommitTime);
        List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e -> ((List)e.getValue()).stream()).collect(Collectors.toList());
        if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0L) {
            throw new HoodieClusteringException("Clustering failed to write to files:" + writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
        }
        try {
            this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
            this.finalizeWrite(table, clusteringCommitTime, writeStats);
            this.writeTableMetadata(table, clusteringCommitTime, clusteringInstant.getAction(), metadata);
            LOG.info("Committing Clustering {} finished with result {}.", (Object)clusteringCommitTime, (Object)metadata);
            table.getActiveTimeline().transitionReplaceInflightToComplete(HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime), Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
        }
        catch (IOException e2) {
            throw new HoodieClusteringException("Failed to commit " + table.getMetaClient().getBasePath() + " at time " + clusteringCommitTime, e2);
        }
        finally {
            this.txnManager.endTransaction(Option.of(clusteringInstant));
        }
        WriteMarkersFactory.get(this.config.getMarkersType(), table, clusteringCommitTime).quietDeleteMarkerDir(this.context, this.config.getMarkersDeleteParallelism());
        if (this.clusteringTimer != null) {
            long durationInMs = this.metrics.getDurationInMs(this.clusteringTimer.stop());
            try {
                this.metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(), durationInMs, metadata, "replacecommit");
            }
            catch (ParseException e3) {
                throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " + this.config.getBasePath() + " at time " + clusteringCommitTime, e3);
            }
        }
        LOG.info("Clustering successfully on commit " + clusteringCommitTime);
    }

    @Override
    protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<String> instantTime, boolean initialMetadataTableIfNecessary) {
        return this.getHoodieTable();
    }

    @Override
    protected void tryUpgrade(HoodieTableMetaClient metaClient, Option<String> instantTime) {
    }

    public void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, String commitInstant) {
        switch (tableServiceType) {
            case CLUSTER: {
                this.completeClustering((HoodieReplaceCommitMetadata)metadata, table, commitInstant);
                break;
            }
            case COMPACT: {
                this.completeCompaction(metadata, table, commitInstant);
                break;
            }
            default: {
                throw new IllegalArgumentException("This table service is not valid " + (Object)((Object)tableServiceType));
            }
        }
    }

    public void upgradeDowngrade(String instantTime, HoodieTableMetaClient metaClient) {
        new UpgradeDowngrade(metaClient, this.config, this.context, FlinkUpgradeDowngradeHelper.getInstance()).run(HoodieTableVersion.current(), instantTime);
    }

    public void cleanHandles() {
        this.bucketToHandles.clear();
    }

    public void cleanHandlesGracefully() {
        this.bucketToHandles.values().forEach(handle2 -> ((MiniBatchHandle)((Object)handle2)).closeGracefully());
        this.bucketToHandles.clear();
    }

    private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(HoodieRecord<T> record, HoodieWriteConfig config, String instantTime, HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, Iterator<HoodieRecord<T>> recordItr) {
        MiniBatchHandle lastHandle;
        HoodieRecordLocation loc = record.getCurrentLocation();
        String fileID = loc.getFileId();
        String partitionPath = record.getPartitionPath();
        boolean insertClustering = config.allowDuplicateInserts();
        if (this.bucketToHandles.containsKey(fileID) && (lastHandle = (MiniBatchHandle)((Object)this.bucketToHandles.get(fileID))).shouldReplace()) {
            FlinkMergeAndReplaceHandle writeHandle = insertClustering ? new FlinkConcatAndReplaceHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(), lastHandle.getWritePath()) : new FlinkMergeAndReplaceHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier(), lastHandle.getWritePath());
            this.bucketToHandles.put(fileID, writeHandle);
            return writeHandle;
        }
        boolean isDelta = table.getMetaClient().getTableType().equals((Object)HoodieTableType.MERGE_ON_READ);
        HoodieWriteHandle writeHandle = isDelta ? new FlinkAppendHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, partitionPath, fileID, recordItr, table.getTaskContextSupplier()) : (loc.getInstantTime().equals("I") ? new FlinkCreateHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, partitionPath, fileID, table.getTaskContextSupplier()) : (insertClustering ? new FlinkConcatHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier()) : new FlinkMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>(config, instantTime, table, recordItr, partitionPath, fileID, table.getTaskContextSupplier())));
        this.bucketToHandles.put(fileID, writeHandle);
        return writeHandle;
    }

    public HoodieFlinkTable<T> getHoodieTable() {
        return HoodieFlinkTable.create(this.config, (HoodieFlinkEngineContext)this.context);
    }

    public Map<String, List<String>> getPartitionToReplacedFileIds(WriteOperationType writeOperationType, List<WriteStatus> writeStatuses) {
        HoodieFlinkTable<T> table = this.getHoodieTable();
        switch (writeOperationType) {
            case INSERT_OVERWRITE: {
                return writeStatuses.stream().map(status -> status.getStat().getPartitionPath()).distinct().collect(Collectors.toMap(partition -> partition, partitionPath -> this.getAllExistingFileIds(table, (String)partitionPath)));
            }
            case INSERT_OVERWRITE_TABLE: {
                Map<String, List<String>> partitionToExistingFileIds = new HashMap<String, List<String>>();
                List<String> partitionPaths = FSUtils.getAllPartitionPaths(this.context, this.config.getMetadataConfig(), table.getMetaClient().getBasePath());
                if (partitionPaths != null && partitionPaths.size() > 0) {
                    this.context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions: " + this.config.getTableName());
                    partitionToExistingFileIds = ((Stream)partitionPaths.stream().parallel()).collect(Collectors.toMap(partition -> partition, partition -> this.getAllExistingFileIds(table, (String)partition)));
                }
                return partitionToExistingFileIds;
            }
        }
        throw new AssertionError();
    }

    private List<String> getAllExistingFileIds(HoodieFlinkTable<T> table, String partitionPath) {
        return table.getSliceView().getLatestFileSlices(partitionPath).map(FileSlice::getFileId).distinct().collect(Collectors.toList());
    }
}

