package io.debezium.connector.vitess.connection;

import binlogdata.Binlogdata;
import io.debezium.connector.vitess.Vgtid;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.connector.vitess.connection.VtctldConnection;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.stub.AbstractStub;
import io.grpc.stub.StreamObserver;
import io.vitess.client.Proto;
import io.vitess.client.grpc.StaticAuthCredentials;
import io.vitess.proto.Topodata;
import io.vitess.proto.Vtgate;
import io.vitess.proto.grpc.VitessGrpc;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/vitess/connection/VitessReplicationConnection.class */
public class VitessReplicationConnection implements ReplicationConnection {
    private static final Logger LOGGER = LoggerFactory.getLogger(VitessReplicationConnection.class);
    private final MessageDecoder messageDecoder;
    private final VitessConnectorConfig config;
    private final AtomicReference<ManagedChannel> managedChannel = new AtomicReference<>();

    public VitessReplicationConnection(VitessConnectorConfig vitessConnectorConfig, VitessDatabaseSchema vitessDatabaseSchema) {
        this.messageDecoder = new VStreamOutputMessageDecoder(vitessDatabaseSchema);
        this.config = vitessConnectorConfig;
    }

    public void execute(String str) {
        ManagedChannel newChannel = newChannel(this.config.getVtgateHost(), this.config.getVtgatePort());
        this.managedChannel.compareAndSet(null, newChannel);
        newBlockingStub(newChannel).execute(Vtgate.ExecuteRequest.newBuilder().setQuery(Proto.bindQuery(str, Collections.emptyMap())).build());
    }

