/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.shaded.com.google.common.collect.Lists;
import org.apache.flink.shaded.com.google.common.collect.Maps;
import org.apache.flink.shaded.com.google.common.collect.Sets;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.JSONGenerator;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.sling.commons.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamGraph
extends StreamingPlan {
    public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
    private String jobName = "Flink Streaming Job";
    private final StreamExecutionEnvironment environemnt;
    private final ExecutionConfig executionConfig;
    private CheckpointingMode checkpointingMode;
    private boolean checkpointingEnabled = false;
    private long checkpointingInterval = 5000L;
    private boolean chaining = true;
    private Map<Integer, StreamNode> streamNodes;
    private Set<Integer> sources;
    private Set<Integer> sinks;
    private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
    private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtuaPartitionNodes;
    protected Map<Integer, String> vertexIDtoBrokerID;
    protected Map<Integer, Long> vertexIDtoLoopTimeout;
    private StateBackend<?> stateBackend;
    private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
    private boolean forceCheckpoint = false;

    public StreamGraph(StreamExecutionEnvironment environment) {
        this.environemnt = environment;
        this.executionConfig = environment.getConfig();
        this.clear();
    }

    public void clear() {
        this.streamNodes = Maps.newHashMap();
        this.virtualSelectNodes = Maps.newHashMap();
        this.virtuaPartitionNodes = Maps.newHashMap();
        this.vertexIDtoBrokerID = Maps.newHashMap();
        this.vertexIDtoLoopTimeout = Maps.newHashMap();
        this.iterationSourceSinkPairs = Sets.newHashSet();
        this.sources = Sets.newHashSet();
        this.sinks = Sets.newHashSet();
    }

    protected ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public void setJobName(String jobName) {
        this.jobName = jobName;
    }

    public void setChaining(boolean chaining) {
        this.chaining = chaining;
    }

    public void setCheckpointingEnabled(boolean checkpointingEnabled) {
        this.checkpointingEnabled = checkpointingEnabled;
    }

    public void setCheckpointingInterval(long checkpointingInterval) {
        this.checkpointingInterval = checkpointingInterval;
    }

    public void forceCheckpoint() {
        this.forceCheckpoint = true;
    }

    public void setStateBackend(StateBackend<?> backend) {
        this.stateBackend = backend;
    }

    public StateBackend<?> getStateBackend() {
        return this.stateBackend;
    }

    public long getCheckpointingInterval() {
        return this.checkpointingInterval;
    }

    public boolean isChainingEnabled() {
        return this.chaining;
    }

    public boolean isCheckpointingEnabled() {
        return this.checkpointingEnabled;
    }

    public CheckpointingMode getCheckpointingMode() {
        return this.checkpointingMode;
    }

    public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
        this.checkpointingMode = checkpointingMode;
    }

    public boolean isIterative() {
        return !this.vertexIDtoLoopTimeout.isEmpty();
    }

    public <IN, OUT> void addSource(Integer vertexID, StreamOperator<OUT> operatorObject, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
        this.addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
        this.sources.add(vertexID);
    }

    public <IN, OUT> void addSink(Integer vertexID, StreamOperator<OUT> operatorObject, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
        this.addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
        this.sinks.add(vertexID);
    }

    public <IN, OUT> void addOperator(Integer vertexID, StreamOperator<OUT> operatorObject, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
        if (operatorObject instanceof StreamSource) {
            this.addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName);
        } else {
            this.addNode(vertexID, OneInputStreamTask.class, operatorObject, operatorName);
        }
        TypeSerializer inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(this.executionConfig) : null;
        TypeSerializer outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(this.executionConfig) : null;
        this.setSerializers(vertexID, inSerializer, null, outSerializer);
        if (operatorObject instanceof OutputTypeConfigurable) {
            OutputTypeConfigurable outputTypeConfigurable = (OutputTypeConfigurable)((Object)operatorObject);
            outputTypeConfigurable.setOutputType(outTypeInfo, this.executionConfig);
        }
        if (operatorObject instanceof InputTypeConfigurable) {
            InputTypeConfigurable inputTypeConfigurable = (InputTypeConfigurable)operatorObject;
            inputTypeConfigurable.setInputType(inTypeInfo, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Vertex: {}", (Object)vertexID);
        }
    }

    public <IN1, IN2, OUT> void addCoOperator(Integer vertexID, TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject, TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
        this.addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, operatorName);
        TypeSerializer outSerializer = outTypeInfo != null && !(outTypeInfo instanceof MissingTypeInfo) ? outTypeInfo.createSerializer(this.executionConfig) : null;
        this.setSerializers(vertexID, in1TypeInfo.createSerializer(this.executionConfig), in2TypeInfo.createSerializer(this.executionConfig), outSerializer);
        if (taskOperatorObject instanceof OutputTypeConfigurable) {
            OutputTypeConfigurable outputTypeConfigurable = (OutputTypeConfigurable)((Object)taskOperatorObject);
            outputTypeConfigurable.setOutputType(outTypeInfo, this.executionConfig);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CO-TASK: {}", (Object)vertexID);
        }
    }

    protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass, StreamOperator<?> operatorObject, String operatorName) {
        if (this.streamNodes.containsKey(vertexID)) {
            throw new RuntimeException("Duplicate vertexID " + vertexID);
        }
        StreamNode vertex = new StreamNode(this.environemnt, vertexID, operatorObject, operatorName, new ArrayList(), vertexClass);
        this.streamNodes.put(vertexID, vertex);
        return vertex;
    }

    public void addVirtualSelectNode(Integer originalId, Integer virtualId, List<String> selectedNames) {
        if (this.virtualSelectNodes.containsKey(virtualId)) {
            throw new IllegalStateException("Already has virtual select node with id " + virtualId);
        }
        this.virtualSelectNodes.put(virtualId, (Tuple2<Integer, List<String>>)new Tuple2((Object)originalId, selectedNames));
    }

    public void addVirtualPartitionNode(Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner) {
        if (this.virtuaPartitionNodes.containsKey(virtualId)) {
            throw new IllegalStateException("Already has virtual partition node with id " + virtualId);
        }
        this.virtuaPartitionNodes.put(virtualId, new Tuple2((Object)originalId, partitioner));
    }

    public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
        this.addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, Lists.newArrayList());
    }

    private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?> partitioner, List<String> outputNames) {
        if (this.virtualSelectNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = (Integer)this.virtualSelectNodes.get((Object)Integer.valueOf((int)virtualId)).f0;
            if (outputNames.isEmpty()) {
                outputNames = (List)this.virtualSelectNodes.get((Object)Integer.valueOf((int)virtualId)).f1;
            }
            this.addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
        } else if (this.virtuaPartitionNodes.containsKey(upStreamVertexID)) {
            int virtualId = upStreamVertexID;
            upStreamVertexID = (Integer)this.virtuaPartitionNodes.get((Object)Integer.valueOf((int)virtualId)).f0;
            if (partitioner == null) {
                partitioner = (StreamPartitioner)this.virtuaPartitionNodes.get((Object)Integer.valueOf((int)virtualId)).f1;
            }
            this.addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, partitioner, outputNames);
        } else {
            StreamNode upstreamNode = this.getStreamNode(upStreamVertexID);
            StreamNode downstreamNode = this.getStreamNode(downStreamVertexID);
            if (partitioner == null && upstreamNode.getParallelism() == downstreamNode.getParallelism()) {
                partitioner = new ForwardPartitioner();
            } else if (partitioner == null) {
                partitioner = new RebalancePartitioner();
            }
            if (partitioner instanceof ForwardPartitioner && upstreamNode.getParallelism() != downstreamNode.getParallelism()) {
                throw new UnsupportedOperationException("Forward partitioning does not allow change of parallelism. Upstream operation: " + upstreamNode + " parallelism: " + upstreamNode.getParallelism() + ", downstream operation: " + downstreamNode + " parallelism: " + downstreamNode.getParallelism() + " You must use another partitioning strategy, such as broadcast, rebalance, shuffle or global.");
            }
            StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner);
            this.getStreamNode(edge.getSourceId()).addOutEdge(edge);
            this.getStreamNode(edge.getTargetId()).addInEdge(edge);
        }
    }

    public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
        if (this.virtuaPartitionNodes.containsKey(vertexID)) {
            this.addOutputSelector((Integer)this.virtuaPartitionNodes.get((Object)vertexID).f0, outputSelector);
        } else if (this.virtualSelectNodes.containsKey(vertexID)) {
            this.addOutputSelector((Integer)this.virtualSelectNodes.get((Object)vertexID).f0, outputSelector);
        } else {
            this.getStreamNode(vertexID).addOutputSelector(outputSelector);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Outputselector set for {}", (Object)vertexID);
            }
        }
    }

    public void setParallelism(Integer vertexID, int parallelism) {
        if (this.getStreamNode(vertexID) != null) {
            this.getStreamNode(vertexID).setParallelism(parallelism);
        }
    }

    public void setKey(Integer vertexID, KeySelector<?, ?> keySelector, TypeSerializer<?> keySerializer) {
        StreamNode node = this.getStreamNode(vertexID);
        node.setStatePartitioner(keySelector);
        node.setStateKeySerializer(keySerializer);
    }

    public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
        if (this.getStreamNode(vertexID) != null) {
            this.getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
        }
    }

    public void setSerializers(Integer vertexID, TypeSerializer<?> in1, TypeSerializer<?> in2, TypeSerializer<?> out) {
        StreamNode vertex = this.getStreamNode(vertexID);
        vertex.setSerializerIn1(in1);
        vertex.setSerializerIn2(in2);
        vertex.setSerializerOut(out);
    }

    public void setSerializersFrom(Integer from, Integer to) {
        StreamNode fromVertex = this.getStreamNode(from);
        StreamNode toVertex = this.getStreamNode(to);
        toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
        toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
    }

    public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
        this.getStreamNode(vertexID).setSerializerOut(outType.createSerializer(this.executionConfig));
    }

    public <IN, OUT> void setOperator(Integer vertexID, StreamOperator<OUT> operatorObject) {
        this.getStreamNode(vertexID).setOperator(operatorObject);
    }

    public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
        this.getStreamNode(vertexID).setInputFormat(inputFormat);
    }

    public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
        StreamNode node = this.getStreamNode(vertexID);
        if (node == null) {
            return;
        }
        switch (strategy) {
            case ISOLATE: {
                node.isolateSlot();
                break;
            }
            case NEWGROUP: {
                node.startNewSlotSharingGroup();
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown resource strategy");
            }
        }
    }

    public StreamNode getStreamNode(Integer vertexID) {
        return this.streamNodes.get(vertexID);
    }

    protected Collection<? extends Integer> getVertexIDs() {
        return this.streamNodes.keySet();
    }

    public List<StreamEdge> getStreamEdges(int sourceId, int targetId) {
        ArrayList<StreamEdge> result = new ArrayList<StreamEdge>();
        for (StreamEdge edge : this.getStreamNode(sourceId).getOutEdges()) {
            if (edge.getTargetId() != targetId) continue;
            result.add(edge);
        }
        if (result.isEmpty()) {
            throw new RuntimeException("No such edge in stream graph: " + sourceId + " -> " + targetId);
        }
        return result;
    }

    public Collection<Integer> getSourceIDs() {
        return this.sources;
    }

    public Collection<Integer> getSinkIDs() {
        return this.sinks;
    }

    public Collection<StreamNode> getStreamNodes() {
        return this.streamNodes.values();
    }

    public Set<Tuple2<Integer, StreamOperator<?>>> getOperators() {
        HashSet operatorSet = new HashSet();
        for (StreamNode vertex : this.streamNodes.values()) {
            operatorSet.add(new Tuple2((Object)vertex.getId(), vertex.getOperator()));
        }
        return operatorSet;
    }

    public String getBrokerID(Integer vertexID) {
        return this.vertexIDtoBrokerID.get(vertexID);
    }

    public long getLoopTimeout(Integer vertexID) {
        return this.vertexIDtoLoopTimeout.get(vertexID);
    }

    public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
        StreamNode source = this.addNode(sourceId, StreamIterationHead.class, null, null);
        this.sources.add(source.getId());
        this.setParallelism(source.getId(), parallelism);
        StreamNode sink = this.addNode(sinkId, StreamIterationTail.class, null, null);
        this.sinks.add(sink.getId());
        this.setParallelism(sink.getId(), parallelism);
        this.iterationSourceSinkPairs.add((Tuple2<StreamNode, StreamNode>)new Tuple2((Object)source, (Object)sink));
        source.setOperatorName("IterationSource-" + loopId);
        sink.setOperatorName("IterationSink-" + loopId);
        this.vertexIDtoBrokerID.put(source.getId(), "broker-" + loopId);
        this.vertexIDtoBrokerID.put(sink.getId(), "broker-" + loopId);
        this.vertexIDtoLoopTimeout.put(source.getId(), timeout);
        this.vertexIDtoLoopTimeout.put(sink.getId(), timeout);
        return new Tuple2((Object)source, (Object)sink);
    }

    public Set<Tuple2<StreamNode, StreamNode>> getIterationSourceSinkPairs() {
        return this.iterationSourceSinkPairs;
    }

    protected void removeEdge(StreamEdge edge) {
        edge.getSourceVertex().getOutEdges().remove(edge);
        edge.getTargetVertex().getInEdges().remove(edge);
    }

    protected void removeVertex(StreamNode toRemove) {
        HashSet<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
        edgesToRemove.addAll(toRemove.getInEdges());
        edgesToRemove.addAll(toRemove.getOutEdges());
        for (StreamEdge edge : edgesToRemove) {
            this.removeEdge(edge);
        }
        this.streamNodes.remove(toRemove.getId());
    }

    public JobGraph getJobGraph() {
        return this.getJobGraph(this.jobName);
    }

    public JobGraph getJobGraph(String jobGraphName) {
        if (this.isIterative() && this.isCheckpointingEnabled() && !this.forceCheckpoint) {
            throw new UnsupportedOperationException("Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. \nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
        }
        this.setJobName(jobGraphName);
        StreamingJobGraphGenerator jobgraphGenerator = new StreamingJobGraphGenerator(this);
        return jobgraphGenerator.createJobGraph(jobGraphName);
    }

    public String getStreamingPlanAsJSON() {
        try {
            return new JSONGenerator(this).getJSON();
        }
        catch (JSONException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("JSON plan creation failed: {}", (Throwable)e);
            }
            return "";
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dumpStreamingPlanAsJSON(File file) throws IOException {
        try (PrintWriter pw = null;){
            pw = new PrintWriter(new FileOutputStream(file), false);
            pw.write(this.getStreamingPlanAsJSON());
            pw.flush();
        }
    }

    public static enum ResourceStrategy {
        DEFAULT,
        ISOLATE,
        NEWGROUP;

    }
}

