/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.consistency.checking.full;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.neo4j.consistency.checking.full.QueueDistribution;
import org.neo4j.consistency.checking.full.RecordCheckWorker;
import org.neo4j.consistency.checking.full.RecordProcessor;
import org.neo4j.internal.batchimport.cache.idmapping.string.Workers;
import org.neo4j.internal.helpers.progress.ProgressListener;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;

public class RecordDistributor {
    private RecordDistributor() {
    }

    public static <RECORD> void distributeRecords(int numberOfThreads, String workerNames, int queueSize, Iterator<RECORD> records, ProgressListener progress, RecordProcessor<RECORD> processor, QueueDistribution.QueueDistributor<RECORD> idDistributor, PageCacheTracer pageCacheTracer) {
        if (!records.hasNext()) {
            return;
        }
        ArrayBlockingQueue[] recordQ = new ArrayBlockingQueue[numberOfThreads];
        Workers workers = new Workers(workerNames);
        AtomicInteger idGroup = new AtomicInteger(-1);
        for (int threadId = 0; threadId < numberOfThreads; ++threadId) {
            recordQ[threadId] = new ArrayBlockingQueue(queueSize);
            workers.start(new RecordCheckWorker<RECORD>(threadId, idGroup, recordQ[threadId], processor, pageCacheTracer));
        }
        int[] recsProcessed = new int[numberOfThreads];
        RecordConsumer<Object> recordConsumer = (record, qIndex) -> {
            recordQ[qIndex].put(record);
            int n = qIndex;
            recsProcessed[n] = recsProcessed[n] + 1;
        };
        try {
            while (records.hasNext()) {
                try {
                    RECORD record2 = records.next();
                    idDistributor.distribute(record2, recordConsumer);
                    progress.add(1L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            for (RecordCheckWorker worker : workers) {
                worker.done();
            }
            workers.awaitAndThrowOnError();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Was interrupted while awaiting completion");
        }
    }

    public static long calculateRecordsPerCpu(long highId, int numberOfThreads) {
        boolean hasRest = highId % (long)numberOfThreads > 0L;
        long result = highId / (long)numberOfThreads;
        if (hasRest) {
            ++result;
        }
        return result;
    }

    static interface RecordConsumer<RECORD> {
        public void accept(RECORD var1, int var2) throws InterruptedException;
    }
}

