/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.table.stream.compact;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.connector.file.table.stream.compact.AbstractCompactTestBase;
import org.apache.flink.connector.file.table.stream.compact.CompactCoordinator;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CompactCoordinatorTest
extends AbstractCompactTestBase {
    CompactCoordinatorTest() {
    }

    @Test
    void testCoordinatorCrossCheckpoints() throws Exception {
        AtomicReference state = new AtomicReference();
        this.runCoordinator((ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, Exception>)((ThrowingConsumer)harness -> {
            harness.setup();
            harness.open();
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f0", 3)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f1", 2)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p1", this.newFile("f2", 2)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f3", 5)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f4", 1)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p1", this.newFile("f5", 5)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p1", this.newFile("f6", 4)), 0L);
            state.set(harness.snapshot(1L, 0L));
        }));
        this.runCoordinator((ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, Exception>)((ThrowingConsumer)harness -> {
            harness.setup();
            harness.initializeState((OperatorSubtaskState)state.get());
            harness.open();
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f7", 3)), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p0", this.newFile("f8", 2)), 0L);
            state.set(harness.snapshot(2L, 0L));
        }));
        this.runCoordinator((ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, Exception>)((ThrowingConsumer)harness -> {
            harness.setup();
            harness.initializeState((OperatorSubtaskState)state.get());
            harness.open();
            harness.processElement((Object)new CompactMessages.EndCheckpoint(2L, 0, 1), 0L);
            harness.processElement((Object)new CompactMessages.InputFile("p2", this.newFile("f9", 4)), 0L);
            harness.processElement((Object)new CompactMessages.EndCheckpoint(Long.MAX_VALUE, 0, 1), 0L);
            List outputs = harness.extractOutputValues();
            Assertions.assertThat((List)outputs).hasSize(9);
            ArrayList<CompactMessages.CompactionUnit> cp1Units = new ArrayList<CompactMessages.CompactionUnit>();
            for (int i = 0; i < 4; ++i) {
                CompactMessages.CoordinatorOutput output = (CompactMessages.CoordinatorOutput)outputs.get(i);
                Assertions.assertThat((Object)output).isInstanceOf(CompactMessages.CompactionUnit.class);
                cp1Units.add((CompactMessages.CompactionUnit)output);
            }
            cp1Units.sort(Comparator.comparing(CompactMessages.CompactionUnit::getPartition).thenComparingInt(CompactMessages.CompactionUnit::getUnitId));
            this.assertUnit((CompactMessages.CoordinatorOutput)cp1Units.get(0), 0, "p0", Arrays.asList("f0", "f1", "f4"));
            this.assertUnit((CompactMessages.CoordinatorOutput)cp1Units.get(1), 1, "p0", Collections.singletonList("f3"));
            this.assertUnit((CompactMessages.CoordinatorOutput)cp1Units.get(2), 2, "p1", Arrays.asList("f2", "f5"));
            this.assertUnit((CompactMessages.CoordinatorOutput)cp1Units.get(3), 3, "p1", Collections.singletonList("f6"));
            this.assertEndCompaction((CompactMessages.CoordinatorOutput)outputs.get(4), 1L);
            this.assertUnit((CompactMessages.CoordinatorOutput)outputs.get(5), 0, "p0", Arrays.asList("f7", "f8"));
            this.assertEndCompaction((CompactMessages.CoordinatorOutput)outputs.get(6), 2L);
            this.assertUnit((CompactMessages.CoordinatorOutput)outputs.get(7), 0, "p2", Collections.singletonList("f9"));
            this.assertEndCompaction((CompactMessages.CoordinatorOutput)outputs.get(8), Long.MAX_VALUE);
        }));
    }

    private void runCoordinator(ThrowingConsumer<OneInputStreamOperatorTestHarness<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput>, Exception> consumer) throws Exception {
        CompactCoordinator coordinator = new CompactCoordinator(() -> this.folder.getFileSystem(), 9L);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            consumer.accept((Object)harness);
        }
    }

    private void assertEndCompaction(CompactMessages.CoordinatorOutput output, long checkpointId) {
        Assertions.assertThat((Object)output).isInstanceOf(CompactMessages.EndCompaction.class);
        CompactMessages.EndCompaction end = (CompactMessages.EndCompaction)output;
        Assertions.assertThat((long)end.getCheckpointId()).isEqualTo(checkpointId);
    }

    private void assertUnit(CompactMessages.CoordinatorOutput output, int unitId, String partition, List<String> fileNames) {
        Assertions.assertThat((Object)output).isInstanceOf(CompactMessages.CompactionUnit.class);
        CompactMessages.CompactionUnit unit = (CompactMessages.CompactionUnit)output;
        Assertions.assertThat((int)unit.getUnitId()).isEqualTo(unitId);
        Assertions.assertThat((String)unit.getPartition()).isEqualTo(partition);
        Assertions.assertThat(unit.getPaths().stream().map(Path::getName)).containsExactlyElementsOf(fileNames);
    }
}

