package io.awspring.cloud.sqs.listener.sink.adapter;

import io.awspring.cloud.sqs.MessageHeaderUtils;
import io.awspring.cloud.sqs.listener.MessageProcessingContext;
import io.awspring.cloud.sqs.listener.sink.MessageSink;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:io/awspring/cloud/sqs/listener/sink/adapter/MessageGroupingSinkAdapter.class */
public class MessageGroupingSinkAdapter<T> extends AbstractDelegatingMessageListeningSinkAdapter<T> {
    private static final Logger logger = LoggerFactory.getLogger(MessageGroupingSinkAdapter.class);
    private final Function<Message<T>, String> groupingFunction;

    public MessageGroupingSinkAdapter(MessageSink<T> messageSink, Function<Message<T>, String> function) {
        super(messageSink);
        Assert.notNull(function, "groupingFunction cannot be null.");
        this.groupingFunction = function;
    }

    @Override // io.awspring.cloud.sqs.listener.sink.MessageSink
    public CompletableFuture<Void> emit(Collection<Message<T>> collection, MessageProcessingContext<T> messageProcessingContext) {
        logger.trace("Emitting messages {}", MessageHeaderUtils.getId(collection));
        return CompletableFuture.allOf((CompletableFuture[]) ((Map) collection.stream().collect(Collectors.groupingBy(this.groupingFunction))).values().stream().map(list -> {
            return getDelegate().emit(list, messageProcessingContext);
        }).toArray(i -> {
            return new CompletableFuture[i];
        }));
    }
}
