/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone;

import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.clone.CloneFileInfo;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CopyFileOperator
extends AbstractStreamOperator<CloneFileInfo>
implements OneInputStreamOperator<CloneFileInfo, CloneFileInfo> {
    private static final Logger LOG = LoggerFactory.getLogger(CopyFileOperator.class);
    private final Map<String, String> sourceCatalogConfig;
    private final Map<String, String> targetCatalogConfig;
    private Catalog sourceCatalog;
    private Catalog targetCatalog;

    public CopyFileOperator(Map<String, String> sourceCatalogConfig, Map<String, String> targetCatalogConfig) {
        this.sourceCatalogConfig = sourceCatalogConfig;
        this.targetCatalogConfig = targetCatalogConfig;
    }

    public void open() throws Exception {
        this.sourceCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.sourceCatalogConfig));
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
    }

    public void processElement(StreamRecord<CloneFileInfo> streamRecord) throws Exception {
        CloneFileInfo cloneFileInfo = (CloneFileInfo)streamRecord.getValue();
        FileIO sourceTableFileIO = this.sourceCatalog.fileIO();
        FileIO targetTableFileIO = this.targetCatalog.fileIO();
        Path sourceTableRootPath = this.sourceCatalog.getTableLocation(Identifier.fromString(cloneFileInfo.getSourceIdentifier()));
        Path targetTableRootPath = this.targetCatalog.getTableLocation(Identifier.fromString(cloneFileInfo.getTargetIdentifier()));
        String filePathExcludeTableRoot = cloneFileInfo.getFilePathExcludeTableRoot();
        Path sourcePath = new Path(sourceTableRootPath + filePathExcludeTableRoot);
        Path targetPath = new Path(targetTableRootPath + filePathExcludeTableRoot);
        if (targetTableFileIO.exists(targetPath) && targetTableFileIO.getFileSize(targetPath) == sourceTableFileIO.getFileSize(sourcePath)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Skipping clone target file {} because it already exists and has the same size.", (Object)targetPath);
            }
            this.output.collect(streamRecord);
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Begin copy file from {} to {}.", (Object)sourcePath, (Object)targetPath);
        }
        IOUtils.copyBytes(sourceTableFileIO.newInputStream(sourcePath), targetTableFileIO.newOutputStream(targetPath, true));
        if (LOG.isDebugEnabled()) {
            LOG.debug("End copy file from {} to {}.", (Object)sourcePath, (Object)targetPath);
        }
        this.output.collect(streamRecord);
    }

    public void close() throws Exception {
        if (this.sourceCatalog != null) {
            this.sourceCatalog.close();
        }
        if (this.targetCatalog != null) {
            this.targetCatalog.close();
        }
    }
}

