package com.hazelcast.jet.kafka.connect.impl;

import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

/* loaded from: input_file:com/hazelcast/jet/kafka/connect/impl/TaskRunner.class */
public class TaskRunner {
    private final ILogger logger;
    private final String name;
    private final ReentrantLock taskLifecycleLock = new ReentrantLock();
    private final State state;
    private final SourceTaskFactory sourceTaskFactory;
    private volatile boolean running;
    private volatile boolean reconfigurationNeeded;
    private SourceTask task;
    private volatile Map<String, String> taskConfigReference;

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/hazelcast/jet/kafka/connect/impl/TaskRunner$SourceTaskFactory.class */
    public interface SourceTaskFactory {
        SourceTask create();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskRunner(String str, State state, SourceTaskFactory sourceTaskFactory) {
        this.name = str;
        this.state = state;
        this.sourceTaskFactory = sourceTaskFactory;
        this.logger = Logger.getLogger(getClass().getName() + " " + str);
    }

    public List<SourceRecord> poll() {
        restartTaskIfNeeded();
        return this.running ? doPoll() : Collections.emptyList();
    }

    public void stop() {
        try {
            this.taskLifecycleLock.lock();
            if (this.running) {
                this.logger.info("Stopping task '" + this.name + "'");
                this.task.stop();
                this.logger.info("Task '" + this.name + "' stopped");
            }
        } finally {
            this.running = false;
            this.taskLifecycleLock.unlock();
        }
    }

    private List<SourceRecord> doPoll() {
        try {
            List<SourceRecord> poll = this.task.poll();
            return poll == null ? Collections.emptyList() : poll;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw ExceptionUtil.rethrow(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void restartTaskIfNeeded() {
        if (this.reconfigurationNeeded) {
            this.reconfigurationNeeded = false;
            try {
                stop();
            } catch (Exception e) {
                this.logger.warning("Stopping task '" + this.name + "' failed but proceeding with re-start", e);
            }
        }
        start();
    }

    public void updateTaskConfig(Map<String, String> map) {
        try {
            this.taskLifecycleLock.lock();
            if (!Objects.equals(this.taskConfigReference, map)) {
                this.logger.info("Updating task '" + this.name + "' configuration");
                this.taskConfigReference = map;
                this.reconfigurationNeeded = true;
            }
        } finally {
            this.taskLifecycleLock.unlock();
        }
    }

    private void start() {
        try {
            this.taskLifecycleLock.lock();
            if (!this.running) {
                Map<String, String> map = this.taskConfigReference;
                if (map != null) {
                    SourceTask create = this.sourceTaskFactory.create();
                    this.logger.info("Initializing task '" + this.name + "'");
                    create.initialize(new JetSourceTaskContext(map, this.state));
                    this.logger.info("Starting task '" + this.name + "'");
                    create.start(map);
                    this.task = create;
                    this.running = true;
                } else {
                    this.logger.info("No task config for task '" + this.name + "'");
                }
            }
        } finally {
            this.taskLifecycleLock.unlock();
        }
    }

    public void commit() {
        if (this.running) {
            try {
                this.task.commit();
            } catch (InterruptedException e) {
                this.logger.warning("Interrupted while committing");
                Thread.currentThread().interrupt();
            }
        }
    }

    public void commitRecord(SourceRecord sourceRecord) {
        this.state.commitRecord(sourceRecord);
        try {
            if (this.running) {
                this.task.commitRecord(sourceRecord, (RecordMetadata) null);
            }
        } catch (InterruptedException e) {
            this.logger.warning("Interrupted while committing record");
            Thread.currentThread().interrupt();
        }
    }

    public State copyState() {
        State state = new State();
        state.load(this.state);
        return state;
    }

    public void restoreState(State state) {
        this.state.load(state);
    }

    public String getName() {
        return this.name;
    }

    public String toString() {
        return "TaskRunner{name='" + this.name + "'}";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void forceRestart() {
        this.reconfigurationNeeded = true;
    }
}
