/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.dynamodb.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.aws.util.AWSCredentialFatalExceptionClassifiers;
import org.apache.flink.connector.aws.util.AWSGeneralUtil;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkFatalExceptionClassifiers;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.dynamodb.sink.DynamoDbSinkException;
import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequest;
import org.apache.flink.connector.dynamodb.sink.DynamoDbWriteRequestType;
import org.apache.flink.connector.dynamodb.sink.client.SdkClientProvider;
import org.apache.flink.connector.dynamodb.util.PrimaryKeyBuilder;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableMap;
import org.apache.flink.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteRequest;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.PutRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
import software.amazon.awssdk.utils.SdkAutoCloseable;

@Internal
class DynamoDbSinkWriter<InputT>
extends AsyncSinkWriter<InputT, DynamoDbWriteRequest> {
    private static final Logger LOG = LoggerFactory.getLogger(DynamoDbSinkWriter.class);
    private static final FatalExceptionClassifier RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.withRootCauseOfType(ResourceNotFoundException.class, err -> new DynamoDbSinkException("Encountered non-recoverable exception relating to not being able to find the specified resources", (Throwable)err));
    private static final FatalExceptionClassifier CONDITIONAL_CHECK_FAILED_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.withRootCauseOfType(ConditionalCheckFailedException.class, err -> new DynamoDbSinkException("Encountered non-recoverable exception relating to failed conditional check", (Throwable)err));
    private static final FatalExceptionClassifier VALIDATION_EXCEPTION_CLASSIFIER = new FatalExceptionClassifier(err -> err instanceof DynamoDbException && ((DynamoDbException)err).awsErrorDetails().errorCode().equalsIgnoreCase("ValidationException"), err -> new DynamoDbSinkException("Encountered non-recoverable exception because of DynamoDB request validation", (Throwable)err));
    private static final FatalExceptionClassifier DYNAMODB_FATAL_EXCEPTION_CLASSIFIER = FatalExceptionClassifier.createChain((FatalExceptionClassifier[])new FatalExceptionClassifier[]{AsyncSinkFatalExceptionClassifiers.getInterruptedExceptionClassifier(), AWSCredentialFatalExceptionClassifiers.getInvalidCredentialsExceptionClassifier(), RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER, CONDITIONAL_CHECK_FAILED_EXCEPTION_CLASSIFIER, VALIDATION_EXCEPTION_CLASSIFIER, AWSCredentialFatalExceptionClassifiers.getSdkClientMisconfiguredExceptionClassifier()});
    private final Counter numRecordsSendErrorsCounter;
    private final Counter numRecordsSendPartialFailure;
    private final SinkWriterMetricGroup metrics;
    private final SdkClientProvider<DynamoDbAsyncClient> clientProvider;
    private final boolean failOnError;
    private final String tableName;
    private final List<String> overwriteByPartitionKeys;

    public DynamoDbSinkWriter(ElementConverter<InputT, DynamoDbWriteRequest> elementConverter, Sink.InitContext context, int maxBatchSize, int maxInFlightRequests, int maxBufferedRequests, long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, boolean failOnError, String tableName, List<String> overwriteByPartitionKeys, SdkClientProvider<DynamoDbAsyncClient> clientProvider, Collection<BufferedRequestState<DynamoDbWriteRequest>> states) {
        super(elementConverter, context, maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, states);
        this.failOnError = failOnError;
        this.tableName = tableName;
        this.overwriteByPartitionKeys = overwriteByPartitionKeys;
        this.metrics = context.metricGroup();
        this.numRecordsSendErrorsCounter = this.metrics.getNumRecordsSendErrorsCounter();
        this.numRecordsSendPartialFailure = this.metrics.counter("numRecordsSendPartialFailure");
        this.clientProvider = clientProvider;
    }

    protected void submitRequestEntries(List<DynamoDbWriteRequest> requestEntries, Consumer<List<DynamoDbWriteRequest>> requestResultConsumer) {
        ArrayList<Object> items = new ArrayList<Object>();
        if (CollectionUtil.isNullOrEmpty(this.overwriteByPartitionKeys)) {
            for (DynamoDbWriteRequest request : requestEntries) {
                items.add(this.convertToWriteRequest(request));
            }
        } else {
            HashMap<String, WriteRequest> container = new HashMap<String, WriteRequest>();
            PrimaryKeyBuilder keyBuilder = new PrimaryKeyBuilder(this.overwriteByPartitionKeys);
            for (DynamoDbWriteRequest request : requestEntries) {
                WriteRequest req = this.convertToWriteRequest(request);
                container.put(keyBuilder.build(req), req);
            }
            items.addAll(container.values());
        }
        CompletableFuture future = this.clientProvider.getClient().batchWriteItem((BatchWriteItemRequest)BatchWriteItemRequest.builder().requestItems((Map)ImmutableMap.of((Object)this.tableName, items)).build());
        future.whenComplete((response, err) -> {
            if (err != null) {
                this.handleFullyFailedRequest((Throwable)err, requestEntries, requestResultConsumer);
            } else if (!CollectionUtil.isNullOrEmpty((Map)response.unprocessedItems())) {
                this.handlePartiallyUnprocessedRequest((BatchWriteItemResponse)response, requestResultConsumer);
            } else {
                requestResultConsumer.accept(Collections.emptyList());
            }
        });
    }

    private void handlePartiallyUnprocessedRequest(BatchWriteItemResponse response, Consumer<List<DynamoDbWriteRequest>> requestResult) {
        ArrayList<DynamoDbWriteRequest> unprocessed = new ArrayList<DynamoDbWriteRequest>();
        for (WriteRequest writeRequest : (List)response.unprocessedItems().get(this.tableName)) {
            unprocessed.add(this.convertToDynamoDbWriteRequest(writeRequest));
        }
        LOG.warn("DynamoDB Sink failed to persist and will retry {} entries.", (Object)unprocessed.size());
        this.numRecordsSendErrorsCounter.inc((long)unprocessed.size());
        this.numRecordsSendPartialFailure.inc((long)unprocessed.size());
        requestResult.accept(unprocessed);
    }

    private void handleFullyFailedRequest(Throwable err, List<DynamoDbWriteRequest> requestEntries, Consumer<List<DynamoDbWriteRequest>> requestResult) {
        LOG.warn("DynamoDB Sink failed to persist and will retry {} entries.", (Object)requestEntries.size(), (Object)err);
        this.numRecordsSendErrorsCounter.inc((long)requestEntries.size());
        if (this.isRetryable(err.getCause())) {
            requestResult.accept(requestEntries);
        }
    }

    private boolean isRetryable(Throwable err) {
        if (!DYNAMODB_FATAL_EXCEPTION_CLASSIFIER.isFatal(err, this.getFatalExceptionCons())) {
            return false;
        }
        if (this.failOnError) {
            this.getFatalExceptionCons().accept(new DynamoDbSinkException.DynamoDbSinkFailFastException(err));
            return false;
        }
        return true;
    }

    protected long getSizeInBytes(DynamoDbWriteRequest requestEntry) {
        return 0L;
    }

    public void close() {
        AWSGeneralUtil.closeResources((SdkAutoCloseable[])new SdkAutoCloseable[]{this.clientProvider});
    }

    private WriteRequest convertToWriteRequest(DynamoDbWriteRequest dynamoDbWriteRequest) {
        if (dynamoDbWriteRequest.getType() == DynamoDbWriteRequestType.PUT) {
            return (WriteRequest)WriteRequest.builder().putRequest((PutRequest)PutRequest.builder().item(dynamoDbWriteRequest.getItem()).build()).build();
        }
        if (dynamoDbWriteRequest.getType() == DynamoDbWriteRequestType.DELETE) {
            return (WriteRequest)WriteRequest.builder().deleteRequest((DeleteRequest)DeleteRequest.builder().key(dynamoDbWriteRequest.getItem()).build()).build();
        }
        throw new IllegalArgumentException("Unsupported DynamoDb Write Request Type. consider updating the convertToWriteRequest method");
    }

    private DynamoDbWriteRequest convertToDynamoDbWriteRequest(WriteRequest writeRequest) {
        if (writeRequest.putRequest() != null) {
            return DynamoDbWriteRequest.builder().setItem(writeRequest.putRequest().item()).setType(DynamoDbWriteRequestType.PUT).build();
        }
        if (writeRequest.deleteRequest() != null) {
            return DynamoDbWriteRequest.builder().setItem(writeRequest.deleteRequest().key()).setType(DynamoDbWriteRequestType.DELETE).build();
        }
        throw new IllegalArgumentException("Unsupported Write Request, consider updating the convertToDynamoDbWriteRequest method");
    }
}

