/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.paimon.sink.v2.bucket;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.paimon.sink.v2.OperatorIDGenerator;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriterHelper;
import org.apache.flink.cdc.connectors.paimon.sink.v2.TableSchemaInfo;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperChangeEvent;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperFlushEvent;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.index.BucketAssigner;
import org.apache.paimon.index.HashBucketAssigner;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
import org.apache.paimon.utils.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BucketAssignOperator
extends AbstractStreamOperator<Event>
implements OneInputStreamOperator<Event, Event> {
    protected static final Logger LOGGER = LoggerFactory.getLogger(BucketAssignOperator.class);
    public final String commitUser;
    private final Options catalogOptions;
    private Catalog catalog;
    Map<TableId, Tuple4<BucketMode, RowKeyExtractor, BucketAssigner, RowPartitionKeyExtractor>> bucketAssignerMap;
    private Map<TableId, TableSchemaInfo> schemaMaps;
    private int totalTasksNumber;
    private int currentTaskNumber;
    public final String schemaOperatorUid;
    private transient SchemaEvolutionClient schemaEvolutionClient;
    private final ZoneId zoneId;

    public BucketAssignOperator(Options catalogOptions, String schemaOperatorUid, ZoneId zoneId, String commitUser) {
        this.catalogOptions = catalogOptions;
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.schemaOperatorUid = schemaOperatorUid;
        this.commitUser = commitUser;
        this.zoneId = zoneId;
    }

    public void open() throws Exception {
        super.open();
        this.catalog = FlinkCatalogFactory.createPaimonCatalog(this.catalogOptions);
        this.bucketAssignerMap = new HashMap<TableId, Tuple4<BucketMode, RowKeyExtractor, BucketAssigner, RowPartitionKeyExtractor>>();
        this.totalTasksNumber = this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
        this.currentTaskNumber = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        this.schemaMaps = new HashMap<TableId, TableSchemaInfo>();
    }

    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Event>> output) {
        super.setup(containingTask, config, output);
        TaskOperatorEventGateway toCoordinator = this.getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway();
        this.schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, new OperatorIDGenerator(this.schemaOperatorUid).generate());
    }

    public void processElement(StreamRecord<Event> streamRecord) throws Exception {
        Event event = (Event)streamRecord.getValue();
        if (event instanceof FlushEvent) {
            for (int i = 0; i < this.totalTasksNumber; ++i) {
                this.output.collect((Object)new StreamRecord((Object)new BucketWrapperFlushEvent(i, ((FlushEvent)event).getSourceSubTaskId(), this.currentTaskNumber, ((FlushEvent)event).getTableIds(), ((FlushEvent)event).getSchemaChangeEventType())));
            }
            return;
        }
        if (event instanceof DataChangeEvent) {
            int bucket;
            DataChangeEvent dataChangeEvent = (DataChangeEvent)event;
            if (!this.schemaMaps.containsKey(dataChangeEvent.tableId())) {
                Optional schema = this.schemaEvolutionClient.getLatestEvolvedSchema(dataChangeEvent.tableId());
                if (schema.isPresent()) {
                    this.schemaMaps.put(dataChangeEvent.tableId(), new TableSchemaInfo((Schema)schema.get(), this.zoneId));
                } else {
                    throw new RuntimeException("Could not find schema message from SchemaRegistry for " + dataChangeEvent.tableId());
                }
            }
            Tuple4 tuple4 = this.bucketAssignerMap.computeIfAbsent(dataChangeEvent.tableId(), this::getTableInfo);
            GenericRow genericRow = PaimonWriterHelper.convertEventToGenericRow(dataChangeEvent, this.schemaMaps.get(dataChangeEvent.tableId()).getFieldGetters());
            switch ((BucketMode)((Object)tuple4.f0)) {
                case HASH_DYNAMIC: {
                    bucket = ((BucketAssigner)tuple4.f2).assign(((RowPartitionKeyExtractor)tuple4.f3).partition(genericRow), ((RowPartitionKeyExtractor)tuple4.f3).trimmedPrimaryKey(genericRow).hashCode());
                    break;
                }
                case HASH_FIXED: {
                    ((RowKeyExtractor)tuple4.f1).setRecord(genericRow);
                    bucket = ((RowKeyExtractor)tuple4.f1).bucket();
                    break;
                }
                case BUCKET_UNAWARE: {
                    bucket = 0;
                    break;
                }
                default: {
                    throw new RuntimeException("Unsupported bucket mode: " + tuple4.f0);
                }
            }
            this.output.collect((Object)new StreamRecord((Object)new BucketWrapperChangeEvent(bucket, (ChangeEvent)event)));
        } else if (event instanceof SchemaChangeEvent) {
            SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent)event;
            Schema schema = SchemaUtils.applySchemaChangeEvent((Schema)Optional.ofNullable(this.schemaMaps.get(schemaChangeEvent.tableId())).map(TableSchemaInfo::getSchema).orElse(null), (SchemaChangeEvent)schemaChangeEvent);
            this.schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, this.zoneId));
            for (int index = 0; index < this.totalTasksNumber; ++index) {
                this.output.collect((Object)new StreamRecord((Object)new BucketWrapperChangeEvent(index, (ChangeEvent)event)));
            }
        }
    }

    private Tuple4<BucketMode, RowKeyExtractor, BucketAssigner, RowPartitionKeyExtractor> getTableInfo(TableId tableId) {
        FileStoreTable table;
        Preconditions.checkNotNull((Object)tableId, (String)"Invalid tableId in given event.");
        try {
            table = (FileStoreTable)this.catalog.getTable(Identifier.fromString(tableId.toString()));
        }
        catch (Catalog.TableNotExistException e) {
            throw new RuntimeException(e);
        }
        long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
        Integer numAssigners = table.coreOptions().dynamicBucketInitialBuckets();
        return new Tuple4((Object)table.bucketMode(), (Object)table.createRowKeyExtractor(), (Object)new HashBucketAssigner(table.snapshotManager(), this.commitUser, table.store().newIndexFileHandler(), this.totalTasksNumber, MathUtils.min(numAssigners, this.totalTasksNumber), this.currentTaskNumber, targetRowNum), (Object)new RowPartitionKeyExtractor(table.schema()));
    }
}

