/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.connector.sink;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.TestManagedCommittable;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.TestManagedTableFactory;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.assertj.core.api.Assertions;

public class TestManagedSinkCommitter
implements Committer<TestManagedCommittable> {
    private final ObjectIdentifier tableIdentifier;
    private final Path basePath;
    private final RowDataEncoder encoder = new RowDataEncoder();

    public TestManagedSinkCommitter(ObjectIdentifier tableIdentifier, Path basePath) {
        this.tableIdentifier = tableIdentifier;
        this.basePath = basePath;
    }

    public void commit(Collection<Committer.CommitRequest<TestManagedCommittable>> committables) throws IOException, InterruptedException {
        ArrayList<TestManagedCommittable> committableList = new ArrayList<TestManagedCommittable>();
        for (Committer.CommitRequest<TestManagedCommittable> commitRequest : committables) {
            committableList.add((TestManagedCommittable)commitRequest.getCommittable());
        }
        TestManagedCommittable combinedCommittable = TestManagedCommittable.combine(committableList);
        AtomicReference<Map<CatalogPartitionSpec, List<Path>>> reference = TestManagedTableFactory.MANAGED_TABLE_FILE_ENTRIES.get(this.tableIdentifier);
        Assertions.assertThat(reference).isNotNull();
        Map<CatalogPartitionSpec, List<Path>> managedTableFileEntries = reference.get();
        this.commitAdd(combinedCommittable.getToAdd(), managedTableFileEntries);
        this.commitDelete(combinedCommittable.getToDelete(), managedTableFileEntries);
        reference.set(managedTableFileEntries);
    }

    public void close() throws Exception {
    }

    private void commitAdd(Map<CatalogPartitionSpec, List<RowData>> toAdd, Map<CatalogPartitionSpec, List<Path>> managedTableFileEntries) throws IOException {
        HashMap<CatalogPartitionSpec, String> processedPartitions = new HashMap<CatalogPartitionSpec, String>();
        for (Map.Entry<CatalogPartitionSpec, List<RowData>> entry : toAdd.entrySet()) {
            CatalogPartitionSpec partitionSpec = entry.getKey();
            String partition = processedPartitions.computeIfAbsent(partitionSpec, spec -> PartitionPathUtils.generatePartitionPath(new LinkedHashMap(spec.getPartitionSpec())));
            List<RowData> elements = entry.getValue();
            Path compactFilePath = new Path(this.basePath, new Path(String.format("%scompact-%s-file-0", partition, UUID.randomUUID())));
            FSDataOutputStream outputStream = compactFilePath.getFileSystem().create(compactFilePath, FileSystem.WriteMode.NO_OVERWRITE);
            for (RowData element : elements) {
                this.encoder.encode(element, (OutputStream)outputStream);
            }
            outputStream.flush();
            outputStream.close();
            List<Path> fileEntries = managedTableFileEntries.get(partitionSpec);
            fileEntries.add(compactFilePath);
            managedTableFileEntries.put(partitionSpec, fileEntries);
        }
    }

    private void commitDelete(Map<CatalogPartitionSpec, Set<Path>> toDelete, Map<CatalogPartitionSpec, List<Path>> managedTableFileEntries) throws IOException {
        for (Map.Entry<CatalogPartitionSpec, Set<Path>> entry : toDelete.entrySet()) {
            CatalogPartitionSpec partitionSpec = entry.getKey();
            Set<Path> pathsToDelete = entry.getValue();
            for (Path path : pathsToDelete) {
                path.getFileSystem().delete(path, false);
            }
            List<Path> paths = managedTableFileEntries.get(partitionSpec);
            paths.removeAll(pathsToDelete);
            managedTableFileEntries.put(partitionSpec, paths);
        }
    }

    private static class RowDataEncoder
    implements Encoder<RowData> {
        private static final long serialVersionUID = 1L;
        private static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];

        private RowDataEncoder() {
        }

        public void encode(RowData rowData, OutputStream stream) throws IOException {
            stream.write(rowData.getString(0).toBytes());
            stream.write(LINE_DELIMITER);
        }
    }
}

