/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.server.simulation;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RaftRpcMessage;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.Timestamp;

class SimulatedRequestReply<REQUEST extends RaftRpcMessage, REPLY extends RaftRpcMessage> {
    static final String SIMULATE_LATENCY_KEY = SimulatedRequestReply.class.getName() + ".simulateLatencyMs";
    static final int SIMULATE_LATENCY_DEFAULT = RaftServerConfigKeys.Rpc.TIMEOUT_MIN_DEFAULT.toIntExact(TimeUnit.MILLISECONDS);
    static final long TIMEOUT = 3000L;
    private final Map<String, EventQueue<REQUEST, REPLY>> queues = new ConcurrentHashMap<String, EventQueue<REQUEST, REPLY>>();
    private final int simulateLatencyMs;

    SimulatedRequestReply(int simulateLatencyMs) {
        this.simulateLatencyMs = simulateLatencyMs;
    }

    EventQueue<REQUEST, REPLY> getQueue(String qid) {
        return this.queues.get(qid);
    }

    public REPLY sendRequest(REQUEST request) throws IOException {
        String qid = request.getReplierId();
        EventQueue<REQUEST, REPLY> q = this.queues.get(qid);
        if (q == null) {
            throw new IOException("The peer " + qid + " is not alive.");
        }
        try {
            RaftTestUtil.block(q.blockSendRequestTo::get);
            return (REPLY)((RaftRpcMessage)q.request(request));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw IOUtils.toInterruptedIOException((String)"", (InterruptedException)e);
        }
    }

    public REQUEST takeRequest(String qid) throws IOException {
        RaftRpcMessage request;
        EventQueue<REQUEST, REPLY> q = this.queues.get(qid);
        if (q == null) {
            throw new IOException("The RPC of " + qid + " has already shutdown.");
        }
        try {
            block4: {
                EventQueue<REQUEST, REPLY> reqQ;
                do {
                    request = (RaftRpcMessage)q.takeRequest();
                    Preconditions.assertTrue((boolean)qid.equals(request.getReplierId()));
                    reqQ = this.queues.get(request.getRequestorId());
                    if (reqQ == null) break block4;
                } while (reqQ.blockTakeRequestFrom.get());
                RaftTestUtil.delay(reqQ.delayTakeRequestFrom::get);
            }
            RaftTestUtil.delay(q.delayTakeRequestTo::get);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw IOUtils.toInterruptedIOException((String)"", (InterruptedException)e);
        }
        return (REQUEST)request;
    }

    public void sendReply(REQUEST request, REPLY reply, IOException ioe) throws IOException {
        if (reply != null) {
            Preconditions.assertTrue((boolean)request.getRequestorId().equals(reply.getRequestorId()));
            Preconditions.assertTrue((boolean)request.getReplierId().equals(reply.getReplierId()));
        }
        this.simulateLatency();
        String qid = request.getReplierId();
        EventQueue<REQUEST, REPLY> q = this.queues.get(qid);
        if (q != null) {
            q.reply(request, reply, ioe);
        }
    }

    public void shutdown(String id) {
        this.queues.remove(id);
    }

    public void clear() {
        this.queues.clear();
    }

    public void addPeer(RaftPeerId newPeer) {
        this.queues.put(newPeer.toString(), new EventQueue());
    }

    private void simulateLatency() throws IOException {
        if (this.simulateLatencyMs > 0) {
            int waitExpetation = this.simulateLatencyMs / 10;
            int waitHalfRange = waitExpetation / 3;
            int randomSleepMs = ThreadLocalRandom.current().nextInt(2 * waitHalfRange) + waitExpetation - waitHalfRange;
            try {
                Thread.sleep(randomSleepMs);
            }
            catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
                throw IOUtils.toInterruptedIOException((String)"", (InterruptedException)ie);
            }
        }
    }

    static class EventQueue<REQUEST, REPLY> {
        private final BlockingQueue<REQUEST> requestQueue = new LinkedBlockingQueue<REQUEST>();
        private final Map<REQUEST, ReplyOrException<REPLY>> replyMap = new ConcurrentHashMap<REQUEST, ReplyOrException<REPLY>>();
        final AtomicBoolean blockTakeRequestFrom = new AtomicBoolean();
        final AtomicBoolean blockSendRequestTo = new AtomicBoolean();
        final AtomicInteger delayTakeRequestTo = new AtomicInteger();
        final AtomicInteger delayTakeRequestFrom = new AtomicInteger();

        EventQueue() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        REPLY request(REQUEST request) throws InterruptedException, IOException {
            this.requestQueue.put(request);
            EventQueue eventQueue = this;
            synchronized (eventQueue) {
                Timestamp startTime = Timestamp.currentTime();
                while (startTime.elapsedTimeMs() < 3000L && !this.replyMap.containsKey(request)) {
                    this.wait(3000L);
                }
            }
            if (!this.replyMap.containsKey(request)) {
                throw new IOException("Timeout while waiting for reply of request " + request);
            }
            ReplyOrException<REPLY> re = this.replyMap.remove(request);
            if (((ReplyOrException)re).ioe != null) {
                throw ((ReplyOrException)re).ioe;
            }
            return (REPLY)((ReplyOrException)re).reply;
        }

        REQUEST takeRequest() throws InterruptedException {
            return this.requestQueue.take();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void reply(REQUEST request, REPLY reply, IOException ioe) throws IOException {
            this.replyMap.put(request, new ReplyOrException<REPLY>(reply, ioe));
            EventQueue eventQueue = this;
            synchronized (eventQueue) {
                this.notifyAll();
            }
        }
    }

    private static class ReplyOrException<REPLY> {
        private final REPLY reply;
        private final IOException ioe;

        ReplyOrException(REPLY reply, IOException ioe) {
            Preconditions.assertTrue((boolean)(reply == null ^ ioe == null));
            this.reply = reply;
            this.ioe = ioe;
        }
    }
}

