/*
 * Decompiled with CFR 0.152.
 */
package com.salesforce.datacloud.jdbc.core.partial;

import com.salesforce.datacloud.jdbc.exception.DataCloudJDBCException;
import com.salesforce.datacloud.jdbc.util.StreamUtilities;
import com.salesforce.datacloud.jdbc.util.Unstable;
import com.salesforce.datacloud.query.v3.DataCloudQueryStatus;
import com.salesforce.datacloud.shaded.dev.failsafe.Failsafe;
import com.salesforce.datacloud.shaded.dev.failsafe.FailsafeException;
import com.salesforce.datacloud.shaded.dev.failsafe.Policy;
import com.salesforce.datacloud.shaded.dev.failsafe.RetryPolicy;
import com.salesforce.datacloud.shaded.dev.failsafe.RetryPolicyBuilder;
import com.salesforce.datacloud.shaded.io.grpc.StatusRuntimeException;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import salesforce.cdp.hyperdb.v1.HyperServiceGrpc;
import salesforce.cdp.hyperdb.v1.QueryInfo;
import salesforce.cdp.hyperdb.v1.QueryInfoParam;

@Unstable
public final class DataCloudQueryPolling {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(DataCloudQueryPolling.class);

    public static DataCloudQueryStatus waitForChunksAvailable(HyperServiceGrpc.HyperServiceBlockingStub stub, String queryId, long offset, long limit, Duration duration, boolean allowLessThan) throws DataCloudJDBCException {
        return DataCloudQueryPolling.waitForCountAvailabile(stub, queryId, offset, limit, duration, allowLessThan, DataCloudQueryStatus::getChunkCount);
    }

    public static DataCloudQueryStatus waitForRowsAvailable(HyperServiceGrpc.HyperServiceBlockingStub stub, String queryId, long offset, long limit, Duration duration, boolean allowLessThan) throws DataCloudJDBCException {
        return DataCloudQueryPolling.waitForCountAvailabile(stub, queryId, offset, limit, duration, allowLessThan, DataCloudQueryStatus::getRowCount);
    }

    private static DataCloudQueryStatus waitForCountAvailabile(HyperServiceGrpc.HyperServiceBlockingStub stub, String queryId, long offset, long limit, Duration timeout2, boolean allowLessThan, Function<DataCloudQueryStatus, Long> countSelector) throws DataCloudJDBCException {
        Predicate<DataCloudQueryStatus> predicate = status -> {
            Long count = (Long)countSelector.apply((DataCloudQueryStatus)status);
            if (allowLessThan) {
                return count > offset;
            }
            return count >= offset + limit;
        };
        DataCloudQueryStatus result = DataCloudQueryPolling.waitForQueryStatus(stub, queryId, timeout2, predicate);
        if (predicate.test(result)) {
            return result;
        }
        if (allowLessThan) {
            throw new DataCloudJDBCException("Timed out waiting for new items to be available. queryId=" + queryId + ", status=" + result);
        }
        throw new DataCloudJDBCException("Timed out waiting for enough items to be available. queryId=" + queryId + ", status=" + result);
    }

    public static DataCloudQueryStatus waitForQueryStatus(HyperServiceGrpc.HyperServiceBlockingStub stub, String queryId, Duration timeoutDuration, Predicate<DataCloudQueryStatus> predicate) throws DataCloudJDBCException {
        AtomicReference last = new AtomicReference();
        Instant deadline = Instant.now().plus(timeoutDuration);
        AtomicInteger attempts = new AtomicInteger(0);
        RetryPolicy retryPolicy = ((RetryPolicyBuilder)RetryPolicy.builder().withMaxDuration(timeoutDuration).handleIf(e -> {
            if (!(e instanceof StatusRuntimeException)) {
                log.error("Got an unexpected exception when getting query status for queryId={}", (Object)queryId, e);
                return false;
            }
            if (last.get() == null) {
                log.error("Failed to get query status response, will not try again. queryId={}, attempts={}", queryId, attempts.get(), e);
                return false;
            }
            if (Instant.now().isAfter(deadline)) {
                log.error("Reached deadline for polling query status, will not try again. queryId={}, attempts={}, lastStatus={}", queryId, attempts.get(), last.get(), e);
                return false;
            }
            log.warn("We think this error was a server timeout, will try again. queryId={}, attempts={}, lastStatus={}", queryId, attempts.get(), last.get());
            return true;
        })).build();
        try {
            return Failsafe.with(retryPolicy, (Policy[])new RetryPolicy[0]).get(() -> DataCloudQueryPolling.waitForQueryStatusWithoutRetry(stub, queryId, deadline, last, attempts, predicate));
        }
        catch (FailsafeException ex) {
            throw new DataCloudJDBCException("Failed to get query status response. queryId=" + queryId + ", attempts=" + attempts.get() + ", lastStatus=" + last.get(), ex.getCause());
        }
        catch (StatusRuntimeException ex) {
            throw new DataCloudJDBCException("Failed to get query status response. queryId=" + queryId, ex);
        }
    }

    static DataCloudQueryStatus waitForQueryStatusWithoutRetry(HyperServiceGrpc.HyperServiceBlockingStub stub, String queryId, Instant deadline, AtomicReference<DataCloudQueryStatus> last, AtomicInteger times, Predicate<DataCloudQueryStatus> predicate) {
        times.getAndIncrement();
        QueryInfoParam param = QueryInfoParam.newBuilder().setQueryId(queryId).setStreaming(true).build();
        while (Instant.now().isBefore(deadline)) {
            Iterator<QueryInfo> info = stub.getQueryInfo(param);
            Optional<DataCloudQueryStatus> matched = StreamUtilities.toStream(info).map(DataCloudQueryStatus::of).filter(Optional::isPresent).map(Optional::get).peek(last::set).filter(predicate).findFirst();
            if (matched.isPresent()) {
                return matched.get();
            }
            if (Optional.ofNullable(last.get()).map(DataCloudQueryStatus::allResultsProduced).orElse(false).booleanValue()) {
                log.warn("predicate did not match but all results were produced. last={}", (Object)last.get());
                return last.get();
            }
            log.info("end of info stream, starting a new one if the timeout allows. last={}, remaining={}", (Object)last.get(), (Object)DataCloudQueryPolling.remaining(deadline));
        }
        log.warn("exceeded deadline getting query info. last={}", (Object)last.get());
        return last.get();
    }

    private static Duration remaining(Instant deadline) {
        return Duration.between(Instant.now(), deadline);
    }

    @Generated
    private DataCloudQueryPolling() {
        throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
    }
}

