/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.function.stream.config;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.springframework.cloud.function.context.FunctionCatalog;
import org.springframework.cloud.function.context.message.MessageUtils;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

public class SupplierInvokingMessageProducer<T>
extends MessageProducerSupport {
    private final FunctionCatalog functionCatalog;
    private final Set<String> suppliers = new HashSet<String>();
    private final Map<String, Disposable> disposables = new HashMap<String, Disposable>();
    private String defaultRoute;

    public SupplierInvokingMessageProducer(FunctionCatalog registry, String defaultRoute) {
        this.functionCatalog = registry;
        this.defaultRoute = defaultRoute;
        this.setOutputChannelName("output");
    }

    protected void doStart() {
        if (StringUtils.hasText((String)this.defaultRoute)) {
            this.start(this.defaultRoute);
        } else {
            for (String name : this.functionCatalog.getNames(Supplier.class)) {
                this.start(name);
            }
        }
    }

    protected void doStop() {
        for (String name : new HashSet<String>(this.suppliers)) {
            this.stop(name);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(String name) {
        if (this.disposables.containsKey(name)) {
            Map<String, Disposable> map = this.disposables;
            synchronized (map) {
                if (this.disposables.containsKey(name)) {
                    try {
                        this.disposables.get(name).dispose();
                    }
                    finally {
                        this.disposables.remove(name);
                        this.suppliers.remove(name);
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(String name) {
        if (!this.disposables.containsKey(name)) {
            Map<String, Disposable> map = this.disposables;
            synchronized (map) {
                Supplier supplier;
                if (!this.disposables.containsKey(name) && (supplier = (Supplier)this.functionCatalog.lookup(Supplier.class, name)) != null) {
                    this.suppliers.add(name);
                    this.disposables.put(name, Flux.from((Publisher)((Publisher)supplier.get())).subscribeOn(Schedulers.elastic()).subscribe(m -> this.send(name, m)));
                }
            }
        }
    }

    private void send(String name, Object payload) {
        Supplier supplier = (Supplier)this.functionCatalog.lookup(Supplier.class, name);
        Message message = MessageUtils.unpack((Object)supplier, (Object)payload);
        message = MessageBuilder.fromMessage((Message)message).setHeaderIfAbsent("stream_routekey", (Object)name).build();
        this.getOutputChannel().send(message);
    }
}

