package com.kdgregory.logging.aws.kinesis;

import com.kdgregory.logging.aws.facade.KinesisFacade;
import com.kdgregory.logging.aws.facade.KinesisFacadeException;
import com.kdgregory.logging.aws.internal.AbstractLogWriter;
import com.kdgregory.logging.aws.internal.AbstractWriterConfig;
import com.kdgregory.logging.aws.kinesis.KinesisConstants;
import com.kdgregory.logging.common.LogMessage;
import com.kdgregory.logging.common.util.InternalLogger;
import com.kdgregory.logging.common.util.RetryManager2;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;

/* loaded from: input_file:com/kdgregory/logging/aws/kinesis/KinesisLogWriter.class */
public class KinesisLogWriter extends AbstractLogWriter<KinesisWriterConfig, KinesisWriterStatistics> {
    public static final String RANDOM_PARTITION_KEY_CONFIG = "{random}";
    private KinesisFacade facade;
    protected RetryManager2 describeRetry;
    protected RetryManager2 createRetry;
    protected RetryManager2 postCreateRetry;
    protected Duration sendTimeout;
    protected RetryManager2 sendRetry;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/kdgregory/logging/aws/kinesis/KinesisLogWriter$DefaultExceptionHandler.class */
    public static class DefaultExceptionHandler implements Consumer<RuntimeException> {
        private DefaultExceptionHandler() {
        }

        @Override // java.util.function.Consumer
        public void accept(RuntimeException runtimeException) {
            if (!(runtimeException instanceof KinesisFacadeException) || !((KinesisFacadeException) runtimeException).isRetryable()) {
                throw runtimeException;
            }
        }
    }

    public KinesisLogWriter(KinesisWriterConfig kinesisWriterConfig, KinesisWriterStatistics kinesisWriterStatistics, InternalLogger internalLogger, KinesisFacade kinesisFacade) {
        super(kinesisWriterConfig, kinesisWriterStatistics, internalLogger);
        this.describeRetry = new RetryManager2("describe", Duration.ofMillis(50L));
        this.createRetry = new RetryManager2("create", Duration.ofMillis(200L));
        this.postCreateRetry = new RetryManager2("describe", Duration.ofMillis(200L), false, true);
        this.sendTimeout = Duration.ofMillis(AbstractWriterConfig.DEFAULT_BATCH_DELAY);
        this.sendRetry = new RetryManager2("send", Duration.ofMillis(200L));
        this.facade = kinesisFacade;
        kinesisWriterStatistics.setActualStreamName(kinesisWriterConfig.getStreamName());
    }

    @Override // com.kdgregory.logging.common.LogWriter
    public int maxMessageSize() {
        return 1048576 - ((KinesisWriterConfig) this.config).getPartitionKeyHelper().getLength();
    }

    @Override // com.kdgregory.logging.aws.internal.AbstractLogWriter
    protected boolean ensureDestinationAvailable() {
        List<String> validate = ((KinesisWriterConfig) this.config).validate();
        if (!validate.isEmpty()) {
            Iterator<String> it = validate.iterator();
            while (it.hasNext()) {
                reportError("configuration error: " + it.next(), null);
            }
            return false;
        }
        Instant plusMillis = Instant.now().plusMillis(((KinesisWriterConfig) this.config).getInitializationTimeout());
        try {
            this.logger.debug("checking status of stream: " + ((KinesisWriterConfig) this.config).getStreamName());
            KinesisConstants.StreamStatus streamStatus = (KinesisConstants.StreamStatus) this.describeRetry.invoke(plusMillis, () -> {
                return this.facade.retrieveStreamStatus();
            });
            if (streamStatus == KinesisConstants.StreamStatus.ACTIVE) {
                return true;
            }
            if (streamStatus != KinesisConstants.StreamStatus.DOES_NOT_EXIST) {
                return waitForStreamToBeActive(plusMillis);
            }
            if (((KinesisWriterConfig) this.config).getAutoCreate()) {
                return createStream(plusMillis) && setRetentionPeriod(plusMillis);
            }
            reportError("stream \"" + ((KinesisWriterConfig) this.config).getStreamName() + "\" does not exist and auto-create not enabled", null);
            return false;
        } catch (Exception e) {
            reportError("exception during initialization", e);
            return false;
        }
    }

