/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table.stream.compact;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.compact.AbstractCompactTestBase;
import org.apache.flink.connector.file.table.stream.compact.CompactBulkReader;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.connector.file.table.stream.compact.CompactOperator;
import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
import org.apache.flink.connector.file.table.stream.compact.TestByteFormat;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CompactOperatorTest
extends AbstractCompactTestBase {
    CompactOperatorTest() {
    }

    @Test
    void testCompactOperator() throws Exception {
        AtomicReference state = new AtomicReference();
        Path f0 = this.newFile(".uncompacted-f0", 3);
        Path f1 = this.newFile(".uncompacted-f1", 2);
        Path f2 = this.newFile(".uncompacted-f2", 2);
        Path f3 = this.newFile(".uncompacted-f3", 5);
        Path f4 = this.newFile(".uncompacted-f4", 1);
        Path f5 = this.newFile(".uncompacted-f5", 5);
        Path f6 = this.newFile(".uncompacted-f6", 4);
        FileSystem fs = f0.getFileSystem();
        this.runCompact((ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo>, Exception>)((ThrowingConsumer)harness -> {
            harness.setup();
            harness.open();
            harness.processElement((Object)new CompactMessages.CompactionUnit(0, "p0", Arrays.asList(f0, f1, f4)), 0L);
            harness.processElement((Object)new CompactMessages.CompactionUnit(1, "p0", Collections.singletonList(f3)), 0L);
            harness.processElement((Object)new CompactMessages.CompactionUnit(2, "p1", Arrays.asList(f2, f5)), 0L);
            harness.processElement((Object)new CompactMessages.CompactionUnit(3, "p0", Collections.singletonList(f6)), 0L);
            harness.processElement((Object)new CompactMessages.EndCompaction(1L), 0L);
            state.set(harness.snapshot(2L, 0L));
            List outputs = harness.extractOutputValues();
            Assertions.assertThat((List)outputs).hasSize(1);
            Assertions.assertThat((long)((PartitionCommitInfo)outputs.get(0)).getCheckpointId()).isEqualTo(1L);
            Assertions.assertThat((List)((PartitionCommitInfo)outputs.get(0)).getPartitions()).isEqualTo(Arrays.asList("p0", "p1"));
            Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f0"))).isTrue();
            Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f2"))).isTrue();
            Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f3"))).isTrue();
            Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f6"))).isTrue();
            byte[] bytes = FileUtils.readAllBytes((java.nio.file.Path)new File(this.folder.getPath(), "compacted-f0").toPath());
            Arrays.sort(bytes);
            Assertions.assertThat((byte[])bytes).isEqualTo((Object)new byte[]{0, 0, 0, 1, 1, 2});
        }));
        this.runCompact((ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo>, Exception>)((ThrowingConsumer)harness -> {
            harness.setup();
            harness.initializeState((OperatorSubtaskState)state.get());
            harness.open();
            harness.notifyOfCompletedCheckpoint(2L);
            Assertions.assertThat((boolean)fs.exists(f0)).isFalse();
            Assertions.assertThat((boolean)fs.exists(f1)).isFalse();
            Assertions.assertThat((boolean)fs.exists(f2)).isFalse();
            Assertions.assertThat((boolean)fs.exists(f3)).isFalse();
            Assertions.assertThat((boolean)fs.exists(f4)).isFalse();
            Assertions.assertThat((boolean)fs.exists(f5)).isFalse();
            Assertions.assertThat((boolean)fs.exists(f6)).isFalse();
        }));
    }

    @Test
    void testEndInput() throws Exception {
        Path f0 = this.newFile(".uncompacted-f0", 3);
        Path f1 = this.newFile(".uncompacted-f1", 4);
        Path f2 = this.newFile(".uncompacted-f2", 2);
        FileSystem fs = f0.getFileSystem();
        this.runCompact((ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo>, Exception>)((ThrowingConsumer)harness -> {
            harness.setup();
            harness.open();
            harness.processElement((Object)new CompactMessages.CompactionUnit(0, "p0", Arrays.asList(f0, f1)), 0L);
            harness.processElement((Object)new CompactMessages.CompactionUnit(1, "p0", Collections.singletonList(f2)), 0L);
            harness.endInput();
            Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f0"))).isTrue();
            Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f2"))).isTrue();
            Assertions.assertThat((boolean)fs.exists(f0)).isFalse();
            Assertions.assertThat((boolean)fs.exists(f1)).isFalse();
            Assertions.assertThat((boolean)fs.exists(f2)).isFalse();
        }));
    }

    @Test
    void testUnitSelection() throws Exception {
        OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo> harness0 = this.create(2, 0);
        harness0.setup();
        harness0.open();
        OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo> harness1 = this.create(2, 1);
        harness1.setup();
        harness1.open();
        Path f0 = this.newFile(".uncompacted-f0", 3);
        Path f1 = this.newFile(".uncompacted-f1", 2);
        Path f2 = this.newFile(".uncompacted-f2", 2);
        Path f3 = this.newFile(".uncompacted-f3", 5);
        Path f4 = this.newFile(".uncompacted-f4", 1);
        Path f5 = this.newFile(".uncompacted-f5", 5);
        Path f6 = this.newFile(".uncompacted-f6", 4);
        FileSystem fs = f0.getFileSystem();
        harness0.processElement((Object)new CompactMessages.CompactionUnit(0, "p0", Arrays.asList(f0, f1, f4)), 0L);
        harness0.processElement((Object)new CompactMessages.CompactionUnit(1, "p0", Collections.singletonList(f3)), 0L);
        harness0.processElement((Object)new CompactMessages.CompactionUnit(2, "p0", Arrays.asList(f2, f5)), 0L);
        harness0.processElement((Object)new CompactMessages.CompactionUnit(3, "p0", Collections.singletonList(f6)), 0L);
        harness0.processElement((Object)new CompactMessages.EndCompaction(1L), 0L);
        Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f0"))).isTrue();
        Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f2"))).isTrue();
        Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f3"))).isFalse();
        Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f6"))).isFalse();
        harness1.processElement((Object)new CompactMessages.CompactionUnit(0, "p0", Arrays.asList(f0, f1, f4)), 0L);
        harness1.processElement((Object)new CompactMessages.CompactionUnit(1, "p0", Collections.singletonList(f3)), 0L);
        harness1.processElement((Object)new CompactMessages.CompactionUnit(2, "p0", Arrays.asList(f2, f5)), 0L);
        harness1.processElement((Object)new CompactMessages.CompactionUnit(3, "p0", Collections.singletonList(f6)), 0L);
        harness1.processElement((Object)new CompactMessages.EndCompaction(1L), 0L);
        Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f3"))).isTrue();
        Assertions.assertThat((boolean)fs.exists(new Path(this.folder, "compacted-f6"))).isTrue();
        harness0.close();
        harness1.close();
    }

    private void runCompact(ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo>, Exception> consumer) throws Exception {
        try (OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo> harness = this.create(1, 0);){
            consumer.accept(harness);
        }
    }

    private OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorOutput, PartitionCommitInfo> create(int parallelism, int subtaskIndex) throws Exception {
        CompactOperator operator = new CompactOperator(() -> this.folder.getFileSystem(), CompactBulkReader.factory(TestByteFormat.bulkFormat()), (CompactWriter.Factory & Serializable)context -> {
            final Path path = context.getPath();
            final Path tempPath = new Path(path.getParent(), "." + path.getName());
            final FSDataOutputStream out = context.getFileSystem().create(tempPath, FileSystem.WriteMode.OVERWRITE);
            return new CompactWriter<Byte>(){

                public void write(Byte record) throws IOException {
                    out.write((int)record.byteValue());
                }

                public void commit() throws IOException {
                    out.close();
                    context.getFileSystem().rename(tempPath, path);
                }
            };
        });
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator, parallelism, parallelism, subtaskIndex);
    }
}

