/*
 * Decompiled with CFR 0.152.
 */
package org.influxdb.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.impl.BatchWriter;
import org.influxdb.impl.InfluxDBImpl;
import org.influxdb.impl.OneShotBatchWriter;
import org.influxdb.impl.Preconditions;
import org.influxdb.impl.RetryCapableBatchWriter;

public final class BatchProcessor {
    private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
    protected final BlockingQueue<AbstractBatchEntry> queue;
    private final ScheduledExecutorService scheduler;
    private final BiConsumer<Iterable<Point>, Throwable> exceptionHandler;
    final InfluxDBImpl influxDB;
    final int actions;
    private final TimeUnit flushIntervalUnit;
    private final int flushInterval;
    private final InfluxDB.ConsistencyLevel consistencyLevel;
    private final int jitterInterval;
    private final BatchWriter batchWriter;

    public static Builder builder(InfluxDB influxDB) {
        return new Builder(influxDB);
    }

    BatchProcessor(InfluxDBImpl influxDB, BatchWriter batchWriter, ThreadFactory threadFactory, int actions, TimeUnit flushIntervalUnit, int flushInterval, int jitterInterval, BiConsumer<Iterable<Point>, Throwable> exceptionHandler, InfluxDB.ConsistencyLevel consistencyLevel) {
        this.influxDB = influxDB;
        this.batchWriter = batchWriter;
        this.actions = actions;
        this.flushIntervalUnit = flushIntervalUnit;
        this.flushInterval = flushInterval;
        this.jitterInterval = jitterInterval;
        this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
        this.exceptionHandler = exceptionHandler;
        this.consistencyLevel = consistencyLevel;
        this.queue = actions > 1 && actions < Integer.MAX_VALUE ? new LinkedBlockingQueue<AbstractBatchEntry>(actions) : new LinkedBlockingQueue<AbstractBatchEntry>();
        Runnable flushRunnable = new Runnable(){

            @Override
            public void run() {
                BatchProcessor.this.write();
                int jitterInterval = (int)(Math.random() * (double)BatchProcessor.this.jitterInterval);
                BatchProcessor.this.scheduler.schedule(this, (long)(BatchProcessor.this.flushInterval + jitterInterval), BatchProcessor.this.flushIntervalUnit);
            }
        };
        this.scheduler.schedule(flushRunnable, (long)(this.flushInterval + (int)(Math.random() * (double)this.jitterInterval)), this.flushIntervalUnit);
    }

