package com.hazelcast.jet.kafka.connect.impl;

import com.hazelcast.client.impl.protocol.util.PropertiesUtil;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import com.hazelcast.jet.impl.util.ReflectionUtils;
import com.hazelcast.jet.kafka.connect.impl.message.TaskConfigMessage;
import com.hazelcast.jet.kafka.connect.impl.message.TaskConfigPublisher;
import com.hazelcast.jet.retry.IntervalFunction;
import com.hazelcast.jet.retry.RetryStrategies;
import com.hazelcast.jet.retry.RetryStrategy;
import com.hazelcast.jet.retry.impl.RetryTracker;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.topic.Message;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

/* loaded from: input_file:com/hazelcast/jet/kafka/connect/impl/SourceConnectorWrapper.class */
public class SourceConnectorWrapper {
    public static final RetryStrategy DEFAULT_RECONNECT_BEHAVIOR = RetryStrategies.custom().maxAttempts(10).intervalFunction(IntervalFunction.exponentialBackoffWithCap(200, 1.5d, 5000)).build();
    private final ILogger logger;
    private SourceConnector sourceConnector;
    private int tasksMax;
    private TaskRunner taskRunner;
    private final ReentrantLock reconfigurationLock;
    private final State state;
    private String name;
    private final boolean isMasterProcessor;
    private final int processorOrder;
    private TaskConfigPublisher taskConfigPublisher;
    private final AtomicBoolean receivedTaskConfiguration;
    private final RetryTracker reconnectTracker;
    private Map<String, String> currentConfig;
    private Consumer<Boolean> activeStatusSetter;
    private transient Exception lastConnectionException;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/kafka/connect/impl/SourceConnectorWrapper$JetConnectorContext.class */
    public class JetConnectorContext implements ConnectorContext {
        private JetConnectorContext() {
        }

        public void requestTaskReconfiguration() {
            SourceConnectorWrapper.this.requestTaskReconfiguration();
        }

        public void raiseError(Exception exc) {
            throw ExceptionUtil.rethrow(exc);
        }
    }

    public SourceConnectorWrapper(Properties properties, int i, Processor.Context context) {
        this(properties, i, context, DEFAULT_RECONNECT_BEHAVIOR);
    }

    public SourceConnectorWrapper(Properties properties, int i, Processor.Context context, RetryStrategy retryStrategy) {
        this.logger = Logger.getLogger(SourceConnectorWrapper.class);
        this.reconfigurationLock = new ReentrantLock();
        this.state = new State();
        this.receivedTaskConfiguration = new AtomicBoolean();
        this.activeStatusSetter = bool -> {
        };
        validatePropertiesFromUser(properties);
        this.processorOrder = i;
        this.isMasterProcessor = i == 0;
        this.reconnectTracker = new RetryTracker(retryStrategy == null ? DEFAULT_RECONNECT_BEHAVIOR : retryStrategy);
        this.currentConfig = PropertiesUtil.toMap(properties);
        createTopic(context.hazelcastInstance(), context.executionId());
        createSourceConnector();
    }

    void validatePropertiesFromUser(Properties properties) {
        Preconditions.checkRequiredProperty(properties, "connector.class");
        this.name = Preconditions.checkRequiredProperty(properties, "name");
        this.tasksMax = Integer.parseInt(Preconditions.checkRequiredProperty(properties, "tasks.max"));
    }

