package org.glassfish.jersey.microprofile.restclient;

import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.sse.InboundSseEvent;
import org.glassfish.jersey.client.ChunkParser;
import org.glassfish.jersey.client.ChunkedInput;
import org.glassfish.jersey.internal.PropertiesDelegate;
import org.glassfish.jersey.media.sse.InboundEvent;
import org.glassfish.jersey.message.MessageBodyWorkers;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

/* loaded from: input_file:org/glassfish/jersey/microprofile/restclient/SseEventPublisher.class */
public class SseEventPublisher extends ChunkedInput<InboundEvent> implements Publisher<InboundEvent> {
    private final Executor executor;
    private final Type genericType;
    private final int bufferSize;
    private static final ChunkParser SSE_EVENT_PARSER = ChunkedInput.createMultiParser("\n\n", "\r\n\r\n");
    private static final Logger LOG = Logger.getLogger(SseEventPublisher.class.getName());

    /* JADX INFO: Access modifiers changed from: package-private */
    public SseEventPublisher(InputStream inputStream, Type type, Annotation[] annotationArr, MediaType mediaType, MultivaluedMap<String, String> multivaluedMap, MessageBodyWorkers messageBodyWorkers, PropertiesDelegate propertiesDelegate, ExecutorService executorService, int i) {
        super(InboundEvent.class, inputStream, annotationArr, mediaType, multivaluedMap, messageBodyWorkers, propertiesDelegate);
        super.setParser(SSE_EVENT_PARSER);
        this.executor = executorService;
        this.genericType = type;
        this.bufferSize = i;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super InboundEvent> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException("The subscriber is `null`");
        }
        SseEventSubscription sseEventSubscription = new SseEventSubscription(subscriber, this.bufferSize);
        subscriber.onSubscribe(sseEventSubscription);
        try {
            this.executor.execute(() -> {
                if (this.genericType instanceof ParameterizedType) {
                    Type type = ((ParameterizedType) this.genericType).getActualTypeArguments()[0];
                    try {
                        if (!type.equals(InboundSseEvent.class)) {
                            while (true) {
                                InboundEvent read = read();
                                if (read == null) {
                                    break;
                                } else {
                                    sseEventSubscription.emit(read.readData((Class) type));
                                }
                            }
                        } else {
                            while (true) {
                                InboundEvent read2 = read();
                                if (read2 == null) {
                                    break;
                                } else {
                                    sseEventSubscription.emit(read2);
                                }
                            }
                        }
                        sseEventSubscription.onCompletion();
                    } catch (Throwable th) {
                        sseEventSubscription.onError(th);
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            LOG.log(Level.WARNING, "Executor {0} rejected emit event task", this.executor);
        }
    }
}
