/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.net.URISyntaxException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectArrayAssert;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExtendWith(value={TestLoggerExtension.class})
class UnifiedSinkMigrationITCase {
    private static final Logger LOG = LoggerFactory.getLogger(UnifiedSinkMigrationITCase.class);
    private static final String SAVEPOINT_FOLDER_NAME = "unified-sink-migration-test";
    @RegisterExtension
    private static final MiniClusterExtension miniClusterExtension = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().build());
    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
    private static final int WRITER_STATE = 1;
    private static final int COMMITTER_STATE = 2;
    private static final int GLOBAL_COMMITTER_STATE = 3;
    private static final String SINK_UUID = "1c4ec0f9-2d96-46e9-99ea-45e8c3df5202";

    UnifiedSinkMigrationITCase() {
    }

    @Disabled
    @Test
    void prepareSinkSavepoint() throws Exception {
        LOG.warn("Deleting the previous savepoints.");
        Path basePath = Paths.get("src/test/resources/", new String[0]).resolve(SAVEPOINT_FOLDER_NAME);
        Files.walk(basePath, new FileVisitOption[0]).skip(1L).sorted(Comparator.reverseOrder()).map(Path::toFile).forEach(File::delete);
        JobClient jobClient = this.executeJob(false);
        CompletableFuture savepointFuture = jobClient.stopWithSavepoint(false, basePath.toString(), SavepointFormatType.CANONICAL);
        String savepointPath = (String)savepointFuture.get();
        LOG.info("Savepoint path: {}", (Object)savepointPath);
        Assertions.assertThat((String)savepointPath).contains(new CharSequence[]{basePath.toString()});
    }

    @Test
    void testRestoreSinkState() throws Exception {
        JobClient jobClient = this.executeJob(true);
        jobClient.cancel();
    }

    private JobClient executeJob(boolean restore) throws Exception {
        Configuration conf = new Configuration();
        if (restore) {
            conf.set(StateRecoveryOptions.SAVEPOINT_PATH, (Object)this.findSavepointPath());
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        SharedReference latch = this.sharedObjects.add((Object)new OneShotLatch());
        SharedReference commitLatch = this.sharedObjects.add((Object)new CountDownLatch(2));
        env.enableCheckpointing(100L);
        env.setParallelism(1);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.fromSequence(1L, Long.MAX_VALUE).map((MapFunction & Serializable)value -> {
            Thread.sleep(10L);
            return value;
        }).sinkTo((Sink)new StateFulSinkV1(restore, (SharedReference<OneShotLatch>)latch, (SharedReference<CountDownLatch>)commitLatch)).disableChaining().uid(SINK_UUID);
        JobClient jobClient = env.executeAsync();
        ((OneShotLatch)latch.get()).await();
        Assertions.assertThat((Comparable)((Comparable)jobClient.getJobStatus().get())).isEqualTo((Object)JobStatus.RUNNING);
        return jobClient;
    }

    private File buildSavepointPath() throws URISyntaxException {
        return new File(this.getClass().getResource("/unified-sink-migration-test").toURI());
    }

    private String findSavepointPath() throws URISyntaxException {
        File basePath = this.buildSavepointPath();
        LOG.info("Base path: {}", (Object)basePath.getAbsolutePath());
        Object[] savepointDirectories = basePath.listFiles(File::isDirectory);
        ((ObjectArrayAssert)Assertions.assertThat((Object[])savepointDirectories).isNotNull()).hasSize(1);
        Object[] stateFiles = ((File)savepointDirectories[0]).listFiles();
        ((ObjectArrayAssert)Assertions.assertThat((Object[])stateFiles).isNotNull()).hasSize(1);
        return ((File)stateFiles[0]).getAbsolutePath();
    }

    private static class IntegerSerializer
    implements SimpleVersionedSerializer<Integer> {
        private IntegerSerializer() {
        }

        public int getVersion() {
            return 0;
        }

        public byte[] serialize(Integer obj) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(8);
            out.writeInt(obj.intValue());
            return out.getCopyOfBuffer();
        }

        public Integer deserialize(int version, byte[] serialized) throws IOException {
            DataInputDeserializer in = new DataInputDeserializer(serialized);
            return in.readInt();
        }
    }

    private static class StringSerializer
    implements SimpleVersionedSerializer<String> {
        private StringSerializer() {
        }

        public int getVersion() {
            return 0;
        }

        public byte[] serialize(String obj) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(8);
            out.writeUTF(obj);
            return out.getCopyOfBuffer();
        }

        public String deserialize(int version, byte[] serialized) throws IOException {
            DataInputDeserializer in = new DataInputDeserializer(serialized);
            return in.readUTF();
        }
    }

    private static class TestGlobalCommitter
    implements GlobalCommitter<Integer, String> {
        private final boolean recover;
        private final SharedReference<OneShotLatch> latch;
        private final SharedReference<CountDownLatch> commitLatch;
        private boolean firstCommitAfterRecover;

        TestGlobalCommitter(boolean recover, SharedReference<OneShotLatch> latch, SharedReference<CountDownLatch> commitLatch) {
            this.recover = recover;
            this.firstCommitAfterRecover = recover;
            this.latch = latch;
            this.commitLatch = commitLatch;
        }

        public List<String> filterRecoveredCommittables(List<String> globalCommittables) throws IOException {
            if (this.recover) {
                Assertions.assertThat(globalCommittables).containsExactly((Object[])new String[]{String.valueOf(3)});
            }
            return globalCommittables;
        }

        public String combine(List<Integer> committables) throws IOException {
            Assertions.assertThat(committables).hasSize(1);
            return String.valueOf(committables.get(0));
        }

        public List<String> commit(List<String> globalCommittables) throws IOException, InterruptedException {
            LOG.info("Global committing {}", globalCommittables);
            LOG.info("Latch count: {}", (Object)((CountDownLatch)this.commitLatch.get()).getCount());
            if (!this.firstCommitAfterRecover && ((CountDownLatch)this.commitLatch.get()).getCount() <= 0L) {
                ((OneShotLatch)this.latch.get()).trigger();
                Assertions.assertThat(globalCommittables).containsExactly((Object[])new String[]{String.valueOf(3)});
            }
            this.firstCommitAfterRecover = false;
            return this.recover ? Collections.emptyList() : globalCommittables;
        }

        public void endOfInput() throws IOException, InterruptedException {
        }

        public void close() throws Exception {
        }
    }

    private static class TestCommitter
    implements Committer<Integer> {
        private final boolean recovered;
        private final SharedReference<CountDownLatch> commitLatch;
        boolean firstCommit = true;

        TestCommitter(boolean recovered, SharedReference<CountDownLatch> commitLatch) {
            this.recovered = recovered;
            this.commitLatch = commitLatch;
        }

        public List<Integer> commit(List<Integer> committables) throws IOException, InterruptedException {
            if (this.firstCommit) {
                if (!this.recovered) {
                    Assertions.assertThat(committables).containsExactly((Object[])new Integer[]{2, 3});
                } else if (this.recovered) {
                    Assertions.assertThat(committables).containsExactly((Object[])new Integer[]{2});
                }
            }
            LOG.info("Committing {}", committables);
            ((CountDownLatch)this.commitLatch.get()).countDown();
            if (this.recovered && !this.firstCommit) {
                return Collections.emptyList();
            }
            this.firstCommit = false;
            return Collections.singletonList(2);
        }

        public void close() throws Exception {
        }
    }

    private static class TestWriter
    implements SinkWriter<Long, Integer, Integer> {
        private final boolean recovered;
        private boolean emitted = false;

        TestWriter(boolean recovered, List<Integer> recoveredState) {
            this.recovered = recovered;
            if (recovered) {
                Assertions.assertThat(recoveredState).containsExactly((Object[])new Integer[]{1});
            } else {
                Assertions.assertThat(recoveredState).isEmpty();
            }
        }

        public void write(Long element, SinkWriter.Context context) throws IOException, InterruptedException {
        }

        public List<Integer> prepareCommit(boolean flush) throws IOException, InterruptedException {
            if (this.emitted || this.recovered) {
                return Collections.emptyList();
            }
            this.emitted = true;
            return Arrays.asList(2, 3);
        }

        public List<Integer> snapshotState(long checkpointId) throws IOException {
            return Collections.singletonList(1);
        }

        public void close() throws Exception {
        }
    }

    private static class StateFulSinkV1
    implements Sink<Long, Integer, Integer, String> {
        private final boolean recovered;
        private final SharedReference<OneShotLatch> latch;
        private final SharedReference<CountDownLatch> commitLatch;

        StateFulSinkV1(boolean recovered, SharedReference<OneShotLatch> latch, SharedReference<CountDownLatch> commitLatch) {
            this.recovered = recovered;
            this.latch = latch;
            this.commitLatch = commitLatch;
        }

        public SinkWriter<Long, Integer, Integer> createWriter(Sink.InitContext context, List<Integer> states) throws IOException {
            return new TestWriter(this.recovered, states);
        }

        public Optional<SimpleVersionedSerializer<Integer>> getWriterStateSerializer() {
            return Optional.of(new IntegerSerializer());
        }

        public Optional<Committer<Integer>> createCommitter() throws IOException {
            return Optional.of(new TestCommitter(this.recovered, this.commitLatch));
        }

        public Optional<GlobalCommitter<Integer, String>> createGlobalCommitter() throws IOException {
            return Optional.of(new TestGlobalCommitter(this.recovered, this.latch, this.commitLatch));
        }

        public Optional<SimpleVersionedSerializer<Integer>> getCommittableSerializer() {
            return Optional.of(new IntegerSerializer());
        }

        public Optional<SimpleVersionedSerializer<String>> getGlobalCommittableSerializer() {
            return Optional.of(new StringSerializer());
        }
    }
}

