/*
 * Decompiled with CFR 0.152.
 */
package org.red5.server.stream.consumer;

import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.mina.core.buffer.IoBuffer;
import org.red5.io.ITag;
import org.red5.io.ITagWriter;
import org.red5.io.flv.impl.FLVWriter;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IStreamFilenameGenerator;
import org.red5.server.api.stream.consumer.IFileConsumer;
import org.red5.server.messaging.IMessage;
import org.red5.server.messaging.IMessageComponent;
import org.red5.server.messaging.IPipe;
import org.red5.server.messaging.IPipeConnectionListener;
import org.red5.server.messaging.IPushableConsumer;
import org.red5.server.messaging.OOBControlMessage;
import org.red5.server.messaging.PipeConnectionEvent;
import org.red5.server.net.rtmp.event.IRTMPEvent;
import org.red5.server.net.rtmp.event.VideoData;
import org.red5.server.net.rtmp.message.Constants;
import org.red5.server.stream.DefaultStreamFilenameGenerator;
import org.red5.server.stream.IStreamData;
import org.red5.server.stream.consumer.ImmutableTag;
import org.red5.server.stream.consumer.QueuedMediaData;
import org.red5.server.stream.message.RTMPMessage;
import org.red5.server.stream.message.ResetMessage;
import org.red5.server.util.ScopeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

