package com.hazelcast.jet.kafka.connect.impl;

import com.hazelcast.function.FunctionEx;
import com.hazelcast.internal.metrics.DynamicMetricsProvider;
import com.hazelcast.internal.metrics.MetricDescriptor;
import com.hazelcast.internal.metrics.MetricsCollectionContext;
import com.hazelcast.internal.metrics.impl.ProviderHelper;
import com.hazelcast.internal.util.Timer;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.EventTimeMapper;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.kafka.connect.impl.processorsupplier.ReadKafkaConnectProcessorSupplier;
import com.hazelcast.jet.retry.RetryStrategy;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.kafka.connect.source.SourceRecord;

/* loaded from: input_file:com/hazelcast/jet/kafka/connect/impl/ReadKafkaConnectP.class */
public class ReadKafkaConnectP<T> extends AbstractProcessor implements DynamicMetricsProvider {
    private transient SourceConnectorWrapper sourceConnectorWrapper;
    private final EventTimeMapper<T> eventTimeMapper;
    private final FunctionEx<SourceRecord, T> projectionFn;
    private Properties propertiesFromUser;
    private boolean snapshotInProgress;
    private Traverser<Map.Entry<BroadcastKey<String>, ? extends Serializable>> snapshotTraverser;
    private boolean snapshotsEnabled;
    private int globalProcessorIndex;
    private int localProcessorIndex;
    private int processorOrder;
    private RetryStrategy retryStrategy;
    private ILogger logger = Logger.getLogger(ReadKafkaConnectP.class);
    private Traverser<?> traverser = Traversers.empty();
    private final LocalKafkaConnectStatsImpl localKafkaConnectStats = new LocalKafkaConnectStatsImpl();
    private final AtomicInteger counter = new AtomicInteger();
    private volatile boolean active = true;

    public ReadKafkaConnectP(@Nonnull EventTimePolicy<? super T> eventTimePolicy, @Nonnull FunctionEx<SourceRecord, T> functionEx) {
        Objects.requireNonNull(eventTimePolicy, "eventTimePolicy is required");
        Objects.requireNonNull(functionEx, "projectionFn is required");
        this.eventTimeMapper = new EventTimeMapper<>(eventTimePolicy);
        this.projectionFn = functionEx;
        this.eventTimeMapper.addPartitions(1);
    }

    protected void init(@Nonnull Processor.Context context) {
        this.logger = getLogger();
        this.globalProcessorIndex = context.globalProcessorIndex();
        this.localProcessorIndex = context.localProcessorIndex();
        this.logger.info("Entering ReadKafkaConnectP init processorOrder=" + this.processorOrder + " localProcessorIndex= " + this.localProcessorIndex + ", globalProcessorIndex=" + this.globalProcessorIndex + ", snapshotsEnabled=" + this.snapshotsEnabled);
        if (this.sourceConnectorWrapper == null) {
            this.sourceConnectorWrapper = new SourceConnectorWrapper(this.propertiesFromUser, this.processorOrder, context, this.retryStrategy);
            this.sourceConnectorWrapper.setActiveStatusSetter((v1) -> {
                setActive(v1);
            });
        }
        this.snapshotsEnabled = context.snapshottingEnabled();
        Util.getNodeEngine(context.hazelcastInstance()).getMetricsRegistry().registerDynamicMetricsProvider(this);
    }

    public boolean isCooperative() {
        return false;
    }

    public boolean complete() {
        if (!this.active) {
            emitFromTraverser(this.eventTimeMapper.flatMapIdle());
            return false;
        }
        if (!configurationReceived() || this.snapshotInProgress || !emitFromTraverser(this.traverser) || this.sourceConnectorWrapper.waitNeeded()) {
            return false;
        }
        long nanos = Timer.nanos();
        List<SourceRecord> poll = this.sourceConnectorWrapper.poll();
        this.logger.fine("Total polled record size " + this.counter.addAndGet(poll.size()));
        this.localKafkaConnectStats.addSourceRecordPollDuration(Duration.ofNanos(Timer.nanosElapsed(nanos)));
        this.localKafkaConnectStats.incrementSourceRecordPoll(poll.size());
        this.traverser = poll.isEmpty() ? this.eventTimeMapper.flatMapIdle() : Traversers.traverseIterable(poll).flatMap(sourceRecord -> {
            long longValue = sourceRecord.timestamp() == null ? Long.MIN_VALUE : sourceRecord.timestamp().longValue();
            Object apply = this.projectionFn.apply(sourceRecord);
            this.sourceConnectorWrapper.commitRecord(sourceRecord);
            return this.eventTimeMapper.flatMapEvent(apply, 0, longValue);
        });
        emitFromTraverser(this.traverser);
        return false;
    }

    boolean configurationReceived() {
        return this.sourceConnectorWrapper.hasTaskConfiguration();
    }

