package com.amazonaws.services.kinesis.multilang;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.multilang.messages.CheckpointMessage;
import com.amazonaws.services.kinesis.multilang.messages.InitializeMessage;
import com.amazonaws.services.kinesis.multilang.messages.Message;
import com.amazonaws.services.kinesis.multilang.messages.ProcessRecordsMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownMessage;
import com.amazonaws.services.kinesis.multilang.messages.ShutdownRequestedMessage;
import com.amazonaws.services.kinesis.multilang.messages.StatusMessage;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/amazonaws/services/kinesis/multilang/MultiLangProtocol.class */
class MultiLangProtocol {
    private static final Log log = LogFactory.getLog(MultiLangProtocol.class);
    private MessageReader messageReader;
    private MessageWriter messageWriter;
    private final InitializationInput initializationInput;
    private KinesisClientLibConfiguration configuration;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/amazonaws/services/kinesis/multilang/MultiLangProtocol$FutureMethod.class */
    public interface FutureMethod<T> {
        T get() throws InterruptedException, TimeoutException, ExecutionException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MultiLangProtocol(MessageReader messageReader, MessageWriter messageWriter, InitializationInput initializationInput, KinesisClientLibConfiguration kinesisClientLibConfiguration) {
        this.messageReader = messageReader;
        this.messageWriter = messageWriter;
        this.initializationInput = initializationInput;
        this.configuration = kinesisClientLibConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean initialize() {
        return waitForStatusMessage(InitializeMessage.ACTION, null, this.messageWriter.writeInitializeMessage(this.initializationInput));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean processRecords(ProcessRecordsInput processRecordsInput) {
        return waitForStatusMessage(ProcessRecordsMessage.ACTION, processRecordsInput.getCheckpointer(), this.messageWriter.writeProcessRecordsMessage(processRecordsInput));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shutdown(IRecordProcessorCheckpointer iRecordProcessorCheckpointer, ShutdownReason shutdownReason) {
        return waitForStatusMessage(ShutdownMessage.ACTION, iRecordProcessorCheckpointer, this.messageWriter.writeShutdownMessage(shutdownReason));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean shutdownRequested(IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
        return waitForStatusMessage(ShutdownRequestedMessage.ACTION, iRecordProcessorCheckpointer, this.messageWriter.writeShutdownRequestedMessage());
    }

    private boolean waitForStatusMessage(String str, IRecordProcessorCheckpointer iRecordProcessorCheckpointer, Future<Boolean> future) {
        try {
            return waitForStatusMessage(str, iRecordProcessorCheckpointer) && future.get().booleanValue();
        } catch (InterruptedException e) {
            log.error(String.format("Interrupted while writing %s message for shard %s", str, this.initializationInput.getShardId()));
            return false;
        } catch (ExecutionException e2) {
            log.error(String.format("Failed to write %s message for shard %s", str, this.initializationInput.getShardId()), e2);
            return false;
        }
    }

    boolean waitForStatusMessage(String str, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
        Optional empty = Optional.empty();
        while (true) {
            Optional optional = empty;
            if (optional.isPresent()) {
                return validateStatusMessage((StatusMessage) optional.get(), str);
            }
            Future<Message> nextMessageFromSTDOUT = this.messageReader.getNextMessageFromSTDOUT();
            Optional<U> map = this.configuration.getTimeoutInSeconds().map(num -> {
                return futureMethod(() -> {
                    return (Message) nextMessageFromSTDOUT.get(num.intValue(), TimeUnit.SECONDS);
                }, str);
            });
            nextMessageFromSTDOUT.getClass();
            Optional optional2 = (Optional) map.orElse(futureMethod(nextMessageFromSTDOUT::get, str));
            if (!optional2.isPresent() || ((Boolean) optional2.filter(message -> {
                return message instanceof CheckpointMessage;
            }).map(message2 -> {
                return (CheckpointMessage) message2;
            }).flatMap(checkpointMessage -> {
                return futureMethod(() -> {
                    return checkpoint(checkpointMessage, iRecordProcessorCheckpointer).get();
                }, "Checkpoint");
            }).map(bool -> {
                return Boolean.valueOf(!bool.booleanValue());
            }).orElse(false)).booleanValue()) {
                return false;
            }
            empty = optional2.filter(message3 -> {
                return message3 instanceof StatusMessage;
            }).map(message4 -> {
                return (StatusMessage) message4;
            });
        }
    }

    private <T> Optional<T> futureMethod(FutureMethod<T> futureMethod, String str) {
        try {
            return Optional.of(futureMethod.get());
        } catch (InterruptedException e) {
            log.error(String.format("Interrupted while waiting for %s message for shard %s", str, this.initializationInput.getShardId()), e);
            return Optional.empty();
        } catch (ExecutionException e2) {
            log.error(String.format("Failed to get status message for %s action for shard %s", str, this.initializationInput.getShardId()), e2);
            return Optional.empty();
        } catch (TimeoutException e3) {
            log.error(String.format("Timedout to get status message for %s action for shard %s. Terminating...", str, this.initializationInput.getShardId()), e3);
            haltJvm(1);
            return Optional.empty();
        }
    }

    protected void haltJvm(int i) {
        Runtime.getRuntime().halt(i);
    }

    private boolean validateStatusMessage(StatusMessage statusMessage, String str) {
        log.info("Received response " + statusMessage + " from subprocess while waiting for " + str + " while processing shard " + this.initializationInput.getShardId());
        return (statusMessage == null || statusMessage.getResponseFor() == null || !statusMessage.getResponseFor().equals(str)) ? false : true;
    }

    private Future<Boolean> checkpoint(CheckpointMessage checkpointMessage, IRecordProcessorCheckpointer iRecordProcessorCheckpointer) {
        String sequenceNumber = checkpointMessage.getSequenceNumber();
        Long subSequenceNumber = checkpointMessage.getSubSequenceNumber();
        try {
            if (iRecordProcessorCheckpointer == null) {
                String format = String.format("Was asked to checkpoint at %s but no checkpointer was provided for shard %s", sequenceNumber, this.initializationInput.getShardId());
                log.error(format);
                return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, new InvalidStateException(format));
            }
            log.debug(logCheckpointMessage(sequenceNumber, subSequenceNumber));
            if (sequenceNumber == null) {
                iRecordProcessorCheckpointer.checkpoint();
            } else if (subSequenceNumber != null) {
                iRecordProcessorCheckpointer.checkpoint(sequenceNumber, subSequenceNumber.longValue());
            } else {
                iRecordProcessorCheckpointer.checkpoint(sequenceNumber);
            }
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, null);
        } catch (Throwable th) {
            return this.messageWriter.writeCheckpointMessageWithError(sequenceNumber, subSequenceNumber, th);
        }
    }

    private String logCheckpointMessage(String str, Long l) {
        return String.format("Attempting to checkpoint shard %s @ sequence number %s, and sub sequence number %s", this.initializationInput.getShardId(), str, l);
    }
}
