/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.db2;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.connectors.db2.Db2Source;
import org.apache.flink.cdc.connectors.db2.Db2TestBase;
import org.apache.flink.cdc.connectors.utils.AssertUtils;
import org.apache.flink.cdc.connectors.utils.TestSourceContext;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Test;

public class Db2SourceTest
extends Db2TestBase {
    @Test
    public void testConsumingAllEvents() throws Exception {
        this.initializeDb2Table("inventory", "PRODUCTS");
        final DebeziumSourceFunction<SourceRecord> source = this.createDb2Source("DB2INST1.PRODUCTS");
        final TestSourceContext sourceContext = new TestSourceContext();
        Db2SourceTest.setupSource(source);
        try (Connection connection = this.getJdbcConnection();
             Statement statement = connection.createStatement();){
            CheckedThread runThread = new CheckedThread(){

                public void go() throws Exception {
                    source.run((SourceFunction.SourceContext)sourceContext);
                }
            };
            runThread.start();
            List records = this.drain(sourceContext, 9);
            Assert.assertEquals((long)9L, (long)records.size());
            for (int i = 0; i < records.size(); ++i) {
                AssertUtils.assertRead((SourceRecord)((SourceRecord)records.get(i)), (String)"ID", (int)(101 + i));
            }
            statement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (default,'robot','Toy robot',1.304)");
            records = this.drain(sourceContext, 1);
            AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)110);
            statement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (1001,'roy','old robot',1234.56)");
            records = this.drain(sourceContext, 1);
            AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1001);
            statement.execute("UPDATE DB2INST1.PRODUCTS SET ID=2001, DESCRIPTION='really old robot' WHERE ID=1001");
            records = this.drain(sourceContext, 2);
            AssertUtils.assertDelete((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1001);
            AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(1)), (String)"ID", (int)2001);
            statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT=1345.67 WHERE ID=2001");
            records = this.drain(sourceContext, 1);
            AssertUtils.assertUpdate((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)2001);
            statement.execute("ALTER TABLE DB2INST1.PRODUCTS ADD COLUMN VOLUME FLOAT ADD COLUMN ALIAS VARCHAR(30) NULL");
            statement.execute("UPDATE DB2INST1.PRODUCTS SET VOLUME=13.5 WHERE ID=2001");
            records = this.drain(sourceContext, 1);
            AssertUtils.assertUpdate((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)2001);
            source.close();
            runThread.sync();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckpointAndRestore() throws Exception {
        Object statement2;
        this.initializeDb2Table("inventory", "PRODUCTS");
        TestingListState offsetState = new TestingListState();
        TestingListState historyState = new TestingListState();
        String prevLsn = "";
        final DebeziumSourceFunction<SourceRecord> source = this.createDb2Source("DB2INST1.PRODUCTS");
        final BlockingSourceContext sourceContext = new BlockingSourceContext(8);
        Db2SourceTest.setupSource(source, null, offsetState, historyState, true, 0, 1);
        CheckedThread runThread = new CheckedThread(){

            public void go() throws Exception {
                source.run((SourceFunction.SourceContext)sourceContext);
            }
        };
        runThread.start();
        int received = this.drain(sourceContext, 2).size();
        Assert.assertEquals((long)2L, (long)received);
        Assert.assertFalse((boolean)this.waitForCheckpointLock(sourceContext.getCheckpointLock(), Duration.ofSeconds(3L)));
        sourceContext.blocker.release();
        Object records = this.drain(sourceContext, 9 - received);
        Assert.assertEquals((long)9L, (long)(records.size() + received));
        Assert.assertEquals((long)0L, (long)offsetState.list.size());
        Assert.assertEquals((long)0L, (long)historyState.list.size());
        Object object = sourceContext.getCheckpointLock();
        synchronized (object) {
            source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(101L, 101L));
        }
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        String state = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertEquals((Object)"db2_cdc_source", (Object)JsonPath.read((String)state, (String)"$.sourcePartition.server", (Predicate[])new Predicate[0]));
        Object lsn = (String)JsonPath.read((String)state, (String)"$.sourceOffset.commit_lsn", (Predicate[])new Predicate[0]);
        Assert.assertTrue((((String)lsn).compareTo(prevLsn) > 0 ? 1 : 0) != 0);
        prevLsn = lsn;
        source.close();
        runThread.sync();
        final DebeziumSourceFunction<SourceRecord> source2 = this.createDb2Source("DB2INST1.PRODUCTS");
        final TestSourceContext sourceContext2 = new TestSourceContext();
        Db2SourceTest.setupSource(source2, 1L, offsetState, historyState, true, 0, 1);
        CheckedThread runThread2 = new CheckedThread(){

            public void go() throws Exception {
                source2.run((SourceFunction.SourceContext)sourceContext2);
            }
        };
        runThread2.start();
        Assert.assertFalse((boolean)this.waitForAvailableRecords(Duration.ofSeconds(5L), sourceContext2));
        Connection connection = this.getJdbcConnection();
        records = null;
        try {
            statement2 = connection.createStatement();
            lsn = null;
            try {
                statement2.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (default,'robot','Toy robot',1.304)");
                List records2 = this.drain(sourceContext2, 1);
                Assert.assertEquals((long)1L, (long)records2.size());
                AssertUtils.assertInsert((SourceRecord)((SourceRecord)records2.get(0)), (String)"ID", (int)110);
                Object object2 = sourceContext2.getCheckpointLock();
                synchronized (object2) {
                    source2.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(138L, 138L));
                }
                Assert.assertEquals((long)1L, (long)offsetState.list.size());
                String state2 = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
                Assert.assertEquals((Object)"db2_cdc_source", (Object)JsonPath.read((String)state2, (String)"$.sourcePartition.server", (Predicate[])new Predicate[0]));
                String lsn2 = (String)JsonPath.read((String)state2, (String)"$.sourceOffset.commit_lsn", (Predicate[])new Predicate[0]);
                Assert.assertTrue((lsn2.compareTo(prevLsn) > 0 ? 1 : 0) != 0);
                statement2.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (1001,'roy','old robot',1234.56)");
                statement2.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT=1345.67 WHERE ID=1001");
            }
            catch (Throwable throwable) {
                lsn = throwable;
                throw throwable;
            }
            finally {
                if (statement2 != null) {
                    if (lsn != null) {
                        try {
                            statement2.close();
                        }
                        catch (Throwable throwable) {
                            ((Throwable)lsn).addSuppressed(throwable);
                        }
                    } else {
                        statement2.close();
                    }
                }
            }
        }
        catch (Throwable statement2) {
            records = statement2;
            throw statement2;
        }
        finally {
            if (connection != null) {
                if (records != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable statement2) {
                        ((Throwable)records).addSuppressed(statement2);
                    }
                } else {
                    connection.close();
                }
            }
        }
        source2.close();
        runThread2.sync();
        final DebeziumSourceFunction<SourceRecord> source3 = this.createDb2Source("DB2INST1.PRODUCTS");
        final TestSourceContext sourceContext3 = new TestSourceContext();
        Db2SourceTest.setupSource(source3, 2L, offsetState, historyState, true, 0, 1);
        CheckedThread runThread3 = new CheckedThread(){

            public void go() throws Exception {
                source3.run((SourceFunction.SourceContext)sourceContext3);
            }
        };
        runThread3.start();
        List records3 = this.drain(sourceContext3, 2);
        AssertUtils.assertInsert((SourceRecord)((SourceRecord)records3.get(0)), (String)"ID", (int)1001);
        AssertUtils.assertUpdate((SourceRecord)((SourceRecord)records3.get(1)), (String)"ID", (int)1001);
        Assert.assertFalse((boolean)this.waitForAvailableRecords(Duration.ofSeconds(3L), sourceContext3));
        Object connection2 = this.getJdbcConnection();
        statement2 = null;
        try (Statement statement3 = connection2.createStatement();){
            statement3.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=1001");
        }
        catch (Throwable throwable) {
            statement2 = throwable;
            throw throwable;
        }
        finally {
            if (connection2 != null) {
                if (statement2 != null) {
                    try {
                        connection2.close();
                    }
                    catch (Throwable throwable) {
                        ((Throwable)statement2).addSuppressed(throwable);
                    }
                } else {
                    connection2.close();
                }
            }
        }
        records3 = this.drain(sourceContext3, 1);
        AssertUtils.assertDelete((SourceRecord)((SourceRecord)records3.get(0)), (String)"ID", (int)1001);
        connection2 = sourceContext3.getCheckpointLock();
        synchronized (connection2) {
            source3.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(233L, 233L));
        }
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        String state3 = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertEquals((Object)"db2_cdc_source", (Object)JsonPath.read((String)state3, (String)"$.sourcePartition.server", (Predicate[])new Predicate[0]));
        String lsn3 = (String)JsonPath.read((String)state3, (String)"$.sourceOffset.commit_lsn", (Predicate[])new Predicate[0]);
        Assert.assertTrue((lsn3.compareTo(prevLsn) > 0 ? 1 : 0) != 0);
        source3.close();
        runThread3.sync();
    }

    private DebeziumSourceFunction<SourceRecord> createDb2Source(String tableName) {
        return Db2Source.builder().hostname(DB2_CONTAINER.getHost()).port(DB2_CONTAINER.getMappedPort(50000).intValue()).database(DB2_CONTAINER.getDatabaseName()).username(DB2_CONTAINER.getUsername()).password(DB2_CONTAINER.getPassword()).tableList(new String[]{tableName}).deserializer((DebeziumDeserializationSchema)new ForwardDeserializeSchema()).build();
    }

    private <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount) throws Exception {
        ArrayList<Object> allRecords = new ArrayList<Object>();
        LinkedBlockingQueue queue = sourceContext.getCollectedOutputs();
        while (allRecords.size() < expectedRecordCount) {
            StreamRecord record = (StreamRecord)queue.poll(100L, TimeUnit.SECONDS);
            if (record != null) {
                allRecords.add(record.getValue());
                continue;
            }
            throw new RuntimeException("Can't receive " + expectedRecordCount + " elements before timeout.");
        }
        return allRecords;
    }

    private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout) throws Exception {
        Semaphore semaphore = new Semaphore(0);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> {
            Object object = checkpointLock;
            synchronized (object) {
                semaphore.release();
            }
        });
        boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS);
        executor.shutdownNow();
        return result;
    }

    private boolean waitForAvailableRecords(Duration timeout, TestSourceContext<?> sourceContext) throws InterruptedException {
        long now = System.currentTimeMillis();
        long stop = now + timeout.toMillis();
        while (System.currentTimeMillis() < stop && sourceContext.getCollectedOutputs().isEmpty()) {
            Thread.sleep(10L);
        }
        return !sourceContext.getCollectedOutputs().isEmpty();
    }

    private static <T> void setupSource(DebeziumSourceFunction<T> source) throws Exception {
        Db2SourceTest.setupSource(source, null, null, null, true, 0, 1);
    }

    private static <T, S1, S2> void setupSource(DebeziumSourceFunction<T> source, @Nullable Long restoredCheckpointId, ListState<S1> restoredOffsetState, ListState<S2> restoredHistoryState, boolean isCheckpointingEnabled, int subtaskIndex, int totalNumSubtasks) throws Exception {
        source.setRuntimeContext((RuntimeContext)new MockStreamingRuntimeContext(isCheckpointingEnabled, totalNumSubtasks, subtaskIndex));
        source.initializeState((FunctionInitializationContext)new MockFunctionInitializationContext(restoredCheckpointId, new MockOperatorStateStore(restoredOffsetState, restoredHistoryState)));
        source.open(new Configuration());
    }

    private static final class TestingListState<T>
    implements ListState<T> {
        private final List<T> list = new ArrayList<T>();
        private boolean clearCalled = false;

        private TestingListState() {
        }

        public void clear() {
            this.list.clear();
            this.clearCalled = true;
        }

        public Iterable<T> get() throws Exception {
            return this.list;
        }

        public void add(T value) throws Exception {
            Preconditions.checkNotNull(value, (String)"You cannot add null to a ListState.");
            this.list.add(value);
        }

        public List<T> getList() {
            return this.list;
        }

        boolean isClearCalled() {
            return this.clearCalled;
        }

        public void update(List<T> values) throws Exception {
            this.clear();
            this.addAll(values);
        }

        public void addAll(List<T> values) throws Exception {
            if (values != null) {
                values.forEach(v -> Preconditions.checkNotNull((Object)v, (String)"You cannot add null to a ListState."));
                this.list.addAll(values);
            }
        }
    }

    private static class BlockingSourceContext<T>
    extends TestSourceContext<T> {
        private final Semaphore blocker = new Semaphore(0);
        private final int expectedCount;
        private int currentCount = 0;

        private BlockingSourceContext(int expectedCount) {
            this.expectedCount = expectedCount;
        }

        public void collect(T t) {
            super.collect(t);
            ++this.currentCount;
            if (this.currentCount == this.expectedCount) {
                try {
                    this.blocker.acquire();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
    }

    private static class MockFunctionInitializationContext
    implements FunctionInitializationContext {
        private final Long restoredCheckpointId;
        private final OperatorStateStore operatorStateStore;

        private MockFunctionInitializationContext(Long restoredCheckpointId, OperatorStateStore operatorStateStore) {
            this.restoredCheckpointId = restoredCheckpointId;
            this.operatorStateStore = operatorStateStore;
        }

        public boolean isRestored() {
            return this.restoredCheckpointId != null;
        }

        public OptionalLong getRestoredCheckpointId() {
            if (this.restoredCheckpointId == null) {
                return OptionalLong.empty();
            }
            return OptionalLong.of(this.restoredCheckpointId);
        }

        public OperatorStateStore getOperatorStateStore() {
            return this.operatorStateStore;
        }

        public KeyedStateStore getKeyedStateStore() {
            throw new UnsupportedOperationException();
        }
    }

    private static class MockOperatorStateStore
    implements OperatorStateStore {
        private final ListState<?> restoredOffsetListState;
        private final ListState<?> restoredHistoryListState;

        private MockOperatorStateStore(ListState<?> restoredOffsetListState, ListState<?> restoredHistoryListState) {
            this.restoredOffsetListState = restoredOffsetListState;
            this.restoredHistoryListState = restoredHistoryListState;
        }

        public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
            if (stateDescriptor.getName().equals("offset-states")) {
                return this.restoredOffsetListState;
            }
            if (stateDescriptor.getName().equals("history-records-states")) {
                return this.restoredHistoryListState;
            }
            throw new IllegalStateException("Unknown state.");
        }

        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredStateNames() {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredBroadcastStateNames() {
            throw new UnsupportedOperationException();
        }
    }

    private static class ForwardDeserializeSchema
    implements DebeziumDeserializationSchema<SourceRecord> {
        private static final long serialVersionUID = 1L;

        private ForwardDeserializeSchema() {
        }

        public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
            out.collect((Object)record);
        }

        public TypeInformation<SourceRecord> getProducedType() {
            return TypeInformation.of(SourceRecord.class);
        }
    }
}

