/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.namenode.ha;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.ClientGSIContext;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.namenode.ha.ObserverReadProxyProvider;
import org.apache.hadoop.io.retry.FailoverProxyProvider;
import org.apache.hadoop.io.retry.RetryInvocationHandler;
import org.apache.hadoop.shaded.com.google.common.base.Joiner;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HATestUtil {
    private static final Logger LOG = LoggerFactory.getLogger(HATestUtil.class);
    private static final String LOGICAL_HOSTNAME = "ha-nn-uri-%d";

    public static void waitForStandbyToCatchUp(NameNode active, NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
        long activeTxId = active.getNamesystem().getFSImage().getEditLog().getLastWrittenTxId();
        active.getRpcServer().rollEditLog();
        long start = Time.now();
        while (Time.now() - start < 10000L) {
            long nn2HighestTxId = standby.getNamesystem().getFSImage().getLastAppliedTxId();
            if (nn2HighestTxId >= activeTxId) {
                return;
            }
            Thread.sleep(1000L);
        }
        throw new CouldNotCatchUpException("Standby did not catch up to txid " + activeTxId + " (currently at " + standby.getNamesystem().getFSImage().getLastAppliedTxId() + ")");
    }

    public static void waitForDNDeletions(final MiniDFSCluster cluster) throws TimeoutException, InterruptedException {
        GenericTestUtils.waitFor(new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                for (DataNode dn : cluster.getDataNodes()) {
                    if (cluster.getFsDatasetTestUtils(dn).getPendingAsyncDeletions() <= 0L) continue;
                    return false;
                }
                return true;
            }
        }, 1000L, 10000L);
    }

    public static void waitForNNToIssueDeletions(final NameNode nn) throws Exception {
        GenericTestUtils.waitFor(new Supplier<Boolean>(){

            @Override
            public Boolean get() {
                LOG.info("Waiting for NN to issue block deletions to DNs");
                return nn.getNamesystem().getBlockManager().getPendingDeletionBlocksCount() == 0L;
            }
        }, 250L, 10000L);
    }

    public static DistributedFileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf) throws IOException, URISyntaxException {
        return HATestUtil.configureFailoverFs(cluster, conf, 0);
    }

    public static DistributedFileSystem configureFailoverFs(MiniDFSCluster cluster, Configuration conf, int nsIndex) throws IOException, URISyntaxException {
        conf = new Configuration(conf);
        String logicalName = HATestUtil.getLogicalHostname(cluster);
        HATestUtil.setFailoverConfigurations(cluster, conf, logicalName, nsIndex);
        FileSystem fs = FileSystem.get((URI)new URI("hdfs://" + logicalName), (Configuration)conf);
        return (DistributedFileSystem)fs;
    }

    public static <P extends ObserverReadProxyProvider<?>> DistributedFileSystem configureObserverReadFs(MiniDFSCluster cluster, Configuration conf, Class<P> classFPP, boolean isObserverReadEnabled) throws IOException, URISyntaxException {
        String logicalName = conf.get("dfs.nameservices");
        URI nnUri = new URI("hdfs://" + logicalName);
        conf.set("dfs.client.failover.proxy.provider." + logicalName, classFPP.getName());
        conf.set("fs.defaultFS", nnUri.toString());
        DistributedFileSystem dfs = (DistributedFileSystem)FileSystem.get((URI)nnUri, (Configuration)conf);
        ObserverReadProxyProvider provider = (ObserverReadProxyProvider)((RetryInvocationHandler)Proxy.getInvocationHandler(dfs.getClient().getNamenode())).getProxyProvider();
        provider.setObserverReadEnabled(isObserverReadEnabled);
        return dfs;
    }

    public static boolean isSentToAnyOfNameNodes(DistributedFileSystem dfs, MiniDFSCluster cluster, int ... nnIndices) throws IOException {
        ObserverReadProxyProvider provider = (ObserverReadProxyProvider)((RetryInvocationHandler)Proxy.getInvocationHandler(dfs.getClient().getNamenode())).getProxyProvider();
        FailoverProxyProvider.ProxyInfo pi = provider.getLastProxy();
        for (int nnIdx : nnIndices) {
            if (!pi.proxyInfo.equals(cluster.getNameNode(nnIdx).getNameNodeAddress().toString())) continue;
            return true;
        }
        return false;
    }

    public static MiniQJMHACluster setUpObserverCluster(Configuration conf, int numObservers, int numDataNodes, boolean fastTailing) throws IOException {
        return HATestUtil.setUpObserverCluster(conf, numObservers, numDataNodes, fastTailing, null, null);
    }

    public static MiniQJMHACluster setUpObserverCluster(Configuration conf, int numObservers, int numDataNodes, boolean fastTailing, long[] simulatedCapacities, String[] racks) throws IOException {
        conf.setInt("dfs.datanode.scan.period.hours", -1);
        conf.setBoolean("dfs.ha.tail-edits.in-progress", fastTailing);
        if (fastTailing) {
            conf.setTimeDuration("dfs.ha.tail-edits.period", 100L, TimeUnit.MILLISECONDS);
        } else {
            conf.setTimeDuration("dfs.ha.log-roll.period", 300L, TimeUnit.SECONDS);
            conf.setTimeDuration("dfs.ha.tail-edits.period", 200L, TimeUnit.SECONDS);
        }
        MiniQJMHACluster.Builder qjmBuilder = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2 + numObservers);
        qjmBuilder.getDfsBuilder().numDataNodes(numDataNodes).simulatedCapacities(simulatedCapacities).racks(racks);
        MiniQJMHACluster qjmhaCluster = qjmBuilder.build();
        MiniDFSCluster dfsCluster = qjmhaCluster.getDfsCluster();
        dfsCluster.transitionToActive(0);
        dfsCluster.waitActive(0);
        for (int i = 0; i < numObservers; ++i) {
            dfsCluster.transitionToObserver(2 + i);
        }
        return qjmhaCluster;
    }

    public static <P extends FailoverProxyProvider<?>> void setupHAConfiguration(MiniDFSCluster cluster, Configuration conf, int nsIndex, Class<P> classFPP) {
        MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
        ArrayList<String> nnAddresses = new ArrayList<String>();
        for (MiniDFSCluster.NameNodeInfo nn : nns) {
            InetSocketAddress addr = nn.nameNode.getNameNodeAddress();
            nnAddresses.add(DFSUtil.createUri("hdfs", addr).toString());
        }
        HATestUtil.setFailoverConfigurations(conf, HATestUtil.getLogicalHostname(cluster), nnAddresses, classFPP);
    }

    public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf) {
        HATestUtil.setFailoverConfigurations(cluster, conf, HATestUtil.getLogicalHostname(cluster));
    }

    public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName) {
        HATestUtil.setFailoverConfigurations(cluster, conf, logicalName, 0);
    }

    public static void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName, int nsIndex) {
        HATestUtil.setFailoverConfigurations(cluster, conf, logicalName, nsIndex, ConfiguredFailoverProxyProvider.class);
    }

    public static <P extends FailoverProxyProvider<?>> void setFailoverConfigurations(MiniDFSCluster cluster, Configuration conf, String logicalName, int nsIndex, Class<P> classFPP) {
        MiniDFSCluster.NameNodeInfo[] nns = cluster.getNameNodeInfos(nsIndex);
        ArrayList<InetSocketAddress> nnAddresses = new ArrayList<InetSocketAddress>(3);
        for (MiniDFSCluster.NameNodeInfo nn : nns) {
            nnAddresses.add(nn.nameNode.getNameNodeAddress());
        }
        HATestUtil.setFailoverConfigurations(conf, logicalName, nnAddresses, classFPP);
    }

    public static void setFailoverConfigurations(Configuration conf, String logicalName, InetSocketAddress ... nnAddresses) {
        HATestUtil.setFailoverConfigurations(conf, logicalName, Arrays.asList(nnAddresses), ConfiguredFailoverProxyProvider.class);
    }

    public static <P extends FailoverProxyProvider<?>> void setFailoverConfigurations(Configuration conf, String logicalName, List<InetSocketAddress> nnAddresses, Class<P> classFPP) {
        ArrayList<String> addresses = new ArrayList<String>();
        nnAddresses.forEach(addr -> addresses.add("hdfs://" + addr.getHostName() + ":" + addr.getPort()));
        HATestUtil.setFailoverConfigurations(conf, logicalName, addresses, classFPP);
    }

    public static <P extends FailoverProxyProvider<?>> void setFailoverConfigurations(Configuration conf, String logicalName, Iterable<String> nnAddresses, Class<P> classFPP) {
        ArrayList<String> nnids = new ArrayList<String>();
        int i = 0;
        for (String address : nnAddresses) {
            String nnId = "nn" + (i + 1);
            nnids.add(nnId);
            conf.set(DFSUtil.addKeySuffixes("dfs.namenode.rpc-address", logicalName, nnId), address);
            ++i;
        }
        conf.set("dfs.nameservices", logicalName);
        conf.set(DFSUtil.addKeySuffixes("dfs.ha.namenodes", logicalName), Joiner.on((char)',').join(nnids));
        conf.set("dfs.client.failover.proxy.provider." + logicalName, classFPP.getName());
        conf.set("fs.defaultFS", "hdfs://" + logicalName);
    }

    public static String getLogicalHostname(MiniDFSCluster cluster) {
        return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
    }

    public static URI getLogicalUri(MiniDFSCluster cluster) throws URISyntaxException {
        return new URI("hdfs://" + HATestUtil.getLogicalHostname(cluster));
    }

    public static void waitForCheckpoint(MiniDFSCluster cluster, int nnIdx, List<Integer> txids) throws InterruptedException {
        long start = Time.now();
        while (true) {
            try {
                FSImageTestUtil.assertNNHasCheckpoints(cluster, nnIdx, txids);
                return;
            }
            catch (AssertionError err) {
                if (Time.now() - start > 10000L) {
                    throw err;
                }
                Thread.sleep(300L);
                continue;
            }
            break;
        }
    }

    public static long setACStateId(DistributedFileSystem dfs, long stateId) throws Exception {
        ObserverReadProxyProvider provider = (ObserverReadProxyProvider)((RetryInvocationHandler)Proxy.getInvocationHandler(dfs.getClient().getNamenode())).getProxyProvider();
        ClientGSIContext ac = (ClientGSIContext)provider.getAlignmentContext();
        Field f = ac.getClass().getDeclaredField("lastSeenStateId");
        f.setAccessible(true);
        LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac);
        long currentStateId = lastSeenStateId.getThenReset();
        lastSeenStateId.accumulate(stateId);
        return currentStateId;
    }

    public static class CouldNotCatchUpException
    extends IOException {
        private static final long serialVersionUID = 1L;

        public CouldNotCatchUpException(String message) {
            super(message);
        }
    }
}

