/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.runtime.partitioning;

import java.io.Serializable;
import java.time.Duration;
import java.util.Optional;
import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.FlushEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.function.HashFunction;
import org.apache.flink.cdc.common.function.HashFunctionProvider;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
import org.apache.flink.cdc.runtime.partitioning.PartitioningEvent;
import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

@Internal
public class PrePartitionOperator
extends AbstractStreamOperator<PartitioningEvent>
implements OneInputStreamOperator<Event, PartitioningEvent>,
Serializable {
    private static final long serialVersionUID = 1L;
    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1L);
    private final OperatorID schemaOperatorId;
    private final int downstreamParallelism;
    private final HashFunctionProvider<DataChangeEvent> hashFunctionProvider;
    private transient SchemaEvolutionClient schemaEvolutionClient;
    private transient LoadingCache<TableId, HashFunction<DataChangeEvent>> cachedHashFunctions;

    public PrePartitionOperator(OperatorID schemaOperatorId, int downstreamParallelism, HashFunctionProvider<DataChangeEvent> hashFunctionProvider) {
        this.chainingStrategy = ChainingStrategy.ALWAYS;
        this.schemaOperatorId = schemaOperatorId;
        this.downstreamParallelism = downstreamParallelism;
        this.hashFunctionProvider = hashFunctionProvider;
    }

    public void open() throws Exception {
        super.open();
        TaskOperatorEventGateway toCoordinator = this.getContainingTask().getEnvironment().getOperatorCoordinatorEventGateway();
        this.schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, this.schemaOperatorId);
        this.cachedHashFunctions = this.createCache();
    }

    public void processElement(StreamRecord<Event> element) throws Exception {
        Event event = (Event)element.getValue();
        if (event instanceof SchemaChangeEvent) {
            TableId tableId = ((SchemaChangeEvent)event).tableId();
            this.cachedHashFunctions.put((Object)tableId, this.recreateHashFunction(tableId));
            this.broadcastEvent(event);
        } else if (event instanceof FlushEvent) {
            this.broadcastEvent(event);
        } else if (event instanceof DataChangeEvent) {
            this.partitionBy((DataChangeEvent)event);
        }
    }

    private void partitionBy(DataChangeEvent dataChangeEvent) throws Exception {
        this.output.collect((Object)new StreamRecord((Object)new PartitioningEvent((Event)dataChangeEvent, ((HashFunction)this.cachedHashFunctions.get((Object)dataChangeEvent.tableId())).hashcode((Object)dataChangeEvent) % this.downstreamParallelism)));
    }

    private void broadcastEvent(Event toBroadcast) {
        for (int i = 0; i < this.downstreamParallelism; ++i) {
            Event copiedEvent = EventSerializer.INSTANCE.copy(toBroadcast);
            this.output.collect((Object)new StreamRecord((Object)new PartitioningEvent(copiedEvent, i)));
        }
    }

    private Schema loadLatestSchemaFromRegistry(TableId tableId) {
        Optional<Schema> schema;
        try {
            schema = this.schemaEvolutionClient.getLatestEvolvedSchema(tableId);
        }
        catch (Exception e) {
            throw new RuntimeException(String.format("Failed to request latest schema for table \"%s\"", tableId), e);
        }
        if (!schema.isPresent()) {
            throw new IllegalStateException(String.format("Schema is never registered or outdated for table \"%s\"", tableId));
        }
        return schema.get();
    }

    private HashFunction<DataChangeEvent> recreateHashFunction(TableId tableId) {
        return this.hashFunctionProvider.getHashFunction(tableId, this.loadLatestSchemaFromRegistry(tableId));
    }

    private LoadingCache<TableId, HashFunction<DataChangeEvent>> createCache() {
        return CacheBuilder.newBuilder().expireAfterAccess(CACHE_EXPIRE_DURATION).build((CacheLoader)new CacheLoader<TableId, HashFunction<DataChangeEvent>>(){

            public HashFunction<DataChangeEvent> load(TableId key) {
                return PrePartitionOperator.this.recreateHashFunction(key);
            }
        });
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
    }
}

