/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.CloseableConnectorContext;
import org.apache.kafka.connect.runtime.ConnectMetrics;
import org.apache.kafka.connect.runtime.ConnectMetricsRegistry;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.isolation.Plugins;
import org.apache.kafka.connect.sink.SinkConnectorContext;
import org.apache.kafka.connect.source.SourceConnectorContext;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.LoggingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WorkerConnector
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(WorkerConnector.class);
    private static final String THREAD_NAME_PREFIX = "connector-thread-";
    private final String connName;
    private final Map<String, String> config;
    private final ConnectorStatus.Listener statusListener;
    private final ClassLoader loader;
    private final CloseableConnectorContext ctx;
    private final Connector connector;
    private final ConnectorMetricsGroup metrics;
    private final AtomicReference<TargetState> pendingTargetStateChange;
    private final AtomicReference<Callback<TargetState>> pendingStateChangeCallback;
    private final CountDownLatch shutdownLatch;
    private volatile boolean stopping;
    private volatile boolean cancelled;
    private State state;
    private final OffsetStorageReader offsetStorageReader;

    public WorkerConnector(String connName, Connector connector, ConnectorConfig connectorConfig, CloseableConnectorContext ctx, ConnectMetrics metrics, ConnectorStatus.Listener statusListener, OffsetStorageReader offsetStorageReader, ClassLoader loader) {
        this.connName = connName;
        this.config = connectorConfig.originalsStrings();
        this.loader = loader;
        this.ctx = ctx;
        this.connector = connector;
        this.state = State.INIT;
        this.metrics = new ConnectorMetricsGroup(metrics, AbstractStatus.State.UNASSIGNED, statusListener);
        this.statusListener = this.metrics;
        this.offsetStorageReader = offsetStorageReader;
        this.pendingTargetStateChange = new AtomicReference();
        this.pendingStateChangeCallback = new AtomicReference();
        this.shutdownLatch = new CountDownLatch(1);
        this.stopping = false;
        this.cancelled = false;
    }

    public ClassLoader loader() {
        return this.loader;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LoggingContext.clear();
        try (LoggingContext loggingContext = LoggingContext.forConnector(this.connName);){
            ClassLoader savedLoader = Plugins.compareAndSwapLoaders(this.loader);
            String savedName = Thread.currentThread().getName();
            try {
                Thread.currentThread().setName(THREAD_NAME_PREFIX + this.connName);
                this.doRun();
            }
            finally {
                Thread.currentThread().setName(savedName);
                Plugins.compareAndSwapLoaders(savedLoader);
            }
        }
        finally {
            this.shutdownLatch.countDown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void doRun() {
        this.initialize();
        while (!this.stopping) {
            Callback stateChangeCallback;
            TargetState newTargetState;
            WorkerConnector workerConnector = this;
            synchronized (workerConnector) {
                newTargetState = this.pendingTargetStateChange.getAndSet(null);
                stateChangeCallback = this.pendingStateChangeCallback.getAndSet(null);
            }
            if (newTargetState != null && !this.stopping) {
                this.doTransitionTo(newTargetState, stateChangeCallback);
            }
            workerConnector = this;
            synchronized (workerConnector) {
                if (this.pendingTargetStateChange.get() == null && !this.stopping) {
                    try {
                        this.wait();
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
            }
        }
        this.doShutdown();
    }

    void initialize() {
        try {
            if (!this.isSourceConnector() && !this.isSinkConnector()) {
                throw new ConnectException("Connector implementations must be a subclass of either SourceConnector or SinkConnector");
            }
            log.debug("{} Initializing connector {}", (Object)this, (Object)this.connName);
            if (this.isSinkConnector()) {
                SinkConnectorConfig.validate(this.config);
                this.connector.initialize(new WorkerSinkConnectorContext());
            } else {
                this.connector.initialize(new WorkerSourceConnectorContext(this.offsetStorageReader));
            }
        }
        catch (Throwable t) {
            log.error("{} Error initializing connector", (Object)this, (Object)t);
            this.onFailure(t);
        }
    }

    private boolean doStart() throws Throwable {
        try {
            switch (this.state) {
                case STARTED: {
                    return false;
                }
                case INIT: 
                case STOPPED: {
                    this.connector.start(this.config);
                    this.state = State.STARTED;
                    return true;
                }
            }
            throw new IllegalArgumentException("Cannot start connector in state " + (Object)((Object)this.state));
        }
        catch (Throwable t) {
            log.error("{} Error while starting connector", (Object)this, (Object)t);
            this.onFailure(t);
            throw t;
        }
    }

    private void onFailure(Throwable t) {
        this.statusListener.onFailure(this.connName, t);
        this.state = State.FAILED;
    }

    private void resume() throws Throwable {
        if (this.doStart()) {
            this.statusListener.onResume(this.connName);
        }
    }

    private void start() throws Throwable {
        if (this.doStart()) {
            this.statusListener.onStartup(this.connName);
        }
    }

    public boolean isRunning() {
        return this.state == State.STARTED;
    }

    private void pause() {
        try {
            switch (this.state) {
                case STOPPED: {
                    return;
                }
                case STARTED: {
                    this.connector.stop();
                }
                case INIT: {
                    this.statusListener.onPause(this.connName);
                    this.state = State.STOPPED;
                    break;
                }
                default: {
                    throw new IllegalArgumentException("Cannot pause connector in state " + (Object)((Object)this.state));
                }
            }
        }
        catch (Throwable t) {
            log.error("{} Error while shutting down connector", (Object)this, (Object)t);
            this.statusListener.onFailure(this.connName, t);
            this.state = State.FAILED;
        }
    }

    public synchronized void shutdown() {
        log.info("Scheduled shutdown for {}", (Object)this);
        this.stopping = true;
        this.notify();
    }

    void doShutdown() {
        try {
            TargetState preEmptedState = this.pendingTargetStateChange.getAndSet(null);
            Callback stateChangeCallback = this.pendingStateChangeCallback.getAndSet(null);
            if (stateChangeCallback != null) {
                stateChangeCallback.onCompletion(new ConnectException("Could not begin changing connector state to " + preEmptedState.name() + " as the connector has been scheduled for shutdown"), null);
            }
            if (this.state == State.STARTED) {
                this.connector.stop();
            }
            this.state = State.STOPPED;
            this.statusListener.onShutdown(this.connName);
            log.info("Completed shutdown for {}", (Object)this);
        }
        catch (Throwable t) {
            log.error("{} Error while shutting down connector", (Object)this, (Object)t);
            this.state = State.FAILED;
            this.statusListener.onFailure(this.connName, t);
        }
        finally {
            this.ctx.close();
            this.metrics.close();
        }
    }

    public synchronized void cancel() {
        this.statusListener.onShutdown(this.connName);
        this.ctx.close();
        this.cancelled = true;
    }

    public boolean awaitShutdown(long timeoutMs) {
        try {
            return this.shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void transitionTo(TargetState targetState, Callback<TargetState> stateChangeCallback) {
        TargetState preEmptedState;
        Callback<TargetState> preEmptedStateChangeCallback;
        WorkerConnector workerConnector = this;
        synchronized (workerConnector) {
            preEmptedStateChangeCallback = this.pendingStateChangeCallback.getAndSet(stateChangeCallback);
            preEmptedState = this.pendingTargetStateChange.getAndSet(targetState);
            this.notify();
        }
        if (preEmptedStateChangeCallback != null) {
            preEmptedStateChangeCallback.onCompletion(new ConnectException("Could not begin changing connector state to " + preEmptedState.name() + " before another request to change state was made; the new request (which is to change the state to " + targetState.name() + ") has pre-empted this one"), null);
        }
    }

    void doTransitionTo(TargetState targetState, Callback<TargetState> stateChangeCallback) {
        if (this.state == State.FAILED) {
            stateChangeCallback.onCompletion(new ConnectException(this + " Cannot transition connector to " + (Object)((Object)targetState) + " since it has failed"), null);
            return;
        }
        try {
            this.doTransitionTo(targetState);
            stateChangeCallback.onCompletion(null, targetState);
        }
        catch (Throwable t) {
            stateChangeCallback.onCompletion(new ConnectException("Failed to transition connector " + this.connName + " to state " + (Object)((Object)targetState), t), null);
        }
    }

    private void doTransitionTo(TargetState targetState) throws Throwable {
        log.debug("{} Transition connector to {}", (Object)this, (Object)targetState);
        if (targetState == TargetState.PAUSED) {
            this.pause();
        } else if (targetState == TargetState.STARTED) {
            if (this.state == State.INIT) {
                this.start();
            } else {
                this.resume();
            }
        } else {
            throw new IllegalArgumentException("Unhandled target state " + (Object)((Object)targetState));
        }
    }

    public boolean isSinkConnector() {
        return ConnectUtils.isSinkConnector(this.connector);
    }

    public boolean isSourceConnector() {
        return ConnectUtils.isSourceConnector(this.connector);
    }

    protected String connectorType() {
        if (this.isSinkConnector()) {
            return "sink";
        }
        if (this.isSourceConnector()) {
            return "source";
        }
        return "unknown";
    }

    public Connector connector() {
        return this.connector;
    }

    ConnectorMetricsGroup metrics() {
        return this.metrics;
    }

    public String toString() {
        return "WorkerConnector{id=" + this.connName + '}';
    }

    private class WorkerSourceConnectorContext
    extends WorkerConnectorContext
    implements SourceConnectorContext {
        private final OffsetStorageReader offsetStorageReader;

        WorkerSourceConnectorContext(OffsetStorageReader offsetStorageReader) {
            this.offsetStorageReader = offsetStorageReader;
        }

        @Override
        public OffsetStorageReader offsetStorageReader() {
            return this.offsetStorageReader;
        }
    }

    private class WorkerSinkConnectorContext
    extends WorkerConnectorContext
    implements SinkConnectorContext {
        private WorkerSinkConnectorContext() {
        }
    }

    private abstract class WorkerConnectorContext
    implements ConnectorContext {
        private WorkerConnectorContext() {
        }

        @Override
        public void requestTaskReconfiguration() {
            WorkerConnector.this.ctx.requestTaskReconfiguration();
        }

        @Override
        public void raiseError(Exception e) {
            log.error("{} Connector raised an error", (Object)WorkerConnector.this, (Object)e);
            WorkerConnector.this.onFailure(e);
            WorkerConnector.this.ctx.raiseError(e);
        }
    }

    class ConnectorMetricsGroup
    implements ConnectorStatus.Listener,
    AutoCloseable {
        private volatile AbstractStatus.State state;
        private final ConnectMetrics.MetricGroup metricGroup;
        private final ConnectorStatus.Listener delegate;

        public ConnectorMetricsGroup(ConnectMetrics connectMetrics, AbstractStatus.State initialState, ConnectorStatus.Listener delegate) {
            Objects.requireNonNull(connectMetrics);
            Objects.requireNonNull(WorkerConnector.this.connector);
            Objects.requireNonNull(initialState);
            Objects.requireNonNull(delegate);
            this.delegate = delegate;
            this.state = initialState;
            ConnectMetricsRegistry registry = connectMetrics.registry();
            this.metricGroup = connectMetrics.group(registry.connectorGroupName(), registry.connectorTagName(), WorkerConnector.this.connName);
            this.metricGroup.close();
            this.metricGroup.addImmutableValueMetric(registry.connectorType, WorkerConnector.this.connectorType());
            this.metricGroup.addImmutableValueMetric(registry.connectorClass, WorkerConnector.this.connector.getClass().getName());
            this.metricGroup.addImmutableValueMetric(registry.connectorVersion, WorkerConnector.this.connector.version());
            this.metricGroup.addValueMetric(registry.connectorStatus, now -> this.state.toString().toLowerCase(Locale.getDefault()));
        }

        @Override
        public void close() {
            this.metricGroup.close();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onStartup(String connector) {
            this.state = AbstractStatus.State.RUNNING;
            ConnectorMetricsGroup connectorMetricsGroup = this;
            synchronized (connectorMetricsGroup) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onStartup(connector);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onShutdown(String connector) {
            this.state = AbstractStatus.State.UNASSIGNED;
            ConnectorMetricsGroup connectorMetricsGroup = this;
            synchronized (connectorMetricsGroup) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onShutdown(connector);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onPause(String connector) {
            this.state = AbstractStatus.State.PAUSED;
            ConnectorMetricsGroup connectorMetricsGroup = this;
            synchronized (connectorMetricsGroup) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onPause(connector);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onResume(String connector) {
            this.state = AbstractStatus.State.RUNNING;
            ConnectorMetricsGroup connectorMetricsGroup = this;
            synchronized (connectorMetricsGroup) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onResume(connector);
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onFailure(String connector, Throwable cause) {
            this.state = AbstractStatus.State.FAILED;
            ConnectorMetricsGroup connectorMetricsGroup = this;
            synchronized (connectorMetricsGroup) {
                if (!WorkerConnector.this.cancelled) {
                    this.delegate.onFailure(connector, cause);
                }
            }
        }

        @Override
        public void onDeletion(String connector) {
            this.state = AbstractStatus.State.DESTROYED;
            this.delegate.onDeletion(connector);
        }

        @Override
        public void onRestart(String connector) {
            this.state = AbstractStatus.State.RESTARTING;
            this.delegate.onRestart(connector);
        }

        boolean isUnassigned() {
            return this.state == AbstractStatus.State.UNASSIGNED;
        }

        boolean isRunning() {
            return this.state == AbstractStatus.State.RUNNING;
        }

        boolean isPaused() {
            return this.state == AbstractStatus.State.PAUSED;
        }

        boolean isFailed() {
            return this.state == AbstractStatus.State.FAILED;
        }

        protected ConnectMetrics.MetricGroup metricGroup() {
            return this.metricGroup;
        }
    }

    private static enum State {
        INIT,
        STOPPED,
        STARTED,
        FAILED;

    }
}

