package io.trino.plugin.exchange.filesystem.s3;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageBatch;
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.base.Strings;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.concurrent.MoreFutures;
import io.airlift.concurrent.Threads;
import io.airlift.log.Logger;
import io.airlift.slice.SizeOf;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.Slices;
import io.airlift.units.Duration;
import io.trino.annotation.NotThreadSafe;
import io.trino.plugin.exchange.filesystem.ExchangeSourceFile;
import io.trino.plugin.exchange.filesystem.ExchangeStorageReader;
import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
import io.trino.plugin.exchange.filesystem.FileStatus;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeFutures;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeManager;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
import io.trino.plugin.exchange.filesystem.MetricsBuilder;
import io.trino.plugin.exchange.filesystem.s3.ExchangeS3Config;
import io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper;
import jakarta.annotation.PreDestroy;
import java.io.ByteArrayInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.awscore.endpoint.AwsClientEndpointProvider;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.internal.retry.SdkDefaultRetryStrategy;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.StorageClass;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.StsClientBuilder;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;

/* loaded from: input_file:io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage.class */
public class S3FileSystemExchangeStorage implements FileSystemExchangeStorage {
    private static final Logger log = Logger.get(S3FileSystemExchangeStorage.class);
    private final S3FileSystemExchangeStorageStats stats;
    private final Optional<Region> region;
    private final Optional<String> endpoint;
    private final int multiUploadPartSize;
    private final S3AsyncClient s3AsyncClient;
    private final StorageClass storageClass;
    private final CompatibilityMode compatibilityMode;
    private final S3SseContext s3SseContext;
    private final Optional<Storage> gcsClient;
    private final Optional<ListeningExecutorService> gcsDeleteExecutor;

    /* loaded from: input_file:io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$CompatibilityMode.class */
    public enum CompatibilityMode {
        AWS,
        GCP
    }

    @ThreadSafe
    /* loaded from: input_file:io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$S3ExchangeStorageReader.class */
    private static class S3ExchangeStorageReader implements ExchangeStorageReader {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(S3ExchangeStorageReader.class);
        private final S3FileSystemExchangeStorageStats stats;
        private final S3AsyncClient s3AsyncClient;
        MetricsBuilder.CounterMetricBuilder sourceFilesProcessedMetric;
        MetricsBuilder.DistributionMetricBuilder s3GetObjectRequestsSuccessMetric;
        MetricsBuilder.DistributionMetricBuilder s3GetObjectRequestsFailedMetric;
        private final int partSize;
        private final int bufferSize;

        @GuardedBy("this")
        private final Queue<ExchangeSourceFile> sourceFiles;

        @GuardedBy("this")
        private ExchangeSourceFile currentFile;

        @GuardedBy("this")
        private long fileOffset;

        @GuardedBy("this")
        private SliceInput sliceInput;
        private volatile boolean closed;
        private volatile long bufferRetainedSize;

        @GuardedBy("this")
        private int sliceSize = -1;
        private volatile ListenableFuture<Void> inProgressReadFuture = Futures.immediateVoidFuture();

