/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.stream.reactive;

import java.io.Closeable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

public class PublisherToMessageChannelResultAdapter
implements StreamListenerResultAdapter<Publisher<?>, MessageChannel> {
    private Log log = LogFactory.getLog(PublisherToMessageChannelResultAdapter.class);

    public boolean supports(Class<?> resultType, Class<?> bindingTarget) {
        return Publisher.class.isAssignableFrom(resultType) && MessageChannel.class.isAssignableFrom(bindingTarget);
    }

    public Closeable adapt(Publisher<?> streamListenerResult, MessageChannel bindingTarget) {
        Disposable disposable = Flux.from(streamListenerResult).doOnError(e -> this.log.error((Object)"Error while processing result", e)).retry().subscribe(result -> bindingTarget.send(result instanceof Message ? (Message)result : MessageBuilder.withPayload((Object)result).build()));
        return () -> ((Disposable)disposable).dispose();
    }
}

