/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle.graph;

import java.lang.invoke.LambdaMetafactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.InputEndedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorFinishedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.runtime.operators.lifecycle.event.WatermarkReceivedEvent;
import org.apache.flink.runtime.operators.lifecycle.graph.TestDataElement;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.BoundedMultiInput;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

class MultiInputTestOperator
extends AbstractStreamOperatorV2<TestDataElement>
implements MultipleInputStreamOperator<TestDataElement>,
ProcessingTimeService.ProcessingTimeCallback,
BoundedMultiInput {
    private final String operatorId;
    private final List<Input> inputs;
    private final TestEventQueue eventQueue;
    private final AtomicLong lastDataSent;
    private final Map<String, OperatorFinishedEvent.LastVertexDataInfo> lastDataReceived = new HashMap<String, OperatorFinishedEvent.LastVertexDataInfo>();

    public MultiInputTestOperator(int numInputs, StreamOperatorParameters<TestDataElement> params, TestEventQueue eventQueue, String operatorId) {
        super(params, numInputs);
        this.lastDataSent = new AtomicLong(0L);
        this.inputs = IntStream.rangeClosed(1, numInputs).mapToObj(id -> new TestEventInput(id, eventQueue, (Output<StreamRecord<TestDataElement>>)this.output, operatorId, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber(), this.lastDataReceived, this.lastDataSent)).collect(Collectors.toList());
        this.eventQueue = eventQueue;
        this.operatorId = operatorId;
    }

    public void open() throws Exception {
        super.open();
        this.registerTimer();
        this.eventQueue.add(new OperatorStartedEvent(this.operatorId, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber()));
    }

    public List<Input> getInputs() {
        return this.inputs;
    }

    public void finish() throws Exception {
        this.eventQueue.add(new OperatorFinishedEvent(this.operatorId, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber(), this.lastDataSent.get(), new OperatorFinishedEvent.LastReceivedVertexDataInfo(this.lastDataReceived)));
        super.finish();
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        this.eventQueue.add(new CheckpointStartedEvent(this.operatorId, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber(), context.getCheckpointId()));
        super.snapshotState(context);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.eventQueue.add(new CheckpointCompletedEvent(this.operatorId, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber(), checkpointId));
        super.notifyCheckpointComplete(checkpointId);
    }

    public void onProcessingTime(long timestamp) {
        this.registerTimer();
    }

    private void registerTimer() {
        this.getProcessingTimeService().registerTimer(this.getProcessingTimeService().getCurrentProcessingTime() + 1L, (ProcessingTimeService.ProcessingTimeCallback)this);
    }

    public void endInput(int inputId) throws Exception {
        this.eventQueue.add(new InputEndedEvent(this.operatorId, this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.getRuntimeContext().getTaskInfo().getAttemptNumber(), inputId));
    }

    private static class TestEventInput
    implements Input<TestDataElement> {
        private final int id;
        private final TestEventQueue eventQueue;
        private final Output<StreamRecord<TestDataElement>> output;
        private final String operatorId;
        private final int subtaskIndex;
        private final Map<String, OperatorFinishedEvent.LastVertexDataInfo> lastDataReceived;
        private final AtomicLong lastDataSent;
        private final int attemptNumber;

        public TestEventInput(int id, TestEventQueue eventQueue, Output<StreamRecord<TestDataElement>> output, String operatorId, int subtaskIndex, int attemptNumber, Map<String, OperatorFinishedEvent.LastVertexDataInfo> lastDataReceived, AtomicLong lastDataSent) {
            this.id = id;
            this.eventQueue = eventQueue;
            this.output = output;
            this.operatorId = operatorId;
            this.subtaskIndex = subtaskIndex;
            this.lastDataReceived = lastDataReceived;
            this.lastDataSent = lastDataSent;
            this.attemptNumber = attemptNumber;
        }

        public void processElement(StreamRecord<TestDataElement> element) throws Exception {
            TestDataElement e = (TestDataElement)element.getValue();
            this.lastDataReceived.computeIfAbsent((String)e.operatorId, (Function<String, OperatorFinishedEvent.LastVertexDataInfo>)LambdaMetafactory.metafactory(null, null, null, (Ljava/lang/Object;)Ljava/lang/Object;, lambda$processElement$0(java.lang.String ), (Ljava/lang/String;)Lorg/apache/flink/runtime/operators/lifecycle/event/OperatorFinishedEvent$LastVertexDataInfo;)()).bySubtask.put(e.subtaskIndex, e.seq);
            this.output.collect((Object)new StreamRecord((Object)new TestDataElement(this.operatorId, this.subtaskIndex, this.lastDataSent.incrementAndGet())));
        }

        public void processWatermark(Watermark mark) throws Exception {
            this.eventQueue.add(new WatermarkReceivedEvent(this.operatorId, this.subtaskIndex, this.attemptNumber, mark.getTimestamp(), this.id));
            this.output.emitWatermark(mark);
        }

        public void processWatermarkStatus(WatermarkStatus watermarkStatus) {
        }

        public void processLatencyMarker(LatencyMarker latencyMarker) {
        }

        public void setKeyContextElement(StreamRecord<TestDataElement> record) {
        }

        private static /* synthetic */ OperatorFinishedEvent.LastVertexDataInfo lambda$processElement$0(String ign) {
            return new OperatorFinishedEvent.LastVertexDataInfo();
        }
    }
}

