/*
 * Decompiled with CFR 0.152.
 */
package apoc.export.parquet;

import apoc.Pools;
import apoc.export.parquet.ExportParquetStrategy;
import apoc.export.parquet.ParquetBufferedWriter;
import apoc.export.parquet.ParquetConfig;
import apoc.export.parquet.ParquetExportType;
import apoc.result.ByteArrayResult;
import apoc.util.QueueBasedSpliterator;
import apoc.util.QueueUtil;
import apoc.util.Util;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.schema.MessageType;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.logging.Log;
import org.neo4j.procedure.TerminationGuard;

public abstract class ExportParquetStreamStrategy<TYPE, IN>
implements ExportParquetStrategy<IN, Stream<ByteArrayResult>> {
    private final GraphDatabaseService db;
    private final Pools pools;
    private final TerminationGuard terminationGuard;
    private final Log logger;
    private final ParquetExportType exportType;

    public ExportParquetStreamStrategy(GraphDatabaseService db, Pools pools, TerminationGuard terminationGuard, Log logger, ParquetExportType exportType) {
        this.db = db;
        this.pools = pools;
        this.terminationGuard = terminationGuard;
        this.logger = logger;
        this.exportType = exportType;
    }

    @Override
    public Stream<ByteArrayResult> export(IN data, ParquetConfig config) {
        ArrayBlockingQueue queue = new ArrayBlockingQueue(100);
        Util.inTxFuture((ExecutorService)this.pools.getDefaultExecutorService(), (GraphDatabaseService)this.db, tx -> {
            int batchCount = 0;
            ArrayList<TYPE> rows = new ArrayList<TYPE>(config.getBatchSize());
            try {
                byte[] bytes;
                Iterator<TYPE> it = this.toIterator(data);
                while (!Util.transactionIsTerminated((TerminationGuard)this.terminationGuard) && it.hasNext()) {
                    rows.add(it.next());
                    if (batchCount > 0 && batchCount % config.getBatchSize() == 0) {
                        bytes = this.writeBatch(rows, data, config);
                        QueueUtil.put((BlockingQueue)queue, (Object)new ByteArrayResult(bytes), (long)10L);
                    }
                    ++batchCount;
                }
                if (!rows.isEmpty()) {
                    bytes = this.writeBatch(rows, data, config);
                    QueueUtil.put((BlockingQueue)queue, (Object)new ByteArrayResult(bytes), (long)10L);
                }
                Boolean bl = true;
                return bl;
            }
            catch (Exception e) {
                this.logger.error("Exception while extracting Parquet data:", (Throwable)e);
            }
            finally {
                QueueUtil.put((BlockingQueue)queue, (Object)ByteArrayResult.NULL, (long)10L);
            }
            return true;
        });
        QueueBasedSpliterator spliterator = new QueueBasedSpliterator(queue, (Object)ByteArrayResult.NULL, this.terminationGuard, Integer.MAX_VALUE);
        return StreamSupport.stream(spliterator, false);
    }

    private byte[] writeBatch(List<TYPE> rows, IN data, ParquetConfig config) {
        byte[] byArray;
        List<Map<String, Object>> conf = this.exportType.createConfig(rows, data, config);
        MessageType schema = this.exportType.schemaFor(this.db, conf);
        ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
        try {
            ParquetBufferedWriter out = new ParquetBufferedWriter(bytesOut);
            try (ParquetWriter<Group> writer = this.getBuild(schema, ExampleParquetWriter.builder((OutputFile)out));){
                this.writeRows(rows, writer, this.exportType, schema);
            }
            byArray = bytesOut.toByteArray();
        }
        catch (Throwable throwable) {
            try {
                try {
                    bytesOut.close();
                }
                catch (Throwable throwable2) {
                    throwable.addSuppressed(throwable2);
                }
                throw throwable;
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        bytesOut.close();
        return byArray;
    }

    public abstract Iterator<TYPE> toIterator(IN var1);
}

