/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.tx.statement;

import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import org.neo4j.bolt.dbapi.BoltQueryExecution;
import org.neo4j.bolt.event.CopyOnWriteEventPublisher;
import org.neo4j.bolt.event.EventPublisher;
import org.neo4j.bolt.protocol.common.fsm.response.NoopRecordHandler;
import org.neo4j.bolt.protocol.common.fsm.response.RecordHandler;
import org.neo4j.bolt.protocol.common.fsm.response.ResponseHandler;
import org.neo4j.bolt.protocol.common.message.Error;
import org.neo4j.bolt.tx.TransactionType;
import org.neo4j.bolt.tx.error.statement.StatementException;
import org.neo4j.bolt.tx.error.statement.StatementStreamingException;
import org.neo4j.bolt.tx.statement.Statement;
import org.neo4j.bolt.tx.statement.StatementQuerySubscriber;
import org.neo4j.function.ThrowingConsumer;
import org.neo4j.graphdb.ExecutionPlanDescription;
import org.neo4j.graphdb.GqlStatusObject;
import org.neo4j.graphdb.Notification;
import org.neo4j.graphdb.QueryExecutionType;
import org.neo4j.graphdb.QueryStatistics;
import org.neo4j.kernel.database.DatabaseReference;
import org.neo4j.kernel.impl.query.QueryExecution;
import org.neo4j.values.AnyValue;
import org.neo4j.values.virtual.MapValue;

