/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.values.source;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.source.DataSource;
import org.apache.flink.cdc.common.source.EventSourceProvider;
import org.apache.flink.cdc.common.source.FlinkSourceProvider;
import org.apache.flink.cdc.common.source.MetadataAccessor;
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
import org.apache.flink.cdc.connectors.values.ValuesDatabase;
import org.apache.flink.cdc.connectors.values.source.OpTsMetadataColumn;
import org.apache.flink.cdc.connectors.values.source.ValuesDataSourceHelper;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.table.api.TableException;

@Internal
public class ValuesDataSource
implements DataSource {
    private final ValuesDataSourceHelper.EventSetId eventSetId;
    private final int failAtPos;

    public ValuesDataSource(ValuesDataSourceHelper.EventSetId eventSetId) {
        this.eventSetId = eventSetId;
        this.failAtPos = Integer.MAX_VALUE;
    }

    public ValuesDataSource(ValuesDataSourceHelper.EventSetId eventSetId, int failAtPos) {
        this.eventSetId = eventSetId;
        this.failAtPos = failAtPos;
    }

    public EventSourceProvider getEventSourceProvider() {
        ValuesDataSourceHelper.setSourceEvents(this.eventSetId);
        return FlinkSourceProvider.of((Source)new ValuesSource(this.failAtPos, this.eventSetId, false));
    }

    public MetadataAccessor getMetadataAccessor() {
        return new ValuesDatabase.ValuesMetadataAccessor();
    }

    public SupportedMetadataColumn[] supportedMetadataColumns() {
        return new SupportedMetadataColumn[]{new OpTsMetadataColumn()};
    }

    private static class EventIteratorSplit
    implements IteratorSourceSplit<Event, Iterator<Event>> {
        private final List<Event> events;
        private final int splitId;
        private int pos;

        public EventIteratorSplit(int splitId, int pos) {
            this.splitId = splitId;
            this.pos = pos;
            List<Event> eventOfSplit = ValuesDataSourceHelper.getSourceEvents().get(splitId);
            this.events = eventOfSplit.subList(pos, eventOfSplit.size());
        }

        public Iterator<Event> getIterator() {
            final Iterator<Event> inner = this.events.iterator();
            return new Iterator<Event>(){

                @Override
                public boolean hasNext() {
                    return inner.hasNext();
                }

                @Override
                public Event next() {
                    pos++;
                    return (Event)inner.next();
                }
            };
        }

        public IteratorSourceSplit<Event, Iterator<Event>> getUpdatedSplitForIterator(Iterator<Event> iterator) {
            return new EventIteratorSplit(this.splitId, this.pos);
        }

        public String splitId() {
            return "split_" + this.splitId;
        }
    }

    private static class EventIteratorReader
    extends IteratorSourceReader<Event, Iterator<Event>, EventIteratorSplit> {
        private static volatile boolean checkpointed = false;
        private static volatile boolean failedBefore = false;
        private final int failAtPos;
        private final ValuesDataSourceHelper.EventSetId eventSetId;
        private int numberOfEventsEmit = 0;

        public EventIteratorReader(SourceReaderContext context, int failAtPos, ValuesDataSourceHelper.EventSetId eventSetId) {
            super(context);
            this.failAtPos = failAtPos;
            this.eventSetId = eventSetId;
        }

        public void start() {
            ValuesDataSourceHelper.setSourceEvents(this.eventSetId);
            super.start();
        }

        public InputStatus pollNext(ReaderOutput<Event> output) {
            if (!failedBefore && this.numberOfEventsEmit >= this.failAtPos) {
                if (!checkpointed) {
                    try {
                        Thread.sleep(100L);
                    }
                    catch (Exception e) {
                        throw new TableException("Failed to wait checkpoint finishes.", (Throwable)e);
                    }
                    return InputStatus.MORE_AVAILABLE;
                }
                failedBefore = true;
                throw new RuntimeException("Artificial Exception.");
            }
            ++this.numberOfEventsEmit;
            return super.pollNext(output);
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            super.notifyCheckpointComplete(checkpointId);
            checkpointed = true;
        }
    }

    private static class ValuesSource
    implements Source<Event, EventIteratorSplit, Collection<EventIteratorSplit>> {
        private static final long serialVersionUID = 1L;
        private final int failAtPos;
        private final ValuesDataSourceHelper.EventSetId eventSetId;
        private final boolean isInSnapshotPhase;

        public ValuesSource(int failAtPos, ValuesDataSourceHelper.EventSetId eventSetId, boolean isInSnapshotPhase) {
            this.failAtPos = failAtPos;
            this.eventSetId = eventSetId;
            this.isInSnapshotPhase = isInSnapshotPhase;
        }

        public Boundedness getBoundedness() {
            return Boundedness.BOUNDED;
        }

        public SplitEnumerator<EventIteratorSplit, Collection<EventIteratorSplit>> createEnumerator(SplitEnumeratorContext<EventIteratorSplit> enumContext) {
            ValuesDataSourceHelper.setSourceEvents(this.eventSetId);
            ArrayList<EventIteratorSplit> eventIteratorSplits = new ArrayList<EventIteratorSplit>();
            List<List<Event>> eventWithSplits = ValuesDataSourceHelper.getSourceEvents();
            if (this.isInSnapshotPhase) {
                for (int i = 0; i < eventWithSplits.size() - 1; ++i) {
                    eventIteratorSplits.add(new EventIteratorSplit(i, 0));
                }
            } else {
                eventIteratorSplits.add(new EventIteratorSplit(eventWithSplits.size() - 1, 0));
            }
            return new IteratorSourceEnumerator(enumContext, eventIteratorSplits);
        }

        public SplitEnumerator<EventIteratorSplit, Collection<EventIteratorSplit>> restoreEnumerator(SplitEnumeratorContext<EventIteratorSplit> enumContext, Collection<EventIteratorSplit> checkpoint) {
            return new IteratorSourceEnumerator(enumContext, checkpoint);
        }

        public SimpleVersionedSerializer<EventIteratorSplit> getSplitSerializer() {
            return new EventIteratorSplitSerializer();
        }

        public SimpleVersionedSerializer<Collection<EventIteratorSplit>> getEnumeratorCheckpointSerializer() {
            return new EventIteratorEnumeratorSerializer();
        }

        public SourceReader<Event, EventIteratorSplit> createReader(SourceReaderContext readerContext) {
            return new EventIteratorReader(readerContext, this.failAtPos, this.eventSetId);
        }

        private static void serializeEventIteratorSplit(DataOutputViewStreamWrapper view, EventIteratorSplit split) throws IOException {
            view.writeInt(split.splitId);
            view.writeInt(split.pos);
        }

        private static EventIteratorSplit deserializeEventIteratorSplit(DataInputViewStreamWrapper view) throws IOException {
            int splitId = view.readInt();
            int pos = view.readInt();
            return new EventIteratorSplit(splitId, pos);
        }

        private static class EventIteratorEnumeratorSerializer
        implements SimpleVersionedSerializer<Collection<EventIteratorSplit>> {
            private static final int ENUMERATOR_VERSION = 1;

            private EventIteratorEnumeratorSerializer() {
            }

            public int getVersion() {
                return 1;
            }

            /*
             * Exception decompiling
             */
            public byte[] serialize(Collection<EventIteratorSplit> splits) throws IOException {
                /*
                 * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
                 * 
                 * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
                 *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
                 *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
                 *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
                 *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
                 *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
                 *     at org.benf.cfr.reader.Main.main(Main.java:54)
                 */
                throw new IllegalStateException("Decompilation failed");
            }

            /*
             * Exception decompiling
             */
            public Collection<EventIteratorSplit> deserialize(int version, byte[] serialized) throws IOException {
                /*
                 * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
                 * 
                 * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
                 *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
                 *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
                 *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
                 *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
                 *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
                 *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
                 *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
                 *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
                 *     at org.benf.cfr.reader.Main.main(Main.java:54)
                 */
                throw new IllegalStateException("Decompilation failed");
            }
        }

        private static class EventIteratorSplitSerializer
        implements SimpleVersionedSerializer<EventIteratorSplit> {
            private static final int SPLIT_VERSION = 1;

            private EventIteratorSplitSerializer() {
            }

            public int getVersion() {
                return 1;
            }

            public byte[] serialize(EventIteratorSplit split) throws IOException {
                try (ByteArrayOutputStream bao = new ByteArrayOutputStream(256);){
                    DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper((OutputStream)bao);
                    ValuesSource.serializeEventIteratorSplit(view, split);
                    byte[] byArray = bao.toByteArray();
                    return byArray;
                }
            }

            public EventIteratorSplit deserialize(int version, byte[] serialized) throws IOException {
                if (version != 1) {
                    throw new TableException(String.format("Can't serialized data with version %d because the serializer version is %d.", version, 1));
                }
                try (ByteArrayInputStream bis = new ByteArrayInputStream(serialized);){
                    DataInputViewStreamWrapper view = new DataInputViewStreamWrapper((InputStream)bis);
                    EventIteratorSplit eventIteratorSplit = ValuesSource.deserializeEventIteratorSplit(view);
                    return eventIteratorSplit;
                }
            }
        }
    }
}