    @Override // com.kdgregory.logging.aws.internal.AbstractLogWriter
    protected List<LogMessage> sendBatch(List<LogMessage> list) {
        ((KinesisWriterStatistics) this.stats).setLastBatchSize(list.size());
        if (((KinesisWriterConfig) this.config).getEnableBatchLogging()) {
            this.logger.debug("about to write batch of " + list.size() + " message(s)");
        }
        if (list.isEmpty()) {
            return list;
        }
        try {
            List<LogMessage> list2 = (List) this.sendRetry.invoke(this.sendTimeout, () -> {
                try {
                    List<LogMessage> putRecords = this.facade.putRecords(list);
                    if (((KinesisWriterConfig) this.config).getEnableBatchLogging()) {
                        this.logger.debug("wrote batch of " + list.size() + " message(s); " + putRecords.size() + " rejected");
                    }
                    return putRecords;
                } catch (KinesisFacadeException e) {
                    if (e.getReason() == KinesisFacadeException.ReasonCode.THROTTLING) {
                        ((KinesisWriterStatistics) this.stats).incrementThrottledWrites();
                        return null;
                    }
                    if (e.isRetryable()) {
                        return null;
                    }
                    throw e;
                }
            });
            if (list2 != null) {
                return list2;
            }
            this.logger.warn("timeout while sending batch");
            return list;
        } catch (Exception e) {
            this.logger.error("exception while sending batch", e);
            return list;
        }
    }

    @Override // com.kdgregory.logging.aws.internal.AbstractLogWriter
    protected int effectiveSize(LogMessage logMessage) {
        return logMessage.size() + ((KinesisWriterConfig) this.config).getPartitionKeyHelper().getLength();
    }

    @Override // com.kdgregory.logging.aws.internal.AbstractLogWriter
    protected boolean withinServiceLimits(int i, int i2) {
        return i < 5242880 && i2 <= 500;
    }

    @Override // com.kdgregory.logging.aws.internal.AbstractLogWriter
    protected void stopAWSClient() {
        this.facade.shutdown();
    }

    private boolean createStream(Instant instant) {
        this.logger.debug("creating kinesis stream: " + ((KinesisWriterConfig) this.config).getStreamName());
        this.createRetry.invoke(instant, () -> {
            this.facade.createStream();
            return Boolean.TRUE;
        }, new DefaultExceptionHandler());
        return waitForStreamToBeActive(instant);
    }

    private boolean setRetentionPeriod(Instant instant) {
        if (((KinesisWriterConfig) this.config).getRetentionPeriod() == null) {
            return true;
        }
        this.logger.debug("setting retention period on stream \"" + ((KinesisWriterConfig) this.config).getStreamName() + "\" to " + ((KinesisWriterConfig) this.config).getRetentionPeriod() + " hours");
        this.createRetry.invoke(instant, () -> {
            this.facade.setRetentionPeriod();
            return Boolean.TRUE;
        }, new DefaultExceptionHandler());
        return waitForStreamToBeActive(instant);
    }

    private boolean waitForStreamToBeActive(Instant instant) {
        this.logger.debug("waiting for stream " + ((KinesisWriterConfig) this.config).getStreamName() + " to become active");
        if (((KinesisConstants.StreamStatus) this.postCreateRetry.invoke(instant, () -> {
            KinesisConstants.StreamStatus retrieveStreamStatus = this.facade.retrieveStreamStatus();
            if (retrieveStreamStatus == KinesisConstants.StreamStatus.ACTIVE) {
                return retrieveStreamStatus;
            }
            return null;
        })) == KinesisConstants.StreamStatus.ACTIVE) {
            return true;
        }
        this.logger.error("timeout waiting for stream " + ((KinesisWriterConfig) this.config).getStreamName() + " to become active", null);
        return false;
    }
}
