package org.mule.runtime.core.internal.routing.forkjoin;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.Pair;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.message.GroupCorrelation;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
import org.mule.runtime.core.internal.routing.ForkJoinStrategyFactory;
import org.mule.runtime.core.internal.routing.result.CompositeRoutingException;
import org.mule.runtime.core.privileged.exception.EventProcessingException;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.routing.RoutingResult;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/internal/routing/forkjoin/AbstractForkJoinStrategyFactory.class */
public abstract class AbstractForkJoinStrategyFactory implements ForkJoinStrategyFactory {
    public static final String TIMEOUT_EXCEPTION_DESCRIPTION = "Route Timeout";
    public static final String TIMEOUT_EXCEPTION_DETAILED_DESCRIPTION_PREFIX = "Timeout while processing route/part:";
    private final boolean mergeVariables;

    public AbstractForkJoinStrategyFactory() {
        this(true);
    }

    public AbstractForkJoinStrategyFactory(boolean z) {
        this.mergeVariables = z;
    }

    @Override // org.mule.runtime.core.internal.routing.ForkJoinStrategyFactory
    public ForkJoinStrategy createForkJoinStrategy(ProcessingStrategy processingStrategy, int i, boolean z, long j, Scheduler scheduler, ErrorType errorType, boolean z2) {
        Duration ofNanos = j == Long.MAX_VALUE ? Duration.ofNanos(Long.MAX_VALUE) : Duration.ofMillis(j);
        reactor.core.scheduler.Scheduler fromExecutorService = Schedulers.fromExecutorService(scheduler);
        Duration duration = ofNanos;
        return (coreEvent, publisher) -> {
            AtomicInteger atomicInteger = new AtomicInteger();
            CoreEvent.Builder builder = CoreEvent.builder(coreEvent);
            return Flux.from(publisher).map(addSequence(atomicInteger)).flatMapSequential(processRoutePair(processingStrategy, i, z, duration, fromExecutorService, errorType), i).reduce(new Pair(new ArrayList(), false), (pair, pair2) -> {
                ((List) pair.getFirst()).add(pair2);
                return new Pair((List) pair.getFirst(), Boolean.valueOf(((Boolean) pair.getSecond()).booleanValue() || ((Boolean) ((CoreEvent) pair2.getFirst()).getError().map(error -> {
                    return Boolean.valueOf(!isOriginalError(error, coreEvent.getError()));
                }).orElse(false)).booleanValue()));
            }).doOnNext(pair3 -> {
                if (((Boolean) pair3.getSecond()).booleanValue()) {
                    throw Exceptions.propagate(createCompositeRoutingException((List) ((List) pair3.getFirst()).stream().map(pair3 -> {
                        return removeOriginalError(pair3, coreEvent.getError());
                    }).collect(Collectors.toList()), z2));
                }
            }).map(pair4 -> {
                return (List) ((List) pair4.getFirst()).stream().map((v0) -> {
                    return v0.getFirst();
                }).collect(Collectors.toList());
            }).doOnNext(mergeVariables(coreEvent, builder)).map(createResultEvent(coreEvent, builder));
        };
    }

    private boolean isOriginalError(Error error, Optional<Error> optional) {
        return ((Boolean) optional.map(error2 -> {
            return Boolean.valueOf(error2.equals(error));
        }).orElse(false)).booleanValue();
    }

    private Pair<CoreEvent, EventProcessingException> removeOriginalError(Pair<CoreEvent, EventProcessingException> pair, Optional<Error> optional) {
        CoreEvent coreEvent = (CoreEvent) pair.getFirst();
        EventProcessingException eventProcessingException = (EventProcessingException) pair.getSecond();
        return (Pair) coreEvent.getError().map(error -> {
            return isOriginalError(error, optional) ? new Pair(CoreEvent.builder(coreEvent).error((Error) null).build(), eventProcessingException) : new Pair(coreEvent, eventProcessingException);
        }).orElse(pair);
    }

    protected abstract Function<List<CoreEvent>, CoreEvent> createResultEvent(CoreEvent coreEvent, CoreEvent.Builder builder);

    private Function<ForkJoinStrategy.RoutingPair, ForkJoinStrategy.RoutingPair> addSequence(AtomicInteger atomicInteger) {
        return routingPair -> {
            return ForkJoinStrategy.RoutingPair.of(CoreEvent.builder(routingPair.getEvent()).groupCorrelation(Optional.of(GroupCorrelation.of(atomicInteger.getAndIncrement()))).build(), routingPair.getRoute());
        };
    }

    private Function<ForkJoinStrategy.RoutingPair, Publisher<Pair<CoreEvent, EventProcessingException>>> processRoutePair(ProcessingStrategy processingStrategy, int i, boolean z, Duration duration, reactor.core.scheduler.Scheduler scheduler, ErrorType errorType) {
        return routingPair -> {
            return Flux.from(MessageProcessors.processWithChildContextDontComplete(routingPair.getEvent(), applyProcessingStrategy(processingStrategy, publisher -> {
                return Flux.from(publisher).transform(routingPair.getRoute());
            }, i), Optional.empty())).timeout(duration, onTimeout(processingStrategy, z, errorType, routingPair), scheduler).map(coreEvent -> {
                return new Pair(CoreEvent.builder(coreEvent).removeInternalParameter("error.context").build(), (Object) null);
            }).onErrorResume(MessagingException.class, messagingException -> {
                return getPublisher(z, messagingException);
            });
        };
    }

