package io.cdap.plugin.salesforce.plugin.source.streaming;

import com.google.common.net.HttpHeaders;
import io.cdap.plugin.salesforce.authenticator.Authenticator;
import io.cdap.plugin.salesforce.authenticator.AuthenticatorCredentials;
import io.cdap.plugin.salesforce.plugin.OAuthInfo;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.ClientTransport;
import org.cometd.client.transport.LongPollingTransport;
import org.cometd.common.JSONContext;
import org.cometd.common.JacksonJSONContextClient;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/cdap/plugin/salesforce/plugin/source/streaming/SalesforcePushTopicListener.class */
public class SalesforcePushTopicListener {
    private static final String DEFAULT_PUSH_ENDPOINT = "/cometd/53.0";
    private static final int HANDSHAKE_CHECK_INTERVAL_MS = 1000;
    private final BlockingQueue<String> messagesQueue = new LinkedBlockingQueue();
    private final AuthenticatorCredentials credentials;
    private final String topic;
    private JSONContext.Client jsonContext;
    private static final Logger LOG = LoggerFactory.getLogger(SalesforcePushTopicListener.class);
    private static final long CONNECTION_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(110);
    private static final long HANDSHAKE_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(110);

    public SalesforcePushTopicListener(AuthenticatorCredentials authenticatorCredentials, String str) {
        this.credentials = authenticatorCredentials;
        this.topic = str;
    }

    public void start() {
        try {
            BayeuxClient client = getClient(this.credentials);
            waitForHandshake(client, HANDSHAKE_TIMEOUT_MS, 1000L);
            LOG.debug("Client handshake done");
            client.getChannel("/topic/" + this.topic).subscribe((clientSessionChannel, message) -> {
                this.messagesQueue.add(this.jsonContext.getGenerator().generate(message.getDataAsMap()));
            });
        } catch (Exception e) {
            throw new RuntimeException("Could not start client", e);
        }
    }

    public String getMessage(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.messagesQueue.poll(j, timeUnit);
    }

    private BayeuxClient getClient(AuthenticatorCredentials authenticatorCredentials) throws Exception {
        final OAuthInfo oAuthInfo = Authenticator.getOAuthInfo(authenticatorCredentials);
        HttpClient httpClient = new HttpClient(new SslContextFactory());
        httpClient.setConnectTimeout(CONNECTION_TIMEOUT_MS);
        httpClient.start();
        this.jsonContext = new JacksonJSONContextClient();
        HashMap hashMap = new HashMap();
        hashMap.put(ClientTransport.JSON_CONTEXT_OPTION, this.jsonContext);
        BayeuxClient bayeuxClient = new BayeuxClient(oAuthInfo.getInstanceURL() + DEFAULT_PUSH_ENDPOINT, new LongPollingTransport(hashMap, httpClient) { // from class: io.cdap.plugin.salesforce.plugin.source.streaming.SalesforcePushTopicListener.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.cometd.client.transport.LongPollingTransport
            public void customize(Request request) {
                super.customize(request);
                request.header(HttpHeaders.AUTHORIZATION, "OAuth " + oAuthInfo.getAccessToken());
            }
        }, new ClientTransport[0]);
        bayeuxClient.handshake();
        return bayeuxClient;
    }

    private void waitForHandshake(BayeuxClient bayeuxClient, long j, long j2) {
        try {
            Awaitility.await().atMost(j, TimeUnit.MILLISECONDS).pollInterval(j2, TimeUnit.MILLISECONDS).until(() -> {
                return Boolean.valueOf(bayeuxClient.isHandshook());
            });
        } catch (ConditionTimeoutException e) {
            throw new IllegalStateException("Client could not handshake with Salesforce server", e);
        }
    }
}