    public boolean saveToSnapshot() {
        this.logger.info("Saving to snapshot for globalProcessorIndex=" + this.globalProcessorIndex + " localProcessorIndex= " + this.localProcessorIndex);
        if (!this.snapshotsEnabled) {
            return true;
        }
        if (!emitFromTraverser(this.traverser)) {
            return false;
        }
        this.snapshotInProgress = true;
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseItems(new Map.Entry[]{com.hazelcast.jet.Util.entry(snapshotKey(), this.sourceConnectorWrapper.copyState()), com.hazelcast.jet.Util.entry(snapshotKeyWm(), Long.valueOf(this.eventTimeMapper.getWatermark(0)))}).onFirstNull(() -> {
                this.snapshotTraverser = null;
                this.logger.finest("Finished saving snapshot");
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    private BroadcastKey<String> snapshotKey() {
        return BroadcastKey.broadcastKey("snapshot-" + this.processorOrder);
    }

    private BroadcastKey<String> snapshotKeyWm() {
        return BroadcastKey.broadcastKey("snapshotWm-" + this.processorOrder);
    }

    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        this.logger.info("Restoring from snapshot with key " + obj + " value " + obj2);
        if (snapshotKey().equals(obj)) {
            this.sourceConnectorWrapper.restoreState((State) obj2);
        }
        if (snapshotKeyWm().equals(obj)) {
            this.eventTimeMapper.restoreWatermark(0, ((Long) obj2).longValue());
        }
    }

    public boolean snapshotCommitFinish(boolean z) {
        if (z) {
            try {
                this.sourceConnectorWrapper.commit();
            } finally {
                this.snapshotInProgress = false;
            }
        }
        return true;
    }

    public void close() {
        if (this.sourceConnectorWrapper != null) {
            this.sourceConnectorWrapper.close();
        }
        this.logger.info("Closed ReadKafkaConnectP");
    }

    public Map<String, LocalKafkaConnectStats> getStats() {
        HashMap hashMap = new HashMap();
        if (this.sourceConnectorWrapper != null && this.sourceConnectorWrapper.hasTaskRunner()) {
            hashMap.put(this.sourceConnectorWrapper.getTaskRunnerName(), this.localKafkaConnectStats);
        }
        return hashMap;
    }

    public void provideDynamicMetrics(MetricDescriptor metricDescriptor, MetricsCollectionContext metricsCollectionContext) {
        if (this.sourceConnectorWrapper != null && this.sourceConnectorWrapper.hasTaskRunner()) {
            metricDescriptor.copy().withTag("task.runner", this.sourceConnectorWrapper.getTaskRunnerName());
        }
        ProviderHelper.provide(metricDescriptor, metricsCollectionContext, "kafka.connect", getStats());
    }

    public void setPropertiesFromUser(Properties properties) {
        this.propertiesFromUser = properties;
    }

    public void setSourceConnectorWrapper(SourceConnectorWrapper sourceConnectorWrapper) {
        this.sourceConnectorWrapper = sourceConnectorWrapper;
    }

    public void setProcessorOrder(int i) {
        this.processorOrder = i;
    }

    public void setActive(boolean z) {
        this.active = z;
    }

    private void setRetryStrategy(@Nullable RetryStrategy retryStrategy) {
        this.retryStrategy = retryStrategy;
    }

    EventTimeMapper<T> eventTimeMapper() {
        return this.eventTimeMapper;
    }

    public static <T> ReadKafkaConnectProcessorSupplier processorSupplier(@Nonnull final Properties properties, @Nonnull final EventTimePolicy<? super T> eventTimePolicy, @Nonnull final FunctionEx<SourceRecord, T> functionEx, @Nullable final RetryStrategy retryStrategy) {
        return new ReadKafkaConnectProcessorSupplier() { // from class: com.hazelcast.jet.kafka.connect.impl.ReadKafkaConnectP.1
            private static final long serialVersionUID = 1;

            @Override // com.hazelcast.jet.kafka.connect.impl.processorsupplier.ReadKafkaConnectProcessorSupplier
            @Nonnull
            public Collection<ReadKafkaConnectP<?>> get(int i) {
                IntStream range = IntStream.range(0, i);
                EventTimePolicy eventTimePolicy2 = eventTimePolicy;
                FunctionEx functionEx2 = functionEx;
                Properties properties2 = properties;
                RetryStrategy retryStrategy2 = retryStrategy;
                return (Collection) range.mapToObj(i2 -> {
                    ReadKafkaConnectP readKafkaConnectP = new ReadKafkaConnectP(eventTimePolicy2, functionEx2);
                    readKafkaConnectP.setPropertiesFromUser(properties2);
                    readKafkaConnectP.setRetryStrategy(retryStrategy2);
                    return readKafkaConnectP;
                }).collect(Collectors.toList());
            }
        };
    }
}