    @Override // io.debezium.connector.vitess.connection.ReplicationConnection
    public void startStreaming(Vgtid vgtid, final ReplicationMessageProcessor replicationMessageProcessor, final AtomicReference<Throwable> atomicReference) {
        if (vgtid == null) {
            Objects.requireNonNull(vgtid);
        }
        ManagedChannel newChannel = newChannel(this.config.getVtgateHost(), this.config.getVtgatePort());
        this.managedChannel.compareAndSet(null, newChannel);
        newStub(newChannel).vStream(Vtgate.VStreamRequest.newBuilder().setVgtid(vgtid.getRawVgtid()).setTabletType(toTopodataTabletType(VtctldConnection.TabletType.valueOf(this.config.getTabletType()))).build(), new StreamObserver<Vtgate.VStreamResponse>() { // from class: io.debezium.connector.vitess.connection.VitessReplicationConnection.1
            public void onNext(Vtgate.VStreamResponse vStreamResponse) {
                VitessReplicationConnection.LOGGER.debug("Received {} vEvents in the VStreamResponse:", Integer.valueOf(vStreamResponse.getEventsCount()));
                Iterator it = vStreamResponse.getEventsList().iterator();
                while (it.hasNext()) {
                    VitessReplicationConnection.LOGGER.debug("vEvent: {}", (Binlogdata.VEvent) it.next());
                }
                Vgtid vgtid2 = getVgtid(vStreamResponse);
                int numOfRowEvents = getNumOfRowEvents(vStreamResponse);
                int i = 0;
                for (int i2 = 0; i2 < vStreamResponse.getEventsCount(); i2++) {
                    try {
                        if (vStreamResponse.getEvents(i2).getType() == Binlogdata.VEventType.ROW) {
                            i++;
                        }
                        VitessReplicationConnection.this.messageDecoder.processMessage(vStreamResponse.getEvents(i2), replicationMessageProcessor, vgtid2, (vgtid2 == null || numOfRowEvents == 0 || i != numOfRowEvents) ? false : true);
                    } catch (InterruptedException e) {
                        VitessReplicationConnection.LOGGER.error("Message processing is interrupted", e);
                        atomicReference.compareAndSet(null, e);
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }

            public void onError(Throwable th) {
                VitessReplicationConnection.LOGGER.info("VStream streaming onError. Status: " + Status.fromThrowable(th), th);
                atomicReference.compareAndSet(null, th);
            }

            public void onCompleted() {
                VitessReplicationConnection.LOGGER.info("VStream streaming completed.");
            }

            private Vgtid getVgtid(Vtgate.VStreamResponse vStreamResponse) {
                LinkedList linkedList = new LinkedList();
                for (Binlogdata.VEvent vEvent : vStreamResponse.getEventsList()) {
                    if (vEvent.getType() == Binlogdata.VEventType.VGTID) {
                        linkedList.addLast(Vgtid.of(vEvent.getVgtid()));
                    }
                }
                if (linkedList.size() == 0) {
                    VitessReplicationConnection.LOGGER.trace("No vgtid found in response {}...", vStreamResponse.toString().substring(0, Math.min(100, vStreamResponse.toString().length())));
                    VitessReplicationConnection.LOGGER.debug("Full response is {}", vStreamResponse);
                    return null;
                }
                if (linkedList.size() > 1) {
                    VitessReplicationConnection.LOGGER.error("Should only have 1 vgtid per VStreamResponse, but found {}. Use the last vgtid {}.", Integer.valueOf(linkedList.size()), linkedList.getLast());
                }
                return (Vgtid) linkedList.getLast();
            }

            private int getNumOfRowEvents(Vtgate.VStreamResponse vStreamResponse) {
                int i = 0;
                Iterator it = vStreamResponse.getEventsList().iterator();
                while (it.hasNext()) {
                    if (((Binlogdata.VEvent) it.next()).getType() == Binlogdata.VEventType.ROW) {
                        i++;
                    }
                }
                return i;
            }
        });
    }

    private VitessGrpc.VitessStub newStub(ManagedChannel managedChannel) {
        return withCredentials(VitessGrpc.newStub(managedChannel));
    }

    private VitessGrpc.VitessBlockingStub newBlockingStub(ManagedChannel managedChannel) {
        return withCredentials(VitessGrpc.newBlockingStub(managedChannel));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v9, types: [io.grpc.stub.AbstractStub] */
    private <T extends AbstractStub<T>> T withCredentials(T t) {
        if (this.config.getVtgateUsername() != null && this.config.getVtgatePassword() != null) {
            LOGGER.info("Use authenticated vtgate grpc.");
            t = t.withCallCredentials(new StaticAuthCredentials(this.config.getVtgateUsername(), this.config.getVtgatePassword()));
        }
        return t;
    }

    private ManagedChannel newChannel(String str, int i) {
        return ManagedChannelBuilder.forAddress(str, i).usePlaintext().build();
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        LOGGER.info("Closing replication connection");
        this.managedChannel.get().shutdownNow();
        LOGGER.trace("VStream GRPC channel shutdownNow is invoked.");
        if (this.managedChannel.get().awaitTermination(5L, TimeUnit.SECONDS)) {
            LOGGER.info("VStream GRPC channel is shutdown in time.");
        } else {
            LOGGER.warn("VStream GRPC channel is not shutdown in time. Give up waiting.");
        }
    }

    public static Vgtid defaultVgtid(VitessConnectorConfig vitessConnectorConfig) {
        if (vitessConnectorConfig.getShard() == null || vitessConnectorConfig.getShard().isEmpty()) {
            LOGGER.info("Default VGTID is set to the current gtid of all shards from keyspace: {}", vitessConnectorConfig.getKeyspace());
            return Vgtid.of(Binlogdata.VGtid.newBuilder().addShardGtids(Binlogdata.ShardGtid.newBuilder().setKeyspace(vitessConnectorConfig.getKeyspace()).setGtid(Vgtid.CURRENT_GTID).build()).build());
        }
        try {
            VtctldConnection of = VtctldConnection.of(vitessConnectorConfig.getVtctldHost(), vitessConnectorConfig.getVtctldPort(), vitessConnectorConfig.getVtctldUsername(), vitessConnectorConfig.getVtctldPassword());
            try {
                Vgtid latestVgtid = of.latestVgtid(vitessConnectorConfig.getKeyspace(), vitessConnectorConfig.getShard(), VtctldConnection.TabletType.valueOf(vitessConnectorConfig.getTabletType()));
                if (of != null) {
                    of.close();
                }
                return latestVgtid;
            } finally {
            }
        } catch (Exception e) {
            LOGGER.error("Cannot get vgtid from VTCtld", e);
            throw new RuntimeException(e);
        }
    }

    public String connectionString() {
        return String.format("vtgate gRPC connection %s:%s", this.config.getVtgateHost(), Integer.valueOf(this.config.getVtgatePort()));
    }

    public String username() {
        return this.config.getVtgateUsername();
    }

    private static Topodata.TabletType toTopodataTabletType(VtctldConnection.TabletType tabletType) {
        switch (tabletType) {
            case MASTER:
                return Topodata.TabletType.MASTER;
            case REPLICA:
                return Topodata.TabletType.REPLICA;
            case RDONLY:
                return Topodata.TabletType.RDONLY;
            default:
                LOGGER.warn("Unknown tabletType {}", tabletType);
                return null;
        }
    }
}
