/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink.Committer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializerAdapter;
import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;

@Deprecated
public class TestSink<T>
implements Sink<T, String, Integer, String> {
    public static final SimpleVersionedSerializerAdapter<String> COMMITTABLE_SERIALIZER = TestSinkV2.COMMITTABLE_SERIALIZER;
    public static final SimpleVersionedSerializerAdapter<Integer> WRITER_SERIALIZER = TestSinkV2.WRITER_SERIALIZER;
    public static final String END_OF_INPUT_STR = "end of input";
    private final DefaultSinkWriter<T> writer;
    @Nullable
    private final SimpleVersionedSerializer<Integer> writerStateSerializer;
    @Nullable
    private final Committer<String> committer;
    @Nullable
    private final SimpleVersionedSerializer<String> committableSerializer;
    @Nullable
    private final GlobalCommitter<String, String> globalCommitter;
    @Nullable
    private final SimpleVersionedSerializer<String> globalCommittableSerializer;
    private final Collection<String> compatibleStateNames;

    private TestSink(DefaultSinkWriter<T> writer, @Nullable SimpleVersionedSerializer<Integer> writerStateSerializer, @Nullable Committer<String> committer, @Nullable SimpleVersionedSerializer<String> committableSerializer, @Nullable GlobalCommitter<String, String> globalCommitter, @Nullable SimpleVersionedSerializer<String> globalCommittableSerializer, Collection<String> compatibleStateNames) {
        this.writer = writer;
        this.writerStateSerializer = writerStateSerializer;
        this.committer = committer;
        this.committableSerializer = committableSerializer;
        this.globalCommitter = globalCommitter;
        this.globalCommittableSerializer = globalCommittableSerializer;
        this.compatibleStateNames = compatibleStateNames;
    }

    public SinkWriter<T, String, Integer> createWriter(Sink.InitContext context, List<Integer> states) {
        this.writer.init(context);
        this.writer.restoredFrom(states);
        this.writer.setProcessingTimerService(context.getProcessingTimeService());
        return this.writer;
    }

    public Optional<Committer<String>> createCommitter() {
        return Optional.ofNullable(this.committer);
    }

    public Optional<GlobalCommitter<String, String>> createGlobalCommitter() {
        return Optional.ofNullable(this.globalCommitter);
    }

    public Optional<SimpleVersionedSerializer<String>> getCommittableSerializer() {
        return Optional.ofNullable(this.committableSerializer);
    }

    public Optional<SimpleVersionedSerializer<String>> getGlobalCommittableSerializer() {
        return Optional.ofNullable(this.globalCommittableSerializer);
    }

    public Optional<SimpleVersionedSerializer<Integer>> getWriterStateSerializer() {
        return Optional.ofNullable(this.writerStateSerializer);
    }

    public Collection<String> getCompatibleStateNames() {
        return this.compatibleStateNames;
    }

    public static Builder<Integer> newBuilder() {
        return new Builder<Integer>();
    }

    public org.apache.flink.api.connector.sink2.Sink<T> asV2() {
        return SinkV1Adapter.wrap((Sink)this);
    }

    public DefaultSinkWriter<T> getWriter() {
        return this.writer;
    }

    static class DefaultGlobalCommitter
    extends DefaultCommitter
    implements GlobalCommitter<String, String> {
        static final Function<List<String>, String> COMBINER = strings -> {
            Collections.sort(strings);
            return String.join((CharSequence)"+", strings);
        };
        private final String committedSuccessData;

        DefaultGlobalCommitter(String committedSuccessData) {
            this.committedSuccessData = committedSuccessData;
        }

        DefaultGlobalCommitter(Supplier<Queue<String>> queueSupplier) {
            super(queueSupplier);
            this.committedSuccessData = "";
        }

        public List<String> filterRecoveredCommittables(List<String> globalCommittables) {
            if (this.committedSuccessData == null) {
                return globalCommittables;
            }
            return globalCommittables.stream().filter(s -> !s.equals(this.committedSuccessData)).collect(Collectors.toList());
        }

        public String combine(List<String> committables) {
            return COMBINER.apply(committables);
        }

        public void endOfInput() {
            this.commit(Collections.singletonList(TestSink.END_OF_INPUT_STR));
        }
    }

    static class RetryOnceCommitter
    extends DefaultCommitter
    implements Committer<String> {
        private final Set<String> seen = new LinkedHashSet<String>();

        RetryOnceCommitter() {
        }

        @Override
        public List<String> commit(List<String> committables) {
            committables.forEach(c -> {
                if (this.seen.remove(c)) {
                    Preconditions.checkNotNull((Object)this.committedData);
                    this.committedData.add(c);
                } else {
                    this.seen.add((String)c);
                }
            });
            return new ArrayList<String>(this.seen);
        }
    }

    static class DefaultCommitter
    implements Committer<String>,
    Serializable {
        @Nullable
        protected Queue<String> committedData;
        private boolean isClosed;
        @Nullable
        private final Supplier<Queue<String>> queueSupplier;

        public DefaultCommitter() {
            this.committedData = new ConcurrentLinkedQueue<String>();
            this.isClosed = false;
            this.queueSupplier = null;
        }

        public DefaultCommitter(@Nullable Supplier<Queue<String>> queueSupplier) {
            this.queueSupplier = queueSupplier;
            this.isClosed = false;
            this.committedData = null;
        }

        public List<String> getCommittedData() {
            if (this.committedData != null) {
                return new ArrayList<String>(this.committedData);
            }
            return Collections.emptyList();
        }

        public List<String> commit(List<String> committables) {
            if (this.committedData == null) {
                Assertions.assertThat(this.queueSupplier).isNotNull();
                this.committedData = this.queueSupplier.get();
            }
            this.committedData.addAll(committables);
            return Collections.emptyList();
        }

        public void close() throws Exception {
            this.isClosed = true;
        }

        public boolean isClosed() {
            return this.isClosed;
        }
    }

    public static class DefaultSinkWriter<T>
    implements SinkWriter<T, String, Integer>,
    Serializable {
        protected List<String> elements = new ArrayList<String>();
        protected List<Watermark> watermarks = new ArrayList<Watermark>();
        protected Sink.ProcessingTimeService processingTimerService;
        private int recordCount;
        protected long lastCheckpointId = -1L;

        protected DefaultSinkWriter() {
        }

        public void write(T element, SinkWriter.Context context) {
            this.elements.add(Tuple3.of(element, (Object)context.timestamp(), (Object)context.currentWatermark()).toString());
            ++this.recordCount;
        }

        public void writeWatermark(Watermark watermark) throws IOException {
            this.watermarks.add(watermark);
        }

        public List<String> prepareCommit(boolean flush) {
            List<String> result = this.elements;
            this.elements = new ArrayList<String>();
            return result;
        }

        public List<Integer> snapshotState(long checkpointId) throws IOException {
            this.lastCheckpointId = checkpointId;
            return Collections.singletonList(this.recordCount);
        }

        public void close() throws Exception {
        }

        void restoredFrom(List<Integer> states) {
            this.recordCount = states.isEmpty() ? 0 : states.get(0);
        }

        void setProcessingTimerService(Sink.ProcessingTimeService processingTimerService) {
            this.processingTimerService = processingTimerService;
        }

        public void init(Sink.InitContext context) {
        }

        public int getRecordCount() {
            return this.recordCount;
        }
    }

    public static class Builder<T> {
        private DefaultSinkWriter<T> writer = new DefaultSinkWriter();
        private SimpleVersionedSerializer<Integer> writerStateSerializer;
        private Committer<String> committer;
        private GlobalCommitter<String, String> globalCommitter;
        private Collection<String> compatibleStateNames = Collections.emptyList();

        public <W> Builder<W> setWriter(DefaultSinkWriter<W> writer) {
            this.writer = (DefaultSinkWriter)Preconditions.checkNotNull(writer);
            return this;
        }

        public Builder<T> withWriterState() {
            this.writerStateSerializer = WRITER_SERIALIZER;
            return this;
        }

        public Builder<T> setCommitter(Committer<String> committer) {
            this.committer = committer;
            return this;
        }

        public Builder<T> setDefaultCommitter() {
            this.committer = new DefaultCommitter();
            return this;
        }

        public Builder<T> setDefaultCommitter(Supplier<Queue<String>> queueSupplier) {
            this.committer = new DefaultCommitter(queueSupplier);
            return this;
        }

        public Builder<T> setDefaultGlobalCommitter() {
            this.globalCommitter = new DefaultGlobalCommitter("");
            return this;
        }

        public Builder<T> setGlobalCommitter(Supplier<Queue<String>> queueSupplier) {
            this.globalCommitter = new DefaultGlobalCommitter(queueSupplier);
            return this;
        }

        public Builder<T> setCompatibleStateNames(Collection<String> compatibleStateNames) {
            this.compatibleStateNames = compatibleStateNames;
            return this;
        }

        public Builder<T> setCompatibleStateNames(String ... compatibleStateNames) {
            return this.setCompatibleStateNames(Arrays.asList(compatibleStateNames));
        }

        public TestSink<T> build() {
            return new TestSink(this.writer, (SimpleVersionedSerializer)this.writerStateSerializer, (Committer)this.committer, (SimpleVersionedSerializer)(this.committer == null && this.globalCommitter == null ? null : COMMITTABLE_SERIALIZER), (GlobalCommitter)this.globalCommitter, (SimpleVersionedSerializer)(this.globalCommitter == null ? null : COMMITTABLE_SERIALIZER), this.compatibleStateNames);
        }
    }
}

