/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.AutoValue_StateFetchingIterators_CachingStateIterable_Block;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.WeightedList;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
import org.apache.beam.sdk.util.Weighted;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.AbstractIterator;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.checkerframework.dataflow.qual.Pure;

public class StateFetchingIterators {
    private StateFetchingIterators() {
    }

    public static <T> @UnknownKeyFor @NonNull @Initialized CachingStateIterable<T> readAllAndDecodeStartingFrom(/*
     * Issues handling annotations - annotations may be inaccurate
     */
    @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?, @UnknownKeyFor @UnknownKeyFor @Nullable @Initialized @NonNull @Initialized ?> cache, @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
        return new CachingStateIterable(cache, beamFnStateClient, stateRequestForFirstChunk, valueCoder);
    }

    public static <T> @UnknownKeyFor @NonNull @Initialized UncachedStateIterable<T> readAllAndDecodeStartingFrom(@UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
        return new UncachedStateIterable<T>(beamFnStateClient, stateRequestForFirstChunk, valueCoder);
    }

    @VisibleForTesting
    static class LazyBlockingStateFetchingIterator
    implements PrefetchableIterator<ByteString> {
        private final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient;
        private final // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk;
        private @UnknownKeyFor @NonNull @Initialized ByteString continuationToken;
        private @UnknownKeyFor @NonNull @Initialized CompletableFuture<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateResponse> prefetchedResponse;

        LazyBlockingStateFetchingIterator(@UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk) {
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequestForFirstChunk;
            this.continuationToken = stateRequestForFirstChunk.getGet().getContinuationToken();
        }

        @javax.annotation.Nullable
        public @UnknownKeyFor @Nullable @Initialized ByteString getContinuationToken() {
            return this.continuationToken;
        }

        public void seekToContinuationToken(@javax.annotation.Nullable @UnknownKeyFor @Nullable @Initialized ByteString continuationToken) {
            if (Objects.equals(this.continuationToken, continuationToken)) {
                return;
            }
            this.continuationToken = continuationToken;
            this.prefetchedResponse = null;
        }

        public @UnknownKeyFor @NonNull @Initialized boolean isReady() {
            if (this.prefetchedResponse == null) {
                return this.continuationToken == null;
            }
            return this.prefetchedResponse.isDone();
        }

        public void prefetch() {
            if (this.continuationToken != null && this.prefetchedResponse == null) {
                this.prefetchedResponse = this.loadPrefetchedResponse(this.continuationToken);
            }
        }

        public @UnknownKeyFor @NonNull @Initialized CompletableFuture<// Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateResponse> loadPrefetchedResponse(@UnknownKeyFor @NonNull @Initialized ByteString continuationToken) {
            return this.beamFnStateClient.handle(this.stateRequestForFirstChunk.toBuilder().setGet(BeamFnApi.StateGetRequest.newBuilder().setContinuationToken(continuationToken)));
        }

