package io.fluxcapacitor.javaclient.tracking;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.handling.Handler;
import io.fluxcapacitor.common.handling.HandlerFilter;
import io.fluxcapacitor.common.handling.HandlerInvoker;
import io.fluxcapacitor.common.handling.Invocation;
import io.fluxcapacitor.common.reflection.ReflectionUtils;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.common.ClientUtils;
import io.fluxcapacitor.javaclient.common.exception.FunctionalException;
import io.fluxcapacitor.javaclient.common.exception.TechnicalException;
import io.fluxcapacitor.javaclient.common.serialization.DeserializingMessage;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.publishing.ResultGateway;
import io.fluxcapacitor.javaclient.tracking.client.DefaultTracker;
import io.fluxcapacitor.javaclient.tracking.handling.HandlerFactory;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/DefaultTracking.class */
public class DefaultTracking implements Tracking {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DefaultTracking.class);
    private final MessageType messageType;
    private final ResultGateway resultGateway;
    private final List<ConsumerConfiguration> configurations;
    private final List<? extends BatchInterceptor> generalBatchInterceptors;
    private final Serializer serializer;
    private final HandlerFactory handlerFactory;
    private final Object $lock = new Object[0];
    private final HandlerFilter handlerFilter = ClientUtils::isTrackingHandler;
    private final Set<ConsumerConfiguration> startedConfigurations = new HashSet();
    private final Collection<CompletableFuture<?>> outstandingRequests = new CopyOnWriteArrayList();
    private final AtomicReference<Registration> shutdownFunction = new AtomicReference<>(Registration.noOp());

    @Override // io.fluxcapacitor.javaclient.tracking.Tracking
    public Registration start(FluxCapacitor fluxCapacitor, List<?> list) {
        Registration registration;
        synchronized (this.$lock) {
            registration = (Registration) fluxCapacitor.apply(fluxCapacitor2 -> {
                Map map = (Map) assignHandlersToConsumers(list).entrySet().stream().flatMap(entry -> {
                    List list2 = (List) ((List) entry.getValue()).stream().flatMap(obj -> {
                        return obj instanceof Handler ? Stream.of((Handler) obj) : this.handlerFactory.createHandler(ReflectionUtils.asInstance(obj), ((ConsumerConfiguration) entry.getKey()).getName(), this.handlerFilter).stream();
                    }).collect(Collectors.toList());
                    return list2.isEmpty() ? Stream.empty() : Stream.of(new AbstractMap.SimpleEntry((ConsumerConfiguration) entry.getKey(), list2));
                }).collect(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, (v0) -> {
                    return v0.getValue();
                }));
                if (!Collections.disjoint(map.keySet(), this.startedConfigurations)) {
                    throw new TrackingException("Failed to start tracking. Consumers for some handlers have already started tracking.");
                }
                this.startedConfigurations.addAll(map.keySet());
                Registration registration2 = (Registration) map.entrySet().stream().map(entry2 -> {
                    return startTracking((ConsumerConfiguration) entry2.getKey(), (List) entry2.getValue(), fluxCapacitor2);
                }).reduce((v0, v1) -> {
                    return v0.merge(v1);
                }).orElse(Registration.noOp());
                this.shutdownFunction.updateAndGet(registration3 -> {
                    return registration3.merge(registration2);
                });
                return registration2;
            });
        }
        return registration;
    }

    private Map<ConsumerConfiguration, List<Object>> assignHandlersToConsumers(List<?> list) {
        ArrayList arrayList = new ArrayList(list);
        Map<ConsumerConfiguration, List<Object>> map = (Map) ((LinkedHashMap) Stream.concat(ConsumerConfiguration.configurations((Collection) list.stream().map((v0) -> {
            return v0.getClass();
        }).collect(Collectors.toList())).filter(consumerConfiguration -> {
            return consumerConfiguration.getMessageType() == this.messageType;
        }), this.configurations.stream()).map(consumerConfiguration2 -> {
            return consumerConfiguration2.toBuilder().batchInterceptors(this.generalBatchInterceptors).build();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getName();
        }, Function.identity(), (consumerConfiguration3, consumerConfiguration4) -> {
            if (consumerConfiguration3.equals(consumerConfiguration4)) {
                return consumerConfiguration3.toBuilder().handlerFilter(consumerConfiguration3.getHandlerFilter().or(consumerConfiguration4.getHandlerFilter())).build();
            }
            throw new IllegalStateException(String.format("Consumer name %s is already in use", consumerConfiguration3.getName()));
        }, LinkedHashMap::new))).values().stream().map(consumerConfiguration5 -> {
            List list2 = (List) arrayList.stream().filter(obj -> {
                return consumerConfiguration5.getHandlerFilter().test(obj);
            }).collect(Collectors.toList());
            if (consumerConfiguration5.exclusive()) {
                arrayList.removeAll(list2);
            }
            return Map.entry(consumerConfiguration5, list2);
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        arrayList.removeAll((Collection) map.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).distinct().collect(Collectors.toList()));
        arrayList.forEach(obj -> {
            throw new TrackingException(String.format("Failed to find consumer for %s", obj));
        });
        return map;
    }

    protected Registration startTracking(ConsumerConfiguration consumerConfiguration, List<Handler<DeserializingMessage>> list, FluxCapacitor fluxCapacitor) {
        return DefaultTracker.start(createConsumer(consumerConfiguration, list), consumerConfiguration, fluxCapacitor);
    }

    protected java.util.function.Consumer<List<SerializedMessage>> createConsumer(ConsumerConfiguration consumerConfiguration, List<Handler<DeserializingMessage>> list) {
        return list2 -> {
            DeserializingMessage.handleBatch(this.serializer.deserializeMessages(list2.stream(), this.messageType)).forEach(deserializingMessage -> {
                list.forEach(handler -> {
                    tryHandle(deserializingMessage, handler, consumerConfiguration);
                });
            });
        };
    }

    protected void tryHandle(DeserializingMessage deserializingMessage, Handler<DeserializingMessage> handler, ConsumerConfiguration consumerConfiguration) {
        handler.findInvoker(deserializingMessage).ifPresent(handlerInvoker -> {
            try {
                reportResult(handle(deserializingMessage, handlerInvoker, handler, consumerConfiguration), handlerInvoker, deserializingMessage, consumerConfiguration);
            } catch (Throwable th) {
                reportResult(th, handlerInvoker, deserializingMessage, consumerConfiguration);
                if (!(th instanceof BatchProcessingException)) {
                    throw new BatchProcessingException(deserializingMessage.getIndex());
                }
            }
        });
    }

    protected Object handle(DeserializingMessage deserializingMessage, HandlerInvoker handlerInvoker, Handler<DeserializingMessage> handler, ConsumerConfiguration consumerConfiguration) {
        try {
            Objects.requireNonNull(handlerInvoker);
            Object performInvocation = Invocation.performInvocation(handlerInvoker::invoke);
            return performInvocation instanceof CompletableFuture ? ((CompletableFuture) performInvocation).exceptionally(th -> {
                return deserializingMessage.apply(deserializingMessage2 -> {
                    return processError(th, deserializingMessage, handlerInvoker, handler, consumerConfiguration);
                });
            }) : performInvocation;
        } catch (Throwable th2) {
            return processError(th2, deserializingMessage, handlerInvoker, handler, consumerConfiguration);
        }
    }

    protected Object processError(Throwable th, DeserializingMessage deserializingMessage, HandlerInvoker handlerInvoker, Handler<DeserializingMessage> handler, ConsumerConfiguration consumerConfiguration) {
        return consumerConfiguration.getErrorHandler().handleError(th, String.format("Handler %s failed to handle a %s", handler, deserializingMessage), () -> {
            Objects.requireNonNull(handlerInvoker);
            return Invocation.performInvocation(handlerInvoker::invoke);
        });
    }

    protected void reportResult(Object obj, HandlerInvoker handlerInvoker, DeserializingMessage deserializingMessage, ConsumerConfiguration consumerConfiguration) {
        if (obj instanceof CompletableFuture) {
            ((CompletableFuture) obj).whenComplete((obj2, th) -> {
                try {
                    deserializingMessage.run(deserializingMessage2 -> {
                        reportResult(Optional.ofNullable(th).orElse(obj2), handlerInvoker, deserializingMessage, consumerConfiguration);
                    });
                    if (th != null) {
                        close();
                    }
                } catch (Throwable th) {
                    if (th != null) {
                        close();
                    }
                    throw th;
                }
            });
            return;
        }
        if (shouldSendResponse(handlerInvoker, deserializingMessage, consumerConfiguration)) {
            if (obj instanceof Throwable) {
                obj = ObjectUtils.unwrapException((Throwable) obj);
                if (!(obj instanceof FunctionalException)) {
                    obj = new TechnicalException(String.format("Handler %s failed to handle a %s", handlerInvoker.getMethod(), deserializingMessage), (Throwable) obj);
                }
            }
            SerializedMessage serializedObject = deserializingMessage.getSerializedObject();
            this.resultGateway.respond(obj, serializedObject.getSource(), serializedObject.getRequestId().intValue());
        }
    }

    protected boolean shouldSendResponse(HandlerInvoker handlerInvoker, DeserializingMessage deserializingMessage, ConsumerConfiguration consumerConfiguration) {
        return (deserializingMessage.getSerializedObject().getRequestId() == null || consumerConfiguration.passive() || handlerInvoker.isPassive() || deserializingMessage.getMessageType() == MessageType.RESULT || deserializingMessage.getMessageType() == MessageType.WEBRESPONSE) ? false : true;
    }

    @Override // io.fluxcapacitor.javaclient.tracking.Tracking, java.lang.AutoCloseable
    public void close() {
        synchronized (this.$lock) {
            this.shutdownFunction.get().merge(() -> {
                ClientUtils.waitForResults(Duration.ofSeconds(2L), this.outstandingRequests);
            }).cancel();
        }
    }

    @ConstructorProperties({"messageType", "resultGateway", "configurations", "generalBatchInterceptors", "serializer", "handlerFactory"})
    public DefaultTracking(MessageType messageType, ResultGateway resultGateway, List<ConsumerConfiguration> list, List<? extends BatchInterceptor> list2, Serializer serializer, HandlerFactory handlerFactory) {
        this.messageType = messageType;
        this.resultGateway = resultGateway;
        this.configurations = list;
        this.generalBatchInterceptors = list2;
        this.serializer = serializer;
        this.handlerFactory = handlerFactory;
    }
}
