package org.streamingpool.ext.tensorics.streamfactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.reactivex.Flowable;
import io.reactivex.functions.Function;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.reactivestreams.Publisher;
import org.streamingpool.core.service.DiscoveryService;
import org.streamingpool.core.service.StreamFactory;
import org.streamingpool.core.service.StreamId;
import org.streamingpool.core.service.streamid.OverlapBufferStreamId;
import org.streamingpool.ext.tensorics.evaluation.BufferedEvaluation;
import org.streamingpool.ext.tensorics.evaluation.ContinuousEvaluation;
import org.streamingpool.ext.tensorics.evaluation.EvaluationStrategy;
import org.streamingpool.ext.tensorics.evaluation.TriggeredEvaluation;
import org.streamingpool.ext.tensorics.exception.NoBufferedStreamSpecifiedException;
import org.streamingpool.ext.tensorics.expression.UnresolvedStreamIdBasedExpression;
import org.streamingpool.ext.tensorics.streamid.DetailedExpressionStreamId;
import org.tensorics.core.resolve.domain.DetailedExpressionResult;
import org.tensorics.core.resolve.engine.ResolvedContextDidNotGrowException;
import org.tensorics.core.resolve.engine.ResolvingEngine;
import org.tensorics.core.resolve.options.HandleWithFirstCapableAncestorStrategy;
import org.tensorics.core.resolve.options.ResolvingOption;
import org.tensorics.core.tree.domain.Contexts;
import org.tensorics.core.tree.domain.EditableResolvingContext;
import org.tensorics.core.tree.domain.Expression;
import org.tensorics.core.tree.domain.ResolvingContext;
import org.tensorics.core.tree.walking.Trees;

/* loaded from: input_file:org/streamingpool/ext/tensorics/streamfactory/DetailedTensoricsExpressionStreamFactory.class */
public class DetailedTensoricsExpressionStreamFactory implements StreamFactory {
    private static final HandleWithFirstCapableAncestorStrategy EXCEPTION_HANDLING_STRATEGY = new HandleWithFirstCapableAncestorStrategy();
    private static final Function<Object[], Boolean> TRIGGER_CONTEXT_COMBINER = objArr -> {
        return true;
    };
    private static final Function<Object[], ResolvingContext> CONTEXT_COMBINER = objArr -> {
        EditableResolvingContext newResolvingContext = Contexts.newResolvingContext();
        for (Object obj : objArr) {
            if (obj instanceof ExpToValue) {
                ExpToValue expToValue = (ExpToValue) obj;
                newResolvingContext.put(expToValue.node, expToValue.value);
            }
        }
        return newResolvingContext;
    };
    private final ResolvingEngine engine;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/streamingpool/ext/tensorics/streamfactory/DetailedTensoricsExpressionStreamFactory$ExpToValue.class */
    public static final class ExpToValue {
        private final Expression<Object> node;
        private final Object value;

        public ExpToValue(Expression<Object> expression, Object obj) {
            this.node = expression;
            this.value = obj;
        }
    }

    public DetailedTensoricsExpressionStreamFactory(ResolvingEngine resolvingEngine) {
        this.engine = resolvingEngine;
    }

    public <T> Optional<Publisher<T>> create(StreamId<T> streamId, DiscoveryService discoveryService) {
        return !(streamId instanceof DetailedExpressionStreamId) ? Optional.empty() : Optional.of(resolvedStream((DetailedExpressionStreamId) streamId, discoveryService));
    }

    private <T, E extends Expression<T>> Flowable<DetailedExpressionResult<T, E>> resolvedStream(DetailedExpressionStreamId<T, E> detailedExpressionStreamId, DiscoveryService discoveryService) {
        E expression = detailedExpressionStreamId.expression();
        ResolvingContext initialCtx = detailedExpressionStreamId.initialCtx();
        Map<Expression<Object>, StreamId<Object>> streamIdsFrom = streamIdsFrom(detailedExpressionStreamId);
        HashMap hashMap = new HashMap();
        for (Map.Entry<Expression<Object>, StreamId<Object>> entry : streamIdsFrom.entrySet()) {
            hashMap.put(entry.getValue(), Flowable.fromPublisher(discoveryService.discover(entry.getValue())).map(obj -> {
                return new ExpToValue((Expression) entry.getKey(), obj);
            }));
        }
        return triggerObservable(hashMap, detailedExpressionStreamId.evaluationStrategy(), discoveryService).withLatestFrom((Publisher[]) hashMap.values().toArray(new Flowable[0]), CONTEXT_COMBINER).map(resolvingContext -> {
            EditableResolvingContext newResolvingContext = Contexts.newResolvingContext();
            newResolvingContext.putAllNew(resolvingContext);
            newResolvingContext.putAllNew(initialCtx);
            return this.engine.resolveDetailed(expression, newResolvingContext, new ResolvingOption[]{EXCEPTION_HANDLING_STRATEGY});
        });
    }

    @VisibleForTesting
    <T extends Expression<?>> Map<Expression<Object>, StreamId<Object>> streamIdsFrom(DetailedExpressionStreamId<?, T> detailedExpressionStreamId) {
        T expression = detailedExpressionStreamId.expression();
        ResolvingContext initialCtx = detailedExpressionStreamId.initialCtx();
        Set<UnresolvedStreamIdBasedExpression> findNodesOfClass = Trees.findNodesOfClass(expression, UnresolvedStreamIdBasedExpression.class);
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (UnresolvedStreamIdBasedExpression unresolvedStreamIdBasedExpression : findNodesOfClass) {
            try {
                builder.put(unresolvedStreamIdBasedExpression, (StreamId) this.engine.resolve(unresolvedStreamIdBasedExpression.streamIdExpression(), initialCtx, new ResolvingOption[0]));
            } catch (ResolvedContextDidNotGrowException e) {
                throw new RuntimeException(String.format("Context did not grow while resolving the StreamId of expression. This is most probably because the initial context (%s) did not contain the value of the current UnresolvedStreamIdBasedExpression (%s).", initialCtx, unresolvedStreamIdBasedExpression), e);
            }
        }
        return builder.build();
    }

    private static final Flowable<?> triggerObservable(Map<StreamId<?>, ? extends Flowable<?>> map, EvaluationStrategy evaluationStrategy, DiscoveryService discoveryService) {
        if (evaluationStrategy instanceof ContinuousEvaluation) {
            return Flowable.combineLatest(map.values(), TRIGGER_CONTEXT_COMBINER);
        }
        if (!(evaluationStrategy instanceof BufferedEvaluation)) {
            if (evaluationStrategy instanceof TriggeredEvaluation) {
                return Flowable.fromPublisher(discoveryService.discover(((TriggeredEvaluation) evaluationStrategy).triggeringStreamId()));
            }
            throw new IllegalArgumentException("Unknown evaluationStrategy '" + evaluationStrategy + "'. Cannot create trigger Observable.");
        }
        List list = (List) map.entrySet().stream().filter(entry -> {
            return entry.getKey() instanceof OverlapBufferStreamId;
        }).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            throw new NoBufferedStreamSpecifiedException();
        }
        return Flowable.zip(list, (v0) -> {
            return ImmutableSet.of(v0);
        });
    }
}
