package io.axoniq.axonserver.connector.event.impl;

import io.axoniq.axonserver.connector.AxonServerException;
import io.axoniq.axonserver.connector.ErrorCategory;
import io.axoniq.axonserver.connector.ResultStream;
import io.axoniq.axonserver.connector.admin.AdminChannel;
import io.axoniq.axonserver.connector.event.AggregateEventStream;
import io.axoniq.axonserver.connector.event.AppendEventsTransaction;
import io.axoniq.axonserver.connector.event.EventChannel;
import io.axoniq.axonserver.connector.event.EventQueryResultEntry;
import io.axoniq.axonserver.connector.event.EventStream;
import io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel;
import io.axoniq.axonserver.connector.impl.AbstractBufferedStream;
import io.axoniq.axonserver.connector.impl.AxonServerManagedChannel;
import io.axoniq.axonserver.connector.impl.FutureStreamObserver;
import io.axoniq.axonserver.grpc.FlowControl;
import io.axoniq.axonserver.grpc.InstructionAck;
import io.axoniq.axonserver.grpc.control.ClientIdentification;
import io.axoniq.axonserver.grpc.event.CancelScheduledEventRequest;
import io.axoniq.axonserver.grpc.event.Confirmation;
import io.axoniq.axonserver.grpc.event.Event;
import io.axoniq.axonserver.grpc.event.EventSchedulerGrpc;
import io.axoniq.axonserver.grpc.event.EventStoreGrpc;
import io.axoniq.axonserver.grpc.event.EventWithToken;
import io.axoniq.axonserver.grpc.event.GetAggregateEventsRequest;
import io.axoniq.axonserver.grpc.event.GetAggregateSnapshotsRequest;
import io.axoniq.axonserver.grpc.event.GetFirstTokenRequest;
import io.axoniq.axonserver.grpc.event.GetLastTokenRequest;
import io.axoniq.axonserver.grpc.event.GetTokenAtRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsRequest;
import io.axoniq.axonserver.grpc.event.QueryEventsResponse;
import io.axoniq.axonserver.grpc.event.QueryValue;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrRequest;
import io.axoniq.axonserver.grpc.event.ReadHighestSequenceNrResponse;
import io.axoniq.axonserver.grpc.event.RescheduleEventRequest;
import io.axoniq.axonserver.grpc.event.RowResponse;
import io.axoniq.axonserver.grpc.event.ScheduleEventRequest;
import io.axoniq.axonserver.grpc.event.TrackingToken;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/* loaded from: input_file:io/axoniq/axonserver/connector/event/impl/EventChannelImpl.class */
public class EventChannelImpl extends AbstractAxonServerChannel<Void> implements EventChannel {
    private static final ReadHighestSequenceNrResponse UNKNOWN_HIGHEST_SEQ = ReadHighestSequenceNrResponse.newBuilder().setToSequenceNr(-1).m4193build();
    private static final TrackingToken NO_TOKEN_AVAILABLE = TrackingToken.newBuilder().setToken(-1).m4476build();
    private final EventStoreGrpc.EventStoreStub eventStore;
    private final EventSchedulerGrpc.EventSchedulerStub eventScheduler;
    private final Set<BufferedEventStream> buffers;
    private final ClientIdentification clientId;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axoniq/axonserver/connector/event/impl/EventChannelImpl$EventQueryResponseStream.class */
    public static class EventQueryResponseStream extends AbstractBufferedStream<QueryEventsResponse, QueryEventsRequest> {
        private static final QueryEventsResponse TERMINAL_MESSAGE = QueryEventsResponse.newBuilder().setRow(RowResponse.newBuilder().addIdValues(QueryValue.newBuilder().setTextValue("__terminal__"))).m4049build();
        private final String query;
        private final boolean liveStream;
        private final boolean querySnapshots;
        private final String contextName;
        private final AtomicReference<List<String>> columnNames;

        public EventQueryResponseStream(String str, boolean z, boolean z2, String str2) {
            super("unused", 100, 25);
            this.columnNames = new AtomicReference<>();
            this.query = str;
            this.liveStream = z;
            this.querySnapshots = z2;
            this.contextName = str2 == null ? AdminChannel.CHANNEL_CONTEXT : str2;
        }

