/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.unsafe.impl.batchimport.input.csv;

import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.function.Supplier;
import org.neo4j.csv.reader.BufferedCharSeeker;
import org.neo4j.csv.reader.CharSeeker;
import org.neo4j.csv.reader.ProcessingSource;
import org.neo4j.csv.reader.Source;
import org.neo4j.csv.reader.SourceTraceability;
import org.neo4j.helpers.Exceptions;
import org.neo4j.helpers.collection.PrefetchingIterator;
import org.neo4j.kernel.impl.util.Validator;
import org.neo4j.kernel.impl.util.Validators;
import org.neo4j.kernel.impl.util.collection.ContinuableArrayCursor;
import org.neo4j.unsafe.impl.batchimport.InputIterator;
import org.neo4j.unsafe.impl.batchimport.executor.TaskExecutionPanicException;
import org.neo4j.unsafe.impl.batchimport.input.InputEntity;
import org.neo4j.unsafe.impl.batchimport.input.InputEntityDecorators;
import org.neo4j.unsafe.impl.batchimport.input.InputException;
import org.neo4j.unsafe.impl.batchimport.input.csv.Configuration;
import org.neo4j.unsafe.impl.batchimport.input.csv.Data;
import org.neo4j.unsafe.impl.batchimport.input.csv.Decorator;
import org.neo4j.unsafe.impl.batchimport.input.csv.Header;
import org.neo4j.unsafe.impl.batchimport.input.csv.IdType;
import org.neo4j.unsafe.impl.batchimport.input.csv.InputEntityDeserializer;
import org.neo4j.unsafe.impl.batchimport.input.csv.InputGroupsDeserializer;
import org.neo4j.unsafe.impl.batchimport.staging.TicketedProcessing;