        public S3ExchangeStorageReader(S3FileSystemExchangeStorageStats s3FileSystemExchangeStorageStats, S3AsyncClient s3AsyncClient, int i, List<ExchangeSourceFile> list, MetricsBuilder metricsBuilder, int i2) {
            this.stats = (S3FileSystemExchangeStorageStats) Objects.requireNonNull(s3FileSystemExchangeStorageStats, "stats is null");
            this.s3AsyncClient = (S3AsyncClient) Objects.requireNonNull(s3AsyncClient, "s3AsyncClient is null");
            this.partSize = i;
            this.sourceFiles = new ArrayDeque((Collection) Objects.requireNonNull(list, "sourceFiles is null"));
            Objects.requireNonNull(metricsBuilder, "metricsBuilder is null");
            this.sourceFilesProcessedMetric = metricsBuilder.getCounterMetric(MetricsBuilder.SOURCE_FILES_PROCESSED);
            this.s3GetObjectRequestsSuccessMetric = metricsBuilder.getDistributionMetric("FileSystemExchangeSource.s3GetObjectRequestsSuccess");
            this.s3GetObjectRequestsFailedMetric = metricsBuilder.getDistributionMetric("FileSystemExchangeSource.s3GetObjectRequestsFailed");
            this.bufferSize = i2 + i;
            fillBuffer();
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader
        public synchronized Slice read() throws IOException {
            if (this.closed || !this.inProgressReadFuture.isDone()) {
                return null;
            }
            try {
                MoreFutures.getFutureValue(this.inProgressReadFuture);
                if (this.sliceSize < 0) {
                    this.sliceSize = this.sliceInput.readInt();
                }
                Slice readSlice = this.sliceInput.readSlice(this.sliceSize);
                if (this.sliceInput.available() > 4) {
                    this.sliceSize = this.sliceInput.readInt();
                    if (this.sliceInput.available() < this.sliceSize) {
                        fillBuffer();
                    }
                } else {
                    this.sliceSize = -1;
                    fillBuffer();
                }
                return readSlice;
            } catch (RuntimeException e) {
                throw new IOException(e);
            }
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader
        public ListenableFuture<Void> isBlocked() {
            return this.inProgressReadFuture;
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader
        public long getRetainedSize() {
            return INSTANCE_SIZE + this.bufferRetainedSize;
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader
        public boolean isFinished() {
            return this.closed;
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader, java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.currentFile = null;
            this.sliceInput = null;
            this.bufferRetainedSize = 0L;
            this.inProgressReadFuture.cancel(true);
            this.inProgressReadFuture = Futures.immediateVoidFuture();
        }

        @GuardedBy("this")
        private void fillBuffer() {
            if (this.currentFile == null || this.fileOffset == this.currentFile.getFileSize()) {
                this.currentFile = this.sourceFiles.poll();
                if (this.currentFile == null) {
                    close();
                    return;
                }
                this.fileOffset = 0L;
            }
            byte[] bArr = new byte[this.bufferSize];
            int i = 0;
            if (this.sliceInput != null) {
                int available = this.sliceInput.available();
                this.sliceInput.readBytes(bArr, 0, available);
                i = 0 + available;
            }
            ImmutableList.Builder builder = ImmutableList.builder();
            while (true) {
                long fileSize = this.currentFile.getFileSize();
                int length = (bArr.length - i) / this.partSize;
                if (length == 0) {
                    if (bArr.length - i < fileSize - this.fileOffset) {
                        break;
                    } else {
                        length = 1;
                    }
                }
                String keyFromUri = S3FileSystemExchangeStorage.keyFromUri(this.currentFile.getFileUri());
                String bucketName = S3FileSystemExchangeStorage.getBucketName(this.currentFile.getFileUri());
                for (int i2 = 0; i2 < length && this.fileOffset < fileSize; i2++) {
                    int min = (int) Math.min(this.partSize, fileSize - this.fileOffset);
                    GetObjectRequest.Builder bucket = GetObjectRequest.builder().key(keyFromUri).bucket(bucketName);
                    long j = this.fileOffset;
                    long j2 = (this.fileOffset + min) - 1;
                    ListenableFuture listenableFuture = MoreFutures.toListenableFuture(this.s3AsyncClient.getObject((GetObjectRequest) bucket.range("bytes=" + j + "-" + bucket).build(), BufferWriteAsyncResponseTransformer.toBufferWrite(bArr, i)));
                    this.stats.getGetObject().record(listenableFuture);
                    this.stats.getGetObjectDataSizeInBytes().add(min);
                    S3FileSystemExchangeStorage.recordDistributionMetric(listenableFuture, this.s3GetObjectRequestsSuccessMetric, this.s3GetObjectRequestsFailedMetric);
                    builder.add(listenableFuture);
                    i += min;
                    this.fileOffset += min;
                }
                if (this.fileOffset == fileSize) {
                    this.sourceFilesProcessedMetric.increment();
                    this.currentFile = this.sourceFiles.poll();
                    if (this.currentFile == null) {
                        break;
                    } else {
                        this.fileOffset = 0L;
                    }
                }
            }
            this.inProgressReadFuture = MoreFutures.asVoid(Futures.allAsList(builder.build()));
            this.sliceInput = Slices.wrappedBuffer(bArr, 0, i).getInput();
            this.bufferRetainedSize = this.sliceInput.getRetainedSize();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$S3ExchangeStorageWriter.class */
    public static class S3ExchangeStorageWriter implements ExchangeStorageWriter {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(S3ExchangeStorageWriter.class);
        private final S3FileSystemExchangeStorageStats stats;
        private final S3AsyncClient s3AsyncClient;
        private final String bucketName;
        private final String key;
        private final int partSize;
        private final StorageClass storageClass;
        private final S3SseContext s3SseContext;
        private int currentPartNumber;
        private ListenableFuture<Void> directUploadFuture;
        private ListenableFuture<String> multiPartUploadIdFuture;
        private final List<ListenableFuture<CompletedPart>> multiPartUploadFutures = new ArrayList();
        private volatile boolean closed;

        public S3ExchangeStorageWriter(S3FileSystemExchangeStorageStats s3FileSystemExchangeStorageStats, S3AsyncClient s3AsyncClient, String str, String str2, int i, StorageClass storageClass, S3SseContext s3SseContext) {
            this.stats = (S3FileSystemExchangeStorageStats) Objects.requireNonNull(s3FileSystemExchangeStorageStats, "stats is null");
            this.s3AsyncClient = (S3AsyncClient) Objects.requireNonNull(s3AsyncClient, "s3AsyncClient is null");
            this.bucketName = (String) Objects.requireNonNull(str, "bucketName is null");
            this.key = (String) Objects.requireNonNull(str2, "key is null");
            this.partSize = i;
            this.storageClass = (StorageClass) Objects.requireNonNull(storageClass, "storageClass is null");
            this.s3SseContext = (S3SseContext) Objects.requireNonNull(s3SseContext, "s3SseContext is null");
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageWriter
        public ListenableFuture<Void> write(Slice slice) {
            Preconditions.checkState(this.directUploadFuture == null, "Direct upload already started");
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            if (slice.length() < this.partSize && this.multiPartUploadIdFuture == null) {
                this.directUploadFuture = FileSystemExchangeFutures.translateFailures(MoreFutures.toListenableFuture(this.s3AsyncClient.putObject((PutObjectRequest) PutObjectRequest.builder().bucket(this.bucketName).key(this.key).storageClass(this.storageClass).applyMutation(builder -> {
                    switch (this.s3SseContext.sseType()) {
                        case NONE:
                        default:
                            return;
                        case S3:
                            builder.serverSideEncryption(ServerSideEncryption.AES256);
                            return;
                        case KMS:
                            builder.serverSideEncryption(ServerSideEncryption.AWS_KMS).ssekmsKeyId(this.s3SseContext.sseKmsKeyId().get());
                            return;
                    }
                }).build(), AsyncRequestBody.fromByteBufferUnsafe(slice.toByteBuffer()))));
                this.stats.getPutObject().record(this.directUploadFuture);
                this.stats.getPutObjectDataSizeInBytes().add(slice.length());
                return this.directUploadFuture;
            }
            if (this.multiPartUploadIdFuture == null) {
                this.multiPartUploadIdFuture = Futures.transform(createMultipartUpload(), (v0) -> {
                    return v0.uploadId();
                }, MoreExecutors.directExecutor());
            }
            int i = this.currentPartNumber + 1;
            this.currentPartNumber = i;
            ListenableFuture<CompletedPart> transformAsync = Futures.transformAsync(this.multiPartUploadIdFuture, str -> {
                return uploadPart(str, slice, i);
            }, MoreExecutors.directExecutor());
            this.multiPartUploadFutures.add(transformAsync);
            return FileSystemExchangeFutures.translateFailures(transformAsync);
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageWriter
        public ListenableFuture<Void> finish() {
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            if (this.multiPartUploadIdFuture == null) {
                return (ListenableFuture) Objects.requireNonNullElseGet(this.directUploadFuture, Futures::immediateVoidFuture);
            }
            ListenableFuture<Void> translateFailures = FileSystemExchangeFutures.translateFailures(Futures.transformAsync(Futures.allAsList(this.multiPartUploadFutures), list -> {
                return completeMultipartUpload((String) MoreFutures.getFutureValue(this.multiPartUploadIdFuture), list);
            }, MoreExecutors.directExecutor()));
            Futures.addCallback(translateFailures, new FutureCallback<Void>() { // from class: io.trino.plugin.exchange.filesystem.s3.S3FileSystemExchangeStorage.S3ExchangeStorageWriter.1
                public void onSuccess(Void r4) {
                    S3ExchangeStorageWriter.this.closed = true;
                }

                public void onFailure(Throwable th) {
                }
            }, MoreExecutors.directExecutor());
            return translateFailures;
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageWriter
        public ListenableFuture<Void> abort() {
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            this.closed = true;
            if (this.multiPartUploadIdFuture == null) {
                if (this.directUploadFuture != null) {
                    this.directUploadFuture.cancel(true);
                }
                return Futures.immediateVoidFuture();
            }
            Verify.verify(this.directUploadFuture == null);
            this.multiPartUploadFutures.forEach(listenableFuture -> {
                listenableFuture.cancel(true);
            });
            return FileSystemExchangeFutures.translateFailures(Futures.transformAsync(this.multiPartUploadIdFuture, this::abortMultipartUpload, MoreExecutors.directExecutor()));
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageWriter
        public long getRetainedSize() {
            return INSTANCE_SIZE;
        }

        private ListenableFuture<CreateMultipartUploadResponse> createMultipartUpload() {
            return this.stats.getCreateMultipartUpload().record(MoreFutures.toListenableFuture(this.s3AsyncClient.createMultipartUpload((CreateMultipartUploadRequest) CreateMultipartUploadRequest.builder().bucket(this.bucketName).key(this.key).storageClass(this.storageClass).applyMutation(builder -> {
                switch (this.s3SseContext.sseType()) {
                    case NONE:
                    default:
                        return;
                    case S3:
                        builder.serverSideEncryption(ServerSideEncryption.AES256);
                        return;
                    case KMS:
                        builder.serverSideEncryption(ServerSideEncryption.AWS_KMS).ssekmsKeyId(this.s3SseContext.sseKmsKeyId().get());
                        return;
                }
            }).build())));
        }

        private ListenableFuture<CompletedPart> uploadPart(String str, Slice slice, int i) {
            UploadPartRequest uploadPartRequest = (UploadPartRequest) UploadPartRequest.builder().bucket(this.bucketName).key(this.key).uploadId(str).partNumber(Integer.valueOf(i)).build();
            this.stats.getUploadPartDataSizeInBytes().add(slice.length());
            return this.stats.getUploadPart().record(Futures.transform(MoreFutures.toListenableFuture(this.s3AsyncClient.uploadPart(uploadPartRequest, AsyncRequestBody.fromByteBufferUnsafe(slice.toByteBuffer()))), uploadPartResponse -> {
                return (CompletedPart) CompletedPart.builder().eTag(uploadPartResponse.eTag()).partNumber(Integer.valueOf(i)).build();
            }, MoreExecutors.directExecutor()));
        }

        private ListenableFuture<CompleteMultipartUploadResponse> completeMultipartUpload(String str, List<CompletedPart> list) {
            CompleteMultipartUploadRequest completeMultipartUploadRequest = (CompleteMultipartUploadRequest) CompleteMultipartUploadRequest.builder().bucket(this.bucketName).key(this.key).uploadId(str).multipartUpload((CompletedMultipartUpload) CompletedMultipartUpload.builder().parts(list).build()).build();
            this.stats.getCompleteMultipartUploadPartsCount().add(list.size());
            return this.stats.getCompleteMultipartUpload().record(MoreFutures.toListenableFuture(this.s3AsyncClient.completeMultipartUpload(completeMultipartUploadRequest)));
        }

        private ListenableFuture<AbortMultipartUploadResponse> abortMultipartUpload(String str) {
            return this.stats.getAbortMultipartUpload().record(MoreFutures.toListenableFuture(this.s3AsyncClient.abortMultipartUpload((AbortMultipartUploadRequest) AbortMultipartUploadRequest.builder().bucket(this.bucketName).key(this.key).uploadId(str).build())));
        }
    }

    /* loaded from: input_file:io/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$S3SseContext.class */
    private static final class S3SseContext extends Record {
        private final ExchangeS3Config.S3SseType sseType;
        private final Optional<String> sseKmsKeyId;

        S3SseContext(ExchangeS3Config.S3SseType s3SseType, Optional<String> optional) {
            Objects.requireNonNull(s3SseType, "sseType is null");
            Preconditions.checkArgument((s3SseType == ExchangeS3Config.S3SseType.KMS) ^ optional.isEmpty(), "sseKmsKeyId is supposed to be set only when sseType is KMS");
            this.sseType = s3SseType;
            this.sseKmsKeyId = optional;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, S3SseContext.class), S3SseContext.class, "sseType;sseKmsKeyId", "FIELD:Lio/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$S3SseContext;->sseType:Lio/trino/plugin/exchange/filesystem/s3/ExchangeS3Config$S3SseType;", "FIELD:Lio/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$S3SseContext;->sseKmsKeyId:Ljava/util/Optional;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, S3SseContext.class), S3SseContext.class, "sseType;sseKmsKeyId", "FIELD:Lio/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$S3SseContext;->sseType:Lio/trino/plugin/exchange/filesystem/s3/ExchangeS3Config$S3SseType;", "FIELD:Lio/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$S3SseContext;->sseKmsKeyId:Ljava/util/Optional;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, S3SseContext.class, Object.class), S3SseContext.class, "sseType;sseKmsKeyId", "FIELD:Lio/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$S3SseContext;->sseType:Lio/trino/plugin/exchange/filesystem/s3/ExchangeS3Config$S3SseType;", "FIELD:Lio/trino/plugin/exchange/filesystem/s3/S3FileSystemExchangeStorage$S3SseContext;->sseKmsKeyId:Ljava/util/Optional;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public ExchangeS3Config.S3SseType sseType() {
            return this.sseType;
        }

        public Optional<String> sseKmsKeyId() {
            return this.sseKmsKeyId;
        }
    }

    @Inject
    public S3FileSystemExchangeStorage(final S3FileSystemExchangeStorageStats s3FileSystemExchangeStorageStats, ExchangeS3Config exchangeS3Config, CompatibilityMode compatibilityMode) throws IOException {
        this.stats = (S3FileSystemExchangeStorageStats) Objects.requireNonNull(s3FileSystemExchangeStorageStats, "stats is null");
        this.region = exchangeS3Config.getS3Region();
        this.endpoint = exchangeS3Config.getS3Endpoint();
        this.multiUploadPartSize = Math.toIntExact(exchangeS3Config.getS3UploadPartSize().toBytes());
        this.storageClass = exchangeS3Config.getStorageClass();
        this.compatibilityMode = (CompatibilityMode) Objects.requireNonNull(compatibilityMode, "compatibilityMode is null");
        this.s3SseContext = new S3SseContext(exchangeS3Config.getSseType(), exchangeS3Config.getSseKmsKeyId());
        this.s3AsyncClient = new S3AsyncClientWrapper(this, createS3AsyncClient(createAwsCredentialsProvider(exchangeS3Config), (ClientOverrideConfiguration) ClientOverrideConfiguration.builder().retryStrategy(SdkDefaultRetryStrategy.forRetryMode(exchangeS3Config.getRetryMode()).toBuilder().maxAttempts(exchangeS3Config.getS3MaxErrorRetries()).build()).putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_PREFIX, "").putAdvancedOption(SdkAdvancedClientOption.USER_AGENT_SUFFIX, "Trino-exchange").build(), exchangeS3Config.isS3PathStyleAccess(), exchangeS3Config.getAsyncClientConcurrency(), exchangeS3Config.getAsyncClientMaxPendingConnectionAcquires(), exchangeS3Config.getConnectionAcquisitionTimeout())) { // from class: io.trino.plugin.exchange.filesystem.s3.S3FileSystemExchangeStorage.1
            @Override // io.trino.plugin.exchange.filesystem.s3.S3AsyncClientWrapper
            protected void handle(S3AsyncClientWrapper.RequestType requestType, CompletableFuture<?> completableFuture) {
                s3FileSystemExchangeStorageStats.requestStarted(requestType);
                S3FileSystemExchangeStorageStats s3FileSystemExchangeStorageStats2 = s3FileSystemExchangeStorageStats;
                completableFuture.whenComplete((obj, th) -> {
                    if (th != null && th.getMessage() != null && th.getMessage().contains("Maximum pending connection acquisitions exceeded")) {
                        S3FileSystemExchangeStorage.log.error(th, "Encountered 'Maximum pending connection acquisitions exceeded' error. Active requests: %s", new Object[]{s3FileSystemExchangeStorageStats2.getActiveRequestsSummary()});
                    }
                    s3FileSystemExchangeStorageStats2.requestCompleted(requestType);
                });
            }
        };
        if (compatibilityMode != CompatibilityMode.GCP) {
            this.gcsClient = Optional.empty();
            this.gcsDeleteExecutor = Optional.empty();
            return;
        }
        Optional<String> gcsJsonKeyFilePath = exchangeS3Config.getGcsJsonKeyFilePath();
        Optional<String> gcsJsonKey = exchangeS3Config.getGcsJsonKey();
        Verify.verify((gcsJsonKeyFilePath.isPresent() && gcsJsonKey.isPresent()) ? false : true, "gcsJsonKeyFilePath and gcsJsonKey shouldn't be set at the same time", new Object[0]);
        if (gcsJsonKeyFilePath.isPresent()) {
            this.gcsClient = Optional.of(StorageOptions.newBuilder().setCredentials(GoogleCredentials.fromStream(new FileInputStream(gcsJsonKeyFilePath.get()))).build().getService());
        } else if (gcsJsonKey.isPresent()) {
            this.gcsClient = Optional.of(StorageOptions.newBuilder().setCredentials(GoogleCredentials.fromStream(new ByteArrayInputStream(gcsJsonKey.get().getBytes(StandardCharsets.UTF_8)))).build().getService());
        } else {
            this.gcsClient = Optional.of(StorageOptions.getDefaultInstance().getService());
        }
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(100, 100, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(), Threads.threadsNamed("gcs-delete-%s"));
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        this.gcsDeleteExecutor = Optional.of(MoreExecutors.listeningDecorator(threadPoolExecutor));
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public void createDirectories(URI uri) {
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> list, int i, MetricsBuilder metricsBuilder) {
        return new S3ExchangeStorageReader(this.stats, this.s3AsyncClient, this.multiUploadPartSize, list, metricsBuilder, i);
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ExchangeStorageWriter createExchangeStorageWriter(URI uri) {
        return new S3ExchangeStorageWriter(this.stats, this.s3AsyncClient, getBucketName(uri), keyFromUri(uri), this.multiUploadPartSize, this.storageClass, this.s3SseContext);
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ListenableFuture<Void> createEmptyFile(URI uri) {
        return this.stats.getCreateEmptyFile().record(FileSystemExchangeFutures.translateFailures(MoreFutures.toListenableFuture(this.s3AsyncClient.putObject((PutObjectRequest) PutObjectRequest.builder().bucket(getBucketName(uri)).key(keyFromUri(uri)).build(), AsyncRequestBody.empty()))));
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ListenableFuture<Void> deleteRecursively(List<URI> list) {
        if (this.compatibilityMode == CompatibilityMode.GCP) {
            return deleteRecursivelyGcp(list);
        }
        ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
        for (URI uri : list) {
            ImmutableList.Builder builder2 = ImmutableList.builder();
            builder.put(getBucketName(uri), Futures.transform(MoreFutures.toListenableFuture(listObjectsRecursively(uri).subscribe(listObjectsV2Response -> {
                Stream map = listObjectsV2Response.contents().stream().map((v0) -> {
                    return v0.key();
                });
                Objects.requireNonNull(builder2);
                map.forEach((v1) -> {
                    r1.add(v1);
                });
            })), r3 -> {
                return builder2.build();
            }, MoreExecutors.directExecutor()));
        }
        ImmutableMultimap build = builder.build();
        ImmutableList.Builder builder3 = ImmutableList.builder();
        for (String str : build.keySet()) {
            builder3.add(Futures.transformAsync(Futures.allAsList(build.get(str)), list2 -> {
                return deleteObjects(str, (List) list2.stream().flatMap((v0) -> {
                    return v0.stream();
                }).collect(ImmutableList.toImmutableList()));
            }, MoreExecutors.directExecutor()));
        }
        return FileSystemExchangeFutures.translateFailures(Futures.allAsList(builder3.build()));
    }

    private ListenableFuture<Void> deleteRecursivelyGcp(List<URI> list) {
        Storage orElseThrow = this.gcsClient.orElseThrow(() -> {
            return new IllegalStateException("gcsClient is expected to be initialized");
        });
        return this.stats.getDeleteRecursively().record(FileSystemExchangeFutures.translateFailures(this.gcsDeleteExecutor.orElseThrow(() -> {
            return new IllegalStateException("gcsDeleteExecutor is expected to be initialized");
        }).submit(() -> {
            StorageBatch batch = orElseThrow.batch();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                URI uri = (URI) it.next();
                Iterator it2 = orElseThrow.list(getBucketName(uri), new Storage.BlobListOption[]{Storage.BlobListOption.prefix(keyFromUri(uri))}).iterateAll().iterator();
                while (it2.hasNext()) {
                    batch.delete(((Blob) it2.next()).getBlobId(), new Storage.BlobSourceOption[0]);
                }
            }
            batch.submit();
        })));
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ListenableFuture<List<FileStatus>> listFilesRecursively(URI uri) {
        ImmutableList.Builder builder = ImmutableList.builder();
        return this.stats.getListFilesRecursively().record(Futures.transform(MoreFutures.toListenableFuture(listObjectsRecursively(uri).subscribe(listObjectsV2Response -> {
            for (S3Object s3Object : listObjectsV2Response.contents()) {
                try {
                    builder.add(new FileStatus(new URI(uri.getScheme(), uri.getHost(), "/" + s3Object.key(), uri.getFragment()).toString(), s3Object.size().longValue()));
                } catch (URISyntaxException e) {
                    throw new IllegalArgumentException(e);
                }
            }
        })), r3 -> {
            return builder.build();
        }, MoreExecutors.directExecutor()));
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public int getWriteBufferSize() {
        return this.multiUploadPartSize;
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage, java.lang.AutoCloseable
    @PreDestroy
    public void close() throws IOException {
        Closer create = Closer.create();
        try {
            S3AsyncClient s3AsyncClient = this.s3AsyncClient;
            Objects.requireNonNull(s3AsyncClient);
            create.register(s3AsyncClient::close);
            this.gcsDeleteExecutor.ifPresent(listeningExecutorService -> {
                Objects.requireNonNull(listeningExecutorService);
                create.register(listeningExecutorService::shutdown);
            });
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private ListObjectsV2Publisher listObjectsRecursively(URI uri) {
        Preconditions.checkArgument(isDirectory(uri), "listObjectsRecursively called on file uri %s", uri);
        return this.s3AsyncClient.listObjectsV2Paginator((ListObjectsV2Request) ListObjectsV2Request.builder().bucket(getBucketName(uri)).prefix(keyFromUri(uri)).build());
    }

    private ListenableFuture<List<DeleteObjectsResponse>> deleteObjects(String str, List<String> list) {
        List partition = Lists.partition(list, 1000);
        this.stats.getDeleteObjectsEntriesCount().add(list.size());
        return this.stats.getDeleteObjects().record(Futures.allAsList((Iterable) partition.stream().map(list2 -> {
            return MoreFutures.toListenableFuture(this.s3AsyncClient.deleteObjects((DeleteObjectsRequest) DeleteObjectsRequest.builder().bucket(str).delete((Delete) Delete.builder().objects((Collection) list2.stream().map(str2 -> {
                return (ObjectIdentifier) ObjectIdentifier.builder().key(str2).build();
            }).collect(ImmutableList.toImmutableList())).build()).build()));
        }).collect(ImmutableList.toImmutableList())));
    }

    private static String getBucketName(URI uri) {
        if (uri.getHost() != null) {
            return uri.getHost();
        }
        if (uri.getUserInfo() == null) {
            return uri.getAuthority();
        }
        throw new IllegalArgumentException("Unable to determine S3 bucket from URI.");
    }

    private static String keyFromUri(URI uri) {
        Preconditions.checkArgument(uri.isAbsolute(), "Uri is not absolute: %s", uri);
        String nullToEmpty = Strings.nullToEmpty(uri.getPath());
        if (nullToEmpty.startsWith(FileSystemExchangeManager.PATH_SEPARATOR)) {
            nullToEmpty = nullToEmpty.substring(FileSystemExchangeManager.PATH_SEPARATOR.length());
        }
        if (nullToEmpty.endsWith(FileSystemExchangeManager.PATH_SEPARATOR)) {
            nullToEmpty = nullToEmpty.substring(0, nullToEmpty.length() - FileSystemExchangeManager.PATH_SEPARATOR.length());
        }
        return nullToEmpty;
    }

    private static boolean isDirectory(URI uri) {
        return uri.toString().endsWith(FileSystemExchangeManager.PATH_SEPARATOR);
    }

    private static AwsCredentialsProvider createAwsCredentialsProvider(ExchangeS3Config exchangeS3Config) {
        String s3AwsAccessKey = exchangeS3Config.getS3AwsAccessKey();
        String s3AwsSecretKey = exchangeS3Config.getS3AwsSecretKey();
        if (s3AwsAccessKey == null && s3AwsSecretKey != null) {
            throw new IllegalArgumentException("AWS access key set but secret is not set; make sure you set exchange.s3.aws-secret-key config property");
        }
        if (s3AwsAccessKey != null && s3AwsSecretKey == null) {
            throw new IllegalArgumentException("AWS secret key set but access is not set; make sure you set exchange.s3.aws-access-key config property");
        }
        if (s3AwsAccessKey != null) {
            Preconditions.checkArgument(exchangeS3Config.getS3IamRole().isEmpty(), "IAM role is not compatible with access key based authentication; make sure you set only one of exchange.s3.aws-access-key, exchange.s3.iam-role config properties");
            Preconditions.checkArgument(exchangeS3Config.getS3ExternalId().isEmpty(), "External ID is not compatible with access key based authentication; make sure you set only one of exchange.s3.aws-access-key, exchange.s3.external-id config properties");
            return StaticCredentialsProvider.create(AwsBasicCredentials.create(s3AwsAccessKey, s3AwsSecretKey));
        }
        if (exchangeS3Config.getS3ExternalId().isPresent() && exchangeS3Config.getS3IamRole().isEmpty()) {
            throw new IllegalArgumentException("External ID can only be used with IAM role based authentication; make sure you set exchange.s3.iam-role config property");
        }
        if (!exchangeS3Config.getS3IamRole().isPresent()) {
            return DefaultCredentialsProvider.create();
        }
        AssumeRoleRequest.Builder roleSessionName = AssumeRoleRequest.builder().roleArn(exchangeS3Config.getS3IamRole().get()).roleSessionName("trino-exchange");
        Optional<String> s3ExternalId = exchangeS3Config.getS3ExternalId();
        Objects.requireNonNull(roleSessionName);
        s3ExternalId.ifPresent(roleSessionName::externalId);
        StsClientBuilder builder = StsClient.builder();
        Optional<Region> s3Region = exchangeS3Config.getS3Region();
        Objects.requireNonNull(builder);
        s3Region.ifPresent(builder::region);
        return StsAssumeRoleCredentialsProvider.builder().stsClient((StsClient) builder.build()).refreshRequest((AssumeRoleRequest) roleSessionName.build()).asyncCredentialUpdateEnabled(true).build();
    }

    private S3AsyncClient createS3AsyncClient(AwsCredentialsProvider awsCredentialsProvider, ClientOverrideConfiguration clientOverrideConfiguration, boolean z, int i, int i2, Duration duration) {
        S3AsyncClientBuilder endpointOverride = S3AsyncClient.builder().credentialsProvider(awsCredentialsProvider).overrideConfiguration(clientOverrideConfiguration).serviceConfiguration((S3Configuration) S3Configuration.builder().pathStyleAccessEnabled(Boolean.valueOf(z)).build()).httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.valueOf(i)).maxPendingConnectionAcquires(Integer.valueOf(i2)).connectionAcquisitionTimeout(java.time.Duration.ofMillis(duration.toMillis()))).endpointOverride((URI) this.endpoint.map(URI::create).orElseGet(() -> {
            return AwsClientEndpointProvider.builder().serviceEndpointPrefix("s3").defaultProtocol("http").region(this.region.orElseThrow(() -> {
                return new IllegalArgumentException("region is expected to be set");
            })).build().clientEndpoint();
        }));
        Optional<Region> optional = this.region;
        Objects.requireNonNull(endpointOverride);
        optional.ifPresent(endpointOverride::region);
        return (S3AsyncClient) endpointOverride.build();
    }

    private static <T> void recordDistributionMetric(ListenableFuture<T> listenableFuture, final MetricsBuilder.DistributionMetricBuilder distributionMetricBuilder, final MetricsBuilder.DistributionMetricBuilder distributionMetricBuilder2) {
        final Stopwatch createStarted = Stopwatch.createStarted();
        Futures.addCallback(listenableFuture, new FutureCallback<T>() { // from class: io.trino.plugin.exchange.filesystem.s3.S3FileSystemExchangeStorage.2
            public void onSuccess(T t) {
                MetricsBuilder.DistributionMetricBuilder.this.add(createStarted.elapsed(TimeUnit.MILLISECONDS));
            }

            public void onFailure(Throwable th) {
                distributionMetricBuilder2.add(createStarted.elapsed(TimeUnit.MILLISECONDS));
            }
        }, MoreExecutors.directExecutor());
    }
}
