/*
 * Decompiled with CFR 0.152.
 */
package org.modeshape.jcr.journal;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.schematic.document.ThreadSafe;
import org.joda.time.DateTime;
import org.modeshape.common.SystemFailureException;
import org.modeshape.common.logging.Logger;
import org.modeshape.common.util.CheckArg;
import org.modeshape.jcr.JcrI18n;
import org.modeshape.jcr.cache.change.ChangeSet;
import org.modeshape.jcr.clustering.ClusteringService;
import org.modeshape.jcr.clustering.MessageConsumer;
import org.modeshape.jcr.journal.ChangeJournal;
import org.modeshape.jcr.journal.DeltaMessage;
import org.modeshape.jcr.journal.JournalRecord;
import org.modeshape.jcr.journal.LocalJournal;

@ThreadSafe
public class ClusteredJournal
extends MessageConsumer<DeltaMessage>
implements ChangeJournal {
    private static final Logger LOGGER = Logger.getLogger(ClusteredJournal.class);
    private static final int MAX_MINUTES_TO_WAIT_FOR_RECONCILIATION = 5;
    private final LocalJournal localJournal;
    private final ClusteringService clusteringService;
    private final AtomicInteger expectedNumberOfDeltaResponses;
    private CountDownLatch reconciliationLatch = null;

    public ClusteredJournal(LocalJournal localJournal, ClusteringService clusteringService) {
        super(DeltaMessage.class);
        CheckArg.isNotNull((Object)localJournal, (String)"localJournal");
        CheckArg.isNotNull((Object)clusteringService, (String)"clusteringService");
        this.clusteringService = clusteringService;
        this.localJournal = localJournal.withSearchTimeDelta(clusteringService.getMaxAllowedClockDelayMillis());
        this.expectedNumberOfDeltaResponses = new AtomicInteger(0);
    }

    @Override
    public void notify(ChangeSet changeSet) {
        this.localJournal.notify(changeSet);
    }

    @Override
    public void start() throws Exception {
        if (!this.clusteringService.isOpen()) {
            throw new IllegalStateException("The clustering service has not been started");
        }
        this.localJournal.start();
        this.clusteringService.addConsumer(this);
        if (this.clusteringService.multipleMembersInCluster()) {
            JournalRecord lastRecord;
            int numberOfExpectedResponses = this.clusteringService.membersInCluster() - 1;
            this.reconciliationLatch = new CountDownLatch(numberOfExpectedResponses);
            this.expectedNumberOfDeltaResponses.compareAndSet(0, numberOfExpectedResponses);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Sending delta request from journal {0} as part of cluster {1}", new Object[]{this.journalId(), this.clusterName()});
            }
            DateTime lastChangeSetTimeMillis = (lastRecord = this.lastRecord()) != null ? new DateTime(lastRecord.getChangeTimeMillis()) : null;
            this.clusteringService.sendMessage(DeltaMessage.request(this.journalId(), lastChangeSetTimeMillis));
            this.waitForReconciliationToComplete();
        } else {
            this.reconciliationLatch = new CountDownLatch(0);
        }
    }

    private void waitForReconciliationToComplete() {
        try {
            this.reconciliationLatch.await(5L, TimeUnit.MINUTES);
        }
        catch (InterruptedException e) {
            Thread.interrupted();
        }
        if (!this.deltaReconciliationCompleted()) {
            throw new SystemFailureException(JcrI18n.journalHasNotCompletedReconciliation.text(new Object[]{this.journalId(), this.clusterName(), 5}));
        }
    }

    @Override
    public void shutdown() {
        this.localJournal.shutdown();
    }

    @Override
    public void removeOldRecords() {
        this.localJournal.removeOldRecords();
    }

    @Override
    public ChangeJournal.Records allRecords(boolean descendingOrder) {
        return this.localJournal.allRecords(descendingOrder);
    }

    @Override
    public JournalRecord lastRecord() {
        return this.localJournal.lastRecord();
    }

    @Override
    public ChangeJournal.Records recordsNewerThan(DateTime changeSetTime, boolean inclusive, boolean descendingOrder) {
        return this.localJournal.recordsNewerThan(changeSetTime, inclusive, descendingOrder);
    }

    @Override
    public void addRecords(JournalRecord ... records) {
        this.localJournal.addRecords(records);
    }

    @Override
    public String journalId() {
        return this.localJournal.journalId();
    }

    @Override
    public void consume(DeltaMessage message) {
        if (message instanceof DeltaMessage.DeltaRequest) {
            this.processDeltaRequest((DeltaMessage.DeltaRequest)message);
            return;
        }
        if (message instanceof DeltaMessage.DeltaResponse) {
            this.processDeltaResponse((DeltaMessage.DeltaResponse)message);
            return;
        }
        if (message instanceof DeltaMessage.DeltaStillReconciling) {
            this.processStillReconciling((DeltaMessage.DeltaStillReconciling)message);
        }
    }

    private boolean deltaReconciliationCompleted() {
        return this.reconciliationLatch.getCount() == 0L;
    }

    private void processStillReconciling(DeltaMessage.DeltaStillReconciling message) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Journal {0} received response from journal {1}: still reconciling", new Object[]{this.journalId(), message.getJournalId()});
        }
        this.reconciliationLatch.countDown();
    }

    private void processDeltaRequest(DeltaMessage.DeltaRequest request) {
        if (request.getJournalId().equals(this.journalId())) {
            LOGGER.debug("Journal {0} discarding delta request from itself", new Object[]{this.journalId()});
            return;
        }
        if (!this.deltaReconciliationCompleted()) {
            LOGGER.debug("Journal {0} is still reconciling, cannot send delta to journal {1}", new Object[]{this.journalId(), request.getJournalId()});
            this.clusteringService.sendMessage(DeltaMessage.stillReconciling(this.journalId()));
            return;
        }
        ChangeJournal.Records delta = this.recordsNewerThan(request.getLastChangeSetTime(), false, false);
        ArrayList<JournalRecord> deltaList = new ArrayList<JournalRecord>(delta.size());
        for (JournalRecord record : delta) {
            deltaList.add(record);
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Journal {0} sending delta response to journal {1} as part of cluster {2}. Delta size is {3}", new Object[]{this.journalId(), request.getJournalId(), this.clusterName(), delta.size()});
        }
        this.clusteringService.sendMessage(DeltaMessage.response(this.journalId(), deltaList));
    }

    private void processDeltaResponse(DeltaMessage.DeltaResponse response) {
        List<JournalRecord> records = response.getRecords();
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Journal {0} received delta response from journal {1} as part of cluster {2}. Delta size: {3} records.", new Object[]{this.journalId(), response.getJournalId(), this.clusterName(), records.size()});
        }
        if (!records.isEmpty()) {
            this.localJournal.addRecords(records.toArray(new JournalRecord[0]));
        }
        this.reconciliationLatch.countDown();
    }

    protected ClusteringService clusteringService() {
        return this.clusteringService;
    }

    protected LocalJournal localJournal() {
        return this.localJournal;
    }

    protected String clusterName() {
        return this.clusteringService().clusterName();
    }
}

