/*
 * Decompiled with CFR 0.152.
 */
package org.datasyslab.geospark.joinJudgement;

import com.vividsolutions.jts.geom.Envelope;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.index.SpatialIndex;
import com.vividsolutions.jts.index.quadtree.Quadtree;
import com.vividsolutions.jts.index.strtree.STRtree;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.function.FlatMapFunction2;
import org.datasyslab.geospark.enums.IndexType;
import org.datasyslab.geospark.enums.JoinBuildSide;
import org.datasyslab.geospark.joinJudgement.DedupParams;
import org.datasyslab.geospark.joinJudgement.JudgementBase;
import org.datasyslab.geospark.monitoring.GeoSparkMetric;
import org.datasyslab.geospark.utils.TimeUtils;

public class DynamicIndexLookupJudgement<T extends Geometry, U extends Geometry>
extends JudgementBase
implements FlatMapFunction2<Iterator<U>, Iterator<T>, Pair<U, T>>,
Serializable {
    private static final Logger log = LogManager.getLogger(DynamicIndexLookupJudgement.class);
    private final IndexType indexType;
    private final JoinBuildSide joinBuildSide;
    private final GeoSparkMetric buildCount;
    private final GeoSparkMetric streamCount;
    private final GeoSparkMetric resultCount;
    private final GeoSparkMetric candidateCount;

    public DynamicIndexLookupJudgement(boolean considerBoundaryIntersection, IndexType indexType, JoinBuildSide joinBuildSide, @Nullable DedupParams dedupParams, GeoSparkMetric buildCount, GeoSparkMetric streamCount, GeoSparkMetric resultCount, GeoSparkMetric candidateCount) {
        super(considerBoundaryIntersection, dedupParams);
        this.indexType = indexType;
        this.joinBuildSide = joinBuildSide;
        this.buildCount = buildCount;
        this.streamCount = streamCount;
        this.resultCount = resultCount;
        this.candidateCount = candidateCount;
    }

    public Iterator<Pair<U, T>> call(Iterator<U> leftShapes, Iterator<T> rightShapes) throws Exception {
        Iterator<Object> streamShapes;
        Iterator<Object> buildShapes;
        boolean buildLeft;
        if (!leftShapes.hasNext() || !rightShapes.hasNext()) {
            this.buildCount.add(0L);
            this.streamCount.add(0L);
            this.resultCount.add(0L);
            this.candidateCount.add(0L);
            return Collections.emptyIterator();
        }
        this.initPartition();
        boolean bl = buildLeft = this.joinBuildSide == JoinBuildSide.LEFT;
        if (buildLeft) {
            buildShapes = leftShapes;
            streamShapes = rightShapes;
        } else {
            buildShapes = rightShapes;
            streamShapes = leftShapes;
        }
        final SpatialIndex spatialIndex = this.buildIndex(buildShapes);
        return new Iterator<Pair<U, T>>(){
            private List<Pair<U, T>> batch = null;
            private int nextIndex = 0;
            private int shapeCnt = 0;

            @Override
            public boolean hasNext() {
                if (this.batch != null) {
                    return true;
                }
                return this.populateNextBatch();
            }

            @Override
            public Pair<U, T> next() {
                if (this.batch == null) {
                    this.populateNextBatch();
                }
                if (this.batch != null) {
                    Pair result = this.batch.get(this.nextIndex);
                    ++this.nextIndex;
                    if (this.nextIndex >= this.batch.size()) {
                        this.populateNextBatch();
                        this.nextIndex = 0;
                    }
                    return result;
                }
                throw new NoSuchElementException();
            }

            private boolean populateNextBatch() {
                if (!streamShapes.hasNext()) {
                    if (this.batch != null) {
                        this.batch = null;
                    }
                    return false;
                }
                this.batch = new ArrayList();
                while (streamShapes.hasNext()) {
                    ++this.shapeCnt;
                    DynamicIndexLookupJudgement.this.streamCount.add(1L);
                    Geometry streamShape = (Geometry)streamShapes.next();
                    List candidates = spatialIndex.query(streamShape.getEnvelopeInternal());
                    for (Object candidate : candidates) {
                        DynamicIndexLookupJudgement.this.candidateCount.add(1L);
                        Geometry buildShape = (Geometry)candidate;
                        if (buildLeft) {
                            if (!DynamicIndexLookupJudgement.this.match(buildShape, streamShape)) continue;
                            this.batch.add(Pair.of((Object)buildShape, (Object)streamShape));
                            DynamicIndexLookupJudgement.this.resultCount.add(1L);
                            continue;
                        }
                        if (!DynamicIndexLookupJudgement.this.match(streamShape, buildShape)) continue;
                        this.batch.add(Pair.of((Object)streamShape, (Object)buildShape));
                        DynamicIndexLookupJudgement.this.resultCount.add(1L);
                    }
                    DynamicIndexLookupJudgement.this.logMilestone(this.shapeCnt, 100000L, "Streaming shapes");
                    if (this.batch.isEmpty()) continue;
                    return true;
                }
                this.batch = null;
                return false;
            }

            @Override
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    private SpatialIndex buildIndex(Iterator<? extends Geometry> geometries) {
        long startTime = System.currentTimeMillis();
        long count = 0L;
        SpatialIndex index = this.newIndex();
        while (geometries.hasNext()) {
            Geometry geometry = geometries.next();
            index.insert(geometry.getEnvelopeInternal(), geometry);
            ++count;
        }
        index.query(new Envelope(0.0, 0.0, 0.0, 0.0));
        this.log("Loaded %d shapes into an index in %d ms", count, TimeUtils.elapsedSince(startTime));
        this.buildCount.add((int)count);
        return index;
    }

    private SpatialIndex newIndex() {
        switch (this.indexType) {
            case RTREE: {
                return new STRtree();
            }
            case QUADTREE: {
                return new Quadtree();
            }
        }
        throw new IllegalArgumentException("Unsupported index type: " + this.indexType);
    }

    private void log(String message, Object ... params) {
        if (Level.INFO.isGreaterOrEqual((Priority)log.getEffectiveLevel())) {
            int partitionId = TaskContext.getPartitionId();
            long threadId = Thread.currentThread().getId();
            log.info((Object)("[" + threadId + ", PID=" + partitionId + "] " + String.format(message, params)));
        }
    }

    private void logMilestone(long cnt, long threshold, String name) {
        if (cnt > 1L && cnt % threshold == 1L) {
            this.log("[%s] Reached a milestone: %d", name, cnt);
        }
    }
}

