/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.sink;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.sink.ArrayListAsyncSink;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class AsyncSinkBaseITCase {
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    AsyncSinkBaseITCase() {
    }

    @Test
    void testWriteTwentyThousandRecordsToGenericSink() throws Exception {
        this.env.fromSequence(1L, 20000L).map(Object::toString).sinkTo((Sink)new ArrayListAsyncSink());
        this.env.execute("Integration Test: AsyncSinkBaseITCase").getJobExecutionResult();
    }

    @Test
    void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
        this.env.fromSequence(999999L, 1000100L).map(Object::toString).sinkTo((Sink)new ArrayListAsyncSink(1, 1, 2, 10L, 1000L, 10L));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> this.env.execute("Integration Test: AsyncSinkBaseITCase")).isInstanceOf(JobExecutionException.class)).rootCause().hasMessageContaining("Intentional error on persisting 1_000_000 to ArrayListDestination");
    }

    @Test
    void testThatNoIssuesOccurWhenCheckpointingIsEnabled() throws Exception {
        this.env.enableCheckpointing(20L);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)this.env, (int)1, (long)200L);
        this.env.fromSequence(1L, 10000L).map(Object::toString).sinkTo((Sink)new ArrayListAsyncSink());
        this.env.execute("Integration Test: AsyncSinkBaseITCase");
    }
}

