/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.sink.bulk;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.sink.bulk.BulkInsertWriterHelper;
import org.apache.hudi.sink.bulk.WriterHelpers;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.meta.CkpMetadataFactory;
import org.apache.hudi.sink.utils.TimeWait;
import org.apache.hudi.util.FlinkWriteClients;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BulkInsertWriteFunction<I>
extends AbstractWriteFunction<I> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(BulkInsertWriteFunction.class);
    private transient BulkInsertWriterHelper writerHelper;
    private final Configuration config;
    private final RowType rowType;
    private int taskID;
    private transient HoodieFlinkWriteClient writeClient;
    private volatile String initInstant;
    private transient OperatorEventGateway eventGateway;
    private CkpMetadata ckpMetadata;

    public BulkInsertWriteFunction(Configuration config, RowType rowType) {
        this.config = config;
        this.rowType = rowType;
    }

    public void open(Configuration parameters) throws IOException {
        this.taskID = this.getRuntimeContext().getIndexOfThisSubtask();
        this.writeClient = FlinkWriteClients.createWriteClient(this.config, this.getRuntimeContext());
        this.ckpMetadata = CkpMetadataFactory.getCkpMetadata(this.writeClient.getConfig(), this.config);
        this.initInstant = this.lastPendingInstant();
        this.sendBootstrapEvent();
    }

    public void processElement(I value, ProcessFunction.Context ctx, Collector<Object> out) throws IOException {
        this.initWriterHelperIfNeeded();
        this.writerHelper.write((RowData)value);
    }

    public void close() {
        if (this.writeClient != null) {
            this.writeClient.close();
        }
    }

    @Override
    public void endInput() {
        this.initWriterHelperIfNeeded();
        List<WriteStatus> writeStatus = this.writerHelper.getWriteStatuses(this.taskID);
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(this.taskID).instantTime(this.writerHelper.getInstantTime()).writeStatus(writeStatus).lastBatch(true).endInput(true).build();
        this.eventGateway.sendEventToCoordinator((OperatorEvent)event);
    }

    @Override
    public void handleOperatorEvent(OperatorEvent event) {
    }

    @Override
    public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
        this.eventGateway = operatorEventGateway;
    }

    private void initWriterHelperIfNeeded() {
        if (this.writerHelper == null) {
            String instant = this.instantToWrite();
            this.writerHelper = WriterHelpers.getWriterHelper(this.config, this.writeClient.getHoodieTable(), this.writeClient.getConfig(), instant, this.taskID, this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getAttemptNumber(), this.rowType);
        }
    }

    private void sendBootstrapEvent() {
        WriteMetadataEvent event = WriteMetadataEvent.builder().taskID(this.taskID).writeStatus(Collections.emptyList()).instantTime("").bootstrap(true).build();
        this.eventGateway.sendEventToCoordinator((OperatorEvent)event);
        LOG.info("Send bootstrap write metadata event to coordinator, task[{}].", (Object)this.taskID);
    }

    protected String lastPendingInstant() {
        return this.ckpMetadata.lastPendingInstant();
    }

    private String instantToWrite() {
        String instant = this.lastPendingInstant();
        TimeWait timeWait = TimeWait.builder().timeout(this.config.getLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT)).action("instant initialize").build();
        while (instant == null || instant.equals(this.initInstant)) {
            timeWait.waitFor();
            instant = this.lastPendingInstant();
        }
        return instant;
    }
}

