/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.common.TopicAndPartition;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.flink.streaming.connectors.kafka.internals.Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.OffsetHandler;
import org.apache.flink.streaming.connectors.kafka.internals.ZooKeeperStringSerializer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;

public class ZookeeperOffsetHandler
implements OffsetHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ZookeeperOffsetHandler.class);
    private static final long OFFSET_NOT_SET = -915623761776L;
    private final ZkClient zkClient;
    private final String groupId;

    public ZookeeperOffsetHandler(Properties props) {
        this.groupId = props.getProperty("group.id");
        if (this.groupId == null) {
            throw new IllegalArgumentException("Required property 'group.id' has not been set");
        }
        String zkConnect = props.getProperty("zookeeper.connect");
        if (zkConnect == null) {
            throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set");
        }
        this.zkClient = new ZkClient(zkConnect, Integer.valueOf(props.getProperty("zookeeper.session.timeout.ms", "6000")).intValue(), Integer.valueOf(props.getProperty("zookeeper.connection.timeout.ms", "6000")).intValue(), (ZkSerializer)new ZooKeeperStringSerializer());
    }

    @Override
    public void commit(Map<TopicPartition, Long> offsetsToCommit) {
        for (Map.Entry<TopicPartition, Long> entry : offsetsToCommit.entrySet()) {
            TopicPartition tp = entry.getKey();
            long offset = entry.getValue();
            if (offset < 0L) continue;
            ZookeeperOffsetHandler.setOffsetInZooKeeper(this.zkClient, this.groupId, tp.topic(), tp.partition(), offset);
        }
    }

    @Override
    public void seekFetcherToInitialOffsets(List<TopicPartition> partitions, Fetcher fetcher) {
        for (TopicPartition tp : partitions) {
            long offset = ZookeeperOffsetHandler.getOffsetFromZooKeeper(this.zkClient, this.groupId, tp.topic(), tp.partition());
            if (offset == -915623761776L) continue;
            LOG.info("Offset for partition {} was set to {} in ZooKeeper. Seeking fetcher to that position.", (Object)tp.partition(), (Object)offset);
            fetcher.seek(tp, offset + 1L);
        }
    }

    @Override
    public void close() throws IOException {
        this.zkClient.close();
    }

    public static void setOffsetInZooKeeper(ZkClient zkClient, String groupId, String topic, int partition, long offset) {
        TopicAndPartition tap = new TopicAndPartition(topic, partition);
        ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
        ZkUtils.updatePersistentPath((ZkClient)zkClient, (String)(topicDirs.consumerOffsetDir() + "/" + tap.partition()), (String)Long.toString(offset));
    }

    public static long getOffsetFromZooKeeper(ZkClient zkClient, String groupId, String topic, int partition) {
        TopicAndPartition tap = new TopicAndPartition(topic, partition);
        ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(groupId, tap.topic());
        Tuple2 data = ZkUtils.readDataMaybeNull((ZkClient)zkClient, (String)(topicDirs.consumerOffsetDir() + "/" + tap.partition()));
        if (((Option)data._1()).isEmpty()) {
            return -915623761776L;
        }
        return Long.valueOf((String)((Option)data._1()).get());
    }
}

