/*
 * Decompiled with CFR 0.152.
 */
package org.rhq.server.metrics.aggregation;

import com.datastax.driver.core.ResultSet;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.PeekingIterator;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.joda.time.DateTime;
import org.rhq.server.metrics.StorageResultSetFuture;
import org.rhq.server.metrics.aggregation.AggregationType;
import org.rhq.server.metrics.aggregation.BaseAggregator;
import org.rhq.server.metrics.aggregation.CombinedMetricsIterator;
import org.rhq.server.metrics.aggregation.CombinedMetricsPair;
import org.rhq.server.metrics.aggregation.IndexAggregatesPair;
import org.rhq.server.metrics.aggregation.IndexEntriesLoader;
import org.rhq.server.metrics.aggregation.MetricsFuturesPair;
import org.rhq.server.metrics.aggregation.PersistFunctions;
import org.rhq.server.metrics.domain.AggregateNumericMetric;
import org.rhq.server.metrics.domain.CacheIndexEntry;
import org.rhq.server.metrics.domain.MetricsTable;
import org.rhq.server.metrics.domain.RawNumericMetric;
import org.rhq.server.metrics.domain.RawNumericMetricMapper;

class PastDataAggregator
extends BaseAggregator {
    private static final Log LOG = LogFactory.getLog(PastDataAggregator.class);
    private static final String DEBUG_TYPE = "past data";
    private DateTime startingDay;
    private DateTime currentDay;
    private PersistFunctions persistFns;
    private AtomicInteger rawSchedulesCount = new AtomicInteger();
    private AtomicInteger oneHourSchedulesCount = new AtomicInteger();
    private AtomicInteger sixHourScheduleCount = new AtomicInteger();

    PastDataAggregator() {
    }

    void setStartingDay(DateTime startingDay) {
        this.startingDay = startingDay;
    }

    void setCurrentDay(DateTime currentDay) {
        this.currentDay = currentDay;
    }

    void setPersistFns(PersistFunctions persistFns) {
        this.persistFns = persistFns;
    }

    @Override
    protected String getDebugType() {
        return DEBUG_TYPE;
    }

    @Override
    protected List<CacheIndexEntry> getIndexEntries() {
        IndexEntriesLoader loader = new IndexEntriesLoader(this.startTime, this.currentDay, this.dao);
        return loader.loadPastIndexEntries(this.startingDay);
    }

    @Override
    protected Iterable<CacheIndexEntry> reduceIndexEntries(List<CacheIndexEntry> indexEntries) {
        final PeekingIterator iterator = Iterators.peekingIterator(indexEntries.iterator());
        return new Iterable<CacheIndexEntry>(){

            @Override
            public Iterator<CacheIndexEntry> iterator() {
                return new Iterator<CacheIndexEntry>(){

                    @Override
                    public boolean hasNext() {
                        return iterator.hasNext();
                    }

                    @Override
                    public CacheIndexEntry next() {
                        CacheIndexEntry current = (CacheIndexEntry)iterator.next();
                        ArrayList<CacheIndexEntry> currentEntries = new ArrayList<CacheIndexEntry>();
                        currentEntries.add(current);
                        while (iterator.hasNext() && PastDataAggregator.this.isSameCollectionTimeSliceStartScheduleIdPair(current, (CacheIndexEntry)iterator.peek())) {
                            current = (CacheIndexEntry)iterator.next();
                            currentEntries.add(current);
                        }
                        if (PastDataAggregator.this.isDataInCache(currentEntries)) {
                            return PastDataAggregator.this.combineEntries(currentEntries, current.getCollectionTimeSlice());
                        }
                        return PastDataAggregator.this.combineEntries(currentEntries, 0L);
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    private boolean isSameCollectionTimeSliceStartScheduleIdPair(CacheIndexEntry left, CacheIndexEntry right) {
        return left.getCollectionTimeSlice() == right.getCollectionTimeSlice() && left.getStartScheduleId() == right.getStartScheduleId();
    }

    private boolean isDataInCache(List<CacheIndexEntry> indexEntries) {
        for (CacheIndexEntry indexEntry : indexEntries) {
            if (indexEntry.getCollectionTimeSlice() != indexEntry.getInsertTimeSlice()) continue;
            return true;
        }
        return false;
    }

    private CacheIndexEntry combineEntries(List<CacheIndexEntry> indexEntries, long insertTimeSlice) {
        CacheIndexEntry combinedEntry = new CacheIndexEntry();
        combinedEntry.setBucket(MetricsTable.RAW);
        combinedEntry.setDay(indexEntries.get(0).getDay());
        combinedEntry.setStartScheduleId(indexEntries.get(0).getStartScheduleId());
        combinedEntry.setCollectionTimeSlice(indexEntries.get(0).getCollectionTimeSlice());
        combinedEntry.setInsertTimeSlice(insertTimeSlice);
        combinedEntry.setScheduleIds(new HashSet<Integer>());
        for (CacheIndexEntry indexEntry : indexEntries) {
            combinedEntry.getScheduleIds().addAll(indexEntry.getScheduleIds());
        }
        return combinedEntry;
    }

    @Override
    protected BaseAggregator.AggregationTask createAggregationTask(CacheIndexEntry indexEntry) {
        return new BaseAggregator.AggregationTask(indexEntry){

            @Override
            public void run(CacheIndexEntry indexEntry) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Executing " + PastDataAggregator.this.getDebugType() + " aggregation task for " + indexEntry));
                }
                if (PastDataAggregator.this.cacheActive && indexEntry.getCollectionTimeSlice() == indexEntry.getInsertTimeSlice()) {
                    StorageResultSetFuture cacheFuture = PastDataAggregator.this.dao.findCacheEntriesAsync(PastDataAggregator.this.aggregationType.getCacheTable(), indexEntry.getCollectionTimeSlice(), indexEntry.getStartScheduleId());
                    PastDataAggregator.this.processRawDataCacheBlock(indexEntry, cacheFuture);
                } else {
                    ArrayList<StorageResultSetFuture> queryFutures = new ArrayList<StorageResultSetFuture>(5);
                    for (Integer scheduleId : indexEntry.getScheduleIds()) {
                        queryFutures.add(PastDataAggregator.this.dao.findRawMetricsAsync(scheduleId, indexEntry.getCollectionTimeSlice(), new DateTime(indexEntry.getCollectionTimeSlice()).plusHours(1).getMillis()));
                        if (queryFutures.size() != 5) continue;
                        PastDataAggregator.this.processBatch(queryFutures, indexEntry);
                        queryFutures = new ArrayList(5);
                    }
                    if (!queryFutures.isEmpty()) {
                        PastDataAggregator.this.processBatch(queryFutures, indexEntry);
                    }
                }
            }
        };
    }

    @Override
    protected Map<AggregationType, Integer> getAggregationCounts() {
        return ImmutableMap.of((Object)((Object)AggregationType.RAW), (Object)this.rawSchedulesCount.get(), (Object)((Object)AggregationType.ONE_HOUR), (Object)this.oneHourSchedulesCount.get(), (Object)((Object)AggregationType.SIX_HOUR), (Object)this.sixHourScheduleCount.get());
    }

    private void processBatch(List<StorageResultSetFuture> queryFutures, CacheIndexEntry indexEntry) {
        ListenableFuture<List<ResultSet>> insertsFuture;
        ListenableFuture queriesFuture = Futures.allAsList(queryFutures);
        ListenableFuture iterableFuture = Futures.transform((ListenableFuture)queriesFuture, this.toIterable(new RawNumericMetricMapper()), (Executor)this.aggregationTasks);
        ListenableFuture metricsFuture = Futures.transform((ListenableFuture)iterableFuture, this.computeAggregates(indexEntry.getCollectionTimeSlice(), RawNumericMetric.class), (Executor)this.aggregationTasks);
        ListenableFuture pairFuture = Futures.transform((ListenableFuture)metricsFuture, this.indexAggregatesPair(indexEntry));
        boolean is6HourTimeSliceFinished = this.dateTimeService.is6HourTimeSliceFinished(indexEntry.getCollectionTimeSlice());
        boolean is24HourTimeSliceFinished = this.dateTimeService.is24HourTimeSliceFinished(indexEntry.getCollectionTimeSlice());
        if (is6HourTimeSliceFinished) {
            ListenableFuture oneHourInsertsFuture = Futures.transform((ListenableFuture)pairFuture, this.persistFns.persist1HourMetrics(), (Executor)this.aggregationTasks);
            MetricsFuturesPair sixHourFuturesPair = this.process1HourData(indexEntry, this.proceedWithMetricsAfterInserts(new MetricsFuturesPair((ListenableFuture<List<ResultSet>>)oneHourInsertsFuture, (ListenableFuture<List<AggregateNumericMetric>>)metricsFuture)));
            if (is24HourTimeSliceFinished) {
                MetricsFuturesPair twentyFourHourFuturesPair = this.process6HourData(indexEntry, this.proceedWithMetricsAfterInserts(sixHourFuturesPair));
                insertsFuture = twentyFourHourFuturesPair.resultSetsFuture;
            } else {
                insertsFuture = sixHourFuturesPair.resultSetsFuture;
            }
        } else {
            ListenableFuture<List<ResultSet>> oneHourInsertsFuture;
            insertsFuture = oneHourInsertsFuture = Futures.transform((ListenableFuture)pairFuture, this.persistFns.persist1HourMetricsAndUpdateCache(), (Executor)this.aggregationTasks);
        }
        ListenableFuture deleteCacheFuture = Futures.transform((ListenableFuture)insertsFuture, this.deleteCacheEntry(indexEntry), (Executor)this.aggregationTasks);
        ListenableFuture deleteCacheIndexFuture = Futures.transform((ListenableFuture)deleteCacheFuture, this.deleteCacheIndexEntries(indexEntry), (Executor)this.aggregationTasks);
        this.aggregationTaskFinished((ListenableFuture<ResultSet>)deleteCacheIndexFuture, (ListenableFuture<IndexAggregatesPair>)pairFuture, is6HourTimeSliceFinished, is24HourTimeSliceFinished);
    }

    protected void processRawDataCacheBlock(CacheIndexEntry indexEntry, StorageResultSetFuture cacheFuture) {
        ListenableFuture<List<ResultSet>> insertsFuture;
        ListenableFuture iterableFuture = Futures.transform((ListenableFuture)cacheFuture, this.toIterable(this.aggregationType.getCacheMapper()), (Executor)this.aggregationTasks);
        ListenableFuture metricsFuture = Futures.transform((ListenableFuture)iterableFuture, this.computeAggregates(indexEntry.getCollectionTimeSlice(), RawNumericMetric.class), (Executor)this.aggregationTasks);
        ListenableFuture pairFuture = Futures.transform((ListenableFuture)metricsFuture, this.indexAggregatesPair(indexEntry));
        boolean is6HourTimeSliceFinished = this.dateTimeService.is6HourTimeSliceFinished(indexEntry.getCollectionTimeSlice());
        boolean is24HourTimeSliceFinished = this.dateTimeService.is24HourTimeSliceFinished(indexEntry.getCollectionTimeSlice());
        if (is6HourTimeSliceFinished) {
            ListenableFuture oneHourInsertsFuture = Futures.transform((ListenableFuture)pairFuture, this.persistFns.persist1HourMetrics(), (Executor)this.aggregationTasks);
            MetricsFuturesPair sixHourFuturesPair = this.process1HourData(indexEntry, this.proceedWithMetricsAfterInserts(new MetricsFuturesPair((ListenableFuture<List<ResultSet>>)oneHourInsertsFuture, (ListenableFuture<List<AggregateNumericMetric>>)metricsFuture)));
            if (is24HourTimeSliceFinished) {
                MetricsFuturesPair twentyFourHourFuturesPair = this.process6HourData(indexEntry, this.proceedWithMetricsAfterInserts(sixHourFuturesPair));
                insertsFuture = twentyFourHourFuturesPair.resultSetsFuture;
            } else {
                insertsFuture = sixHourFuturesPair.resultSetsFuture;
            }
        } else {
            ListenableFuture<List<ResultSet>> oneHourInsertsFuture;
            insertsFuture = oneHourInsertsFuture = Futures.transform((ListenableFuture)pairFuture, this.persistFns.persist1HourMetricsAndUpdateCache(), (Executor)this.aggregationTasks);
        }
        ListenableFuture deleteCacheFuture = Futures.transform((ListenableFuture)insertsFuture, this.deleteCacheEntry(indexEntry), (Executor)this.aggregationTasks);
        ListenableFuture deleteCacheIndexFuture = Futures.transform((ListenableFuture)deleteCacheFuture, this.deleteCacheIndexEntries(indexEntry), (Executor)this.aggregationTasks);
        this.aggregationTaskFinished((ListenableFuture<ResultSet>)deleteCacheIndexFuture, (ListenableFuture<IndexAggregatesPair>)pairFuture, is6HourTimeSliceFinished, is24HourTimeSliceFinished);
    }

    private Function<List<CombinedMetricsPair>, Iterable<List<AggregateNumericMetric>>> toIterable() {
        return new Function<List<CombinedMetricsPair>, Iterable<List<AggregateNumericMetric>>>(){

            public Iterable<List<AggregateNumericMetric>> apply(final List<CombinedMetricsPair> pairs) {
                return new Iterable<List<AggregateNumericMetric>>(){

                    @Override
                    public Iterator<List<AggregateNumericMetric>> iterator() {
                        return new CombinedMetricsIterator(pairs);
                    }
                };
            }
        };
    }

    protected AsyncFunction<ResultSet, ResultSet> deleteCacheIndexEntries(final CacheIndexEntry indexEntry) {
        return new AsyncFunction<ResultSet, ResultSet>(){

            public ListenableFuture<ResultSet> apply(ResultSet deleteCacheResultSet) throws Exception {
                return PastDataAggregator.this.dao.deleteCacheIndexEntries(PastDataAggregator.this.aggregationType.getCacheTable(), indexEntry.getDay(), indexEntry.getPartition(), indexEntry.getCollectionTimeSlice(), indexEntry.getStartScheduleId());
            }
        };
    }

    private void aggregationTaskFinished(ListenableFuture<ResultSet> deleteCacheIndexFuture, ListenableFuture<IndexAggregatesPair> pairFuture, final boolean oneHourDataAggregated, final boolean sixHourDataAggregated) {
        ListenableFuture argsFuture = Futures.allAsList((ListenableFuture[])new ListenableFuture[]{deleteCacheIndexFuture, pairFuture});
        Futures.addCallback((ListenableFuture)argsFuture, (FutureCallback)new BaseAggregator.AggregationTaskFinishedCallback<List<Object>>(){

            @Override
            protected void onFinish(List<Object> args) {
                IndexAggregatesPair pair = (IndexAggregatesPair)args.get(1);
                if (LOG.isDebugEnabled()) {
                    LOG.debug((Object)("Finished batch for " + pair.cacheIndexEntry));
                }
                PastDataAggregator.this.rawSchedulesCount.addAndGet(pair.metrics.size());
                if (oneHourDataAggregated) {
                    PastDataAggregator.this.oneHourSchedulesCount.addAndGet(pair.metrics.size());
                }
                if (sixHourDataAggregated) {
                    PastDataAggregator.this.sixHourScheduleCount.addAndGet(pair.metrics.size());
                }
            }
        }, (Executor)this.aggregationTasks);
    }

    private MetricsFuturesPair process1HourData(CacheIndexEntry indexEntry, ListenableFuture<List<AggregateNumericMetric>> metricsFuture) {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Processing 1 hour data for " + indexEntry));
        }
        DateTime sixHourTimeSlice = this.dateTimeService.get6HourTimeSlice(new DateTime(indexEntry.getCollectionTimeSlice()));
        boolean is24HourTimeSliceFinished = this.dateTimeService.is24HourTimeSliceFinished(new DateTime(indexEntry.getCollectionTimeSlice()));
        ListenableFuture pairFutures = Futures.transform(metricsFuture, this.fetch1HourData(sixHourTimeSlice), (Executor)this.aggregationTasks);
        ListenableFuture iterableFuture = Futures.transform((ListenableFuture)pairFutures, this.toIterable(), (Executor)this.aggregationTasks);
        ListenableFuture sixHourMetricsFuture = Futures.transform((ListenableFuture)iterableFuture, this.computeAggregates(sixHourTimeSlice.getMillis(), AggregateNumericMetric.class), (Executor)this.aggregationTasks);
        ListenableFuture pairFuture = Futures.transform((ListenableFuture)sixHourMetricsFuture, this.indexAggregatesPair(indexEntry));
        ListenableFuture insertsFuture = is24HourTimeSliceFinished ? Futures.transform((ListenableFuture)pairFuture, this.persistFns.persist6HourMetrics(), (Executor)this.aggregationTasks) : Futures.transform((ListenableFuture)pairFuture, this.persistFns.persist6HourMetricsAndUpdateCache(), (Executor)this.aggregationTasks);
        return new MetricsFuturesPair((ListenableFuture<List<ResultSet>>)insertsFuture, (ListenableFuture<List<AggregateNumericMetric>>)sixHourMetricsFuture);
    }

    private MetricsFuturesPair process6HourData(CacheIndexEntry indexEntry, ListenableFuture<List<AggregateNumericMetric>> sixHourMetricsFuture) {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Processing 6 hour data for " + indexEntry));
        }
        DateTime timeSlice = this.dateTimeService.get24HourTimeSlice(indexEntry.getCollectionTimeSlice());
        ListenableFuture pairFutures = Futures.transform(sixHourMetricsFuture, this.fetch6HourData(timeSlice));
        ListenableFuture iterableFuture = Futures.transform((ListenableFuture)pairFutures, this.toIterable(), (Executor)this.aggregationTasks);
        ListenableFuture twentyFourHourMetricsFuture = Futures.transform((ListenableFuture)iterableFuture, this.computeAggregates(timeSlice.getMillis(), AggregateNumericMetric.class), (Executor)this.aggregationTasks);
        ListenableFuture pairFuture = Futures.transform((ListenableFuture)twentyFourHourMetricsFuture, this.indexAggregatesPair(indexEntry));
        ListenableFuture insertsFuture = Futures.transform((ListenableFuture)pairFuture, this.persistFns.persist24HourMetrics(), (Executor)this.aggregationTasks);
        return new MetricsFuturesPair((ListenableFuture<List<ResultSet>>)insertsFuture, (ListenableFuture<List<AggregateNumericMetric>>)twentyFourHourMetricsFuture);
    }

    private ListenableFuture<List<AggregateNumericMetric>> proceedWithMetricsAfterInserts(MetricsFuturesPair pair) {
        ListenableFuture futures = Futures.allAsList((ListenableFuture[])new ListenableFuture[]{pair.resultSetsFuture, pair.metricsFuture});
        return Futures.transform((ListenableFuture)futures, (Function)new Function<List<List<?>>, List<AggregateNumericMetric>>(){

            public List<AggregateNumericMetric> apply(List<List<?>> input) {
                return input.get(1);
            }
        });
    }

    private AsyncFunction<List<AggregateNumericMetric>, List<CombinedMetricsPair>> fetch1HourData(final DateTime timeSliceStart) {
        return new AsyncFunction<List<AggregateNumericMetric>, List<CombinedMetricsPair>>(){
            final DateTime timeSliceEnd;
            {
                this.timeSliceEnd = PastDataAggregator.this.dateTimeService.get6HourTimeSliceEnd(timeSliceStart);
            }

            public ListenableFuture<List<CombinedMetricsPair>> apply(List<AggregateNumericMetric> metrics) {
                ArrayList<ListenableFuture> pairFutures = new ArrayList<ListenableFuture>();
                for (AggregateNumericMetric metric : metrics) {
                    StorageResultSetFuture queryFuture = PastDataAggregator.this.dao.findOneHourMetricsAsync(metric.getScheduleId(), timeSliceStart.getMillis(), this.timeSliceEnd.getMillis());
                    ListenableFuture pairFuture = Futures.transform((ListenableFuture)queryFuture, (Function)PastDataAggregator.this.combineMetrics(metric));
                    pairFutures.add(pairFuture);
                }
                return Futures.allAsList(pairFutures);
            }
        };
    }

    private AsyncFunction<List<AggregateNumericMetric>, List<CombinedMetricsPair>> fetch6HourData(final DateTime timeSliceStart) {
        final DateTime timeSliceEnd = this.dateTimeService.get24HourTimeSliceEnd(timeSliceStart);
        return new AsyncFunction<List<AggregateNumericMetric>, List<CombinedMetricsPair>>(){

            public ListenableFuture<List<CombinedMetricsPair>> apply(List<AggregateNumericMetric> metrics) throws Exception {
                ArrayList<ListenableFuture> pairFutures = new ArrayList<ListenableFuture>();
                for (AggregateNumericMetric metric : metrics) {
                    StorageResultSetFuture queryFuture = PastDataAggregator.this.dao.findSixHourMetricsAsync(metric.getScheduleId(), timeSliceStart.getMillis(), timeSliceEnd.getMillis());
                    ListenableFuture pairFuture = Futures.transform((ListenableFuture)queryFuture, (Function)PastDataAggregator.this.combineMetrics(metric));
                    pairFutures.add(pairFuture);
                }
                return Futures.allAsList(pairFutures);
            }
        };
    }

    private Function<ResultSet, CombinedMetricsPair> combineMetrics(final AggregateNumericMetric metric) {
        return new Function<ResultSet, CombinedMetricsPair>(){

            public CombinedMetricsPair apply(ResultSet resultSet) {
                return new CombinedMetricsPair(resultSet, metric);
            }
        };
    }
}

