/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.firehose.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.util.AWSAsyncSinkUtil;
import org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.firehose.sink.KinesisFirehoseException;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.awscore.client.builder.AwsAsyncClientBuilder;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.services.firehose.FirehoseAsyncClient;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchRequest;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponse;
import software.amazon.awssdk.services.firehose.model.PutRecordBatchResponseEntry;
import software.amazon.awssdk.services.firehose.model.Record;
import software.amazon.awssdk.services.firehose.model.ResourceNotFoundException;
import software.amazon.awssdk.utils.SdkAutoCloseable;

@Internal
class KinesisFirehoseSinkWriter<InputT>
extends AsyncSinkWriter<InputT, Record> {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisFirehoseSinkWriter.class);
    private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.withRootCauseOfType(ResourceNotFoundException.class, err -> new KinesisFirehoseException("Encountered non-recoverable exception relating to not being able to find the specified resources", (Throwable)err));
    private static final FatalExceptionClassifier FIREHOSE_FATAL_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.createChain((FatalExceptionClassifier[])new FatalExceptionClassifier[]{AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier(), AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier(), RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER, AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier()});
    private final Counter numRecordsOutErrorsCounter;
    private final String deliveryStreamName;
    private final SinkWriterMetricGroup metrics;
    private final SdkAsyncHttpClient httpClient;
    private final FirehoseAsyncClient firehoseClient;
    private final boolean failOnError;

    private static SdkAsyncHttpClient createHttpClient(Properties firehoseClientProperties) {
        return AWSGeneralUtil.createAsyncHttpClient((Properties)firehoseClientProperties);
    }

    private static FirehoseAsyncClient createFirehoseClient(Properties firehoseClientProperties, SdkAsyncHttpClient httpClient) {
        AWSGeneralUtil.validateAwsCredentials((Properties)firehoseClientProperties);
        return (FirehoseAsyncClient)AWSAsyncSinkUtil.createAwsAsyncClient((Properties)firehoseClientProperties, (SdkAsyncHttpClient)httpClient, (AwsAsyncClientBuilder)FirehoseAsyncClient.builder(), (String)"Apache Flink %s (%s) Firehose Connector", (String)"aws.firehose.client.user-agent-prefix");
    }

    KinesisFirehoseSinkWriter(ElementConverter<InputT, Record> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, boolean failOnError, String deliveryStreamName, Properties firehoseClientProperties) {
        this(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, failOnError, deliveryStreamName, firehoseClientProperties, Collections.emptyList());
    }

    KinesisFirehoseSinkWriter(ElementConverter<InputT, Record> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, boolean failOnError, String deliveryStreamName, Properties firehoseClientProperties, Collection<BufferedRequestState<Record>> initialStates) {
        super(elementConverter, context, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(maxBatchSize).setMaxBatchSizeInBytes(maxBatchSizeInBytes).setMaxInFlightRequests(maxInFlightRequests).setMaxBufferedRequests(maxBufferedRequests).setMaxTimeInBufferMS(maxTimeInBufferMS).setMaxRecordSizeInBytes(maxRecordSizeInBytes).build(), initialStates);
        this.failOnError = failOnError;
        this.deliveryStreamName = deliveryStreamName;
        this.metrics = context.metricGroup();
        this.numRecordsOutErrorsCounter = this.metrics.getNumRecordsOutErrorsCounter();
        this.httpClient = KinesisFirehoseSinkWriter.createHttpClient(firehoseClientProperties);
        this.firehoseClient = KinesisFirehoseSinkWriter.createFirehoseClient(firehoseClientProperties, this.httpClient);
    }

    protected void submitRequestEntries(List<Record> requestEntries, Consumer<List<Record>> requestResult) {
        PutRecordBatchRequest batchRequest = (PutRecordBatchRequest)PutRecordBatchRequest.builder().records(requestEntries).deliveryStreamName(this.deliveryStreamName).build();
        CompletableFuture future = this.firehoseClient.putRecordBatch(batchRequest);
        future.whenComplete((response, err) -> {
            if (err != null) {
                this.handleFullyFailedRequest((Throwable)err, requestEntries, requestResult);
            } else if (response.failedPutCount() > 0) {
                this.handlePartiallyFailedRequest((PutRecordBatchResponse)response, requestEntries, requestResult);
            } else {
                requestResult.accept(Collections.emptyList());
            }
        });
    }

    protected long getSizeInBytes(Record requestEntry) {
        return requestEntry.data().asByteArrayUnsafe().length;
    }

    public void close() {
        AWSGeneralUtil.closeResources((SdkAutoCloseable[])new SdkAutoCloseable[]{this.httpClient, this.firehoseClient});
    }

    private void handleFullyFailedRequest(Throwable err, List<Record> requestEntries, Consumer<List<Record>> requestResult) {
        LOG.debug("KDF Sink failed to write and will retry {} entries to KDF first request was {}", new Object[]{requestEntries.size(), requestEntries.get(0).toString(), err});
        this.numRecordsOutErrorsCounter.inc((long)requestEntries.size());
        if (this.isRetryable(err)) {
            requestResult.accept(requestEntries);
        }
    }

    private void handlePartiallyFailedRequest(PutRecordBatchResponse response, List<Record> requestEntries, Consumer<List<Record>> requestResult) {
        LOG.debug("KDF Sink failed to write and will retry {} entries to KDF first request was {}", (Object)requestEntries.size(), (Object)requestEntries.get(0).toString());
        this.numRecordsOutErrorsCounter.inc((long)response.failedPutCount().intValue());
        if (this.failOnError) {
            this.getFatalExceptionCons().accept(new KinesisFirehoseException.KinesisFirehoseFailFastException());
            return;
        }
        ArrayList<Record> failedRequestEntries = new ArrayList<Record>(response.failedPutCount());
        List records = response.requestResponses();
        for (int i = 0; i < records.size(); ++i) {
            if (((PutRecordBatchResponseEntry)records.get(i)).errorCode() == null) continue;
            failedRequestEntries.add(requestEntries.get(i));
        }
        requestResult.accept(failedRequestEntries);
    }

    private boolean isRetryable(Throwable err) {
        if (!FIREHOSE_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, this.getFatalExceptionCons())) {
            return false;
        }
        if (this.failOnError) {
            this.getFatalExceptionCons().accept(new KinesisFirehoseException.KinesisFirehoseFailFastException(err));
            return false;
        }
        return true;
    }
}