public class SlicedFileConsumer
implements Constants,
IPushableConsumer,
IPipeConnectionListener,
DisposableBean,
IFileConsumer {
    private static final Logger log = LoggerFactory.getLogger(SlicedFileConsumer.class);
    private AtomicBoolean initialized = new AtomicBoolean(false);
    private ScheduledExecutorService scheduledExecutorService;
    private int schedulerThreadSize = 1;
    private PriorityQueue<QueuedMediaData> queue;
    private ReentrantReadWriteLock reentrantLock = new ReentrantReadWriteLock();
    private volatile Lock writeLock = this.reentrantLock.writeLock();
    private volatile Lock readLock = this.reentrantLock.readLock();
    private IScope scope;
    private Path path;
    private ITagWriter writer;
    private String mode = "none";
    private int startTimestamp = -1;
    private ITag videoConfigurationTag;
    private ITag audioConfigurationTag;
    private int queueThreshold = -1;
    private int percentage = 25;
    private volatile int lastWrittenTs = -1;
    private volatile Future<?> writerFuture;
    private boolean waitForVideoKeyframe = true;
    private volatile boolean gotVideoKeyframe;

    public SlicedFileConsumer() {
    }

    public SlicedFileConsumer(IScope scope, File file) {
        this();
        this.scope = scope;
        this.path = file.toPath();
    }

    public SlicedFileConsumer(IScope scope, String fileName, String mode) {
        this();
        this.scope = scope;
        this.mode = mode;
        this.setupOutputPath(fileName);
    }

    @Override
    public void pushMessage(IPipe pipe, IMessage message) throws IOException {
        if (message instanceof RTMPMessage) {
            IRTMPEvent msg = ((RTMPMessage)message).getBody();
            byte dataType = msg.getDataType();
            int timestamp = msg.getTimestamp();
            log.trace("Data type: {} timestamp: {}", (Object)dataType, (Object)timestamp);
            if (this.queue == null) {
                this.queue = new PriorityQueue(this.queueThreshold <= 0 ? 11 : this.queueThreshold);
            }
            QueuedMediaData queued = null;
            if (msg instanceof IStreamData) {
                if (log.isTraceEnabled()) {
                    log.trace("Stream data, body saved. Data type: {} class type: {}", (Object)dataType, (Object)msg.getClass().getName());
                }
                if (msg instanceof VideoData) {
                    log.debug("pushMessage video - waitForVideoKeyframe: {} gotVideoKeyframe: {}", (Object)this.waitForVideoKeyframe, (Object)this.gotVideoKeyframe);
                    if (!this.gotVideoKeyframe) {
                        VideoData video = (VideoData)msg;
                        if (video.getFrameType() == VideoData.FrameType.KEYFRAME) {
                            log.debug("Got our first keyframe");
                            this.gotVideoKeyframe = true;
                        }
                        if (this.waitForVideoKeyframe && !this.gotVideoKeyframe) {
                            log.debug("Skipping video data since keyframe has not been written yet");
                            return;
                        }
                    }
                }
                queued = new QueuedMediaData(timestamp, dataType, (IStreamData)((Object)msg));
            } else {
                if (log.isTraceEnabled()) {
                    log.trace("Non-stream data, body not saved. Data type: {} class type: {}", (Object)dataType, (Object)msg.getClass().getName());
                }
                queued = new QueuedMediaData(timestamp, dataType);
            }
            if (queued != null) {
                this.writeLock.lock();
                try {
                    this.queue.add(queued);
                }
                finally {
                    this.writeLock.unlock();
                }
            }
            int queueSize = 0;
            this.readLock.lock();
            try {
                queueSize = this.queue.size();
            }
            finally {
                this.readLock.unlock();
            }
            if (this.writer == null) {
                this.init();
                if (msg instanceof VideoData) {
                    this.writeQueuedDataSlice(this.createTimestampLimitedSlice(msg.getTimestamp()));
                } else if (this.queueThreshold >= 0 && queueSize >= this.queueThreshold) {
                    this.writeQueuedDataSlice(this.createFixedLengthSlice(this.queueThreshold / (100 / this.percentage)));
                }
            }
        } else if (message instanceof ResetMessage) {
            this.startTimestamp = -1;
        } else if (log.isDebugEnabled()) {
            log.debug("Ignoring pushed message: {}", (Object)message);
        }
    }

    private void writeQueuedDataSlice(final QueuedMediaData[] slice) {
        if (this.acquireWriteFuture(slice.length)) {
            this.writerFuture = this.scheduledExecutorService.submit(new Runnable(){

                @Override
                public void run() {
                    log.trace("Spawning queue writer thread");
                    SlicedFileConsumer.this.doWrites(slice);
                }
            });
        } else {
            this.writeLock.lock();
            try {
                List<QueuedMediaData> unwritten = Arrays.asList(slice);
                for (QueuedMediaData queued : unwritten) {
                    if (!queued.hasData()) continue;
                    this.queue.add(queued);
                }
            }
            finally {
                this.writeLock.unlock();
            }
        }
    }

    private QueuedMediaData[] createFixedLengthSlice(int sliceLength) {
        log.debug("Creating data slice to write of length {}.", (Object)sliceLength);
        QueuedMediaData[] slice = new QueuedMediaData[sliceLength];
        log.trace("Slice length: {}", (Object)slice.length);
        this.writeLock.lock();
        try {
            if (log.isTraceEnabled()) {
                log.trace("Queue length: {}", (Object)this.queue.size());
            }
            int q = 0;
            while (q < sliceLength) {
                slice[q] = (QueuedMediaData)this.queue.remove();
                ++q;
            }
            if (log.isTraceEnabled()) {
                log.trace("Queue length (after removal): {}", (Object)this.queue.size());
            }
        }
        finally {
            this.writeLock.unlock();
        }
        return slice;
    }

    private QueuedMediaData[] createTimestampLimitedSlice(int timestamp) {
        log.debug("Creating data slice up until timestamp {}", (Object)timestamp);
        ArrayList<QueuedMediaData> slice = new ArrayList<QueuedMediaData>();
        this.writeLock.lock();
        try {
            if (log.isTraceEnabled()) {
                log.trace("Queue length: {}", (Object)this.queue.size());
            }
            if (!this.queue.isEmpty()) {
                while (!this.queue.isEmpty() && this.queue.peek().getTimestamp() <= timestamp) {
                    slice.add((QueuedMediaData)this.queue.remove());
                }
                if (log.isTraceEnabled()) {
                    log.trace("Queue length (after removal): {}", (Object)this.queue.size());
                }
            }
        }
        finally {
            this.writeLock.unlock();
        }
        return slice.toArray(new QueuedMediaData[slice.size()]);
    }

    private boolean acquireWriteFuture(int sliceLength) {
        if (sliceLength > 0) {
            Object writeResult = null;
            int timeout = sliceLength * 500;
            if (this.writerFuture != null) {
                try {
                    writeResult = this.writerFuture.get(timeout, TimeUnit.MILLISECONDS);
                }
                catch (Exception e) {
                    log.warn("Exception waiting for write result. Timeout: {}ms", (Object)timeout, (Object)e);
                    return false;
                }
            }
            log.debug("Write future result (expect null): {}", writeResult);
            return true;
        }
        return false;
    }

    @Override
    public void onOOBControlMessage(IMessageComponent source, IPipe pipe, OOBControlMessage oobCtrlMsg) {
    }

    @Override
    public void onPipeConnectionEvent(PipeConnectionEvent event) {
        switch (event.getType()) {
            case CONSUMER_CONNECT_PUSH: {
                Map<String, Object> paramMap;
                if (event.getConsumer() != this || (paramMap = event.getParamMap()) == null) break;
                this.mode = (String)paramMap.get("mode");
            }
        }
    }

    private void init() throws IOException {
        if (this.initialized.compareAndSet(false, true)) {
            log.debug("Init: {}", (Object)this.mode);
            this.scheduledExecutorService = Executors.newScheduledThreadPool(this.schedulerThreadSize, (ThreadFactory)new CustomizableThreadFactory("FileConsumerExecutor-"));
            if (this.path != null) {
                if (log.isDebugEnabled()) {
                    Path parent = this.path.getParent();
                    log.debug("Parent abs: {} dir: {}", (Object)parent.isAbsolute(), (Object)Files.isDirectory(parent, new LinkOption[0]));
                }
                if ("append".equals(this.mode)) {
                    if (Files.notExists(this.path, new LinkOption[0])) {
                        throw new IOException("File to be appended doesnt exist, verify the record mode");
                    }
                    log.debug("Path: {}\nRead: {} write: {} size: {}", new Object[]{this.path, Files.isReadable(this.path), Files.isWritable(this.path), Files.size(this.path)});
                    this.writer = new FLVWriter(this.path, true);
                } else if ("record".equals(this.mode)) {
                    try {
                        if (Files.deleteIfExists(this.path)) {
                            log.debug("File deleted");
                        }
                        Files.createDirectories(this.path.getParent(), new FileAttribute[0]);
                        this.path = Files.createFile(this.path, new FileAttribute[0]);
                    }
                    catch (IOException ioe) {
                        log.error("File creation error: {}", (Throwable)ioe);
                    }
                    if (!Files.isWritable(this.path)) {
                        throw new IOException("File is not writable");
                    }
                    log.debug("Path: {}\nRead: {} write: {}", new Object[]{this.path, Files.isReadable(this.path), Files.isWritable(this.path)});
                    this.writer = new FLVWriter(this.path, false);
                    if (this.audioConfigurationTag != null) {
                        this.writer.writeTag(this.audioConfigurationTag);
                    }
                    if (this.videoConfigurationTag != null) {
                        this.writer.writeTag(this.videoConfigurationTag);
                        this.gotVideoKeyframe = true;
                    }
                } else {
                    try {
                        if (Files.deleteIfExists(this.path)) {
                            log.debug("File deleted");
                        }
                    }
                    catch (IOException ioe) {
                        log.error("File creation error: {}", (Throwable)ioe);
                    }
                }
            } else {
                log.warn("Consumer is uninitialized");
            }
            log.debug("Init - complete");
        }
    }

    public void uninit() {
        if (this.initialized.get()) {
            log.debug("Uninit");
            if (this.writer != null) {
                if (this.writerFuture != null) {
                    try {
                        this.writerFuture.get();
                    }
                    catch (Exception e) {
                        log.warn("Exception waiting for write result on uninit", (Throwable)e);
                    }
                    if (this.writerFuture.cancel(false)) {
                        log.debug("Future completed");
                    }
                }
                this.writerFuture = null;
                this.doWrites();
                this.queue.clear();
                this.queue = null;
                this.writer.close();
                this.writer = null;
            }
            this.path = null;
        }
    }

    public final void doWrites() {
        Object[] slice = null;
        this.writeLock.lock();
        try {
            slice = this.queue.toArray(new QueuedMediaData[0]);
            if (this.queue.removeAll(Arrays.asList(slice))) {
                log.debug("Queued writes transfered, count: {}", (Object)slice.length);
            }
        }
        finally {
            this.writeLock.unlock();
        }
        Arrays.sort(slice);
        this.doWrites((QueuedMediaData[])slice);
    }

    public final void doWrites(QueuedMediaData[] slice) {
        QueuedMediaData[] queuedMediaDataArray = slice;
        int n = slice.length;
        int n2 = 0;
        while (n2 < n) {
            QueuedMediaData queued = queuedMediaDataArray[n2];
            int tmpTs = queued.getTimestamp();
            if (this.lastWrittenTs <= tmpTs) {
                if (queued.hasData()) {
                    this.write(queued);
                    this.lastWrittenTs = tmpTs;
                    queued.dispose();
                } else if (log.isTraceEnabled()) {
                    log.trace("Queued data was not available");
                }
            } else {
                queued.dispose();
            }
            ++n2;
        }
        slice = null;
    }

    private final void write(QueuedMediaData queued) {
        byte dataType = queued.getDataType();
        int timestamp = queued.getTimestamp();
        log.debug("Write - timestamp: {} type: {}", (Object)timestamp, (Object)dataType);
        ImmutableTag tag = queued.getData();
        if (tag != null && (tag.getBodySize() > 0 || dataType == 8)) {
            if (this.startTimestamp == -1) {
                this.startTimestamp = timestamp;
                timestamp = 0;
            } else {
                timestamp -= this.startTimestamp;
            }
            tag.setTimestamp(timestamp);
            try {
                try {
                    if (timestamp >= 0) {
                        if (!this.writer.writeTag((ITag)tag)) {
                            log.warn("Tag was not written");
                        }
                    } else {
                        log.warn("Skipping message with negative timestamp.");
                    }
                }
                catch (ClosedChannelException cce) {
                    log.error("The writer is no longer able to write to the file: {} writable: {}", (Object)this.path.getFileName(), (Object)this.path.toFile().canWrite());
                    queued.dispose();
                }
                catch (IOException e) {
                    log.warn("Error writing tag", (Throwable)e);
                    if (e.getCause() instanceof ClosedChannelException) {
                        log.error("The writer is no longer able to write to the file: {} writable: {}", (Object)this.path.getFileName(), (Object)this.path.toFile().canWrite());
                    }
                    queued.dispose();
                }
            }
            finally {
                queued.dispose();
            }
        }
    }

    public void setupOutputPath(String name) {
        IStreamFilenameGenerator generator = (IStreamFilenameGenerator)ScopeUtils.getScopeService(this.scope, IStreamFilenameGenerator.class, DefaultStreamFilenameGenerator.class);
        String filePath = generator.generateFilename(this.scope, name, ".flv", IStreamFilenameGenerator.GenerationType.RECORD);
        this.path = generator.resolvesToAbsolutePath() ? Paths.get(filePath, new String[0]) : Paths.get(System.getProperty("red5.root"), "webapps", this.scope.getContextPath(), filePath);
        File appendee = this.getFile();
        if ("append".equals(this.mode) && !appendee.exists()) {
            try {
                if (appendee.createNewFile()) {
                    log.debug("New file created for appending");
                } else {
                    log.debug("Failure to create new file for appending");
                }
            }
            catch (IOException e) {
                log.warn("Exception creating replacement file for append", (Throwable)e);
            }
        }
    }

    @Override
    public void setVideoDecoderConfiguration(IRTMPEvent decoderConfig) {
        if (decoderConfig instanceof IStreamData) {
            IoBuffer data = ((IStreamData)((Object)decoderConfig)).getData().asReadOnlyBuffer();
            this.videoConfigurationTag = ImmutableTag.build(decoderConfig.getDataType(), 0, data, 0);
        }
    }

    @Override
    public void setAudioDecoderConfiguration(IRTMPEvent decoderConfig) {
        if (decoderConfig instanceof IStreamData) {
            IoBuffer data = ((IStreamData)((Object)decoderConfig)).getData().asReadOnlyBuffer();
            this.audioConfigurationTag = ImmutableTag.build(decoderConfig.getDataType(), 0, data, 0);
        }
    }

    public void setScope(IScope scope) {
        this.scope = scope;
    }

    public void setFile(File file) {
        this.path = file.toPath();
    }

    public File getFile() {
        return this.path.toFile();
    }

    public void setQueueThreshold(int queueThreshold) {
        this.queueThreshold = queueThreshold;
    }

    public int getQueueThreshold() {
        return this.queueThreshold;
    }

    @Deprecated
    public boolean isDelayWrite() {
        return true;
    }

    @Deprecated
    public void setDelayWrite(boolean delayWrite) {
    }

    public void setWaitForVideoKeyframe(boolean waitForVideoKeyframe) {
        log.debug("setWaitForVideoKeyframe: {}", (Object)waitForVideoKeyframe);
        this.waitForVideoKeyframe = waitForVideoKeyframe;
    }

    public int getSchedulerThreadSize() {
        return this.schedulerThreadSize;
    }

    public void setSchedulerThreadSize(int schedulerThreadSize) {
        this.schedulerThreadSize = schedulerThreadSize;
    }

    public void setMode(String mode) {
        this.mode = mode;
    }

    public void destroy() throws Exception {
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdown();
        }
    }
}

