package com.hazelcast.jet.elastic.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.processor.Processors;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.RestHighLevelClient;

/* loaded from: input_file:com/hazelcast/jet/elastic/impl/ElasticSourcePMetaSupplier.class */
public class ElasticSourcePMetaSupplier<T> implements ProcessorMetaSupplier {
    private static final long serialVersionUID = 1;
    private static final int DEFAULT_LOCAL_PARALLELISM = 2;

    @Nonnull
    private final ElasticSourceConfiguration<T> configuration;
    private transient Map<Address, List<Shard>> assignedShards;
    private transient Address ownerAddress;

    public ElasticSourcePMetaSupplier(@Nonnull ElasticSourceConfiguration<T> elasticSourceConfiguration) {
        this.configuration = elasticSourceConfiguration;
    }

    public int preferredLocalParallelism() {
        if (this.configuration.isCoLocatedReadingEnabled() || this.configuration.isSlicingEnabled()) {
            return DEFAULT_LOCAL_PARALLELISM;
        }
        return 1;
    }

    public void init(@Nonnull ProcessorMetaSupplier.Context context) throws Exception {
        ElasticCatClient elasticCatClient = new ElasticCatClient(((RestHighLevelClient) this.configuration.clientFn().get()).getLowLevelClient(), this.configuration.retries());
        try {
            List<Shard> shards = elasticCatClient.shards(((SearchRequest) this.configuration.searchRequestFn().get()).indices());
            if (this.configuration.isCoLocatedReadingEnabled()) {
                this.assignedShards = assignShards(shards, context.partitionAssignment().keySet());
            } else {
                this.ownerAddress = ProcessorMetaSupplier.getOwnerAddress(context, Long.valueOf(context.jobId()));
                this.assignedShards = Collections.emptyMap();
            }
            elasticCatClient.close();
        } catch (Throwable th) {
            try {
                elasticCatClient.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    static Map<Address, List<Shard>> assignShards(Collection<Shard> collection, Collection<Address> collection2) {
        Map map = (Map) collection2.stream().map((v0) -> {
            return v0.getHost();
        }).distinct().collect(Collectors.toMap(Function.identity(), str -> {
            return new ArrayList();
        }));
        ((Map) collection.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.indexShard();
        }, Collectors.mapping((v0) -> {
            return v0.getIp();
        }, Collectors.toList())))).forEach((str2, list) -> {
            Stream stream = list.stream();
            Objects.requireNonNull(map);
            ((List) stream.map((v1) -> {
                return r1.get(v1);
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).min(Comparator.comparingInt((v0) -> {
                return v0.size();
            })).orElseThrow(() -> {
                return new IllegalStateException("Selected members do not contain shard '" + str2 + "'");
            })).add(str2);
        });
        Map map2 = (Map) collection2.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getHost();
        }, Collectors.toList()));
        Map map3 = (Map) collection.stream().collect(Collectors.toMap(shard -> {
            return shard.indexShard() + "@" + shard.getIp();
        }, Function.identity()));
        return (Map) map.entrySet().stream().flatMap(entry -> {
            List list2 = (List) map2.get(entry.getKey());
            List<T> list3 = ((List) entry.getValue()).stream().map(str3 -> {
                return (Shard) map3.get(str3 + "@" + ((String) entry.getKey()));
            }).toList();
            int ceil = (int) Math.ceil(list3.size() / list2.size());
            return IntStream.range(0, list2.size()).mapToObj(i -> {
                return Map.entry((Address) list2.get(i), List.copyOf(list3.subList(i * ceil, Math.min((i + 1) * ceil, list3.size()))));
            });
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    @Nonnull
    public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> list) {
        return (this.configuration.isSlicingEnabled() || this.configuration.isCoLocatedReadingEnabled()) ? address -> {
            return new ElasticSourcePSupplier(this.configuration, this.assignedShards.getOrDefault(address, Collections.emptyList()));
        } : address2 -> {
            return address2.equals(this.ownerAddress) ? new ElasticSourcePSupplier(this.configuration, Collections.emptyList()) : i -> {
                return Collections.nCopies(i, (Processor) Processors.noopP().get());
            };
        };
    }

    public boolean closeIsCooperative() {
        return true;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1650556416:
                if (implMethodName.equals("lambda$get$16e233c9$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/core/ProcessorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(I)Ljava/util/Collection;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/elastic/impl/ElasticSourcePMetaSupplier") && serializedLambda.getImplMethodSignature().equals("(I)Ljava/util/Collection;")) {
                    return i -> {
                        return Collections.nCopies(i, (Processor) Processors.noopP().get());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
