/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.compactor;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.FileSinkCommittableSerializer;
import org.apache.flink.connector.file.sink.compactor.AbstractCompactTestBase;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinator;
import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinatorStateHandler;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestSerializer;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.types.Either;
import org.apache.flink.util.function.SerializableSupplier;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CompactCoordinatorTest
extends AbstractCompactTestBase {
    CompactCoordinatorTest() {
    }

    @Test
    void testSizeThreshold() throws Exception {
        FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10L).build();
        CompactCoordinator coordinator = new CompactCoordinator(null, strategy, this.getTestCommittableSerializer());
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            harness.setup();
            harness.open();
            FileSinkCommittable committable0 = this.committable("0", ".0", 5);
            FileSinkCommittable committable1 = this.committable("0", ".1", 6);
            harness.processElement(this.message(committable0));
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.processElement(this.message(committable1));
            List results = harness.extractOutputValues();
            Assertions.assertThat((List)results).hasSize(1);
            this.assertToCompact((CompactorRequest)results.get(0), committable0, committable1);
            harness.processElement(this.message(this.committable("0", ".2", 5)));
            harness.processElement(this.message(this.committable("1", ".0", 5)));
            Assertions.assertThat((List)harness.extractOutputValues()).hasSize(1);
        }
    }

    @Test
    void testCompactOnCheckpoint() throws Exception {
        FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(1).build();
        CompactCoordinator coordinator = new CompactCoordinator(null, strategy, this.getTestCommittableSerializer());
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            harness.setup();
            harness.open();
            FileSinkCommittable passThroughCommittable = this.committable("0", "4", 5);
            FileSinkCommittable committable0 = this.committable("0", ".0", 5);
            FileSinkCommittable committable1 = this.committable("0", ".1", 6);
            FileSinkCommittable committable2 = this.committable("0", ".2", 5);
            FileSinkCommittable committable3 = this.committable("1", ".0", 5);
            harness.processElement(this.message(passThroughCommittable));
            harness.processElement(this.message(committable0));
            harness.processElement(this.message(committable1));
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(1L);
            harness.snapshot(1L, 1L);
            Assertions.assertThat((List)harness.extractOutputValues()).hasSize(1);
            harness.processElement(this.message(committable2));
            harness.processElement(this.message(committable3));
            Assertions.assertThat((List)harness.extractOutputValues()).hasSize(1);
            harness.prepareSnapshotPreBarrier(2L);
            harness.snapshot(2L, 2L);
            List results = harness.extractOutputValues();
            Assertions.assertThat((List)results).hasSize(3);
            this.assertToCompact((CompactorRequest)results.get(0), committable0, committable1);
            this.assertToPassthrough((CompactorRequest)results.get(0), passThroughCommittable);
            this.assertToCompact((CompactorRequest)results.get(1), committable2);
            this.assertToCompact((CompactorRequest)results.get(2), committable3);
        }
    }

    @Test
    void testCompactOverMultipleCheckpoints() throws Exception {
        FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder().enableCompactionOnCheckpoint(3).build();
        CompactCoordinator coordinator = new CompactCoordinator(null, strategy, this.getTestCommittableSerializer());
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            harness.setup();
            harness.open();
            FileSinkCommittable committable0 = this.committable("0", ".0", 5);
            FileSinkCommittable committable1 = this.committable("0", ".1", 6);
            harness.processElement(this.message(committable0));
            harness.processElement(this.message(committable1));
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(1L);
            harness.snapshot(1L, 1L);
            harness.prepareSnapshotPreBarrier(2L);
            harness.snapshot(2L, 2L);
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(3L);
            harness.snapshot(3L, 3L);
            List results = harness.extractOutputValues();
            Assertions.assertThat((List)results).hasSize(1);
            this.assertToCompact((CompactorRequest)results.get(0), committable0, committable1);
        }
    }

    @Test
    void testCompactOnEndOfInput() throws Exception {
        FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10L).build();
        CompactCoordinator coordinator = new CompactCoordinator(null, strategy, this.getTestCommittableSerializer());
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            harness.setup();
            harness.open();
            FileSinkCommittable committable0 = this.committable("0", ".0", 5);
            harness.processElement(this.message(committable0));
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(1L);
            harness.snapshot(1L, 1L);
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.endInput();
            List results = harness.extractOutputValues();
            Assertions.assertThat((List)results).hasSize(1);
            this.assertToCompact((CompactorRequest)results.get(0), committable0);
        }
    }

    @Test
    void testPassthrough() throws Exception {
        FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10L).build();
        CompactCoordinator coordinator = new CompactCoordinator(null, strategy, this.getTestCommittableSerializer());
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            harness.setup();
            harness.open();
            FileSinkCommittable cleanupToPassthrough = this.cleanupInprogress("0", ".0", 1);
            FileSinkCommittable sizeUnavailableToPassthrough = this.committable("0", ".1", -1);
            FileSinkCommittable pathNotHidToPassThrough = this.committable("0", "2", -1);
            FileSinkCommittable normalCommittable = this.committable("0", ".3", 10);
            harness.processElement(this.message(cleanupToPassthrough));
            harness.processElement(this.message(sizeUnavailableToPassthrough));
            harness.processElement(this.message(pathNotHidToPassThrough));
            harness.processElement(this.message(normalCommittable));
            List results = harness.extractOutputValues();
            Assertions.assertThat((List)results).hasSize(1);
            this.assertToCompact((CompactorRequest)results.get(0), normalCommittable);
            this.assertToPassthrough((CompactorRequest)results.get(0), cleanupToPassthrough, sizeUnavailableToPassthrough, pathNotHidToPassThrough);
        }
    }

    @Test
    void testRestore() throws Exception {
        OperatorSubtaskState state;
        FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10L).build();
        CompactCoordinator coordinator = new CompactCoordinator(null, strategy, this.getTestCommittableSerializer());
        FileSinkCommittable committable0 = this.committable("0", ".0", 5);
        FileSinkCommittable committable1 = this.committable("0", ".1", 6);
        FileSinkCommittable committable2 = this.committable("0", ".2", 5);
        FileSinkCommittable committable3 = this.committable("1", ".0", 5);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            harness.setup();
            harness.open();
            harness.processElement(this.message(committable0));
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(1L);
            state = harness.snapshot(1L, 1L);
        }
        coordinator = new CompactCoordinator(null, strategy, this.getTestCommittableSerializer());
        harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement(this.message(committable1));
            Assertions.assertThat((List)harness.extractOutputValues()).hasSize(1);
            harness.processElement(this.message(committable2));
            harness.processElement(this.message(committable3));
            Assertions.assertThat((List)harness.extractOutputValues()).hasSize(1);
            harness.endInput();
            List results = harness.extractOutputValues();
            Assertions.assertThat((List)results).hasSize(3);
            this.assertToCompact((CompactorRequest)results.get(0), committable0, committable1);
            this.assertToCompact((CompactorRequest)results.get(1), committable2);
            this.assertToCompact((CompactorRequest)results.get(2), committable3);
        }
        finally {
            harness.close();
        }
    }

    @Test
    void testRestoreWithChangedStrategy() throws Exception {
        OperatorSubtaskState state;
        FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder().setSizeThreshold(100L).build();
        CompactCoordinator coordinator = new CompactCoordinator(null, strategy, this.getTestCommittableSerializer());
        FileSinkCommittable committable0 = this.committable("0", ".0", 5);
        FileSinkCommittable committable1 = this.committable("0", ".1", 6);
        FileSinkCommittable committable2 = this.committable("0", ".2", 7);
        FileSinkCommittable committable3 = this.committable("0", ".3", 8);
        FileSinkCommittable committable4 = this.committable("0", ".4", 9);
        FileSinkCommittable committable5 = this.committable("0", ".5", 2);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            harness.setup();
            harness.open();
            harness.processElement(this.message(committable0));
            harness.processElement(this.message(committable1));
            harness.processElement(this.message(committable2));
            harness.processElement(this.message(committable3));
            harness.processElement(this.message(committable4));
            harness.prepareSnapshotPreBarrier(1L);
            state = harness.snapshot(1L, 1L);
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
        }
        FileCompactStrategy changedStrategy = FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10L).build();
        CompactCoordinator changedCoordinator = new CompactCoordinator(null, changedStrategy, this.getTestCommittableSerializer());
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)changedCoordinator);){
            harness.setup();
            harness.initializeState(state);
            harness.open();
            Assertions.assertThat((List)harness.extractOutputValues()).hasSize(2);
            harness.processElement(this.message(committable5));
            List results = harness.extractOutputValues();
            Assertions.assertThat((List)results).hasSize(3);
            this.assertToCompact((CompactorRequest)results.get(0), committable0, committable1);
            this.assertToCompact((CompactorRequest)results.get(1), committable2, committable3);
            this.assertToCompact((CompactorRequest)results.get(2), committable4, committable5);
        }
    }

    @Test
    void testStateHandler() throws Exception {
        OperatorSubtaskState state;
        FileCompactStrategy strategy = FileCompactStrategy.Builder.newBuilder().setSizeThreshold(10L).build();
        CompactCoordinator coordinator = new CompactCoordinator(null, strategy, this.getTestCommittableSerializer());
        FileSinkCommittable committable0 = this.committable("0", ".0", 5);
        FileSinkCommittable committable1 = this.committable("0", ".1", 6);
        FileSinkCommittable committable2 = this.committable("0", "2", 6);
        FileSinkCommittable cleanup3 = this.cleanupInprogress("0", "3", 7);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)coordinator);){
            harness.setup();
            harness.open();
            harness.processElement(this.message(committable0));
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.processElement(this.message(cleanup3));
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(1L);
            state = harness.snapshot(1L, 1L);
        }
        CompactCoordinatorStateHandler handler = new CompactCoordinatorStateHandler(null, this.getTestCommittableSerializer());
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)handler);){
            harness.setup((TypeSerializer)new EitherSerializer((TypeSerializer)new SimpleVersionedSerializerTypeSerializerProxy((SerializableSupplier & Serializable)() -> new CommittableMessageSerializer(this.getTestCommittableSerializer())), (TypeSerializer)new SimpleVersionedSerializerTypeSerializerProxy((SerializableSupplier & Serializable)() -> new CompactorRequestSerializer(this.getTestCommittableSerializer()))));
            harness.initializeState(state);
            harness.open();
            Assertions.assertThat((List)harness.extractOutputValues()).hasSize(2);
            harness.processElement(this.message(committable1));
            harness.processElement(this.message(committable2));
            List results = harness.extractOutputValues();
            Assertions.assertThat((List)results).hasSize(4);
            Assertions.assertThat((boolean)((Either)results.get(0)).isRight()).isTrue();
            this.assertToCompact((CompactorRequest)((Either)results.get(0)).right(), committable0);
            this.assertToPassthrough((CompactorRequest)((Either)results.get(1)).right(), cleanup3);
            Assertions.assertThat((((Either)results.get(2)).isLeft() && ((Either)results.get(2)).left() instanceof CommittableWithLineage ? 1 : 0) != 0).isTrue();
            Assertions.assertThat((Object)((FileSinkCommittable)((CommittableWithLineage)((Either)results.get(2)).left()).getCommittable())).isEqualTo((Object)committable1);
            Assertions.assertThat((((Either)results.get(3)).isLeft() && ((Either)results.get(3)).left() instanceof CommittableWithLineage ? 1 : 0) != 0).isTrue();
            Assertions.assertThat((Object)((FileSinkCommittable)((CommittableWithLineage)((Either)results.get(3)).left()).getCommittable())).isEqualTo((Object)committable2);
        }
    }

    private StreamRecord<CommittableMessage<FileSinkCommittable>> message(FileSinkCommittable committable) {
        return new StreamRecord((Object)new CommittableWithLineage((Object)committable, 1L, 0), 0L);
    }

    private FileSinkCommittable committable(String bucketId, String name, int size) throws IOException {
        return new FileSinkCommittable(bucketId, (InProgressFileWriter.PendingFileRecoverable)new FileSinkTestUtils.TestPendingFileRecoverable(this.newFile(name + "_" + bucketId, size <= 0 ? 1 : size), size));
    }

    private FileSinkCommittable cleanupInprogress(String bucketId, String name, int size) throws IOException {
        Path toCleanup = this.newFile(name + "_" + bucketId, size);
        return new FileSinkCommittable(bucketId, (InProgressFileWriter.InProgressFileRecoverable)new FileSinkTestUtils.TestInProgressFileRecoverable(toCleanup, size));
    }

    private SimpleVersionedSerializer<FileSinkCommittable> getTestCommittableSerializer() {
        return new FileSinkCommittableSerializer(new FileSinkTestUtils.SimpleVersionedWrapperSerializer<InProgressFileWriter.PendingFileRecoverable>(FileSinkTestUtils.TestPendingFileRecoverable::new), new FileSinkTestUtils.SimpleVersionedWrapperSerializer<InProgressFileWriter.InProgressFileRecoverable>(FileSinkTestUtils.TestInProgressFileRecoverable::new));
    }

    private void assertToCompact(CompactorRequest request, FileSinkCommittable ... committables) {
        List committableToCompact = request.getCommittableToCompact();
        Assertions.assertThat((Object[])committableToCompact.toArray()).isEqualTo((Object)committables);
    }

    private void assertToPassthrough(CompactorRequest request, FileSinkCommittable ... committables) {
        List committableToCompact = request.getCommittableToPassthrough();
        Assertions.assertThat((Object[])committableToCompact.toArray()).isEqualTo((Object)committables);
    }
}