        @Pure
        public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
            return this.continuationToken != null;
        }

        public @UnknownKeyFor @NonNull @Initialized ByteString next() {
            BeamFnApi.StateResponse stateResponse;
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            this.prefetch();
            try {
                stateResponse = this.prefetchedResponse.get();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
            catch (ExecutionException e) {
                if (e.getCause() == null) {
                    throw new IllegalStateException(e);
                }
                Throwables.throwIfUnchecked((Throwable)e.getCause());
                throw new IllegalStateException(e.getCause());
            }
            this.prefetchedResponse = null;
            ByteString tokenFromResponse = stateResponse.getGet().getContinuationToken();
            if (ByteString.EMPTY.equals((Object)tokenFromResponse)) {
                this.continuationToken = null;
            } else {
                this.continuationToken = tokenFromResponse;
                this.prefetch();
            }
            return stateResponse.getGet().getData();
        }
    }

    static class CachingStateIterable<@UnknownKeyFor T>
    extends PrefetchableIterables.Default<T> {
        private final @UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized IterableCacheKey, @UnknownKeyFor @NonNull @Initialized Blocks<T>> cache;
        private final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient;
        private final // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk;
        private final @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder;

        public CachingStateIterable(@UnknownKeyFor @NonNull @Initialized Cache<@UnknownKeyFor @NonNull @Initialized IterableCacheKey, @UnknownKeyFor @NonNull @Initialized Blocks<T>> cache, @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
            this.cache = cache;
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequestForFirstChunk;
            this.valueCoder = valueCoder;
        }

        public void remove(@UnknownKeyFor @NonNull @Initialized Set<@UnknownKeyFor @NonNull @Initialized Object> toRemoveStructuralValues) {
            if (toRemoveStructuralValues.isEmpty()) {
                return;
            }
            Blocks<T> existing = this.cache.peek(IterableCacheKey.INSTANCE);
            if (existing == null) {
                return;
            }
            if (existing.getBlocks().get(existing.getBlocks().size() - 1).getNextToken() != null) {
                this.cache.remove(IterableCacheKey.INSTANCE);
            }
            List<Block<T>> blocks = existing.getBlocks();
            int totalSize = 0;
            for (Block<T> tBlock : blocks) {
                totalSize += tBlock.getValues().size();
            }
            WeightedList allValues = WeightedList.of(new ArrayList(totalSize), (long)0L);
            for (Block<T> block : blocks) {
                boolean valueRemovedFromBlock = false;
                ArrayList<T> blockValuesToKeep = new ArrayList<T>();
                for (T value : block.getValues()) {
                    if (!toRemoveStructuralValues.contains(this.valueCoder.structuralValue(value))) {
                        blockValuesToKeep.add(value);
                        continue;
                    }
                    valueRemovedFromBlock = true;
                }
                if (valueRemovedFromBlock) {
                    allValues.addAll(blockValuesToKeep, Caches.weigh(block.getValues()));
                    continue;
                }
                allValues.addAll(block.getValues(), block.getWeight());
            }
            this.cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks(Block.mutatedBlock(allValues)));
        }

        public void clearAndAppend(@UnknownKeyFor @NonNull @Initialized List<T> values) {
            if (values.isEmpty()) {
                this.cache.put(IterableCacheKey.INSTANCE, new EmptyBlocks());
            } else {
                this.cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<T>(Block.mutatedBlock(values)));
            }
        }

        public void clearAndAppend(@UnknownKeyFor @NonNull @Initialized WeightedList<T> values) {
            if (values.isEmpty()) {
                this.cache.put(IterableCacheKey.INSTANCE, new EmptyBlocks());
            } else {
                this.cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks<T>(Block.mutatedBlock(values)));
            }
        }

        public @UnknownKeyFor @NonNull @Initialized PrefetchableIterator<T> createIterator() {
            return new CachingStateIterator();
        }

        public void append(@UnknownKeyFor @NonNull @Initialized List<T> values) {
            this.appendHelper(values, -1L);
        }

        public void append(@UnknownKeyFor @NonNull @Initialized WeightedList<T> values) {
            this.appendHelper(values.getBacking(), values.getWeight());
        }

        private void appendHelper(@UnknownKeyFor @NonNull @Initialized List<T> newValues, @UnknownKeyFor @NonNull @Initialized long newWeight) {
            if (newValues.isEmpty()) {
                return;
            }
            Blocks<T> existing = this.cache.peek(IterableCacheKey.INSTANCE);
            if (existing == null) {
                return;
            }
            if (existing.getBlocks().get(existing.getBlocks().size() - 1).getNextToken() != null) {
                this.cache.remove(IterableCacheKey.INSTANCE);
            }
            List<Block<T>> blocks = existing.getBlocks();
            int totalSize = newValues.size();
            for (Block<T> block : blocks) {
                totalSize += block.getValues().size();
            }
            WeightedList allValues = WeightedList.of(new ArrayList(totalSize), (long)0L);
            for (Block<T> block : blocks) {
                allValues.addAll(block.getValues(), block.getWeight());
            }
            if (newWeight < 0L) {
                newWeight = newValues.size() == 1 ? Caches.weigh(newValues.get(0)) : Caches.weigh(newValues);
            }
            allValues.addAll(newValues, newWeight);
            this.cache.put(IterableCacheKey.INSTANCE, new MutatedBlocks(Block.mutatedBlock(allValues)));
        }

        class CachingStateIterator
        implements PrefetchableIterator<T> {
            private final @UnknownKeyFor @NonNull @Initialized LazyBlockingStateFetchingIterator underlyingStateFetchingIterator;
            private final // Could not load outer class - annotation placement on inner may be incorrect
             @UnknownKeyFor @NonNull @Initialized DataStreams.DataStreamDecoder<T> dataStreamDecoder;
            private @UnknownKeyFor @NonNull @Initialized Block<T> currentBlock;
            private @UnknownKeyFor @NonNull @Initialized int currentCachedBlockValueIndex;

            public CachingStateIterator() {
                this.underlyingStateFetchingIterator = new LazyBlockingStateFetchingIterator(CachingStateIterable.this.beamFnStateClient, CachingStateIterable.this.stateRequestForFirstChunk);
                this.dataStreamDecoder = new DataStreams.DataStreamDecoder(CachingStateIterable.this.valueCoder, (PrefetchableIterator)this.underlyingStateFetchingIterator);
                this.currentBlock = Block.fromValues(WeightedList.of(Collections.emptyList(), (long)0L), CachingStateIterable.this.stateRequestForFirstChunk.getGet().getContinuationToken());
                this.currentCachedBlockValueIndex = 0;
            }

            public @UnknownKeyFor @NonNull @Initialized boolean isReady() {
                while (true) {
                    int currentBlockIndex;
                    if (this.currentBlock.getValues().size() > this.currentCachedBlockValueIndex) {
                        return true;
                    }
                    if (this.currentBlock.getNextToken() == null) {
                        return true;
                    }
                    Blocks existing = (Blocks)CachingStateIterable.this.cache.peek(IterableCacheKey.INSTANCE);
                    boolean isFirstBlock = ByteString.EMPTY.equals((Object)this.currentBlock.getNextToken());
                    if (existing == null) {
                        return false;
                    }
                    if (isFirstBlock) {
                        this.currentBlock = existing.getBlocks().get(0);
                        this.currentCachedBlockValueIndex = 0;
                        continue;
                    }
                    List blocks = existing.getBlocks();
                    for (currentBlockIndex = 0; currentBlockIndex < blocks.size() && !this.currentBlock.getNextToken().equals((Object)blocks.get(currentBlockIndex).getNextToken()); ++currentBlockIndex) {
                    }
                    if (currentBlockIndex + 1 >= blocks.size()) break;
                    this.currentBlock = blocks.get(currentBlockIndex + 1);
                    this.currentCachedBlockValueIndex = 0;
                }
                return false;
            }

            public void prefetch() {
                if (!this.isReady()) {
                    this.underlyingStateFetchingIterator.seekToContinuationToken(this.currentBlock.getNextToken());
                    this.underlyingStateFetchingIterator.prefetch();
                }
            }

            @Pure
            public @UnknownKeyFor @NonNull @Initialized boolean hasNext() {
                while (this.currentBlock.getValues().size() <= this.currentCachedBlockValueIndex) {
                    ByteString nextToken = this.currentBlock.getNextToken();
                    if (nextToken == null) {
                        return false;
                    }
                    this.currentBlock = Block.fromValues(WeightedList.of(Collections.emptyList(), (long)0L), ByteString.EMPTY);
                    Blocks existing = (Blocks)CachingStateIterable.this.cache.peek(IterableCacheKey.INSTANCE);
                    boolean isFirstBlock = ByteString.EMPTY.equals((Object)nextToken);
                    if (existing == null) {
                        this.currentBlock = this.loadNextBlock(nextToken);
                        if (isFirstBlock) {
                            CachingStateIterable.this.cache.put(IterableCacheKey.INSTANCE, new BlocksPrefix(Collections.singletonList(this.currentBlock)));
                        }
                    } else if (isFirstBlock) {
                        this.currentBlock = existing.getBlocks().get(0);
                    } else {
                        int currentBlockIndex;
                        Preconditions.checkState((boolean)(existing instanceof BlocksPrefix), (String)"Unexpected blocks type %s, expected a %s.", existing.getClass(), BlocksPrefix.class);
                        List blocks = existing.getBlocks();
                        for (currentBlockIndex = 0; currentBlockIndex < blocks.size() && !nextToken.equals((Object)blocks.get(currentBlockIndex).getNextToken()); ++currentBlockIndex) {
                        }
                        if (currentBlockIndex + 1 < blocks.size()) {
                            this.currentBlock = blocks.get(currentBlockIndex + 1);
                        } else {
                            existing = null;
                            blocks = null;
                            this.currentBlock = this.loadNextBlock(nextToken);
                            existing = (Blocks)CachingStateIterable.this.cache.peek(IterableCacheKey.INSTANCE);
                            if (existing != null && !existing.getBlocks().isEmpty() && nextToken.equals((Object)existing.getBlocks().get(existing.getBlocks().size() - 1).getNextToken())) {
                                ArrayList newBlocks = new ArrayList(currentBlockIndex + 1);
                                newBlocks.addAll(existing.getBlocks());
                                newBlocks.add(this.currentBlock);
                                CachingStateIterable.this.cache.put(IterableCacheKey.INSTANCE, new BlocksPrefix(newBlocks));
                            }
                        }
                    }
                    this.currentCachedBlockValueIndex = 0;
                }
                return true;
            }

            @VisibleForTesting
            @UnknownKeyFor @NonNull @Initialized Block<T> loadNextBlock(@UnknownKeyFor @NonNull @Initialized ByteString continuationToken) {
                this.underlyingStateFetchingIterator.seekToContinuationToken(continuationToken);
                WeightedList values = this.dataStreamDecoder.decodeFromChunkBoundaryToChunkBoundary();
                ByteString nextToken = this.underlyingStateFetchingIterator.getContinuationToken();
                if (ByteString.EMPTY.equals((Object)nextToken)) {
                    nextToken = null;
                }
                return Block.fromValues(values, nextToken);
            }

            public T next() {
                if (!this.hasNext()) {
                    throw new NoSuchElementException();
                }
                return this.currentBlock.getValues().get(this.currentCachedBlockValueIndex++);
            }
        }

        @AutoValue
        static abstract class Block<@UnknownKeyFor T>
        implements Weighted {
            private static final @UnknownKeyFor @NonNull @Initialized Block<@UnknownKeyFor @Nullable @Initialized Void> EMPTY = Block.fromValues(WeightedList.of(Collections.emptyList(), (long)0L), null);

            Block() {
            }

            public static <T> @UnknownKeyFor @NonNull @Initialized Block<T> emptyBlock() {
                return EMPTY;
            }

            public static <T> @UnknownKeyFor @NonNull @Initialized Block<T> mutatedBlock(@UnknownKeyFor @NonNull @Initialized List<T> values) {
                return Block.fromValues(values, null);
            }

            public static <T> @UnknownKeyFor @NonNull @Initialized Block<T> mutatedBlock(@UnknownKeyFor @NonNull @Initialized WeightedList<T> values) {
                return Block.fromValues(values, null);
            }

            public static <T> @UnknownKeyFor @NonNull @Initialized Block<T> fromValues(@UnknownKeyFor @NonNull @Initialized List<T> values, @javax.annotation.Nullable @UnknownKeyFor @Nullable @Initialized ByteString nextToken) {
                return Block.fromValues(WeightedList.of(values, (long)Caches.weigh(values)), nextToken);
            }

            public static <T> @UnknownKeyFor @NonNull @Initialized Block<T> fromValues(@UnknownKeyFor @NonNull @Initialized WeightedList<T> values, @javax.annotation.Nullable @UnknownKeyFor @Nullable @Initialized ByteString nextToken) {
                long weight = values.getWeight() + 24L;
                if (nextToken != null) {
                    if (nextToken.isEmpty()) {
                        nextToken = ByteString.EMPTY;
                    } else {
                        weight += Caches.weigh(nextToken);
                    }
                }
                return new AutoValue_StateFetchingIterators_CachingStateIterable_Block(values.getBacking(), nextToken, weight);
            }

            abstract @UnknownKeyFor @NonNull @Initialized List<T> getValues();

            @javax.annotation.Nullable
            abstract @UnknownKeyFor @Nullable @Initialized ByteString getNextToken();

            public abstract @UnknownKeyFor @NonNull @Initialized long getWeight();
        }

        static class BlocksPrefix<@UnknownKeyFor T>
        extends Blocks<T>
        implements Cache.Shrinkable<BlocksPrefix<T>> {
            private final @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> blocks;

            public @UnknownKeyFor @NonNull @Initialized long getWeight() {
                try {
                    long sum = 8L + (long)this.blocks.size() * 8L;
                    for (Block<T> block : this.blocks) {
                        sum = Math.addExact(sum, block.getWeight());
                    }
                    return sum;
                }
                catch (ArithmeticException e) {
                    return Long.MAX_VALUE;
                }
            }

            BlocksPrefix(@UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> blocks) {
                this.blocks = blocks;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized BlocksPrefix<T> shrink() {
                ArrayList<Block<T>> subList = new ArrayList<Block<T>>(this.getBlocks().subList(0, this.getBlocks().size() / 2));
                if (subList.isEmpty()) {
                    return null;
                }
                return new BlocksPrefix<T>(subList);
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> getBlocks() {
                return this.blocks;
            }
        }

        static class MutatedBlocks<@UnknownKeyFor T>
        extends Blocks<T> {
            private final @UnknownKeyFor @NonNull @Initialized Block<T> wholeBlock;

            MutatedBlocks(@UnknownKeyFor @NonNull @Initialized Block<T> wholeBlock) {
                this.wholeBlock = wholeBlock;
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> getBlocks() {
                return Collections.singletonList(this.wholeBlock);
            }

            public @UnknownKeyFor @NonNull @Initialized long getWeight() {
                return this.wholeBlock.getWeight() + 8L;
            }
        }

        static class EmptyBlocks<@UnknownKeyFor T>
        extends Blocks<T> {
            EmptyBlocks() {
            }

            @Override
            public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> getBlocks() {
                return Collections.singletonList(Block.emptyBlock());
            }

            public @UnknownKeyFor @NonNull @Initialized long getWeight() {
                return 8L;
            }
        }

        static abstract class Blocks<@UnknownKeyFor T>
        implements Weighted {
            Blocks() {
            }

            public abstract @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized Block<T>> getBlocks();
        }
    }

    @VisibleForTesting
    static class IterableCacheKey
    implements Weighted {
        static final @UnknownKeyFor @NonNull @Initialized IterableCacheKey INSTANCE = new IterableCacheKey();

        private IterableCacheKey() {
        }

        public @UnknownKeyFor @NonNull @Initialized long getWeight() {
            return 0L;
        }
    }

    private static class UncachedStateIterable<@UnknownKeyFor T>
    extends PrefetchableIterables.Default<T> {
        private final @UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient;
        private final // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk;
        private final @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder;

        public UncachedStateIterable(@UnknownKeyFor @NonNull @Initialized BeamFnStateClient beamFnStateClient, // Could not load outer class - annotation placement on inner may be incorrect
         @UnknownKeyFor @NonNull @Initialized BeamFnApi.StateRequest stateRequestForFirstChunk, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequestForFirstChunk;
            this.valueCoder = valueCoder;
        }

        public @UnknownKeyFor @NonNull @Initialized PrefetchableIterator<T> createIterator() {
            return new DecodingIterator<T>(new LazyBlockingStateFetchingIterator(this.beamFnStateClient, this.stateRequestForFirstChunk), this.valueCoder);
        }

        private static class DecodingIterator<@UnknownKeyFor T>
        extends AbstractIterator<T>
        implements PrefetchableIterator<T> {
            private final @UnknownKeyFor @NonNull @Initialized PrefetchableIterator<@UnknownKeyFor @NonNull @Initialized ByteString> chunkIterator;
            private final @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder;
            private @UnknownKeyFor @NonNull @Initialized InputStream currentChunk;

            public DecodingIterator(@UnknownKeyFor @NonNull @Initialized PrefetchableIterator<@UnknownKeyFor @NonNull @Initialized ByteString> chunkIterator, @UnknownKeyFor @NonNull @Initialized Coder<T> valueCoder) {
                this.chunkIterator = chunkIterator;
                this.valueCoder = valueCoder;
                this.currentChunk = ByteString.EMPTY.newInput();
            }

            protected T computeNext() {
                try {
                    while (this.currentChunk.available() == 0) {
                        if (this.chunkIterator.hasNext()) {
                            this.currentChunk = ((ByteString)this.chunkIterator.next()).newInput();
                            continue;
                        }
                        return (T)this.endOfData();
                    }
                    return (T)this.valueCoder.decode(this.currentChunk);
                }
                catch (IOException exn) {
                    throw new IllegalStateException(exn);
                }
            }

            public @UnknownKeyFor @NonNull @Initialized boolean isReady() {
                try {
                    return this.currentChunk.available() > 0 || this.chunkIterator.isReady();
                }
                catch (IOException exn) {
                    throw new IllegalStateException(exn);
                }
            }

            public void prefetch() {
                if (!this.isReady()) {
                    this.chunkIterator.prefetch();
                }
            }
        }
    }
}