    void write() {
        ArrayList<Point> currentBatch = null;
        try {
            if (this.queue.isEmpty()) {
                this.batchWriter.write(Collections.emptyList());
                return;
            }
            HashMap<String, BatchPoints> batchKeyToBatchPoints = new HashMap<String, BatchPoints>();
            HashMap udpPortToBatchPoints = new HashMap();
            ArrayList batchEntries = new ArrayList(this.queue.size());
            this.queue.drainTo(batchEntries);
            currentBatch = new ArrayList<Point>(batchEntries.size());
            for (AbstractBatchEntry abstractBatchEntry : batchEntries) {
                Point point = abstractBatchEntry.getPoint();
                currentBatch.add(point);
                if (abstractBatchEntry instanceof HttpBatchEntry) {
                    HttpBatchEntry httpBatchEntry = (HttpBatchEntry)HttpBatchEntry.class.cast(abstractBatchEntry);
                    String dbName = httpBatchEntry.getDb();
                    String rp = httpBatchEntry.getRp();
                    String batchKey = dbName + "_" + rp;
                    if (!batchKeyToBatchPoints.containsKey(batchKey)) {
                        BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).consistency(this.getConsistencyLevel()).build();
                        batchKeyToBatchPoints.put(batchKey, batchPoints);
                    }
                    ((BatchPoints)batchKeyToBatchPoints.get(batchKey)).point(point);
                    continue;
                }
                if (!(abstractBatchEntry instanceof UdpBatchEntry)) continue;
                UdpBatchEntry udpBatchEntry = (UdpBatchEntry)UdpBatchEntry.class.cast(abstractBatchEntry);
                int udpPort = udpBatchEntry.getUdpPort();
                if (!udpPortToBatchPoints.containsKey(udpPort)) {
                    ArrayList batchPoints = new ArrayList();
                    udpPortToBatchPoints.put(udpPort, batchPoints);
                }
                ((List)udpPortToBatchPoints.get(udpPort)).add(point.lineProtocol());
            }
            this.batchWriter.write(batchKeyToBatchPoints.values());
            for (Map.Entry entry : udpPortToBatchPoints.entrySet()) {
                for (String lineprotocolStr : (List)entry.getValue()) {
                    this.influxDB.write((int)((Integer)entry.getKey()), lineprotocolStr);
                }
            }
        }
        catch (Throwable t) {
            this.exceptionHandler.accept(currentBatch, t);
            LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t);
        }
    }

    void put(AbstractBatchEntry batchEntry) {
        try {
            this.queue.put(batchEntry);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        if (this.queue.size() >= this.actions) {
            this.scheduler.submit(new Runnable(){

                @Override
                public void run() {
                    BatchProcessor.this.write();
                }
            });
        }
    }

    void flushAndShutdown() {
        this.write();
        this.scheduler.shutdown();
        this.batchWriter.close();
    }

    void flush() {
        this.write();
    }

    public InfluxDB.ConsistencyLevel getConsistencyLevel() {
        return this.consistencyLevel;
    }

    BatchWriter getBatchWriter() {
        return this.batchWriter;
    }

    static class UdpBatchEntry
    extends AbstractBatchEntry {
        private final int udpPort;

        public UdpBatchEntry(Point point, int udpPort) {
            super(point);
            this.udpPort = udpPort;
        }

        public int getUdpPort() {
            return this.udpPort;
        }
    }

    static class HttpBatchEntry
    extends AbstractBatchEntry {
        private final String db;
        private final String rp;

        public HttpBatchEntry(Point point, String db, String rp) {
            super(point);
            this.db = db;
            this.rp = rp;
        }

        public String getDb() {
            return this.db;
        }

        public String getRp() {
            return this.rp;
        }
    }

    static abstract class AbstractBatchEntry {
        private final Point point;

        public AbstractBatchEntry(Point point) {
            this.point = point;
        }

        public Point getPoint() {
            return this.point;
        }
    }

    public static final class Builder {
        private final InfluxDBImpl influxDB;
        private ThreadFactory threadFactory = Executors.defaultThreadFactory();
        private int actions;
        private TimeUnit flushIntervalUnit;
        private int flushInterval;
        private int jitterInterval;
        private int bufferLimit = 0;
        private BiConsumer<Iterable<Point>, Throwable> exceptionHandler = (entries, throwable) -> {};
        private InfluxDB.ConsistencyLevel consistencyLevel;

        public Builder threadFactory(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
            return this;
        }

        public Builder(InfluxDB influxDB) {
            this.influxDB = (InfluxDBImpl)influxDB;
        }

        public Builder actions(int maxActions) {
            this.actions = maxActions;
            return this;
        }

        public Builder interval(int interval, TimeUnit unit) {
            this.flushInterval = interval;
            this.flushIntervalUnit = unit;
            return this;
        }

        public Builder interval(int flushInterval, int jitterInterval, TimeUnit unit) {
            this.flushInterval = flushInterval;
            this.jitterInterval = jitterInterval;
            this.flushIntervalUnit = unit;
            return this;
        }

        public Builder bufferLimit(int bufferLimit) {
            this.bufferLimit = bufferLimit;
            return this;
        }

        public Builder exceptionHandler(BiConsumer<Iterable<Point>, Throwable> handler) {
            this.exceptionHandler = handler;
            return this;
        }

        public Builder consistencyLevel(InfluxDB.ConsistencyLevel consistencyLevel) {
            this.consistencyLevel = consistencyLevel;
            return this;
        }

        public BatchProcessor build() {
            Objects.requireNonNull(this.influxDB, "influxDB");
            Preconditions.checkPositiveNumber(this.actions, "actions");
            Preconditions.checkPositiveNumber(this.flushInterval, "flushInterval");
            Preconditions.checkNotNegativeNumber(this.jitterInterval, "jitterInterval");
            Preconditions.checkNotNegativeNumber(this.bufferLimit, "bufferLimit");
            Objects.requireNonNull(this.flushIntervalUnit, "flushIntervalUnit");
            Objects.requireNonNull(this.threadFactory, "threadFactory");
            Objects.requireNonNull(this.exceptionHandler, "exceptionHandler");
            BatchWriter batchWriter = this.bufferLimit > this.actions ? new RetryCapableBatchWriter(this.influxDB, this.exceptionHandler, this.bufferLimit, this.actions) : new OneShotBatchWriter(this.influxDB);
            return new BatchProcessor(this.influxDB, batchWriter, this.threadFactory, this.actions, this.flushIntervalUnit, this.flushInterval, this.jitterInterval, this.exceptionHandler, this.consistencyLevel);
        }
    }
}

