package org.mule.runtime.core.internal.routing.forkjoin;

import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.message.ErrorBuilder;
import org.mule.runtime.core.api.message.GroupCorrelation;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.routing.ForkJoinStrategy;
import org.mule.runtime.core.api.routing.ForkJoinStrategyFactory;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.internal.routing.CompositeRoutingException;
import org.mule.runtime.core.internal.routing.RoutingResult;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/internal/routing/forkjoin/AbstractForkJoinStrategyFactory.class */
public abstract class AbstractForkJoinStrategyFactory implements ForkJoinStrategyFactory {
    public static final String TIMEOUT_EXCEPTION_DESCRIPTION = "Route Timeout";
    public static final String TIMEOUT_EXCEPTION_DETAILED_DESCRIPTION_PREFIX = "Timeout while processing route/part:";

    @Override // org.mule.runtime.core.api.routing.ForkJoinStrategyFactory
    public ForkJoinStrategy createForkJoinStrategy(ProcessingStrategy processingStrategy, int i, boolean z, long j, Scheduler scheduler, ErrorType errorType) {
        reactor.core.scheduler.Scheduler fromExecutorService = Schedulers.fromExecutorService(scheduler);
        return (internalEvent, publisher) -> {
            AtomicInteger atomicInteger = new AtomicInteger();
            InternalEvent.Builder builder = InternalEvent.builder(internalEvent);
            return Flux.from(publisher).map(addSequence(atomicInteger)).flatMap(processRoutePair(processingStrategy, i, z, j, fromExecutorService, errorType), i).collectList().doOnNext(Exceptions.checkedConsumer(list -> {
                if (list.stream().anyMatch(internalEvent -> {
                    return internalEvent.getError().isPresent();
                })) {
                    throw createCompositeRoutingException(list);
                }
            })).doOnNext(copyVars(builder)).map(createResultEvent(internalEvent, builder));
        };
    }

    protected abstract Function<List<InternalEvent>, InternalEvent> createResultEvent(InternalEvent internalEvent, InternalEvent.Builder builder);

    private Function<ForkJoinStrategy.RoutingPair, ForkJoinStrategy.RoutingPair> addSequence(AtomicInteger atomicInteger) {
        return routingPair -> {
            return ForkJoinStrategy.RoutingPair.of(InternalEvent.builder(routingPair.getEvent()).groupCorrelation(Optional.of(GroupCorrelation.of(atomicInteger.getAndIncrement()))).build(), routingPair.getRoute());
        };
    }

    private Function<ForkJoinStrategy.RoutingPair, Publisher<? extends InternalEvent>> processRoutePair(ProcessingStrategy processingStrategy, int i, boolean z, long j, reactor.core.scheduler.Scheduler scheduler, ErrorType errorType) {
        return routingPair -> {
            return Flux.from(MessageProcessors.processWithChildContext(routingPair.getEvent(), applyProcessingStrategy(processingStrategy, publisher -> {
                return Flux.from(publisher).transform(routingPair.getRoute()).timeout(Duration.ofMillis(j), Mono.defer(() -> {
                    return z ? Mono.just(createTimeoutErrorEvent(errorType, routingPair)) : Mono.error(new TimeoutException("Timeout while processing route/part: '" + routingPair.getEvent().getGroupCorrelation().get().getSequence() + "'"));
                }), scheduler);
            }, i), Optional.empty())).onErrorResume(MessagingException.class, messagingException -> {
                return z ? Mono.just(messagingException.getEvent()) : Mono.error(messagingException);
            });
        };
    }

    private ReactiveProcessor applyProcessingStrategy(ProcessingStrategy processingStrategy, ReactiveProcessor reactiveProcessor, int i) {
        return i > 1 ? processingStrategy.onPipeline(reactiveProcessor) : reactiveProcessor;
    }

    private InternalEvent createTimeoutErrorEvent(ErrorType errorType, ForkJoinStrategy.RoutingPair routingPair) {
        return InternalEvent.builder(routingPair.getEvent()).message(Message.of(null)).error(ErrorBuilder.builder().errorType(errorType).exception(new TimeoutException()).description(TIMEOUT_EXCEPTION_DESCRIPTION).detailedDescription("Timeout while processing route/part: '" + routingPair.getEvent().getGroupCorrelation().get().getSequence() + "'").build()).build();
    }

    private CompositeRoutingException createCompositeRoutingException(List<InternalEvent> list) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        for (InternalEvent internalEvent : list) {
            String num = Integer.toString(internalEvent.getGroupCorrelation().get().getSequence());
            if (internalEvent.getError().isPresent()) {
                linkedHashMap2.put(num, internalEvent.getError().get());
            } else {
                linkedHashMap.put(num, internalEvent.getMessage());
            }
        }
        return new CompositeRoutingException(new RoutingResult(linkedHashMap, linkedHashMap2));
    }

    private Consumer<List<InternalEvent>> copyVars(InternalEvent.Builder builder) {
        return list -> {
            list.stream().forEach(internalEvent -> {
                internalEvent.getVariables().entrySet().stream().forEach(entry -> {
                    builder.addVariable((String) entry.getKey(), entry.getValue());
                });
            });
        };
    }
}