public class ParallelInputEntityDeserializer<ENTITY extends InputEntity>
extends InputIterator.Adapter<ENTITY> {
    private final ProcessingSource source;
    private final TicketedProcessing<CharSeeker, Header, ENTITY[]> processing;
    private final ContinuableArrayCursor<ENTITY> cursor;
    private SourceTraceability last = SourceTraceability.EMPTY;
    private Decorator<ENTITY> decorator;

    public ParallelInputEntityDeserializer(Data<ENTITY> data, Header.Factory headerFactory, Configuration config, IdType idType, int maxProcessors, int initialProcessors, InputGroupsDeserializer.DeserializerFactory<ENTITY> factory, Validator<ENTITY> validator, Class<ENTITY> entityClass) {
        this.source = new ProcessingSource(data.stream(), config.bufferSize(), maxProcessors);
        try {
            Source.Chunk firstChunk = this.source.nextChunk();
            if (firstChunk.length() == 0) {
                throw new InputException("No header defined");
            }
            BufferedCharSeeker firstSeeker = new BufferedCharSeeker(Source.singleChunk((Source.Chunk)firstChunk), (org.neo4j.csv.reader.Configuration)config);
            Header dataHeader = headerFactory.create((CharSeeker)firstSeeker, config, idType);
            this.decorator = data.decorator();
            boolean deferredValidation = this.decorator.isMutable();
            Decorator batchDecorator = deferredValidation ? InputEntityDecorators.noDecorator() : this.decorator;
            Validator<Object> batchValidator = deferredValidation ? Validators.emptyValidator() : validator;
            this.processing = new TicketedProcessing<CharSeeker, Header, InputEntity[]>("Parallel input parser", maxProcessors, (seeker, header) -> {
                InputEntityDeserializer chunkDeserializer = factory.create((Header)header, (CharSeeker)seeker, batchDecorator, (Validator)batchValidator);
                chunkDeserializer.initialize();
                ArrayList<InputEntity> entities = new ArrayList<InputEntity>();
                while (chunkDeserializer.hasNext()) {
                    InputEntity next = (InputEntity)chunkDeserializer.next();
                    entities.add(next);
                }
                return entities.toArray((InputEntity[])Array.newInstance(entityClass, entities.size()));
            }, () -> dataHeader.clone());
            this.processing.processors(initialProcessors - this.processing.processors(0));
            Supplier<T[]> batchSupplier = ParallelInputEntityDeserializer.rebaseBatches(this.processing);
            batchSupplier = deferredValidation ? this.decorateAndValidate(batchSupplier, this.decorator, validator) : batchSupplier;
            this.cursor = new ContinuableArrayCursor(batchSupplier);
            this.processing.slurp(ParallelInputEntityDeserializer.seekers((CharSeeker)firstSeeker, this.source, config), true);
        }
        catch (IOException e) {
            throw new InputException("Couldn't read first chunk from input", e);
        }
    }

    private Supplier<ENTITY[]> decorateAndValidate(Supplier<ENTITY[]> actual, Decorator<ENTITY> decorator, Validator<ENTITY> validator) {
        return () -> {
            InputEntity[] entities = (InputEntity[])actual.get();
            if (entities != null) {
                for (int i = 0; i < entities.length; ++i) {
                    InputEntity entity = (InputEntity)decorator.apply(entities[i]);
                    validator.validate(entity);
                    entities[i] = entity;
                }
            }
            return entities;
        };
    }

    protected ENTITY fetchNextOrNull() {
        boolean hasNext;
        try {
            hasNext = this.cursor.next();
        }
        catch (TaskExecutionPanicException e) {
            throw Exceptions.launderedException(e.getCause());
        }
        if (hasNext) {
            InputEntity next = (InputEntity)this.cursor.get();
            this.last = next;
            return (ENTITY)next;
        }
        return null;
    }

    private static <ENTITY extends InputEntity> Supplier<ENTITY[]> rebaseBatches(final TicketedProcessing<CharSeeker, Header, ENTITY[]> processing) {
        return new Supplier<ENTITY[]>(){
            private String currentSourceDescription;
            private long baseLineNumber;
            private long basePosition;

            @Override
            public ENTITY[] get() {
                InputEntity[] batch = (InputEntity[])processing.next();
                if (batch != null && batch.length > 0) {
                    InputEntity lastEntity = batch[batch.length - 1];
                    if (this.currentSourceDescription == null || !this.currentSourceDescription.equals(lastEntity.sourceDescription())) {
                        this.currentSourceDescription = lastEntity.sourceDescription();
                        this.basePosition = 0L;
                        this.baseLineNumber = 0L;
                        this.currentSourceDescription = lastEntity.sourceDescription();
                    }
                    for (InputEntity entity : batch) {
                        entity.rebase(this.baseLineNumber, this.basePosition);
                    }
                    if (lastEntity.sourceDescription().equals(this.currentSourceDescription)) {
                        this.baseLineNumber = lastEntity.lineNumber();
                        this.basePosition = lastEntity.position();
                    }
                }
                return batch;
            }
        };
    }

    private static Iterator<CharSeeker> seekers(final CharSeeker firstSeeker, final ProcessingSource source, final Configuration config) {
        return new PrefetchingIterator<CharSeeker>(){
            private boolean firstReturned;

            protected CharSeeker fetchNextOrNull() {
                if (!this.firstReturned) {
                    this.firstReturned = true;
                    return firstSeeker;
                }
                try {
                    Source.Chunk chunk = source.nextChunk();
                    return chunk.length() > 0 ? new BufferedCharSeeker(Source.singleChunk((Source.Chunk)chunk), (org.neo4j.csv.reader.Configuration)config) : null;
                }
                catch (IOException e) {
                    throw new InputException("Couldn't get chunk from source", e);
                }
            }
        };
    }

    @Override
    public void close() {
        this.processing.close();
        try {
            this.decorator.close();
            this.source.close();
        }
        catch (IOException e) {
            throw new InputException("Couldn't close source of data chunks", e);
        }
        finally {
            super.close();
        }
    }

    @Override
    public int processors(int delta) {
        return this.processing.processors(delta);
    }

    @Override
    public String sourceDescription() {
        return this.last.sourceDescription();
    }

    @Override
    public long lineNumber() {
        return this.last.lineNumber();
    }

    @Override
    public long position() {
        return this.last.position();
    }

    @Override
    public void receivePanic(Throwable cause) {
        this.processing.receivePanic(cause);
    }
}

