/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperFlushEvent;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class FlushEventAlignmentOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event> {
    private transient int totalTasksNumber;
    private transient Map<Integer, Set<Integer>> sourceTaskIdToAssignBucketSubTaskIds;
    private transient int currentSubTaskId;

    public FlushEventAlignmentOperator() {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    public void open() throws Exception {
        super.open();
        this.totalTasksNumber = this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
        this.currentSubTaskId = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        this.sourceTaskIdToAssignBucketSubTaskIds = new HashMap<Integer, Set<Integer>>();
    }

    public void processElement(StreamRecord<Event> streamRecord) {
        Event event = (Event)streamRecord.getValue();
        if (event instanceof BucketWrapperFlushEvent) {
            BucketWrapperFlushEvent bucketWrapperFlushEvent = (BucketWrapperFlushEvent)event;
            int sourceSubTaskId = bucketWrapperFlushEvent.getSourceSubTaskId();
            Set subTaskIds = this.sourceTaskIdToAssignBucketSubTaskIds.getOrDefault(sourceSubTaskId, new HashSet());
            int subtaskId = bucketWrapperFlushEvent.getBucketAssignTaskId();
            subTaskIds.add(subtaskId);
            if (subTaskIds.size() == this.totalTasksNumber) {
                LOG.info("{} send FlushEvent of {}", (Object)this.currentSubTaskId, (Object)sourceSubTaskId);
                this.output.collect((Object)new StreamRecord((Object)new FlushEvent(sourceSubTaskId, bucketWrapperFlushEvent.getTableIds(), bucketWrapperFlushEvent.getSchemaChangeEventType())));
                this.sourceTaskIdToAssignBucketSubTaskIds.remove(sourceSubTaskId);
            } else {
                LOG.info("{} collect FlushEvent of {} with subtask {}", new Object[]{this.currentSubTaskId, sourceSubTaskId, subtaskId});
                this.sourceTaskIdToAssignBucketSubTaskIds.put(sourceSubTaskId, subTaskIds);
            }
        } else {
            this.output.collect(streamRecord);
        }
    }
}

