/*
 * Decompiled with CFR 0.152.
 */
package org.tikv.common.importer;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.common.AbstractGRPCClient;
import org.tikv.common.PDClient;
import org.tikv.common.TiConfiguration;
import org.tikv.common.exception.GrpcException;
import org.tikv.common.exception.RegionException;
import org.tikv.common.operation.NoopHandler;
import org.tikv.common.region.RegionManager;
import org.tikv.common.region.TiStore;
import org.tikv.common.util.ChannelFactory;
import org.tikv.common.util.ConcreteBackOffer;
import org.tikv.kvproto.ImportSSTGrpc;
import org.tikv.kvproto.ImportSstpb;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.shade.io.grpc.ManagedChannel;
import org.tikv.shade.io.grpc.stub.StreamObserver;

public class ImporterStoreClient<RequestClass, ResponseClass>
extends AbstractGRPCClient<ImportSSTGrpc.ImportSSTBlockingStub, ImportSSTGrpc.ImportSSTFutureStub>
implements StreamObserver<ResponseClass> {
    private static final Logger logger = LoggerFactory.getLogger(ImporterStoreClient.class);
    private final ImportSSTGrpc.ImportSSTStub stub;
    private StreamObserver<RequestClass> streamObserverRequest;
    private ResponseClass writeResponse;
    private Throwable writeError;

    protected ImporterStoreClient(TiConfiguration conf, ChannelFactory channelFactory, ImportSSTGrpc.ImportSSTBlockingStub blockingStub, ImportSSTGrpc.ImportSSTFutureStub asyncStub, ImportSSTGrpc.ImportSSTStub stub) {
        super(conf, channelFactory, blockingStub, asyncStub);
        this.stub = stub;
    }

    public synchronized boolean isWriteResponseReceived() {
        return this.writeResponse != null;
    }

    public synchronized ResponseClass getWriteResponse() {
        return this.writeResponse;
    }

    private synchronized void setWriteResponse(ResponseClass writeResponse) {
        this.writeResponse = writeResponse;
    }

    public synchronized boolean hasWriteResponseError() {
        return this.writeError != null;
    }

    public synchronized Throwable getWriteError() {
        return this.writeError;
    }

    private synchronized void setWriteError(Throwable t) {
        this.writeError = t;
    }

    @Override
    public void onNext(ResponseClass response) {
        this.setWriteResponse(response);
    }

    @Override
    public void onError(Throwable t) {
        this.setWriteError(t);
        logger.error("Error during write!", t);
    }

    @Override
    public void onCompleted() {
    }

    public void startWrite() {
        this.streamObserverRequest = this.conf.isRawKVMode() ? this.getStub().rawWrite(this) : this.getStub().write(this);
    }

    public void writeBatch(RequestClass request) {
        this.streamObserverRequest.onNext(request);
    }

    public void finishWrite() {
        this.streamObserverRequest.onCompleted();
    }

    public void multiIngest(Kvrpcpb.Context ctx, Object writeResponse) throws RegionException {
        List<ImportSstpb.SSTMeta> metasList;
        if (writeResponse instanceof ImportSstpb.RawWriteResponse) {
            metasList = ((ImportSstpb.RawWriteResponse)writeResponse).getMetasList();
        } else if (writeResponse instanceof ImportSstpb.WriteResponse) {
            metasList = ((ImportSstpb.WriteResponse)writeResponse).getMetasList();
        } else {
            throw new IllegalArgumentException("Wrong response type: " + writeResponse);
        }
        ImportSstpb.MultiIngestRequest request = ImportSstpb.MultiIngestRequest.newBuilder().setContext(ctx).addAllSsts(metasList).build();
        ImportSstpb.IngestResponse response = this.getBlockingStub().multiIngest(request);
        if (response.hasError()) {
            throw new RegionException(response.getError());
        }
    }

    public void switchMode(ImportSstpb.SwitchMode mode) {
        Supplier<ImportSstpb.SwitchModeRequest> request = () -> ImportSstpb.SwitchModeRequest.newBuilder().setMode(mode).build();
        NoopHandler noopHandler = new NoopHandler();
        this.callWithRetry(ConcreteBackOffer.newCustomBackOff(1000), ImportSSTGrpc.getSwitchModeMethod(), request, noopHandler);
    }

    @Override
    protected ImportSSTGrpc.ImportSSTBlockingStub getBlockingStub() {
        return (ImportSSTGrpc.ImportSSTBlockingStub)((ImportSSTGrpc.ImportSSTBlockingStub)this.blockingStub).withDeadlineAfter(this.conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override
    protected ImportSSTGrpc.ImportSSTFutureStub getAsyncStub() {
        return (ImportSSTGrpc.ImportSSTFutureStub)((ImportSSTGrpc.ImportSSTFutureStub)this.asyncStub).withDeadlineAfter(this.conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
    }

    protected ImportSSTGrpc.ImportSSTStub getStub() {
        return (ImportSSTGrpc.ImportSSTStub)this.stub.withDeadlineAfter(this.conf.getIngestTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void close() throws Exception {
    }

    public static class ImporterStoreClientBuilder<RequestClass, ResponseClass> {
        private final TiConfiguration conf;
        private final ChannelFactory channelFactory;
        private final RegionManager regionManager;
        private final PDClient pdClient;

        public ImporterStoreClientBuilder(TiConfiguration conf, ChannelFactory channelFactory, RegionManager regionManager, PDClient pdClient) {
            Objects.requireNonNull(conf, "conf is null");
            Objects.requireNonNull(channelFactory, "channelFactory is null");
            Objects.requireNonNull(regionManager, "regionManager is null");
            this.conf = conf;
            this.channelFactory = channelFactory;
            this.regionManager = regionManager;
            this.pdClient = pdClient;
        }

        public synchronized ImporterStoreClient build(TiStore store) throws GrpcException {
            Objects.requireNonNull(store, "store is null");
            String addressStr = store.getStore().getAddress();
            logger.debug(String.format("Create region store client on address %s", addressStr));
            ManagedChannel channel = this.channelFactory.getChannel(addressStr, this.pdClient.getHostMapping());
            ImportSSTGrpc.ImportSSTBlockingStub blockingStub = ImportSSTGrpc.newBlockingStub(channel);
            ImportSSTGrpc.ImportSSTFutureStub asyncStub = ImportSSTGrpc.newFutureStub(channel);
            ImportSSTGrpc.ImportSSTStub stub = ImportSSTGrpc.newStub(channel);
            return new ImporterStoreClient(this.conf, this.channelFactory, blockingStub, asyncStub, stub);
        }
    }
}

