/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordination;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Inject;
import com.google.inject.Provider;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import org.apache.curator.utils.ZKPaths;
import org.apache.druid.common.utils.UUIDUtils;
import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordination.ChangeRequestsSnapshot;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.DataSegmentChangeRequest;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.SegmentChangeRequestDrop;
import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.timeline.DataSegment;

public class BatchDataSegmentAnnouncer
implements DataSegmentAnnouncer {
    private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class);
    private final BatchDataSegmentAnnouncerConfig config;
    @Nullable
    private final Announcer announcer;
    private final ObjectMapper jsonMapper;
    private final String liveSegmentLocation;
    private final DruidServerMetadata server;
    private final Object lock = new Object();
    private final AtomicLong counter = new AtomicLong(0L);
    private final Set<SegmentZNode> availableZNodes = new ConcurrentSkipListSet<SegmentZNode>();
    private final ConcurrentMap<DataSegment, SegmentZNode> segmentLookup = new ConcurrentHashMap<DataSegment, SegmentZNode>();
    private final Function<DataSegment, DataSegment> segmentTransformer;
    private final ChangeRequestHistory<DataSegmentChangeRequest> changes = new ChangeRequestHistory();
    @Nullable
    private final SegmentZNode dummyZnode;
    private final boolean isSkipSegmentAnnouncementOnZk;

    @Inject
    public BatchDataSegmentAnnouncer(DruidServerMetadata server, BatchDataSegmentAnnouncerConfig config, ZkPathsConfig zkPaths, Provider<Announcer> announcerProvider, ObjectMapper jsonMapper, ZkEnablementConfig zkEnablementConfig) {
        this.config = config;
        this.jsonMapper = jsonMapper;
        this.server = server;
        this.liveSegmentLocation = ZKPaths.makePath((String)zkPaths.getLiveSegmentsPath(), (String)server.getName());
        this.segmentTransformer = input -> {
            DataSegment rv = input;
            if (config.isSkipDimensionsAndMetrics()) {
                rv = rv.withDimensions(null).withMetrics(null);
            }
            if (config.isSkipLoadSpec()) {
                rv = rv.withLoadSpec(null);
            }
            return rv;
        };
        boolean bl = this.isSkipSegmentAnnouncementOnZk = !zkEnablementConfig.isEnabled() || config.isSkipSegmentAnnouncementOnZk();
        if (this.isSkipSegmentAnnouncementOnZk) {
            this.dummyZnode = new SegmentZNode("PLACE_HOLDER_ONLY");
            this.announcer = null;
        } else {
            this.dummyZnode = null;
            this.announcer = (Announcer)announcerProvider.get();
        }
    }

    @VisibleForTesting
    public BatchDataSegmentAnnouncer(DruidServerMetadata server, BatchDataSegmentAnnouncerConfig config, ZkPathsConfig zkPaths, Announcer announcer, ObjectMapper jsonMapper) {
        this(server, config, zkPaths, (Provider<Announcer>)((Provider)() -> announcer), jsonMapper, ZkEnablementConfig.ENABLED);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void announceSegment(DataSegment segment) throws IOException {
        if (this.segmentLookup.containsKey(segment)) {
            log.info("Skipping announcement of segment [%s]. Announcement exists already.", new Object[]{segment.getId()});
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            if (this.segmentLookup.containsKey(segment)) {
                log.info("Skipping announcement of segment [%s]. Announcement exists already.", new Object[]{segment.getId()});
                return;
            }
            DataSegment toAnnounce = (DataSegment)this.segmentTransformer.apply((Object)segment);
            this.changes.addChangeRequest(new SegmentChangeRequestLoad(toAnnounce));
            if (this.isSkipSegmentAnnouncementOnZk) {
                this.segmentLookup.put(segment, this.dummyZnode);
                return;
            }
            int newBytesLen = this.jsonMapper.writeValueAsBytes((Object)toAnnounce).length;
            if ((long)newBytesLen > this.config.getMaxBytesPerNode()) {
                throw new ISE("byte size %,d exceeds %,d", new Object[]{newBytesLen, this.config.getMaxBytesPerNode()});
            }
            boolean done = false;
            if (!this.availableZNodes.isEmpty()) {
                Iterator<SegmentZNode> iter = this.availableZNodes.iterator();
                while (iter.hasNext() && !done) {
                    SegmentZNode availableZNode = iter.next();
                    if ((long)(availableZNode.getBytes().length + newBytesLen) < this.config.getMaxBytesPerNode()) {
                        availableZNode.addSegment(toAnnounce);
                        log.info("Announcing segment[%s] at existing path[%s]", new Object[]{toAnnounce.getId(), availableZNode.getPath()});
                        this.announcer.update(availableZNode.getPath(), availableZNode.getBytes());
                        this.segmentLookup.put(toAnnounce, availableZNode);
                        if (availableZNode.getCount() >= this.config.getSegmentsPerNode()) {
                            this.availableZNodes.remove(availableZNode);
                        }
                        done = true;
                        continue;
                    }
                    this.availableZNodes.remove(availableZNode);
                }
            }
            if (!done) {
                assert (this.availableZNodes.isEmpty());
                SegmentZNode availableZNode = new SegmentZNode(this.makeServedSegmentPath());
                availableZNode.addSegment(toAnnounce);
                log.info("Announcing segment[%s] at new path[%s]", new Object[]{toAnnounce.getId(), availableZNode.getPath()});
                this.announcer.announce(availableZNode.getPath(), availableZNode.getBytes());
                this.segmentLookup.put(toAnnounce, availableZNode);
                this.availableZNodes.add(availableZNode);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void unannounceSegment(DataSegment segment) {
        Object object = this.lock;
        synchronized (object) {
            SegmentZNode segmentZNode = (SegmentZNode)this.segmentLookup.remove(segment);
            if (segmentZNode == null) {
                log.warn("No path to unannounce segment[%s]", new Object[]{segment.getId()});
                return;
            }
            this.changes.addChangeRequest(new SegmentChangeRequestDrop(segment));
            if (this.isSkipSegmentAnnouncementOnZk) {
                return;
            }
            segmentZNode.removeSegment(segment);
            log.info("Unannouncing segment[%s] at path[%s]", new Object[]{segment.getId(), segmentZNode.getPath()});
            if (segmentZNode.getCount() == 0) {
                this.availableZNodes.remove(segmentZNode);
                this.announcer.unannounce(segmentZNode.getPath());
            } else {
                this.announcer.update(segmentZNode.getPath(), segmentZNode.getBytes());
                this.availableZNodes.add(segmentZNode);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void announceSegments(Iterable<DataSegment> segments) throws IOException {
        SegmentZNode segmentZNode = new SegmentZNode(this.makeServedSegmentPath());
        HashSet<Object> batch = new HashSet<DataSegment>();
        ArrayList<SegmentChangeRequestLoad> changesBatch = new ArrayList<SegmentChangeRequestLoad>();
        int byteSize = 0;
        int count = 0;
        Object object = this.lock;
        synchronized (object) {
            for (DataSegment ds : segments) {
                if (this.segmentLookup.containsKey(ds)) {
                    log.info("Skipping announcement of segment [%s]. Announcement exists already.", new Object[]{ds.getId()});
                    return;
                }
                DataSegment segment = (DataSegment)this.segmentTransformer.apply((Object)ds);
                changesBatch.add(new SegmentChangeRequestLoad(segment));
                if (this.isSkipSegmentAnnouncementOnZk) {
                    this.segmentLookup.put(segment, this.dummyZnode);
                    continue;
                }
                int newBytesLen = this.jsonMapper.writeValueAsBytes((Object)segment).length;
                if ((long)newBytesLen > this.config.getMaxBytesPerNode()) {
                    throw new ISE("byte size %,d exceeds %,d", new Object[]{newBytesLen, this.config.getMaxBytesPerNode()});
                }
                if (count >= this.config.getSegmentsPerNode() || (long)(byteSize + newBytesLen) > this.config.getMaxBytesPerNode()) {
                    segmentZNode.addSegments(batch);
                    this.announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
                    segmentZNode = new SegmentZNode(this.makeServedSegmentPath());
                    batch = new HashSet();
                    count = 0;
                    byteSize = 0;
                }
                log.info("Announcing segment[%s] at path[%s]", new Object[]{segment.getId(), segmentZNode.getPath()});
                this.segmentLookup.put(segment, segmentZNode);
                batch.add(segment);
                ++count;
                byteSize += newBytesLen;
            }
        }
        this.changes.addChangeRequests(changesBatch);
        if (!this.isSkipSegmentAnnouncementOnZk) {
            segmentZNode.addSegments(batch);
            this.announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes());
        }
    }

    @Override
    public void unannounceSegments(Iterable<DataSegment> segments) {
        for (DataSegment segment : segments) {
            this.unannounceSegment(segment);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ListenableFuture<ChangeRequestsSnapshot<DataSegmentChangeRequest>> getSegmentChangesSince(ChangeRequestHistory.Counter counter) {
        if (counter.getCounter() < 0L) {
            Object object = this.lock;
            synchronized (object) {
                Iterable segments = Iterables.transform(this.segmentLookup.keySet(), (Function)new Function<DataSegment, DataSegmentChangeRequest>(){

                    @Nullable
                    public SegmentChangeRequestLoad apply(DataSegment input) {
                        return new SegmentChangeRequestLoad(input);
                    }
                });
                SettableFuture future = SettableFuture.create();
                future.set(ChangeRequestsSnapshot.success(this.changes.getLastCounter(), Lists.newArrayList((Iterable)segments)));
                return future;
            }
        }
        return this.changes.getRequestsSince(counter);
    }

    private String makeServedSegmentPath() {
        return this.makeServedSegmentPath(UUIDUtils.generateUuid((String[])new String[]{this.server.getHost(), this.server.getType().toString(), this.server.getTier(), DateTimes.nowUtc().toString()}));
    }

    private String makeServedSegmentPath(String zNode) {
        return ZKPaths.makePath((String)this.liveSegmentLocation, (String)StringUtils.format((String)"%s%s", (Object[])new Object[]{zNode, this.counter.getAndIncrement()}));
    }

    private class SegmentZNode
    implements Comparable<SegmentZNode> {
        private final String path;
        private byte[] bytes = new byte[0];
        private int count = 0;

        public SegmentZNode(String path) {
            this.path = path;
        }

        public String getPath() {
            return this.path;
        }

        public int getCount() {
            return this.count;
        }

        public byte[] getBytes() {
            return this.bytes;
        }

        public Set<DataSegment> getSegments() {
            if (this.bytes.length == 0) {
                return new HashSet<DataSegment>();
            }
            try {
                return (Set)BatchDataSegmentAnnouncer.this.jsonMapper.readValue(this.bytes, (TypeReference)new TypeReference<Set<DataSegment>>(){});
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public void addSegment(DataSegment segment) {
            Set<DataSegment> zkSegments = this.getSegments();
            zkSegments.add(segment);
            try {
                this.bytes = BatchDataSegmentAnnouncer.this.jsonMapper.writeValueAsBytes(zkSegments);
            }
            catch (Exception e) {
                zkSegments.remove(segment);
                throw new RuntimeException(e);
            }
            ++this.count;
        }

        public void addSegments(Set<DataSegment> segments) {
            Set<DataSegment> zkSegments = this.getSegments();
            zkSegments.addAll(segments);
            try {
                this.bytes = BatchDataSegmentAnnouncer.this.jsonMapper.writeValueAsBytes(zkSegments);
            }
            catch (Exception e) {
                zkSegments.removeAll(segments);
                throw new RuntimeException(e);
            }
            this.count += segments.size();
        }

        public void removeSegment(DataSegment segment) {
            Set<DataSegment> zkSegments = this.getSegments();
            zkSegments.remove(segment);
            try {
                this.bytes = BatchDataSegmentAnnouncer.this.jsonMapper.writeValueAsBytes(zkSegments);
            }
            catch (Exception e) {
                zkSegments.add(segment);
                throw new RuntimeException(e);
            }
            --this.count;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SegmentZNode that = (SegmentZNode)o;
            return this.path.equals(that.path);
        }

        public int hashCode() {
            return this.path.hashCode();
        }

        @Override
        public int compareTo(SegmentZNode segmentZNode) {
            return this.path.compareTo(segmentZNode.getPath());
        }
    }
}

