/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import org.apache.flink.state.forst.ForStDBOperation;
import org.apache.flink.state.forst.ForStDBPutRequest;
import org.rocksdb.RocksDB;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteOptions;

public class ForStWriteBatchOperation
implements ForStDBOperation {
    private static final int PER_RECORD_ESTIMATE_BYTES = 100;
    private final RocksDB db;
    private final List<ForStDBPutRequest<?, ?>> batchRequest;
    private final WriteOptions writeOptions;
    private final Executor executor;

    ForStWriteBatchOperation(RocksDB db, List<ForStDBPutRequest<?, ?>> batchRequest, WriteOptions writeOptions, Executor executor) {
        this.db = db;
        this.batchRequest = batchRequest;
        this.writeOptions = writeOptions;
        this.executor = executor;
    }

    @Override
    public CompletableFuture<Void> process() {
        return CompletableFuture.runAsync(() -> {
            try (WriteBatch writeBatch = new WriteBatch(this.batchRequest.size() * 100);){
                for (ForStDBPutRequest<?, ?> request : this.batchRequest) {
                    if (request.valueIsNull()) {
                        writeBatch.delete(request.getColumnFamilyHandle(), request.buildSerializedKey());
                        continue;
                    }
                    writeBatch.put(request.getColumnFamilyHandle(), request.buildSerializedKey(), request.buildSerializedValue());
                }
                this.db.write(this.writeOptions, writeBatch);
                for (ForStDBPutRequest<?, ?> request : this.batchRequest) {
                    request.completeStateFuture();
                }
            }
            catch (Exception e) {
                String msg = "Error while write batch data to ForStDB.";
                for (ForStDBPutRequest<?, ?> request : this.batchRequest) {
                    request.completeStateFutureExceptionally(msg, e);
                }
                throw new CompletionException(msg, e);
            }
        }, this.executor);
    }
}

