package com.azure.messaging.eventhubs.checkpointstore.blob;

import com.azure.core.http.rest.Response;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.ListBlobsOptions;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/messaging/eventhubs/checkpointstore/blob/BlobCheckpointStore.class */
public class BlobCheckpointStore implements CheckpointStore {
    private static final String SEQUENCE_NUMBER = "sequencenumber";
    private static final String OFFSET = "offset";
    private static final String OWNER_ID = "ownerid";
    private static final String ETAG = "eTag";
    private static final String BLOB_PATH_SEPARATOR = "/";
    private static final String CHECKPOINT_PATH = "/checkpoint/";
    private static final String OWNERSHIP_PATH = "/ownership/";
    private static final String PARTITION_ID_LOG_KEY = "partitionId";
    private static final String OWNER_ID_LOG_KEY = "ownerId";
    private static final String SEQUENCE_NUMBER_LOG_KEY = "sequenceNumber";
    private static final String BLOB_NAME_LOG_KEY = "blobName";
    private static final String OFFSET_LOG_KEY = "offset";
    public static final String EMPTY_STRING = "";
    private static final ByteBuffer UPLOAD_DATA = ByteBuffer.wrap(EMPTY_STRING.getBytes(StandardCharsets.UTF_8));
    private static final ClientLogger LOGGER = new ClientLogger(BlobCheckpointStore.class);
    private final BlobContainerAsyncClient blobContainerAsyncClient;
    private final Map<String, BlobAsyncClient> blobClients = new ConcurrentHashMap();

    public BlobCheckpointStore(BlobContainerAsyncClient blobContainerAsyncClient) {
        this.blobContainerAsyncClient = blobContainerAsyncClient;
    }

    public Flux<PartitionOwnership> listOwnership(String str, String str2, String str3) {
        return listBlobs(getBlobPrefix(str, str2, str3, OWNERSHIP_PATH), this::convertToPartitionOwnership);
    }

    public Flux<Checkpoint> listCheckpoints(String str, String str2, String str3) {
        return listBlobs(getBlobPrefix(str, str2, str3, CHECKPOINT_PATH), this::convertToCheckpoint);
    }

    private <T> Flux<T> listBlobs(String str, Function<BlobItem, Mono<T>> function) {
        return this.blobContainerAsyncClient.listBlobs(new ListBlobsOptions().setPrefix(str).setDetails(new BlobListDetails().setRetrieveMetadata(true))).flatMap(function).filter(Objects::nonNull);
    }

