/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.catchup.tx;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.CatchupServerProtocol;
import org.neo4j.causalclustering.catchup.ResponseMessageType;
import org.neo4j.causalclustering.catchup.tx.TxPullRequest;
import org.neo4j.causalclustering.catchup.tx.TxPullRequestsMonitor;
import org.neo4j.causalclustering.catchup.tx.TxPullResponse;
import org.neo4j.causalclustering.catchup.tx.TxStreamFinishedResponse;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.kernel.impl.transaction.CommittedTransactionRepresentation;
import org.neo4j.kernel.impl.transaction.log.LogicalTransactionStore;
import org.neo4j.kernel.impl.transaction.log.NoSuchTransactionException;
import org.neo4j.kernel.impl.transaction.log.TransactionCursor;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class TxPullRequestHandler
extends SimpleChannelInboundHandler<TxPullRequest> {
    private final CatchupServerProtocol protocol;
    private final Supplier<StoreId> storeIdSupplier;
    private final BooleanSupplier databaseAvailable;
    private final int batchSize;
    private final TransactionIdStore transactionIdStore;
    private final LogicalTransactionStore logicalTransactionStore;
    private final TxPullRequestsMonitor monitor;
    private final Log log;

    public TxPullRequestHandler(CatchupServerProtocol protocol, Supplier<StoreId> storeIdSupplier, BooleanSupplier databaseAvailable, Supplier<TransactionIdStore> transactionIdStoreSupplier, Supplier<LogicalTransactionStore> logicalTransactionStoreSupplier, int batchSize, Monitors monitors, LogProvider logProvider) {
        this.protocol = protocol;
        this.storeIdSupplier = storeIdSupplier;
        this.databaseAvailable = databaseAvailable;
        this.batchSize = batchSize;
        this.transactionIdStore = transactionIdStoreSupplier.get();
        this.logicalTransactionStore = logicalTransactionStoreSupplier.get();
        this.monitor = (TxPullRequestsMonitor)monitors.newMonitor(TxPullRequestsMonitor.class, new String[0]);
        this.log = logProvider.getLog(((Object)((Object)this)).getClass());
    }

    protected void channelRead0(ChannelHandlerContext ctx, TxPullRequest msg) throws Exception {
        long firstTxId;
        long lastTxId = firstTxId = Math.max(msg.previousTxId(), 1L) + 1L;
        CatchupResult status = CatchupResult.SUCCESS_END_OF_STREAM;
        StoreId localStoreId = this.storeIdSupplier.get();
        long lastCommittedTransactionId = this.transactionIdStore.getLastCommittedTransactionId();
        if (localStoreId == null || !localStoreId.equals(msg.expectedStoreId())) {
            status = CatchupResult.E_STORE_ID_MISMATCH;
            this.log.info("Failed to serve TxPullRequest for tx %d and storeId %s because that storeId is different from this machine with %s", new Object[]{lastTxId, msg.expectedStoreId(), localStoreId});
        } else if (!this.databaseAvailable.getAsBoolean()) {
            status = CatchupResult.E_STORE_UNAVAILABLE;
            this.log.info("Failed to serve TxPullRequest for tx %d because the local database is unavailable.", new Object[]{lastTxId});
        } else if (lastCommittedTransactionId >= firstTxId) {
            try (TransactionCursor cursor = this.logicalTransactionStore.getTransactions(firstTxId);){
                status = CatchupResult.SUCCESS_END_OF_BATCH;
                for (int i = 0; i < this.batchSize; ++i) {
                    if (!cursor.next()) {
                        status = CatchupResult.SUCCESS_END_OF_STREAM;
                        break;
                    }
                    ctx.write((Object)ResponseMessageType.TX);
                    CommittedTransactionRepresentation tx = (CommittedTransactionRepresentation)cursor.get();
                    lastTxId = tx.getCommitEntry().getTxId();
                    ctx.write((Object)new TxPullResponse(localStoreId, tx));
                }
                ctx.flush();
            }
            catch (NoSuchTransactionException e) {
                status = CatchupResult.E_TRANSACTION_PRUNED;
                this.log.info("Failed to serve TxPullRequest for tx %d because the transaction does not exist.", new Object[]{lastTxId});
            }
        }
        ctx.write((Object)ResponseMessageType.TX_STREAM_FINISHED);
        TxStreamFinishedResponse response = new TxStreamFinishedResponse(status, lastCommittedTransactionId);
        ctx.write((Object)response);
        ctx.flush();
        this.monitor.increment();
        this.protocol.expect(CatchupServerProtocol.State.MESSAGE_TYPE);
    }
}

