package org.apache.cassandra.repair;

import java.net.InetAddress;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.SyncComplete;
import org.apache.cassandra.repair.messages.SyncRequest;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamState;
import org.apache.cassandra.utils.FBUtilities;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/cassandra/repair/StreamingRepairTask.class */
public class StreamingRepairTask implements Runnable, StreamEventHandler {
    private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class);
    public final RepairJobDesc desc;
    public final SyncRequest request;

    public StreamingRepairTask(RepairJobDesc repairJobDesc, SyncRequest syncRequest) {
        this.desc = repairJobDesc;
        this.request = syncRequest;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.request.src.equals(FBUtilities.getBroadcastAddress())) {
            initiateStreaming();
        } else {
            forwardToSource();
        }
    }

    private void initiateStreaming() {
        long j = 0;
        InetAddress inetAddress = this.request.dst;
        InetAddress preferredIP = SystemKeyspace.getPreferredIP(inetAddress);
        if (this.desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(this.desc.parentSessionId) != null) {
            j = ActiveRepairService.instance.getParentRepairSession(this.desc.parentSessionId).repairedAt;
        }
        logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", this.desc.sessionId, Integer.valueOf(this.request.ranges.size()), this.request.dst));
        new StreamPlan("Repair", j, 1).flushBeforeTransfer(true).requestRanges(inetAddress, preferredIP, this.desc.keyspace, this.request.ranges, this.desc.columnFamily).transferRanges(inetAddress, preferredIP, this.desc.keyspace, this.request.ranges, this.desc.columnFamily).execute().addEventListener(this);
    }

    private void forwardToSource() {
        logger.info(String.format("[repair #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", this.desc.sessionId, Integer.valueOf(this.request.ranges.size()), this.request.src, this.request.dst));
        MessagingService.instance().sendOneWay(this.request.createMessage(), this.request.src);
    }

    @Override // org.apache.cassandra.streaming.StreamEventHandler
    public void handleStreamEvent(StreamEvent streamEvent) {
    }

    @Override // org.apache.cassandra.$internal.com.google.common.util.concurrent.FutureCallback
    public void onSuccess(StreamState streamState) {
        logger.info(String.format("[repair #%s] streaming task succeed, returning response to %s", this.desc.sessionId, this.request.initiator));
        MessagingService.instance().sendOneWay(new SyncComplete(this.desc, this.request.src, this.request.dst, true).createMessage(), this.request.initiator);
    }

    @Override // org.apache.cassandra.$internal.com.google.common.util.concurrent.FutureCallback
    public void onFailure(Throwable th) {
        MessagingService.instance().sendOneWay(new SyncComplete(this.desc, this.request.src, this.request.dst, false).createMessage(), this.request.initiator);
    }
}
