/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.function.stream.config;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public abstract class AbstractStreamListeningInvoker
implements SmartInitializingSingleton {
    private final FunctionInspector functionInspector;
    private final FunctionCatalog functionCatalog;
    private final CompositeMessageConverterFactory converterFactory;
    private MessageConverter converter;
    private static final Object UNCONVERTED = new Object();
    private final String defaultRoute;
    private final Map<String, FluxMessageProcessor> processors = new HashMap<String, FluxMessageProcessor>();
    private static final FluxMessageProcessor NOENDPOINT = flux -> Flux.empty();
    private boolean share;

    public AbstractStreamListeningInvoker(FunctionCatalog functionCatalog, FunctionInspector functionInspector, CompositeMessageConverterFactory converterFactory, String defaultRoute, boolean share) {
        this.functionCatalog = functionCatalog;
        this.functionInspector = functionInspector;
        this.converterFactory = converterFactory;
        this.defaultRoute = defaultRoute;
        this.share = share;
    }

    public void afterSingletonsInstantiated() {
        this.converter = this.converterFactory.getMessageConverterForAllRegistered();
    }

    protected Mono<Void> consumer(String name, Flux<Message<?>> flux) {
        Consumer consumer = (Consumer)this.functionCatalog.lookup(Consumer.class, name);
        flux = flux.publish().refCount(2);
        consumer.accept(flux.map(message -> this.convertInput(consumer).apply((Message<?>)message)).filter(transformed -> transformed != UNCONVERTED));
        return flux.then(Mono.empty());
    }

    protected Flux<Message<?>> function(String name, Flux<Message<?>> flux) {
        Function function = (Function)this.functionCatalog.lookup(Function.class, name);
        return flux.publish(values -> {
            Publisher result = (Publisher)function.apply(values.map(message -> this.convertInput(function).apply((Message<?>)message)));
            if (this.functionInspector.isMessage((Object)function)) {
                result = Flux.from((Publisher)result).map(message -> MessageUtils.unpack((Object)function, (Object)message));
            }
            Flux<Map<String, Object>> aggregate = this.headers((Flux<Message<?>>)values);
            return aggregate.withLatestFrom(result, (map, payload) -> this.message((Map<String, Object>)map, payload));
        });
    }

    private Flux<Map<String, Object>> headers(Flux<Message<?>> flux) {
        return flux.map(message -> message.getHeaders());
    }

    private Message<?> message(Map<String, Object> headers, Object result) {
        return result instanceof Message ? MessageBuilder.fromMessage((Message)((Message)result)).copyHeadersIfAbsent(headers).build() : MessageBuilder.withPayload((Object)result).copyHeadersIfAbsent(headers).build();
    }

    private Function<Message<?>, Object> convertInput(Object function) {
        Class inputType = this.functionInspector.getInputType(function);
        return m -> {
            if (this.functionInspector.isMessage(function)) {
                return MessageUtils.create((Object)function, (Object)this.convertPayload(inputType, (Message<?>)m), (Map)m.getHeaders());
            }
            return this.convertPayload(inputType, (Message<?>)m);
        };
    }

    protected Object convertPayload(Class<?> inputType, Message<?> m) {
        Object result = inputType.isAssignableFrom(m.getPayload().getClass()) ? m.getPayload() : this.converter.fromMessage(m, inputType);
        if (result == null) {
            result = UNCONVERTED;
        }
        return result;
    }

    private Flux<Message<?>> balance(List<String> names, Flux<Message<?>> flux) {
        if (names.isEmpty()) {
            return Flux.empty();
        }
        flux = flux.hide();
        Flux result = Flux.empty();
        if (names.size() > 1) {
            if (this.share) {
                flux = flux.publish().refCount(names.size());
            } else {
                return Flux.error((Throwable)new IllegalStateException("Multiple matches and share disabled: " + names));
            }
        }
        for (String name : names) {
            if (this.functionCatalog.lookup(Consumer.class, name) != null) {
                result = result.mergeWith((Publisher)this.consumer(name, flux).thenMany((Publisher)Flux.empty()));
                continue;
            }
            result = result.mergeWith(this.function(name, flux));
        }
        return result;
    }

    protected FluxMessageProcessor select(Message<?> input) {
        FluxMessageProcessor processor = null;
        if (input.getHeaders().containsKey((Object)"stream_routekey")) {
            String key = (String)input.getHeaders().get((Object)"stream_routekey");
            processor = this.stash(key);
        }
        if (processor == null && this.defaultRoute != null) {
            processor = this.stash(this.defaultRoute);
        }
        if (processor == null) {
            LinkedHashSet names = new LinkedHashSet(this.functionCatalog.getNames(Function.class));
            names.addAll(this.functionCatalog.getNames(Consumer.class));
            ArrayList<String> matches = new ArrayList<String>();
            if (names.size() == 1) {
                String key = (String)names.iterator().next();
                processor = this.stash(key);
            } else {
                for (String candidate : names) {
                    Class inputType;
                    Object value;
                    Object function = this.functionCatalog.lookup(Function.class, candidate);
                    if (function == null) {
                        function = this.functionCatalog.lookup(Consumer.class, candidate);
                    }
                    if (function == null || (value = this.convertPayload(inputType = this.functionInspector.getInputType(function), input)) == null || !inputType.isInstance(value)) continue;
                    matches.add(candidate);
                }
                if (matches.size() == 1) {
                    processor = this.stash((String)matches.iterator().next());
                } else {
                    return flux -> this.balance(matches, flux);
                }
            }
        }
        if (processor == null) {
            return NOENDPOINT;
        }
        return processor;
    }

    private FluxMessageProcessor stash(String key) {
        if (this.functionCatalog.lookup(Function.class, key) != null) {
            if (!this.processors.containsKey(key)) {
                this.processors.put(key, flux -> this.function(key, flux));
            }
            return this.processors.get(key);
        }
        if (this.functionCatalog.lookup(Consumer.class, key) != null) {
            if (!this.processors.containsKey(key)) {
                this.processors.put(key, flux -> this.consumer(key, flux).thenMany((Publisher)Flux.empty()));
            }
            return this.processors.get(key);
        }
        return null;
    }

    static interface FluxMessageProcessor {
        public Flux<Message<?>> process(Flux<Message<?>> var1);
    }
}

