/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.query.calcite.exec.rel;

import java.util.Comparator;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.processors.query.calcite.exec.ExecutionContext;
import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeHashIndex;
import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeIndex;
import org.apache.ignite.internal.processors.query.calcite.exec.RuntimeSortedIndex;
import org.apache.ignite.internal.processors.query.calcite.exec.exp.RangeIterable;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.Downstream;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.HashAggregateNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.MemoryTrackingNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.ScanNode;
import org.apache.ignite.internal.processors.query.calcite.exec.rel.SingleNode;
import org.apache.ignite.internal.util.typedef.F;
import org.jetbrains.annotations.Nullable;

public class IndexSpoolNode<Row>
extends MemoryTrackingNode<Row>
implements SingleNode<Row>,
Downstream<Row> {
    private final ScanNode<Row> scan;
    private final RuntimeIndex<Row> idx;
    private int requested;
    private int waiting;

    private IndexSpoolNode(ExecutionContext<Row> ctx, RelDataType rowType, RuntimeIndex<Row> idx, ScanNode<Row> scan) {
        super(ctx, rowType, idx instanceof HashAggregateNode ? HASH_MAP_ROW_OVERHEAD : ARRAY_ROW_OVERHEAD);
        this.idx = idx;
        this.scan = scan;
    }

    @Override
    public void onRegister(Downstream<Row> downstream) {
        this.scan.onRegister(downstream);
    }

    @Override
    public Downstream<Row> downstream() {
        return this.scan.downstream();
    }

    @Override
    protected void rewindInternal() {
        this.scan.rewind();
    }

    @Override
    public void rewind() {
        this.rewindInternal();
    }

    @Override
    protected Downstream<Row> requestDownstream(int idx) {
        if (idx != 0) {
            throw new IndexOutOfBoundsException();
        }
        return this;
    }

    @Override
    public void request(int rowsCnt) throws Exception {
        assert (!F.isEmpty(this.sources()) && this.sources().size() == 1);
        assert (rowsCnt > 0);
        this.checkState();
        if (!this.indexReady()) {
            this.requested = rowsCnt;
            this.requestSource();
        } else {
            this.scan.request(rowsCnt);
        }
    }

    private void requestSource() throws Exception {
        this.waiting = IN_BUFFER_SIZE;
        this.source().request(IN_BUFFER_SIZE);
    }

    @Override
    public void push(Row row) throws Exception {
        this.checkState();
        this.idx.push(row);
        this.nodeMemoryTracker.onRowAdded(row);
        --this.waiting;
        if (this.waiting == 0) {
            this.context().execute(this::requestSource, this::onError);
        }
    }

    @Override
    public void end() throws Exception {
        this.checkState();
        this.waiting = -1;
        this.scan.request(this.requested);
    }

    @Override
    protected void closeInternal() {
        try {
            this.scan.close();
        }
        catch (Exception ex) {
            this.onError(ex);
        }
        try {
            this.idx.close();
        }
        catch (Exception ex) {
            this.onError(ex);
        }
        super.closeInternal();
    }

    private boolean indexReady() {
        return this.waiting == -1;
    }

    public static <Row> IndexSpoolNode<Row> createTreeSpool(ExecutionContext<Row> ctx, RelDataType rowType, RelCollation collation, Comparator<Row> comp, Predicate<Row> filter, RangeIterable<Row> ranges) {
        RuntimeSortedIndex<Row> idx = new RuntimeSortedIndex<Row>(ctx, collation, comp);
        ScanNode<Row> scan = new ScanNode<Row>(ctx, rowType, idx.scan(ctx, rowType, filter, ranges));
        return new IndexSpoolNode<Row>(ctx, rowType, idx, scan);
    }

    public static <Row> IndexSpoolNode<Row> createHashSpool(ExecutionContext<Row> ctx, RelDataType rowType, ImmutableBitSet keys, @Nullable Predicate<Row> filter, Supplier<Row> searchRow, boolean allowNulls) {
        RuntimeHashIndex<Row> idx = new RuntimeHashIndex<Row>(ctx, keys, allowNulls);
        ScanNode<Row> scan = new ScanNode<Row>(ctx, rowType, idx.scan(searchRow, filter));
        return new IndexSpoolNode<Row>(ctx, rowType, idx, scan);
    }
}

