/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processor.util.listen;

import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.event.transport.message.ByteArrayMessage;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;

public abstract class EventBatcher<E extends ByteArrayMessage> {
    public static final int POLL_TIMEOUT_MS = 20;
    private volatile BlockingQueue<E> events;
    private volatile BlockingQueue<E> errorEvents;
    private final ComponentLog logger;

    public EventBatcher(ComponentLog logger, BlockingQueue events, BlockingQueue errorEvents) {
        this.logger = logger;
        this.events = events;
        this.errorEvents = errorEvents;
    }

    public Map<String, FlowFileEventBatch> getBatches(ProcessSession session, int totalBatchSize, final byte[] messageDemarcatorBytes) {
        E event;
        HashMap<String, FlowFileEventBatch> batches = new HashMap<String, FlowFileEventBatch>();
        for (int i = 0; i < totalBatchSize && (event = this.getMessage(true, true, session)) != null; ++i) {
            String batchKey = this.getBatchKey(event);
            FlowFileEventBatch batch = (FlowFileEventBatch)batches.get(batchKey);
            if (batch == null) {
                batch = new FlowFileEventBatch(session.create(), new ArrayList());
                batches.put(batchKey, batch);
            }
            batch.getEvents().add(event);
            final boolean writeDemarcator = i > 0;
            try {
                final byte[] rawMessage = event.getMessage();
                FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback(){

                    public void process(OutputStream out) throws IOException {
                        if (writeDemarcator) {
                            out.write(messageDemarcatorBytes);
                        }
                        out.write(rawMessage);
                    }
                });
                batch.setFlowFile(appendedFlowFile);
                continue;
            }
            catch (Exception e) {
                this.logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again", new Object[]{e.getMessage(), e});
                this.errorEvents.offer(event);
                break;
            }
        }
        return batches;
    }

    protected abstract String getBatchKey(E var1);

    protected E getMessage(boolean longPoll, boolean pollErrorQueue, ProcessSession session) {
        ByteArrayMessage event = null;
        if (pollErrorQueue) {
            event = (ByteArrayMessage)this.errorEvents.poll();
        }
        if (event != null) {
            return (E)event;
        }
        try {
            event = longPoll ? (ByteArrayMessage)this.events.poll(20L, TimeUnit.MILLISECONDS) : (ByteArrayMessage)this.events.poll();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
        if (event != null) {
            session.adjustCounter("Messages Received", 1L, false);
        }
        return (E)event;
    }
}