public class StatementImpl
implements Statement {
    private static final long DEFAULT_BATCH_SIZE = Long.MAX_VALUE;
    private final long id;
    private final DatabaseReference database;
    private final Clock clock;
    private final StatementQuerySubscriber subscriber;
    private final BoltQueryExecution execution;
    private final EventPublisher<Statement.Listener> eventPublisher = new CopyOnWriteEventPublisher<Statement.Listener>();
    private final Lock executionLock = new ReentrantLock();
    private final AtomicReference<State> state = new AtomicReference<State>(State.RUNNING);
    private long timeSpentStreaming;
    private final List<String> fieldNames;
    private QueryStatistics statistics;

    public StatementImpl(long id, DatabaseReference database, Clock clock, BoltQueryExecution execution, StatementQuerySubscriber subscriber) {
        this.id = id;
        this.database = database;
        this.clock = clock;
        this.execution = execution;
        this.subscriber = subscriber;
        this.fieldNames = Arrays.asList(execution.queryExecution().fieldNames());
    }

    @Override
    public long id() {
        return this.id;
    }

    @Override
    public List<String> fieldNames() {
        return this.fieldNames;
    }

    @Override
    public long executionTime() {
        return this.timeSpentStreaming;
    }

    @Override
    public Optional<QueryStatistics> statistics() {
        return Optional.ofNullable(this.statistics);
    }

    @Override
    public boolean hasRemaining() {
        return this.state.get() == State.RUNNING;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void consume(ResponseHandler responseHandler, long n) throws StatementException {
        State state = this.state.get();
        if (state != State.RUNNING) {
            return;
        }
        this.executionLock.lock();
        try {
            RecordHandler recordHandler = responseHandler.onBeginStreaming(this.fieldNames);
            this.subscriber.setHandler(recordHandler);
            long start = this.clock.millis();
            QueryExecution query = this.execution.queryExecution();
            if (n == -1L) {
                try {
                    do {
                        query.request(Long.MAX_VALUE);
                        remaining = query.await();
                        this.subscriber.assertSuccess();
                    } while (remaining);
                }
                catch (Exception ex) {
                    throw new StatementStreamingException("Failed to consume all statement results", ex);
                }
                this.timeSpentStreaming += this.clock.millis() - start;
                this.complete(responseHandler, this.subscriber.getStatistics());
                responseHandler.onCompleteStreaming(false);
            } else {
                try {
                    query.request(n);
                    remaining = query.await();
                    this.subscriber.assertSuccess();
                }
                catch (Exception ex) {
                    throw new StatementStreamingException("Failed to consume statement results", ex);
                }
                this.timeSpentStreaming += this.clock.millis() - start;
                if (!remaining) {
                    this.complete(responseHandler, this.subscriber.getStatistics());
                }
                responseHandler.onCompleteStreaming(remaining);
            }
            this.subscriber.setHandler(null);
            Throwable pendingException = this.subscriber.getPendingException();
            if (pendingException != null) {
                throw new StatementStreamingException(pendingException);
            }
        }
        finally {
            this.executionLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void discard(ResponseHandler responseHandler, long n) throws StatementException {
        block7: {
            State state = this.state.get();
            if (state != State.RUNNING) {
                return;
            }
            this.executionLock.lock();
            try {
                long start = this.clock.millis();
                QueryExecution query = this.execution.queryExecution();
                if (n == -1L && query.executionMetadataAvailable() && query.executionType().queryType() == QueryExecutionType.QueryType.READ_ONLY) {
                    responseHandler.onBeginStreaming(this.fieldNames);
                    try {
                        query.cancel();
                        query.await();
                    }
                    catch (Exception ex) {
                        throw new StatementStreamingException("Failed to discard results", ex);
                    }
                    this.timeSpentStreaming += this.clock.millis() - start;
                    this.complete(responseHandler, QueryStatistics.EMPTY);
                    responseHandler.onCompleteStreaming(false);
                    break block7;
                }
                this.consume(new DiscardingRecordConsumer(responseHandler), n);
            }
            finally {
                this.executionLock.unlock();
            }
        }
    }

    private void complete(ResponseHandler handler, QueryStatistics statistics) {
        this.statistics = statistics;
        QueryExecution execution = this.execution.queryExecution();
        handler.onStreamingMetadata(this.timeSpentStreaming, execution.executionType(), this.database, this.statistics, execution.getNotifications(), execution.getGqlStatusObjects());
        QueryExecutionType executionType = execution.executionType();
        if (executionType.requestedExecutionPlanDescription()) {
            handler.onStreamingExecutionPlan(execution.executionPlanDescription());
        }
        if (this.state.compareAndSet(State.RUNNING, State.COMPLETED)) {
            this.eventPublisher.dispatch(l -> l.onCompleted(this));
        }
    }

    private boolean updateState(State targetState, Predicate<State> filter) {
        State previousState;
        do {
            if (filter.test(previousState = this.state.get())) continue;
            return false;
        } while (!this.state.compareAndSet(previousState, targetState));
        return true;
    }

    @Override
    public void terminate() {
        if (!this.updateState(State.TERMINATED, state -> state != State.TERMINATED && state != State.CLOSED)) {
            return;
        }
        this.execution.terminate();
        this.eventPublisher.dispatchSafe((ThrowingConsumer<Statement.Listener, Exception>)((ThrowingConsumer)l -> l.onTerminated(this)));
    }

    @Override
    public void close() {
        if (!this.updateState(State.CLOSED, state -> state != State.CLOSED)) {
            return;
        }
        try {
            this.execution.queryExecution().cancel();
            this.execution.queryExecution().awaitCleanup();
        }
        catch (Exception exception) {
            // empty catch block
        }
        this.executionLock.lock();
        try {
            this.execution.close();
        }
        finally {
            this.executionLock.unlock();
        }
        this.eventPublisher.dispatchSafe((ThrowingConsumer<Statement.Listener, Exception>)((ThrowingConsumer)l -> l.onClosed(this)));
    }

    @Override
    public void registerListener(Statement.Listener listener) {
        this.eventPublisher.registerListener(listener);
    }

    @Override
    public void removeListener(Statement.Listener listener) {
        this.eventPublisher.removeListener(listener);
    }

    static enum State {
        RUNNING,
        COMPLETED,
        TERMINATED,
        CLOSED;

    }

    static final class DiscardingRecordConsumer
    implements ResponseHandler {
        private final ResponseHandler delegate;

        public DiscardingRecordConsumer(ResponseHandler delegate) {
            this.delegate = delegate;
        }

        @Override
        public void onStatementPrepared(TransactionType transactionType, long statementId, long timeSpentPreparingResults, List<String> fieldNames) {
        }

        @Override
        public void onTransactionDatabase(String database) {
        }

        @Override
        public void onMetadata(String key, AnyValue value) {
            this.delegate.onMetadata(key, value);
        }

        @Override
        public RecordHandler onBeginStreaming(List<String> fieldNames) {
            this.delegate.onBeginStreaming(fieldNames);
            return NoopRecordHandler.getInstance();
        }

        @Override
        public void onStreamingMetadata(long timeSpentStreaming, QueryExecutionType executionType, DatabaseReference database, QueryStatistics statistics, Iterable<Notification> notifications, Iterable<GqlStatusObject> statuses) {
            this.delegate.onStreamingMetadata(timeSpentStreaming, executionType, database, statistics, notifications, statuses);
        }

        @Override
        public void onStreamingExecutionPlan(ExecutionPlanDescription plan) {
        }

        @Override
        public void onCompleteStreaming(boolean hasRemaining) {
            this.delegate.onCompleteStreaming(hasRemaining);
        }

        @Override
        public void onRoutingTable(String databaseName, MapValue routingTable) {
        }

        @Override
        public void onBookmark(String encodedBookmark) {
        }

        @Override
        public void onFailure(Error error) {
            this.delegate.onFailure(error);
        }

        @Override
        public void onIgnored() {
            this.delegate.onIgnored();
        }

        @Override
        public void onSuccess() {
            this.delegate.onSuccess();
        }
    }
}