    void createSourceConnector() {
        String str = this.currentConfig.get("connector.class");
        if (!this.reconnectTracker.shouldTryAgain() && this.lastConnectionException != null) {
            throw new HazelcastException("Cannot connect using connector " + str, this.lastConnectionException);
        }
        this.logger.info("Initializing connector '" + this.name + "' of class '" + str + "'");
        try {
            this.sourceConnector = newConnectorInstance(str);
            if (this.isMasterProcessor) {
                this.sourceConnector.initialize(new JetConnectorContext());
                this.logger.info("Starting connector '" + this.name + "'. Below are the propertiesFromUser");
                this.sourceConnector.start(this.currentConfig);
                this.logger.info("Connector '" + this.name + "' started");
            } else {
                this.logger.info("Connector '" + this.name + "' created, not starting because it's not a master processor");
            }
            try {
                this.logger.info("Creating task runner '" + this.name + "'");
                createTaskRunner();
                this.logger.info("Task runner '" + this.name + "' created");
            } catch (Exception e) {
                this.reconnectTracker.attemptFailed();
                this.lastConnectionException = e;
            }
        } catch (Exception e2) {
            this.logger.warning("Error while starting connector", e2);
            this.reconnectTracker.attemptFailed();
            if (this.sourceConnector != null) {
                this.sourceConnector.stop();
                this.sourceConnector = null;
            }
            this.lastConnectionException = e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean waitNeeded() {
        if (!this.reconnectTracker.shouldTryAgain()) {
            throw new HazelcastException("Cannot launch connector and/or task correctly", this.lastConnectionException);
        }
        if (this.reconnectTracker.needsToWait()) {
            return true;
        }
        if (this.sourceConnector == null) {
            createSourceConnector();
        }
        return !restartTaskIfNeeded();
    }

    private boolean restartTaskIfNeeded() {
        if (this.sourceConnector == null) {
            return false;
        }
        try {
            this.taskRunner.restartTaskIfNeeded();
            return true;
        } catch (Exception e) {
            this.logger.warning("Error while restarting task", e);
            this.taskRunner.forceRestart();
            this.reconnectTracker.attemptFailed();
            this.lastConnectionException = e;
            return false;
        }
    }

    TaskRunner createTaskRunner() {
        this.taskRunner = new TaskRunner(this.name + "-task-" + this.processorOrder, this.state, this::createSourceTask);
        requestTaskReconfiguration();
        return this.taskRunner;
    }

    private SourceTask createSourceTask() {
        return (SourceTask) ReflectionUtils.newInstance(Thread.currentThread().getContextClassLoader(), this.sourceConnector.taskClass().asSubclass(SourceTask.class).getName());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setActiveStatusSetter(Consumer<Boolean> consumer) {
        this.activeStatusSetter = consumer;
    }

    public boolean hasTaskConfiguration() {
        return this.receivedTaskConfiguration.get();
    }

    void createTopic(HazelcastInstance hazelcastInstance, long j) {
        if (hazelcastInstance != null) {
            this.taskConfigPublisher = new TaskConfigPublisher(hazelcastInstance);
            this.taskConfigPublisher.createTopic(j);
            this.taskConfigPublisher.addMessageListener(this::processMessage);
        }
    }

    void destroyTopic() {
        this.taskConfigPublisher.removeMessageListeners();
        if (this.isMasterProcessor) {
            this.taskConfigPublisher.destroyTopic();
        }
    }

    protected void publishMessage(TaskConfigMessage taskConfigMessage) {
        if (this.taskConfigPublisher != null) {
            this.logger.info("Publishing TaskConfigTopic");
            this.taskConfigPublisher.publish(taskConfigMessage);
        }
    }

    private void processMessage(Message<TaskConfigMessage> message) {
        this.logger.info("Received TaskConfigTopic message");
        processMessage((TaskConfigMessage) message.getMessageObject());
    }

    protected void processMessage(TaskConfigMessage taskConfigMessage) {
        this.state.setTaskConfigs(taskConfigMessage.getTaskConfigs());
        Map<String, String> taskConfig = this.state.getTaskConfig(this.processorOrder);
        this.activeStatusSetter.accept(Boolean.valueOf(taskConfig != null));
        if (taskConfig != null) {
            this.logger.info("Updating taskRunner with processorOrder = " + this.processorOrder + " with taskConfig=" + maskPasswords(taskConfig));
            this.taskRunner.updateTaskConfig(taskConfig);
            this.currentConfig = taskConfig;
        }
        this.receivedTaskConfiguration.set(true);
    }

    private Map<String, String> maskPasswords(Map<String, String> map) {
        LinkedHashMap linkedHashMap = new LinkedHashMap(map);
        linkedHashMap.replaceAll((str, str2) -> {
            return str.toLowerCase(Locale.ROOT).contains("password") ? "****" : str2;
        });
        return linkedHashMap;
    }

    public List<SourceRecord> poll() {
        try {
            return this.taskRunner.poll();
        } catch (Exception e) {
            this.reconnectTracker.attemptFailed();
            this.lastConnectionException = e;
            this.logger.warning("Exception while polling records" + (this.reconnectTracker.shouldTryAgain() ? ", will reconnect later" : ""), e);
            this.taskRunner.forceRestart();
            return Collections.emptyList();
        }
    }

    public void commitRecord(SourceRecord sourceRecord) {
        try {
            this.taskRunner.commitRecord(sourceRecord);
        } catch (Exception e) {
            this.taskRunner.forceRestart();
            this.reconnectTracker.attemptFailed();
            this.lastConnectionException = e;
            this.logger.warning("Exception while committing records" + (this.reconnectTracker.shouldTryAgain() ? ", will reconnect later" : ""), e);
        }
    }

    public State copyState() {
        return this.taskRunner.copyState();
    }

    public void restoreState(State state) {
        this.taskRunner.restoreState(state);
    }

    public void commit() {
        try {
            this.taskRunner.commit();
        } catch (Exception e) {
            this.taskRunner.forceRestart();
            this.reconnectTracker.attemptFailed();
            this.lastConnectionException = e;
            this.logger.warning("Exception while committing records" + (this.reconnectTracker.shouldTryAgain() ? ", will reconnect later" : ""), e);
        }
    }

    public String getTaskRunnerName() {
        return this.taskRunner.getName();
    }

    private static SourceConnector newConnectorInstance(String str) {
        try {
            return (SourceConnector) ReflectionUtils.newInstance(Thread.currentThread().getContextClassLoader(), str);
        } catch (Exception e) {
            if (e instanceof ClassNotFoundException) {
                throw new HazelcastException("Connector class '" + str + "' not found. Did you add the connector jar to the job?", e);
            }
            throw ExceptionUtil.rethrow(e);
        }
    }

    public void close() {
        this.logger.info("Stopping connector '" + this.name + "'");
        this.taskRunner.stop();
        this.sourceConnector.stop();
        destroyTopic();
        this.logger.info("Connector '" + this.name + "' stopped");
    }

    void requestTaskReconfiguration() {
        if (!this.isMasterProcessor) {
            this.logger.fine("requestTaskReconfiguration is skipped because Source Connector is not master");
            return;
        }
        try {
            this.reconfigurationLock.lock();
            this.logger.info("Updating tasks configuration");
            List<Map<String, String>> taskConfigs = this.sourceConnector.taskConfigs(this.tasksMax);
            for (int i = 0; i < taskConfigs.size(); i++) {
                this.logger.fine("sourceConnector index " + i + " taskConfig=" + taskConfigs.get(i));
            }
            TaskConfigMessage taskConfigMessage = new TaskConfigMessage();
            taskConfigMessage.setTaskConfigs(taskConfigs);
            publishMessage(taskConfigMessage);
            this.logger.info(taskConfigs.size() + " task configs were sent");
            this.reconfigurationLock.unlock();
        } catch (Throwable th) {
            this.reconfigurationLock.unlock();
            throw th;
        }
    }

    public String toString() {
        return "ConnectorWrapper{name='" + this.name + "', tasksMax=" + this.tasksMax + ", isMasterProcessor=" + this.isMasterProcessor + ", processorOrder=" + this.processorOrder + ", receivedTaskConfiguration=" + this.receivedTaskConfiguration + "}";
    }

    public boolean hasTaskRunner() {
        return this.taskRunner != null;
    }
}
