/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.jdbc.internal.executor;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;

public final class TableBufferReducedStatementExecutor
implements JdbcBatchStatementExecutor<RowData> {
    private final JdbcBatchStatementExecutor<RowData> upsertExecutor;
    private final JdbcBatchStatementExecutor<RowData> deleteExecutor;
    private final Function<RowData, RowData> keyExtractor;
    private final Map<RowData, Tuple2<Boolean, RowData>> reduceBuffer = new HashMap<RowData, Tuple2<Boolean, RowData>>();

    public TableBufferReducedStatementExecutor(JdbcBatchStatementExecutor<RowData> upsertExecutor, JdbcBatchStatementExecutor<RowData> deleteExecutor, Function<RowData, RowData> keyExtractor) {
        this.upsertExecutor = upsertExecutor;
        this.deleteExecutor = deleteExecutor;
        this.keyExtractor = keyExtractor;
    }

    @Override
    public void prepareStatements(Connection connection) throws SQLException {
        this.upsertExecutor.prepareStatements(connection);
        this.deleteExecutor.prepareStatements(connection);
    }

    @Override
    public void addToBatch(RowData record) throws SQLException {
        RowData key = this.keyExtractor.apply(record);
        boolean flag = this.changeFlag(record.getRowKind());
        this.reduceBuffer.put(key, (Tuple2<Boolean, RowData>)Tuple2.of((Object)flag, (Object)record));
    }

    private boolean changeFlag(RowKind rowKind) {
        switch (rowKind) {
            case INSERT: 
            case UPDATE_AFTER: {
                return true;
            }
            case DELETE: 
            case UPDATE_BEFORE: {
                return false;
            }
        }
        throw new UnsupportedOperationException(String.format("Unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE, but get: %s.", rowKind));
    }

    @Override
    public void executeBatch() throws SQLException {
        if (!this.reduceBuffer.isEmpty()) {
            for (Map.Entry<RowData, Tuple2<Boolean, RowData>> entry : this.reduceBuffer.entrySet()) {
                if (((Boolean)entry.getValue().f0).booleanValue()) {
                    this.upsertExecutor.addToBatch((RowData)entry.getValue().f1);
                    continue;
                }
                this.deleteExecutor.addToBatch(entry.getKey());
            }
            this.upsertExecutor.executeBatch();
            this.deleteExecutor.executeBatch();
            this.reduceBuffer.clear();
        }
    }

    @Override
    public void closeStatements() throws SQLException {
        this.upsertExecutor.closeStatements();
        this.deleteExecutor.closeStatements();
    }
}