    private Publisher<Pair<CoreEvent, EventProcessingException>> getPublisher(boolean z, EventProcessingException eventProcessingException) {
        return z ? Mono.just(new Pair(eventProcessingException.getEvent(), eventProcessingException)) : Mono.error(eventProcessingException);
    }

    private Mono<CoreEvent> onTimeout(ProcessingStrategy processingStrategy, boolean z, ErrorType errorType, ForkJoinStrategy.RoutingPair routingPair) {
        return Mono.defer(() -> {
            return z ? Mono.just(createTimeoutErrorEvent(errorType, routingPair)) : Mono.error(new TimeoutException(buildDetailedDescription(routingPair)));
        }).transform(processingStrategy.onPipeline(publisher -> {
            return publisher;
        }));
    }

    private ReactiveProcessor applyProcessingStrategy(ProcessingStrategy processingStrategy, ReactiveProcessor reactiveProcessor, int i) {
        return i > 1 ? processingStrategy.onPipeline(reactiveProcessor) : reactiveProcessor;
    }

    private CoreEvent createTimeoutErrorEvent(ErrorType errorType, ForkJoinStrategy.RoutingPair routingPair) {
        String buildDetailedDescription = buildDetailedDescription(routingPair);
        return CoreEvent.builder(routingPair.getEvent()).message(Message.of((Object) null)).error(ErrorBuilder.builder().errorType(errorType).exception(new TimeoutException(buildDetailedDescription)).description(TIMEOUT_EXCEPTION_DESCRIPTION).detailedDescription(buildDetailedDescription).build()).build();
    }

    private String buildDetailedDescription(ForkJoinStrategy.RoutingPair routingPair) {
        return "Timeout while processing route/part: '" + ((GroupCorrelation) routingPair.getEvent().getGroupCorrelation().get()).getSequence() + "'";
    }

    private CompositeRoutingException createCompositeRoutingException(List<Pair<CoreEvent, EventProcessingException>> list, boolean z) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        for (Pair<CoreEvent, EventProcessingException> pair : list) {
            String num = Integer.toString(((GroupCorrelation) ((CoreEvent) pair.getFirst()).getGroupCorrelation().get()).getSequence());
            if (((CoreEvent) pair.getFirst()).getError().isPresent()) {
                linkedHashMap2.put(num, new Pair((Error) ((CoreEvent) pair.getFirst()).getError().get(), (EventProcessingException) pair.getSecond()));
            } else {
                linkedHashMap.put(num, ((CoreEvent) pair.getFirst()).getMessage());
            }
        }
        return z ? new CompositeRoutingException(RoutingResult.routingResultWithException(linkedHashMap, linkedHashMap2)) : new CompositeRoutingException(new RoutingResult(linkedHashMap, (Map) linkedHashMap2.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return (Error) ((Pair) entry.getValue()).getFirst();
        }))));
    }

    private Consumer<List<CoreEvent>> mergeVariables(CoreEvent coreEvent, CoreEvent.Builder builder) {
        return list -> {
            if (this.mergeVariables) {
                HashMap hashMap = new HashMap();
                list.forEach(coreEvent2 -> {
                    coreEvent2.getVariables().forEach((str, typedValue) -> {
                        if (typedValue.equals(coreEvent.getVariables().get(str))) {
                            return;
                        }
                        if (hashMap.containsKey(str)) {
                            addExistingVariable(hashMap, str, typedValue);
                        } else {
                            addNewVariable(hashMap, str, typedValue);
                        }
                    });
                });
                hashMap.forEach((str, typedValue) -> {
                    builder.addVariable(str, typedValue);
                });
            }
        };
    }

    private static void addNewVariable(Map<String, TypedValue<?>> map, String str, TypedValue<?> typedValue) {
        if (typedValue.getValue() instanceof List) {
            map.put(str, new TypedValue<>(new ArrayList((List) typedValue.getValue()), DataType.builder().collectionType(List.class).itemType(typedValue.getDataType().getItemDataType().getType()).build()));
        } else {
            map.put(str, typedValue);
        }
    }

    private static void addExistingVariable(Map<String, TypedValue<?>> map, String str, TypedValue<?> typedValue) {
        if (!(map.get(str).getValue() instanceof List)) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(map.get(str).getValue());
            map.put(str, new TypedValue<>(arrayList, DataType.builder().collectionType(List.class).itemType(map.get(str).getDataType().getType()).build()));
        }
        List list = (List) map.get(str).getValue();
        list.add(typedValue.getValue());
        if (map.get(str).getDataType().getItemDataType().isCompatibleWith(typedValue.getDataType())) {
            map.put(str, new TypedValue<>(list, map.get(str).getDataType()));
        } else {
            map.put(str, new TypedValue<>(list, DataType.builder().collectionType(List.class).build()));
        }
    }
}
