package org.springframework.core.io.buffer;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/* loaded from: input_file:lib/spring-core-6.2.0.jar:org/springframework/core/io/buffer/OutputStreamPublisher.class */
final class OutputStreamPublisher<T> implements Publisher<T> {
    private static final int DEFAULT_CHUNK_SIZE = 1024;
    private final OutputStreamHandler outputStreamHandler;
    private final ByteMapper<T> byteMapper;
    private final Executor executor;
    private final int chunkSize;

    /* loaded from: input_file:lib/spring-core-6.2.0.jar:org/springframework/core/io/buffer/OutputStreamPublisher$ByteMapper.class */
    public interface ByteMapper<T> {
        T map(int i);

        T map(byte[] bArr, int i, int i2);
    }

    @FunctionalInterface
    /* loaded from: input_file:lib/spring-core-6.2.0.jar:org/springframework/core/io/buffer/OutputStreamPublisher$OutputStreamHandler.class */
    public interface OutputStreamHandler {
        void handle(OutputStream outputStream) throws Exception;
    }

    /* loaded from: input_file:lib/spring-core-6.2.0.jar:org/springframework/core/io/buffer/OutputStreamPublisher$OutputStreamSubscription.class */
    private static final class OutputStreamSubscription<T> extends OutputStream implements Subscription {
        private static final Object READY = new Object();
        private final Subscriber<? super T> actual;
        private final OutputStreamHandler outputStreamHandler;
        private final ByteMapper<T> byteMapper;
        private final int chunkSize;
        private final AtomicLong requested = new AtomicLong();
        private final AtomicReference<Object> parkedThread = new AtomicReference<>();

        @Nullable
        private volatile Throwable error;
        private long produced;

        OutputStreamSubscription(Subscriber<? super T> subscriber, OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper, int i) {
            this.actual = subscriber;
            this.outputStreamHandler = outputStreamHandler;
            this.byteMapper = byteMapper;
            this.chunkSize = i;
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            checkDemandAndAwaitIfNeeded();
            this.actual.onNext(this.byteMapper.map(i));
            this.produced++;
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr) throws IOException {
            write(bArr, 0, bArr.length);
        }

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            checkDemandAndAwaitIfNeeded();
            this.actual.onNext(this.byteMapper.map(bArr, i, i2));
            this.produced++;
        }

        private void checkDemandAndAwaitIfNeeded() throws IOException {
            long j = this.requested.get();
            if (isTerminated(j) || isCancelled(j)) {
                throw new IOException("Subscription has been terminated");
            }
            long j2 = this.produced;
            if (j2 == j) {
                if (j2 > 0) {
                    j = tryProduce(j2);
                    this.produced = 0L;
                }
                while (!isTerminated(j) && !isCancelled(j)) {
                    if (j != 0) {
                        return;
                    }
                    await();
                    j = this.requested.get();
                }
                throw new IOException("Subscription has been terminated");
            }
        }

        private void invokeHandler() {
            Throwable th;
            Throwable th2;
            try {
                BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(this, this.chunkSize);
                try {
                    this.outputStreamHandler.handle(bufferedOutputStream);
                    bufferedOutputStream.close();
                    long tryTerminate = tryTerminate();
                    if (isCancelled(tryTerminate)) {
                        return;
                    }
                    if (!isTerminated(tryTerminate) || (th2 = this.error) == null) {
                        this.actual.onComplete();
                    } else {
                        this.actual.onError(th2);
                    }
                } finally {
                }
            } catch (Exception e) {
                long tryTerminate2 = tryTerminate();
                if (isCancelled(tryTerminate2)) {
                    return;
                }
                if (!isTerminated(tryTerminate2) || (th = this.error) == null) {
                    this.actual.onError(e);
                } else {
                    this.actual.onError(th);
                }
            }
        }

