package io.axoniq.axonserver.connector.event.transformation.impl.grpc;

import io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService;
import io.axoniq.axonserver.connector.impl.StreamClosedException;
import io.axoniq.axonserver.grpc.event.DeletedEvent;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.TransformRequest;
import io.axoniq.axonserver.grpc.event.TransformationId;
import io.axoniq.axonserver.grpc.event.TransformedEvent;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/event/transformation/impl/grpc/GrpcTransformationStream.class */
public class GrpcTransformationStream implements EventTransformationService.TransformationStream {
    private final String transformationId;
    private final StreamObserver<TransformRequest> requestStreamObserver;
    private final Logger logger = LoggerFactory.getLogger(GrpcTransformationStream.class);
    private final Map<Long, CompletableFuture<Void>> pendingRequests = new ConcurrentHashMap();
    private final AtomicReference<Consumer<Throwable>> onCompletedByServerListener = new AtomicReference<>();
    private final AtomicReference<Throwable> completed = new AtomicReference<>();

    public GrpcTransformationStream(String str, StreamObserver<TransformRequest> streamObserver, Consumer<Consumer<Long>> consumer, Consumer<Consumer<Throwable>> consumer2) {
        this.transformationId = str;
        this.requestStreamObserver = streamObserver;
        consumer.accept((v1) -> {
            onTransformationActionAck(v1);
        });
        consumer2.accept(this::completedByServer);
    }

    @Nonnull
    private static TransformedEvent transformedEvent(long j, Event event) {
        return TransformedEvent.newBuilder().setEvent(event).setToken(j).m4715build();
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService.TransformationStream
    public CompletableFuture<Void> deleteEvent(long j, long j2) {
        checkCompleted();
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.pendingRequests.put(Long.valueOf(j2), completableFuture);
        this.logger.trace("Sending delete event {} to Axon Server.", Long.valueOf(j));
        this.requestStreamObserver.onNext(TransformRequest.newBuilder().setTransformationId(transformationId()).setSequence(j2).setDeleteEvent(deleteEvent(j)).m4524build());
        return completableFuture;
    }

    private DeletedEvent deleteEvent(long j) {
        return DeletedEvent.newBuilder().setToken(j).m3520build();
    }

    private void checkCompleted() {
        Throwable th = this.completed.get();
        if (th != null) {
            throw new StreamClosedException(th);
        }
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService.TransformationStream
    public CompletableFuture<Void> replaceEvent(long j, Event event, long j2) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.pendingRequests.put(Long.valueOf(j2), completableFuture);
        this.logger.trace("Sending replace event {} to Axon Server.", Long.valueOf(j));
        this.requestStreamObserver.onNext(TransformRequest.newBuilder().setTransformationId(transformationId()).setSequence(j2).setReplaceEvent(transformedEvent(j, event)).m4524build());
        return completableFuture;
    }

    @Nonnull
    private TransformationId transformationId() {
        return TransformationId.newBuilder().setId(this.transformationId).m4666build();
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService.TransformationStream
    public void complete() {
        this.completed.compareAndSet(null, new RuntimeException("Completed by the client"));
        this.requestStreamObserver.onCompleted();
    }

    @Override // io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService.TransformationStream
    public void onCompletedByServer(Consumer<Throwable> consumer) {
        this.onCompletedByServerListener.set(consumer);
    }

    private void completedByServer(Throwable th) {
        this.completed.compareAndSet(null, th);
        this.logger.warn("Transformation stream completed by server with error: ", th);
        completePending(th);
        Consumer<Throwable> consumer = this.onCompletedByServerListener.get();
        if (consumer != null) {
            consumer.accept(th);
        }
    }

    private void completePending(Throwable th) {
        this.pendingRequests.forEach((l, completableFuture) -> {
            completableFuture.completeExceptionally(th);
            this.pendingRequests.remove(l);
        });
    }

    private void onTransformationActionAck(long j) {
        this.logger.trace("Acknowledge received for transformation sequence {}. ", Long.valueOf(j));
        this.pendingRequests.computeIfPresent(Long.valueOf(j), (l, completableFuture) -> {
            completableFuture.complete(null);
            return null;
        });
    }
}