        @Override // io.axoniq.axonserver.connector.impl.AbstractBufferedStream, io.axoniq.axonserver.connector.impl.FlowControlledBuffer
        public void onNext(QueryEventsResponse queryEventsResponse) {
            switch (queryEventsResponse.getDataCase()) {
                case COLUMNS:
                    this.columnNames.set(queryEventsResponse.getColumns().mo3346getColumnList());
                    markConsumed();
                    return;
                case ROW:
                    super.onNext((EventQueryResponseStream) queryEventsResponse);
                    return;
                default:
                    markConsumed();
                    return;
            }
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
        public QueryEventsRequest buildInitialFlowControlMessage(FlowControl flowControl) {
            return QueryEventsRequest.newBuilder().setQuery(this.query).setLiveEvents(this.liveStream).setNumberOfPermits(flowControl.getPermits()).setQuerySnapshots(this.querySnapshots).setContextName(this.contextName).m4001build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.FlowControlledStream
        public QueryEventsRequest buildFlowControlMessage(FlowControl flowControl) {
            return QueryEventsRequest.newBuilder().setNumberOfPermits(flowControl.getPermits()).m4001build();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // io.axoniq.axonserver.connector.impl.FlowControlledBuffer
        public QueryEventsResponse terminalMessage() {
            return TERMINAL_MESSAGE;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axoniq/axonserver/connector/event/impl/EventChannelImpl$EventQueryResultEntryAdapter.class */
    public static class EventQueryResultEntryAdapter implements EventQueryResultEntry {
        private final QueryEventsResponse response;
        private final Supplier<List<String>> columnNames;

        public EventQueryResultEntryAdapter(QueryEventsResponse queryEventsResponse, Supplier<List<String>> supplier) {
            this.columnNames = supplier;
            this.response = queryEventsResponse;
        }

        @Override // io.axoniq.axonserver.connector.event.EventQueryResultEntry
        public List<String> columns() {
            return this.columnNames.get();
        }

        @Override // io.axoniq.axonserver.connector.event.EventQueryResultEntry
        public <R> R getValue(String str) {
            return (R) unwrap(this.response.getRow().getValuesMap().getOrDefault(str, QueryValue.getDefaultInstance()));
        }

        @Override // io.axoniq.axonserver.connector.event.EventQueryResultEntry
        public List<Object> getIdentifiers() {
            return (List) this.response.getRow().getIdValuesList().stream().map(this::unwrap).collect(Collectors.toList());
        }

        @Override // io.axoniq.axonserver.connector.event.EventQueryResultEntry
        public List<Object> getSortValues() {
            return (List) this.response.getRow().getSortValuesList().stream().map(this::unwrap).collect(Collectors.toList());
        }

        private Object unwrap(QueryValue queryValue) {
            switch (queryValue.getDataCase()) {
                case TEXT_VALUE:
                    return queryValue.getTextValue();
                case NUMBER_VALUE:
                    return Long.valueOf(queryValue.getNumberValue());
                case BOOLEAN_VALUE:
                    return Boolean.valueOf(queryValue.getBooleanValue());
                case DOUBLE_VALUE:
                    return Double.valueOf(queryValue.getDoubleValue());
                default:
                    return null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/axoniq/axonserver/connector/event/impl/EventChannelImpl$MappedResultStream.class */
    public static class MappedResultStream<I, T> implements ResultStream<T> {
        private final ResultStream<I> delegate;
        private final Function<I, T> mapper;

        public MappedResultStream(ResultStream<I> resultStream, Function<I, T> function) {
            this.delegate = resultStream;
            this.mapper = function;
        }

        @Override // io.axoniq.axonserver.connector.ResultStream
        public T peek() {
            return transform(this.delegate.peek());
        }

        @Override // io.axoniq.axonserver.connector.ResultStream
        public T nextIfAvailable() {
            return transform(this.delegate.nextIfAvailable());
        }

        @Override // io.axoniq.axonserver.connector.ResultStream
        public T nextIfAvailable(long j, TimeUnit timeUnit) throws InterruptedException {
            return transform(this.delegate.nextIfAvailable(j, timeUnit));
        }

        @Override // io.axoniq.axonserver.connector.ResultStream
        public T next() throws InterruptedException {
            return transform(this.delegate.next());
        }

        @Override // io.axoniq.axonserver.connector.ResultStream
        public void onAvailable(Runnable runnable) {
            this.delegate.onAvailable(runnable);
        }

        @Override // io.axoniq.axonserver.connector.ResultStream, java.lang.AutoCloseable
        public void close() {
            this.delegate.close();
        }

        @Override // io.axoniq.axonserver.connector.ResultStream
        public boolean isClosed() {
            return this.delegate.isClosed();
        }

        @Override // io.axoniq.axonserver.connector.ResultStream
        public Optional<Throwable> getError() {
            return this.delegate.getError();
        }

        private T transform(I i) {
            if (i == null) {
                return null;
            }
            return this.mapper.apply(i);
        }
    }

    public EventChannelImpl(ClientIdentification clientIdentification, ScheduledExecutorService scheduledExecutorService, AxonServerManagedChannel axonServerManagedChannel) {
        super(clientIdentification, scheduledExecutorService, axonServerManagedChannel);
        this.buffers = ConcurrentHashMap.newKeySet();
        this.clientId = clientIdentification;
        this.eventStore = EventStoreGrpc.newStub(axonServerManagedChannel);
        this.eventScheduler = EventSchedulerGrpc.newStub(axonServerManagedChannel);
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public synchronized void connect() {
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public void reconnect() {
        closeEventStreams();
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public void disconnect() {
        closeEventStreams();
    }

    private void closeEventStreams() {
        this.buffers.forEach((v0) -> {
            v0.close();
        });
        this.buffers.clear();
    }

    @Override // io.axoniq.axonserver.connector.impl.AbstractAxonServerChannel
    public boolean isReady() {
        return true;
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public AppendEventsTransaction startAppendEventsTransaction() {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver((Throwable) null);
        return new AppendEventsTransactionImpl(this.eventStore.appendEvent(futureStreamObserver), futureStreamObserver);
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public CompletableFuture<String> scheduleEvent(Instant instant, Event event) {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver((Throwable) new AxonServerException(ErrorCategory.OTHER, "An unknown error occurred while scheduling an Event. No response received from Server.", AdminChannel.CHANNEL_CONTEXT));
        this.eventScheduler.scheduleEvent(ScheduleEventRequest.newBuilder().setEvent(event).setInstant(instant.toEpochMilli()).m4335build(), futureStreamObserver);
        return futureStreamObserver.thenApply((v0) -> {
            return v0.getToken();
        });
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public CompletableFuture<InstructionAck> cancelSchedule(String str) {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver((Throwable) new AxonServerException(ErrorCategory.OTHER, "An unknown error occurred while cancelling a scheduled Event. No response received from Server.", AdminChannel.CHANNEL_CONTEXT));
        this.eventScheduler.cancelScheduledEvent(CancelScheduledEventRequest.newBuilder().setToken(str).m3331build(), futureStreamObserver);
        return futureStreamObserver;
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public CompletableFuture<String> reschedule(String str, Instant instant, Event event) {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver((Throwable) new AxonServerException(ErrorCategory.OTHER, "An unknown error occurred while rescheduling Event. No response received from Server.", AdminChannel.CHANNEL_CONTEXT));
        this.eventScheduler.rescheduleEvent(RescheduleEventRequest.newBuilder().setToken(str).setEvent(event).setInstant(instant.toEpochMilli()).m4240build(), futureStreamObserver);
        return futureStreamObserver.thenApply((v0) -> {
            return v0.getToken();
        });
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public CompletableFuture<Long> findHighestSequence(String str) {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver(UNKNOWN_HIGHEST_SEQ);
        this.eventStore.readHighestSequenceNr(ReadHighestSequenceNrRequest.newBuilder().setAggregateId(str).m4146build(), futureStreamObserver);
        return futureStreamObserver.thenApply((v0) -> {
            return v0.getToSequenceNr();
        });
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public EventStream openStream(long j, int i, int i2, boolean z) {
        StreamObserver<EventWithToken> bufferedEventStream = new BufferedEventStream(this.clientId, j, Math.max(64, i), Math.max(16, Math.min(i, i2)), z);
        this.buffers.add(bufferedEventStream);
        bufferedEventStream.onCloseRequested(() -> {
            this.buffers.remove(bufferedEventStream);
        });
        try {
            this.eventStore.listEvents(bufferedEventStream);
            bufferedEventStream.enableFlowControl();
            return bufferedEventStream;
        } catch (Exception e) {
            this.buffers.remove(bufferedEventStream);
            throw e;
        }
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public AggregateEventStream openAggregateStream(String str, boolean z) {
        return doGetAggregateStream(GetAggregateEventsRequest.newBuilder().setAggregateId(str).setAllowSnapshots(z).m3672build());
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public AggregateEventStream openAggregateStream(String str, long j, long j2) {
        return doGetAggregateStream(GetAggregateEventsRequest.newBuilder().setAggregateId(str).setInitialSequence(j).setMaxSequence(j2).m3672build());
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public CompletableFuture<Confirmation> appendSnapshot(Event event) {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver(Confirmation.newBuilder().setSuccess(false).m3473build());
        this.eventStore.appendSnapshot(event, futureStreamObserver);
        return futureStreamObserver;
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public AggregateEventStream loadSnapshots(String str, long j, long j2, int i) {
        StreamObserver<Event> bufferedAggregateEventStream = new BufferedAggregateEventStream();
        this.eventStore.listAggregateSnapshots(GetAggregateSnapshotsRequest.newBuilder().setInitialSequence(j).setMaxResults(i).setMaxSequence(j2).setAggregateId(str).m3719build(), bufferedAggregateEventStream);
        return bufferedAggregateEventStream;
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public CompletableFuture<Long> getLastToken() {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver(NO_TOKEN_AVAILABLE);
        this.eventStore.getLastToken(GetLastTokenRequest.newBuilder().m3860build(), futureStreamObserver);
        return futureStreamObserver.thenApply((v0) -> {
            return v0.getToken();
        });
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public CompletableFuture<Long> getFirstToken() {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver(NO_TOKEN_AVAILABLE);
        this.eventStore.getFirstToken(GetFirstTokenRequest.newBuilder().m3813build(), futureStreamObserver);
        return futureStreamObserver.thenApply(trackingToken -> {
            return Long.valueOf(Math.max(trackingToken.getToken(), 0L) - 1);
        });
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public CompletableFuture<Long> getTokenAt(long j) {
        FutureStreamObserver futureStreamObserver = new FutureStreamObserver(NO_TOKEN_AVAILABLE);
        this.eventStore.getTokenAt(GetTokenAtRequest.newBuilder().setInstant(j).m3907build(), futureStreamObserver);
        return futureStreamObserver.thenApply(trackingToken -> {
            return Long.valueOf(Math.max(trackingToken.getToken(), 0L) - 1);
        });
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public ResultStream<EventQueryResultEntry> queryEvents(String str, boolean z, String str2) {
        return doQueryEvent(str, z, false, str2);
    }

    @Override // io.axoniq.axonserver.connector.event.EventChannel
    public ResultStream<EventQueryResultEntry> querySnapshotEvents(String str, boolean z, String str2) {
        return doQueryEvent(str, z, true, str2);
    }

    private ResultStream<EventQueryResultEntry> doQueryEvent(String str, boolean z, boolean z2, String str2) {
        StreamObserver<QueryEventsResponse> eventQueryResponseStream = new EventQueryResponseStream(str, z, z2, str2);
        this.eventStore.queryEvents(eventQueryResponseStream);
        eventQueryResponseStream.enableFlowControl();
        return new MappedResultStream(eventQueryResponseStream, queryEventsResponse -> {
            AtomicReference atomicReference = eventQueryResponseStream.columnNames;
            atomicReference.getClass();
            return new EventQueryResultEntryAdapter(queryEventsResponse, atomicReference::get);
        });
    }

    private AggregateEventStream doGetAggregateStream(GetAggregateEventsRequest getAggregateEventsRequest) {
        StreamObserver<Event> bufferedAggregateEventStream = new BufferedAggregateEventStream();
        this.eventStore.listAggregateEvents(getAggregateEventsRequest, bufferedAggregateEventStream);
        return bufferedAggregateEventStream;
    }
}
