/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.master;

import java.io.Closeable;
import java.io.IOException;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.Bootstrap;
import org.apache.hadoop.hbase.shaded.io.netty.bootstrap.ChannelFactory;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelException;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelOption;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramChannel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.DatagramPacket;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.InternetProtocolFamily;
import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioDatagramChannel;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo;

@InterfaceAudience.Private
public class ClusterStatusPublisher
extends ScheduledChore {
    public static final String STATUS_PUBLISHER_CLASS = "hbase.status.publisher.class";
    public static final Class<? extends Publisher> DEFAULT_STATUS_PUBLISHER_CLASS = MulticastPublisher.class;
    public static final String STATUS_PUBLISH_PERIOD = "hbase.status.publish.period";
    public static final int DEFAULT_STATUS_PUBLISH_PERIOD = 10000;
    private long lastMessageTime = 0L;
    private final HMaster master;
    private final int messagePeriod;
    private final ConcurrentMap<ServerName, Integer> lastSent = new ConcurrentHashMap<ServerName, Integer>();
    private Publisher publisher;
    private boolean connected = false;
    public static final int MAX_SERVER_PER_MESSAGE = 10;
    public static final int NB_SEND = 5;

    public ClusterStatusPublisher(HMaster master, Configuration conf, Class<? extends Publisher> publisherClass) throws IOException {
        super("HBase clusterStatusPublisher for " + master.getName(), master, conf.getInt(STATUS_PUBLISH_PERIOD, 10000));
        this.master = master;
        this.messagePeriod = conf.getInt(STATUS_PUBLISH_PERIOD, 10000);
        try {
            this.publisher = publisherClass.newInstance();
        }
        catch (InstantiationException e) {
            throw new IOException("Can't create publisher " + publisherClass.getName(), e);
        }
        catch (IllegalAccessException e) {
            throw new IOException("Can't create publisher " + publisherClass.getName(), e);
        }
        this.publisher.connect(conf);
        this.connected = true;
    }

    protected ClusterStatusPublisher() {
        this.master = null;
        this.messagePeriod = 0;
    }

    @Override
    protected void chore() {
        if (!this.connected) {
            return;
        }
        List<ServerName> sns = this.generateDeadServersListToSend();
        if (sns.isEmpty()) {
            return;
        }
        long curTime = EnvironmentEdgeManager.currentTime();
        if (this.lastMessageTime > curTime - (long)this.messagePeriod) {
            return;
        }
        this.lastMessageTime = curTime;
        ClusterStatus cs = new ClusterStatus(VersionInfo.getVersion(), this.master.getMasterFileSystem().getClusterId().toString(), null, sns, this.master.getServerName(), null, null, null, null);
        this.publisher.publish(cs);
    }

    @Override
    protected void cleanup() {
        this.connected = false;
        this.publisher.close();
    }

    protected List<ServerName> generateDeadServersListToSend() {
        long since = EnvironmentEdgeManager.currentTime() - (long)(this.messagePeriod * 2);
        for (Pair<ServerName, Long> dead : this.getDeadServers(since)) {
            this.lastSent.putIfAbsent(dead.getFirst(), 0);
        }
        ArrayList entries = new ArrayList();
        entries.addAll(this.lastSent.entrySet());
        Collections.sort(entries, new Comparator<Map.Entry<ServerName, Integer>>(){

            @Override
            public int compare(Map.Entry<ServerName, Integer> o1, Map.Entry<ServerName, Integer> o2) {
                return o1.getValue().compareTo(o2.getValue());
            }
        });
        int max = entries.size() > 10 ? 10 : entries.size();
        ArrayList<ServerName> res = new ArrayList<ServerName>(max);
        for (int i = 0; i < max; ++i) {
            Map.Entry toSend = (Map.Entry)entries.get(i);
            if ((Integer)toSend.getValue() >= 4) {
                this.lastSent.remove(toSend.getKey());
            } else {
                this.lastSent.replace((ServerName)toSend.getKey(), (Integer)toSend.getValue(), (Integer)toSend.getValue() + 1);
            }
            res.add((ServerName)toSend.getKey());
        }
        return res;
    }

    protected List<Pair<ServerName, Long>> getDeadServers(long since) {
        if (this.master.getServerManager() == null) {
            return Collections.emptyList();
        }
        return this.master.getServerManager().getDeadServers().copyDeadServersSince(since);
    }

    @InterfaceAudience.LimitedPrivate(value={"Configuration"})
    public static class MulticastPublisher
    implements Publisher {
        private DatagramChannel channel;
        private final EventLoopGroup group = new NioEventLoopGroup(1, Threads.newDaemonThreadFactory("hbase-master-clusterStatusPublisher"));

        @Override
        public void connect(Configuration conf) throws IOException {
            InternetProtocolFamily family;
            InetAddress localAddress;
            InetAddress ina;
            String mcAddress = conf.get("hbase.status.multicast.address.ip", "226.1.1.3");
            int port = conf.getInt("hbase.status.multicast.address.port", 16100);
            try {
                ina = InetAddress.getByName(mcAddress);
            }
            catch (UnknownHostException e) {
                this.close();
                throw new IOException("Can't connect to " + mcAddress, e);
            }
            InetSocketAddress isa = new InetSocketAddress(mcAddress, port);
            if (ina instanceof Inet6Address) {
                localAddress = Addressing.getIp6Address();
                family = InternetProtocolFamily.IPv6;
            } else {
                localAddress = Addressing.getIp4Address();
                family = InternetProtocolFamily.IPv4;
            }
            NetworkInterface ni = NetworkInterface.getByInetAddress(localAddress);
            Bootstrap b = new Bootstrap();
            ((Bootstrap)((Bootstrap)((Bootstrap)b.group(this.group)).channelFactory(new HBaseDatagramChannelFactory<NioDatagramChannel>(NioDatagramChannel.class, family))).option(ChannelOption.SO_REUSEADDR, true)).handler(new ClusterStatusEncoder(isa));
            try {
                this.channel = (DatagramChannel)b.bind(new InetSocketAddress(0)).sync().channel();
                this.channel.joinGroup(ina, ni, null, this.channel.newPromise()).sync();
                this.channel.connect(isa).sync();
            }
            catch (InterruptedException e) {
                this.close();
                throw ExceptionUtil.asInterrupt(e);
            }
        }

        @Override
        public void publish(ClusterStatus cs) {
            this.channel.writeAndFlush(cs).syncUninterruptibly();
        }

        @Override
        public void close() {
            if (this.channel != null) {
                this.channel.close();
            }
            this.group.shutdownGracefully();
        }

        private static class ClusterStatusEncoder
        extends MessageToMessageEncoder<ClusterStatus> {
            private final InetSocketAddress isa;

            private ClusterStatusEncoder(InetSocketAddress isa) {
                this.isa = isa;
            }

            @Override
            protected void encode(ChannelHandlerContext channelHandlerContext, ClusterStatus clusterStatus, List<Object> objects) {
                ClusterStatusProtos.ClusterStatus csp = clusterStatus.convert();
                objects.add(new DatagramPacket(Unpooled.wrappedBuffer(csp.toByteArray()), this.isa));
            }
        }

        private static final class HBaseDatagramChannelFactory<T extends Channel>
        implements ChannelFactory<T> {
            private final Class<? extends T> clazz;
            private InternetProtocolFamily family;

            HBaseDatagramChannelFactory(Class<? extends T> clazz, InternetProtocolFamily family) {
                this.clazz = clazz;
                this.family = family;
            }

            @Override
            public T newChannel() {
                try {
                    return (T)((Channel)ReflectionUtils.instantiateWithCustomCtor(this.clazz.getName(), new Class[]{InternetProtocolFamily.class}, new Object[]{this.family}));
                }
                catch (Throwable t) {
                    throw new ChannelException("Unable to create Channel from class " + this.clazz, t);
                }
            }

            public String toString() {
                return StringUtil.simpleClassName(this.clazz) + ".class";
            }
        }
    }

    public static interface Publisher
    extends Closeable {
        public void connect(Configuration var1) throws IOException;

        public void publish(ClusterStatus var1);

        @Override
        public void close();
    }
}

