/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table.batch.compact;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.connector.file.table.batch.compact.BatchCompactOperator;
import org.apache.flink.connector.file.table.stream.compact.AbstractCompactTestBase;
import org.apache.flink.connector.file.table.stream.compact.CompactBulkReader;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
import org.apache.flink.connector.file.table.stream.compact.TestByteFormat;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.FileUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class BatchCompactOperatorTest
extends AbstractCompactTestBase {
    @Test
    public void testCompact() throws Exception {
        BatchCompactOperator<Byte> compactOperator = this.createBatchCompactOperator();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness(compactOperator);){
            testHarness.setup();
            testHarness.open();
            Path f0 = this.newFile("uncompacted-f0", 3);
            Path f1 = this.newFile("uncompacted-f1", 2);
            Path f2 = this.newFile("uncompacted-f2", 2);
            Path f3 = this.newFile("uncompacted-f3", 10);
            testHarness.processElement(new StreamRecord((Object)new CompactMessages.CompactionUnit(1, "p=p1/", Arrays.asList(f0, f1, f2))));
            testHarness.processElement(new StreamRecord((Object)new CompactMessages.CompactionUnit(2, "p=p2/", Collections.singletonList(f3))));
            testHarness.endInput();
            List compactOutputs = testHarness.extractOutputValues();
            HashMap<String, List<Path>> expectCompactedFiles = new HashMap<String, List<Path>>();
            expectCompactedFiles.put("p=p1/", Collections.singletonList(new Path(this.folder + "/compacted-attempt-0-f0")));
            expectCompactedFiles.put("p=p2/", Collections.singletonList(new Path(this.folder + "/uncompacted-f3")));
            byte[] bytes = FileUtils.readAllBytes((java.nio.file.Path)new File(this.folder.getPath(), "compacted-attempt-0-f0").toPath());
            Arrays.sort(bytes);
            Assertions.assertThat((byte[])bytes).isEqualTo((Object)new byte[]{0, 0, 0, 1, 1, 1, 2});
            bytes = FileUtils.readAllBytes((java.nio.file.Path)new File(this.folder.getPath(), "uncompacted-f3").toPath());
            Assertions.assertThat((byte[])bytes).isEqualTo((Object)new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
            this.assertCompactOutput(compactOutputs, Collections.singletonList(new CompactMessages.CompactOutput(expectCompactedFiles)));
        }
    }

    private void assertCompactOutput(List<CompactMessages.CompactOutput> actualCompactOutputs, List<CompactMessages.CompactOutput> expectCompactOutputs) {
        Assertions.assertThat((int)actualCompactOutputs.size()).isEqualTo(expectCompactOutputs.size());
        for (int i = 0; i < actualCompactOutputs.size(); ++i) {
            CompactMessages.CompactOutput actualCompactOutput = actualCompactOutputs.get(i);
            CompactMessages.CompactOutput expectCompactOutput = expectCompactOutputs.get(i);
            Assertions.assertThat((Map)actualCompactOutput.getCompactedFiles()).isEqualTo((Object)expectCompactOutput.getCompactedFiles());
        }
    }

    private BatchCompactOperator<Byte> createBatchCompactOperator() {
        return new BatchCompactOperator(() -> this.folder.getFileSystem(), CompactBulkReader.factory(TestByteFormat.bulkFormat()), (CompactWriter.Factory & Serializable)context -> {
            final Path path = context.getPath();
            final Path tempPath = new Path(path.getParent(), "." + path.getName());
            final FSDataOutputStream out = context.getFileSystem().create(tempPath, FileSystem.WriteMode.OVERWRITE);
            return new CompactWriter<Byte>(){

                public void write(Byte record) throws IOException {
                    out.write((int)record.byteValue());
                }

                public void commit() throws IOException {
                    out.close();
                    context.getFileSystem().rename(tempPath, path);
                }
            };
        });
    }
}