        @Override // org.reactivestreams.Subscription
        public void request(long j) {
            if (j > 0) {
                if (addCap(j) == 0) {
                    resume();
                    return;
                }
                return;
            }
            this.error = new IllegalArgumentException("request should be a positive number");
            long tryTerminate = tryTerminate();
            if (isTerminated(tryTerminate) || isCancelled(tryTerminate) || tryTerminate > 0) {
                return;
            }
            resume();
        }

        @Override // org.reactivestreams.Subscription
        public void cancel() {
            long tryCancel = tryCancel();
            if (isCancelled(tryCancel) || tryCancel > 0) {
                return;
            }
            resume();
        }

        private void await() {
            Thread currentThread = Thread.currentThread();
            while (true) {
                Object obj = this.parkedThread.get();
                if (obj == READY) {
                    this.parkedThread.lazySet(null);
                    return;
                } else {
                    if (obj != null && obj != currentThread) {
                        throw new IllegalStateException("Only one (Virtual)Thread can await!");
                    }
                    if (this.parkedThread.compareAndSet(null, currentThread)) {
                        LockSupport.park();
                    }
                }
            }
        }

        private void resume() {
            Object andSet;
            if (this.parkedThread.get() == READY || (andSet = this.parkedThread.getAndSet(READY)) == READY) {
                return;
            }
            LockSupport.unpark((Thread) andSet);
        }

        private long tryCancel() {
            long j;
            do {
                j = this.requested.get();
                if (isCancelled(j)) {
                    return j;
                }
            } while (!this.requested.compareAndSet(j, Long.MIN_VALUE));
            return j;
        }

        private long tryTerminate() {
            long j;
            do {
                j = this.requested.get();
                if (isCancelled(j) || isTerminated(j)) {
                    return j;
                }
            } while (!this.requested.compareAndSet(j, -1L));
            return j;
        }

        private long tryProduce(long j) {
            long j2;
            long j3;
            do {
                j2 = this.requested.get();
                if (isTerminated(j2) || isCancelled(j2)) {
                    return j2;
                }
                if (j2 == Long.MAX_VALUE) {
                    return Long.MAX_VALUE;
                }
                j3 = j2 - j;
                if (j3 < 0) {
                    j3 = 0;
                }
            } while (!this.requested.compareAndSet(j2, j3));
            return j3;
        }

        private long addCap(long j) {
            long j2;
            do {
                j2 = this.requested.get();
                if (isTerminated(j2) || isCancelled(j2) || j2 == Long.MAX_VALUE) {
                    return j2;
                }
            } while (!this.requested.compareAndSet(j2, addCap(j2, j)));
            return j2;
        }

        private static boolean isTerminated(long j) {
            return j == -1;
        }

        private static boolean isCancelled(long j) {
            return j == Long.MIN_VALUE;
        }

        private static long addCap(long j, long j2) {
            long j3 = j + j2;
            if (j3 < 0) {
                return Long.MAX_VALUE;
            }
            return j3;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OutputStreamPublisher(OutputStreamHandler outputStreamHandler, ByteMapper<T> byteMapper, Executor executor, @Nullable Integer num) {
        Assert.notNull(outputStreamHandler, "OutputStreamHandler must not be null");
        Assert.notNull(byteMapper, "ByteMapper must not be null");
        Assert.notNull(executor, "Executor must not be null");
        Assert.isTrue(num == null || num.intValue() > 0, "ChunkSize must be larger than 0");
        this.outputStreamHandler = outputStreamHandler;
        this.byteMapper = byteMapper;
        this.executor = executor;
        this.chunkSize = num != null ? num.intValue() : 1024;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super T> subscriber) {
        Objects.requireNonNull(subscriber, "Subscriber must not be null");
        OutputStreamSubscription outputStreamSubscription = new OutputStreamSubscription(subscriber, this.outputStreamHandler, this.byteMapper, this.chunkSize);
        subscriber.onSubscribe(outputStreamSubscription);
        Executor executor = this.executor;
        Objects.requireNonNull(outputStreamSubscription);
        executor.execute(outputStreamSubscription::invokeHandler);
    }
}
