/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.sqs.sink;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.sink.throwable.AWSExceptionHandler;
import org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.connector.sqs.sink.SqsExceptionClassifiers;
import org.apache.flink.connector.sqs.sink.SqsSinkException;
import org.apache.flink.connector.sqs.sink.client.SdkClientProvider;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.utils.SdkAutoCloseable;

@Internal
class SqsSinkWriter<InputT>
extends AsyncSinkWriter<InputT, SendMessageBatchRequestEntry> {
    private static final Logger LOG = LoggerFactory.getLogger(SqsSinkWriter.class);
    private final SdkClientProvider<SqsAsyncClient> clientProvider;
    private static final AWSExceptionHandler SQS_EXCEPTION_HANDLER = AWSExceptionHandler.withClassifier((FatalExceptionClassifier)FatalExceptionClassifier.createChain((FatalExceptionClassifier[])new FatalExceptionClassifier[]{AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier(), AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier(), SqsExceptionClassifiers.getResourceNotFoundExceptionClassifier(), SqsExceptionClassifiers.getAccessDeniedExceptionClassifier(), SqsExceptionClassifiers.getNotAuthorizedExceptionClassifier(), AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier()}));
    private final Counter numRecordsOutErrorsCounter;
    private final String sqsUrl;
    private final SinkWriterMetricGroup metrics;
    private final boolean failOnError;

    SqsSinkWriter(ElementConverter<InputT, SendMessageBatchRequestEntry> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, boolean failOnError, String sqsUrl, SdkClientProvider<SqsAsyncClient> clientProvider, Collection<BufferedRequestState<SendMessageBatchRequestEntry>> initialStates) {
        super(elementConverter, context, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(maxBatchSize).setMaxBatchSizeInBytes(maxBatchSizeInBytes).setMaxInFlightRequests(maxInFlightRequests).setMaxBufferedRequests(maxBufferedRequests).setMaxTimeInBufferMS(maxTimeInBufferMS).setMaxRecordSizeInBytes(maxRecordSizeInBytes).build(), initialStates);
        this.failOnError = failOnError;
        this.sqsUrl = sqsUrl;
        this.metrics = context.metricGroup();
        this.numRecordsOutErrorsCounter = this.metrics.getNumRecordsOutErrorsCounter();
        this.clientProvider = clientProvider;
    }

    protected void submitRequestEntries(List<SendMessageBatchRequestEntry> requestEntries, Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
        SendMessageBatchRequest batchRequest = (SendMessageBatchRequest)SendMessageBatchRequest.builder().entries(requestEntries).queueUrl(this.sqsUrl).build();
        CompletableFuture future = this.clientProvider.getClient().sendMessageBatch(batchRequest);
        ((CompletableFuture)future.whenComplete((response, err) -> {
            if (err != null) {
                this.handleFullyFailedRequest((Throwable)err, requestEntries, requestResult);
            } else if (response.failed() != null && response.failed().size() > 0) {
                this.handlePartiallyFailedRequest((SendMessageBatchResponse)response, requestEntries, requestResult);
            } else {
                requestResult.accept(Collections.emptyList());
            }
        })).exceptionally(ex -> {
            this.getFatalExceptionCons().accept(new SqsSinkException.SqsFailFastSinkException(ex.getMessage(), (Throwable)ex));
            return null;
        });
    }

    protected long getSizeInBytes(SendMessageBatchRequestEntry requestEntry) {
        return requestEntry.messageBody().getBytes(StandardCharsets.UTF_8).length;
    }

    public void close() {
        AWSGeneralUtil.closeResources((SdkAutoCloseable[])new SdkAutoCloseable[]{this.clientProvider});
    }

    private void handleFullyFailedRequest(Throwable err, List<SendMessageBatchRequestEntry> requestEntries, Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
        this.numRecordsOutErrorsCounter.inc((long)requestEntries.size());
        boolean isFatal = SQS_EXCEPTION_HANDLER.consumeIfFatal(err, this.getFatalExceptionCons());
        if (isFatal) {
            return;
        }
        if (this.failOnError) {
            this.getFatalExceptionCons().accept(new SqsSinkException.SqsFailFastSinkException(err));
            return;
        }
        LOG.warn("SQS Sink failed to write and will retry {} entries to SQS,  First request was {}", new Object[]{requestEntries.size(), requestEntries.get(0).toString(), err});
        requestResult.accept(requestEntries);
    }

    private void handlePartiallyFailedRequest(SendMessageBatchResponse response, List<SendMessageBatchRequestEntry> requestEntries, Consumer<List<SendMessageBatchRequestEntry>> requestResult) {
        LOG.warn("handlePartiallyFailedRequest: SQS Sink failed to write and will retry {} entries to SQS", (Object)response.failed().size());
        this.numRecordsOutErrorsCounter.inc((long)response.failed().size());
        if (this.failOnError) {
            this.getFatalExceptionCons().accept(new SqsSinkException.SqsFailFastSinkException());
            return;
        }
        ArrayList<SendMessageBatchRequestEntry> failedRequestEntries = new ArrayList<SendMessageBatchRequestEntry>(response.failed().size());
        for (BatchResultErrorEntry failedEntry : response.failed()) {
            Optional<SendMessageBatchRequestEntry> retryEntry = this.getFailedRecord(requestEntries, failedEntry.id());
            if (retryEntry.isPresent()) {
                failedRequestEntries.add(retryEntry.get());
                continue;
            }
            LOG.error("handlePartiallyFailedRequest: SQS Sink failed to retry unsuccessful SQS publish request due to invalid failed requestId");
            this.getFatalExceptionCons().accept(new SqsSinkException.SqsFailFastSinkException("SQS Sink failed to retry unsuccessful SQS publish request due to invalid failed requestId"));
            return;
        }
        requestResult.accept(failedRequestEntries);
    }

    private Optional<SendMessageBatchRequestEntry> getFailedRecord(List<SendMessageBatchRequestEntry> requestEntries, String selectedId) {
        for (SendMessageBatchRequestEntry entry : requestEntries) {
            if (!entry.id().equals(selectedId)) continue;
            return Optional.of(entry);
        }
        return Optional.empty();
    }
}