    private Mono<Checkpoint> convertToCheckpoint(BlobItem blobItem) {
        String[] split = blobItem.getName().split(BLOB_PATH_SEPARATOR);
        LOGGER.atVerbose().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).log(Messages.FOUND_BLOB_FOR_PARTITION);
        if (split.length != 5) {
            return Mono.empty();
        }
        if (CoreUtils.isNullOrEmpty(blobItem.getMetadata())) {
            LOGGER.atWarning().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).log(Messages.NO_METADATA_AVAILABLE_FOR_BLOB);
            return Mono.empty();
        }
        Map metadata = blobItem.getMetadata();
        LOGGER.atVerbose().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).addKeyValue(SEQUENCE_NUMBER_LOG_KEY, (String) metadata.get(SEQUENCE_NUMBER)).addKeyValue("offset", (String) metadata.get("offset")).log(Messages.CHECKPOINT_INFO);
        Long l = null;
        Long l2 = null;
        if (!CoreUtils.isNullOrEmpty((CharSequence) metadata.get(SEQUENCE_NUMBER))) {
            l = Long.valueOf(Long.parseLong((String) metadata.get(SEQUENCE_NUMBER)));
        }
        if (!CoreUtils.isNullOrEmpty((CharSequence) metadata.get("offset"))) {
            l2 = Long.valueOf(Long.parseLong((String) metadata.get("offset")));
        }
        return Mono.just(new Checkpoint().setFullyQualifiedNamespace(split[0]).setEventHubName(split[1]).setConsumerGroup(split[2]).setPartitionId(split[4]).setSequenceNumber(l).setOffset(l2));
    }

    public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> list) {
        return Flux.fromIterable(list).flatMap(partitionOwnership -> {
            try {
                String partitionId = partitionOwnership.getPartitionId();
                String blobName = getBlobName(partitionOwnership.getFullyQualifiedNamespace(), partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId, OWNERSHIP_PATH);
                if (!this.blobClients.containsKey(blobName)) {
                    this.blobClients.put(blobName, this.blobContainerAsyncClient.getBlobAsyncClient(blobName));
                }
                BlobAsyncClient blobAsyncClient = this.blobClients.get(blobName);
                HashMap hashMap = new HashMap();
                hashMap.put(OWNER_ID, partitionOwnership.getOwnerId());
                BlobRequestConditions blobRequestConditions = new BlobRequestConditions();
                if (CoreUtils.isNullOrEmpty(partitionOwnership.getETag())) {
                    blobRequestConditions.setIfNoneMatch("*");
                    return blobAsyncClient.getBlockBlobAsyncClient().uploadWithResponse(Flux.just(UPLOAD_DATA), 0L, (BlobHttpHeaders) null, hashMap, (AccessTier) null, (byte[]) null, blobRequestConditions).flatMapMany(response -> {
                        return updateOwnershipETag(response, partitionOwnership);
                    }, th -> {
                        LOGGER.atVerbose().addKeyValue(PARTITION_ID_LOG_KEY, partitionId).log(Messages.CLAIM_ERROR, new Object[]{th});
                        return Mono.error(th);
                    }, Mono::empty);
                }
                blobRequestConditions.setIfMatch(partitionOwnership.getETag());
                return blobAsyncClient.setMetadataWithResponse(hashMap, blobRequestConditions).flatMapMany(response2 -> {
                    return updateOwnershipETag(response2, partitionOwnership);
                }, th2 -> {
                    LOGGER.atVerbose().addKeyValue(PARTITION_ID_LOG_KEY, partitionId).log(Messages.CLAIM_ERROR, new Object[]{th2});
                    return Mono.error(th2);
                }, Mono::empty);
            } catch (Exception e) {
                LOGGER.atWarning().addKeyValue(PARTITION_ID_LOG_KEY, partitionOwnership.getPartitionId()).log(Messages.CLAIM_ERROR, new Object[]{e});
                return Mono.error(e);
            }
        });
    }

    private Mono<PartitionOwnership> updateOwnershipETag(Response<?> response, PartitionOwnership partitionOwnership) {
        return Mono.just(partitionOwnership.setETag(response.getHeaders().get(ETAG).getValue()));
    }

    public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
        if (checkpoint == null || (checkpoint.getSequenceNumber() == null && checkpoint.getOffset() == null)) {
            throw LOGGER.logExceptionAsWarning(Exceptions.propagate(new IllegalStateException("Both sequence number and offset cannot be null when updating a checkpoint")));
        }
        String blobName = getBlobName(checkpoint.getFullyQualifiedNamespace(), checkpoint.getEventHubName(), checkpoint.getConsumerGroup(), checkpoint.getPartitionId(), CHECKPOINT_PATH);
        if (!this.blobClients.containsKey(blobName)) {
            this.blobClients.put(blobName, this.blobContainerAsyncClient.getBlobAsyncClient(blobName));
        }
        HashMap hashMap = new HashMap();
        String valueOf = checkpoint.getSequenceNumber() == null ? null : String.valueOf(checkpoint.getSequenceNumber());
        String valueOf2 = checkpoint.getOffset() == null ? null : String.valueOf(checkpoint.getOffset());
        hashMap.put(SEQUENCE_NUMBER, valueOf);
        hashMap.put("offset", valueOf2);
        BlobAsyncClient blobAsyncClient = this.blobClients.get(blobName);
        return blobAsyncClient.exists().flatMap(bool -> {
            return bool.booleanValue() ? blobAsyncClient.setMetadata(hashMap) : blobAsyncClient.getBlockBlobAsyncClient().uploadWithResponse(Flux.just(UPLOAD_DATA), 0L, (BlobHttpHeaders) null, hashMap, (AccessTier) null, (byte[]) null, (BlobRequestConditions) null).then();
        });
    }

    private String getBlobPrefix(String str, String str2, String str3, String str4) {
        return str + BLOB_PATH_SEPARATOR + str2 + BLOB_PATH_SEPARATOR + str3 + str4;
    }

    private String getBlobName(String str, String str2, String str3, String str4, String str5) {
        return str + BLOB_PATH_SEPARATOR + str2 + BLOB_PATH_SEPARATOR + str3 + str5 + str4;
    }

    private Mono<PartitionOwnership> convertToPartitionOwnership(BlobItem blobItem) {
        LOGGER.atVerbose().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).log(Messages.FOUND_BLOB_FOR_PARTITION);
        String[] split = blobItem.getName().split(BLOB_PATH_SEPARATOR);
        if (split.length != 5) {
            return Mono.empty();
        }
        if (CoreUtils.isNullOrEmpty(blobItem.getMetadata())) {
            LOGGER.atWarning().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).log(Messages.NO_METADATA_AVAILABLE_FOR_BLOB);
            return Mono.empty();
        }
        BlobItemProperties properties = blobItem.getProperties();
        String str = (String) blobItem.getMetadata().getOrDefault(OWNER_ID, EMPTY_STRING);
        if (str == null) {
            str = EMPTY_STRING;
        }
        LOGGER.atVerbose().addKeyValue(BLOB_NAME_LOG_KEY, blobItem.getName()).addKeyValue(OWNER_ID_LOG_KEY, str).log(Messages.BLOB_OWNER_INFO);
        return Mono.just(new PartitionOwnership().setFullyQualifiedNamespace(split[0]).setEventHubName(split[1]).setConsumerGroup(split[2]).setPartitionId(split[4]).setOwnerId(str).setLastModifiedTime(Long.valueOf(properties.getLastModified().toInstant().toEpochMilli())).setETag(properties.getETag()));
    }
}
