/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RocksDBVersionedStoreTest {
    private static final String STORE_NAME = "myversionedrocks";
    private static final String METRICS_SCOPE = "versionedrocksdb";
    private static final long HISTORY_RETENTION = 300000L;
    private static final long GRACE_PERIOD = 300000L;
    private static final long SEGMENT_INTERVAL = 100000L;
    private static final long BASE_TIMESTAMP = 10L;
    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
    private static final String DROPPED_RECORDS_METRIC = "dropped-records-total";
    private static final String TASK_LEVEL_GROUP = "stream-task-metrics";
    private InternalMockProcessorContext context;
    private Map<String, String> expectedMetricsTags;
    private RocksDBVersionedStore store;

    @Before
    public void before() {
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()));
        this.context.setTime(10L);
        this.expectedMetricsTags = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName()), Utils.mkEntry((Object)"task-id", (Object)this.context.taskId().toString())});
        this.store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 300000L, 100000L);
        this.store.init((StateStoreContext)this.context, (StateStore)this.store);
    }

    @After
    public void after() {
        this.store.close();
    }

    @Test
    public void shouldPutLatest() {
        this.putToStore("k", "v", 10L, -1L);
        this.putToStore("k", "v2", 11L, -1L);
        this.verifyGetValueFromStore("k", "v2", 11L);
        this.verifyTimestampedGetValueFromStore("k", 10L, "v", 10L, 11L);
        this.verifyTimestampedGetValueFromStore("k", 11L, "v2", 11L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 12L, "v2", 11L, -1L);
    }

    @Test
    public void shouldPutNullAsLatest() {
        this.putToStore("k", null, 10L, -1L);
        this.putToStore("k", null, 11L, -1L);
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 10L);
        this.verifyTimestampedGetNullFromStore("k", 11L);
        this.verifyTimestampedGetNullFromStore("k", 12L);
    }

    @Test
    public void shouldPutOlderWithNonNullLatest() {
        this.putToStore("k", "v", 10L, -1L);
        this.putToStore("k", "v2", 8L, 10L);
        this.putToStore("k", "v1", 9L, 10L);
        this.putToStore("k", "v4", 6L, 8L);
        this.verifyGetValueFromStore("k", "v", 10L);
        this.verifyTimestampedGetValueFromStore("k", 10L, "v", 10L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 9L, "v1", 9L, 10L);
        this.verifyTimestampedGetValueFromStore("k", 8L, "v2", 8L, 9L);
        this.verifyTimestampedGetValueFromStore("k", 7L, "v4", 6L, 8L);
    }

    @Test
    public void shouldPutOlderWithNullLatest() {
        this.putToStore("k", null, 10L, -1L);
        this.putToStore("k", "v2", 8L, 10L);
        this.putToStore("k", "v1", 9L, 10L);
        this.putToStore("k", "v4", 6L, 8L);
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 10L);
        this.verifyTimestampedGetValueFromStore("k", 9L, "v1", 9L, 10L);
        this.verifyTimestampedGetValueFromStore("k", 8L, "v2", 8L, 9L);
        this.verifyTimestampedGetValueFromStore("k", 7L, "v4", 6L, 8L);
    }

    @Test
    public void shouldPutOlderNullWithNonNullLatest() {
        this.putToStore("k", "v", 10L, -1L);
        this.putToStore("k", null, 8L, 10L);
        this.putToStore("k", null, 9L, 10L);
        this.putToStore("k", null, 6L, 8L);
        this.putToStore("k", "v5", 5L, 6L);
        this.putToStore("k", "v3", 7L, 8L);
        this.putToStore("k", null, 4L, 5L);
        this.verifyGetValueFromStore("k", "v", 10L);
        this.verifyTimestampedGetValueFromStore("k", 10L, "v", 10L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 9L);
        this.verifyTimestampedGetNullFromStore("k", 8L);
        this.verifyTimestampedGetValueFromStore("k", 7L, "v3", 7L, 8L);
        this.verifyTimestampedGetNullFromStore("k", 6L);
        this.verifyTimestampedGetValueFromStore("k", 5L, "v5", 5L, 6L);
        this.verifyTimestampedGetNullFromStore("k", 4L);
    }

    @Test
    public void shouldPutOlderNullWithNullLatest() {
        this.putToStore("k", null, 10L, -1L);
        this.putToStore("k", null, 8L, 10L);
        this.putToStore("k", null, 9L, 10L);
        this.putToStore("k", null, 6L, 8L);
        this.putToStore("k", "v3", 7L, 8L);
        this.putToStore("k", "v5", 5L, 6L);
        this.putToStore("k", null, 4L, 5L);
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 10L);
        this.verifyTimestampedGetNullFromStore("k", 9L);
        this.verifyTimestampedGetNullFromStore("k", 8L);
        this.verifyTimestampedGetValueFromStore("k", 7L, "v3", 7L, 8L);
        this.verifyTimestampedGetNullFromStore("k", 6L);
        this.verifyTimestampedGetValueFromStore("k", 5L, "v5", 5L, 6L);
        this.verifyTimestampedGetNullFromStore("k", 4L);
    }

    @Test
    public void shouldPutRepeatTimestampAsLatest() {
        this.putToStore("k", "to_be_replaced", 10L, -1L);
        this.putToStore("k", "b", 10L, -1L);
        this.verifyGetValueFromStore("k", "b", 10L);
        this.verifyTimestampedGetValueFromStore("k", 10L, "b", 10L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 9L);
        this.putToStore("k", null, 10L, -1L);
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 10L);
        this.verifyTimestampedGetNullFromStore("k", 9L);
        this.putToStore("k", null, 10L, -1L);
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 10L);
        this.verifyTimestampedGetNullFromStore("k", 9L);
        this.putToStore("k", "b", 10L, -1L);
        this.verifyGetValueFromStore("k", "b", 10L);
        this.verifyTimestampedGetValueFromStore("k", 10L, "b", 10L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 9L);
    }

    @Test
    public void shouldPutRepeatTimestamps() {
        this.putToStore("k", "to_be_replaced", 100020L, -1L);
        this.putToStore("k", null, 99990L, 100020L);
        this.putToStore("k", "to_be_replaced", 99990L, 100020L);
        this.putToStore("k", null, 99990L, 100020L);
        this.putToStore("k", "to_be_replaced", 99999L, 100020L);
        this.putToStore("k", "to_be_replaced", 100001L, 100020L);
        this.putToStore("k", null, 99999L, 100001L);
        this.putToStore("k", null, 100001L, 100020L);
        this.putToStore("k", null, 100010L, 100020L);
        this.putToStore("k", null, 100005L, 100010L);
        this.putToStore("k", "vp5", 100005L, 100010L);
        this.putToStore("k", "to_be_replaced", 99995L, 99999L);
        this.putToStore("k", "vn5", 99995L, 99999L);
        this.putToStore("k", null, 100020L, -1L);
        this.putToStore("k", null, 100020L, -1L);
        this.putToStore("k", "vn6", 99994L, 99995L);
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 100030L);
        this.verifyTimestampedGetNullFromStore("k", 100015L);
        this.verifyTimestampedGetValueFromStore("k", 100006L, "vp5", 100005L, 100010L);
        this.verifyTimestampedGetNullFromStore("k", 100002L);
        this.verifyTimestampedGetNullFromStore("k", 100000L);
        this.verifyTimestampedGetNullFromStore("k", 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
        this.verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldPutIntoMultipleSegments() {
        this.putToStore("k", null, 99980L, -1L);
        this.putToStore("k", "vn10", 99990L, -1L);
        this.putToStore("k", null, 99999L, -1L);
        this.putToStore("k", null, 100001L, -1L);
        this.putToStore("k", "vp10", 100010L, -1L);
        this.putToStore("k", null, 100020L, -1L);
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 100030L);
        this.verifyTimestampedGetValueFromStore("k", 100015L, "vp10", 100010L, 100020L);
        this.verifyTimestampedGetNullFromStore("k", 100005L);
        this.verifyTimestampedGetNullFromStore("k", 100002L);
        this.verifyTimestampedGetNullFromStore("k", 100000L);
        this.verifyTimestampedGetNullFromStore("k", 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "vn10", 99990L, 99999L);
        this.verifyTimestampedGetNullFromStore("k", 99985L);
    }

    @Test
    public void shouldMoveRecordToOlderSegmentDuringPut() {
        this.putToStore("k", "vp20", 100020L, -1L);
        this.putToStore("k", "vp10", 100010L, 100020L);
        this.putToStore("k", "vn10", 99990L, 100010L);
        this.putToStore("k", "vn2", 99998L, 100010L);
        this.putToStore("k", "vn1", 99999L, 100010L);
        this.putToStore("k", "vp1", 100001L, 100010L);
        this.verifyGetValueFromStore("k", "vp20", 100020L);
        this.verifyTimestampedGetValueFromStore("k", 100030L, "vp20", 100020L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 100015L, "vp10", 100010L, 100020L);
        this.verifyTimestampedGetValueFromStore("k", 100005L, "vp1", 100001L, 100010L);
        this.verifyTimestampedGetValueFromStore("k", 100000L, "vn1", 99999L, 100001L);
        this.verifyTimestampedGetValueFromStore("k", 99999L, "vn1", 99999L, 100001L);
        this.verifyTimestampedGetValueFromStore("k", 99998L, "vn2", 99998L, 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "vn10", 99990L, 99998L);
    }

    @Test
    public void shouldMoveRecordToOlderSegmentWithNullsDuringPut() {
        this.putToStore("k", null, 100020L, -1L);
        this.putToStore("k", null, 99999L, 100020L);
        this.putToStore("k", null, 100001L, 100020L);
        this.putToStore("k", null, 99990L, 99999L);
        this.putToStore("k", null, 100010L, 100020L);
        this.putToStore("k", "vp5", 100005L, 100010L);
        this.putToStore("k", "vn5", 99995L, 99999L);
        this.putToStore("k", "vn6", 99994L, 99995L);
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 100030L);
        this.verifyTimestampedGetNullFromStore("k", 100015L);
        this.verifyTimestampedGetValueFromStore("k", 100006L, "vp5", 100005L, 100010L);
        this.verifyTimestampedGetNullFromStore("k", 100002L);
        this.verifyTimestampedGetNullFromStore("k", 100000L);
        this.verifyTimestampedGetNullFromStore("k", 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
        this.verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldFallThroughToExistingOlderSegmentAsLatestDuringPut() {
        this.putToStore("k", null, 99995L, -1L);
        this.putToStore("k", "vn6", 99994L, 99995L);
        this.putToStore("k", "vp20", 100020L, -1L);
        this.putToStore("k", null, 100010L, 100020L);
        this.putToStore("k", null, 99999L, 100010L);
        this.putToStore("k", "vn2", 99998L, 99999L);
        this.verifyGetValueFromStore("k", "vp20", 100020L);
        this.verifyTimestampedGetValueFromStore("k", 100030L, "vp20", 100020L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 100012L);
        this.verifyTimestampedGetNullFromStore("k", 100000L);
        this.verifyTimestampedGetNullFromStore("k", 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99998L, "vn2", 99998L, 99999L);
        this.verifyTimestampedGetNullFromStore("k", 99995L);
        this.verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
    }

    @Test
    public void shouldPutNonLatestTombstoneIntoNewSegmentWithValidTo() {
        this.putToStore("k", "vp30", 100030L, -1L);
        this.putToStore("k", null, 99990L, 100030L);
        this.putToStore("k", "vn5", 99995L, 100030L);
        this.putToStore("k", "vn1", 99999L, 100030L);
        this.putToStore("k", null, 99998L, 99999L);
        this.verifyGetValueFromStore("k", "vp30", 100030L);
        this.verifyTimestampedGetValueFromStore("k", 100010L, "vn1", 99999L, 100030L);
        this.verifyTimestampedGetValueFromStore("k", 99999L, "vn1", 99999L, 100030L);
        this.verifyTimestampedGetNullFromStore("k", 99998L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99998L);
        this.verifyTimestampedGetNullFromStore("k", 99990L);
    }

    @Test
    public void shouldDelete() {
        this.putToStore("k", "vp20", 100020L, -1L);
        this.putToStore("k", "vp10", 100010L, 100020L);
        this.putToStore("k", "vn10", 99990L, 100010L);
        this.putToStore("k", "vn2", 99998L, 100010L);
        VersionedRecord<String> deleted = this.deleteFromStore("k", 99995L);
        MatcherAssert.assertThat((Object)deleted.value(), (Matcher)CoreMatchers.equalTo((Object)"vn10"));
        MatcherAssert.assertThat((Object)deleted.timestamp(), (Matcher)CoreMatchers.equalTo((Object)99990L));
        deleted = this.deleteFromStore("k", 100010L);
        MatcherAssert.assertThat((Object)deleted.value(), (Matcher)CoreMatchers.equalTo((Object)"vp10"));
        MatcherAssert.assertThat((Object)deleted.timestamp(), (Matcher)CoreMatchers.equalTo((Object)100010L));
        deleted = this.deleteFromStore("k", 100010L);
        MatcherAssert.assertThat(deleted, (Matcher)CoreMatchers.nullValue());
        deleted = this.deleteFromStore("k", 100025L);
        MatcherAssert.assertThat((Object)deleted.value(), (Matcher)CoreMatchers.equalTo((Object)"vp20"));
        MatcherAssert.assertThat((Object)deleted.timestamp(), (Matcher)CoreMatchers.equalTo((Object)100020L));
    }

    @Test
    public void shouldNotPutExpired() {
        this.putToStore("k", "v", 300010L, -1L);
        this.putToStore("k1", "v1", 10L, -1L);
        this.verifyGetValueFromStore("k1", "v1", 10L);
        this.putToStore("k2", "v2", 9L, Long.MIN_VALUE);
        this.verifyGetNullFromStore("k2");
        this.verifyExpiredRecordSensor(1);
    }

    @Test
    public void shouldNotDeleteExpired() {
        this.putToStore("k1", "v1", 1L, -1L);
        this.putToStore("k2", "v2", 1L, -1L);
        this.putToStore("kother", "vother", 300010L, -1L);
        VersionedRecord<String> deleted = this.deleteFromStore("k1", 10L);
        MatcherAssert.assertThat((Object)deleted.value(), (Matcher)CoreMatchers.equalTo((Object)"v1"));
        MatcherAssert.assertThat((Object)deleted.timestamp(), (Matcher)CoreMatchers.equalTo((Object)1L));
        this.verifyGetNullFromStore("k1");
        deleted = this.deleteFromStore("k2", 9L);
        MatcherAssert.assertThat(deleted, (Matcher)CoreMatchers.nullValue());
        this.verifyGetValueFromStore("k2", "v2", 1L);
        this.verifyExpiredRecordSensor(1);
    }

    @Test
    public void shouldGetFromOlderSegments() {
        this.putToStore("ko", null, 99990L, -1L);
        this.putToStore("ko", null, 199990L, -1L);
        this.putToStore("ko", null, 299990L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 99980L);
        this.putToStore("k", "v", 99980L, -1L);
        this.putToStore("k", null, 99990L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 99970L);
        this.verifyTimestampedGetValueFromStore("k", 99985L, "v", 99980L, 99990L);
        this.verifyTimestampedGetNullFromStore("k", 99995L);
        this.putToStore("k", "v2", 299980L, -1L);
        this.putToStore("k", null, 299990L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 99970L);
        this.verifyTimestampedGetValueFromStore("k", 99985L, "v", 99980L, 99990L);
        this.verifyTimestampedGetNullFromStore("k", 99995L);
    }

    @Test
    public void shouldNotGetExpired() {
        this.putToStore("k", "v_old", 0L, -1L);
        this.putToStore("k", "v", 99990L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 99989L, "v_old", 0L, 99990L);
        this.putToStore("ko", "vo", 399989L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 99989L, "v_old", 0L, 99990L);
        this.putToStore("ko", "vo2", 399990L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 99989L);
    }

    @Test
    public void shouldGetExpiredIfLatestValue() {
        this.putToStore("k", "v", 1L, -1L);
        this.putToStore("ko", "vo_old", 1L, -1L);
        this.putToStore("ko", "vo_new", 300012L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 10L, "v", 1L, -1L);
        this.verifyTimestampedGetNullFromStore("ko", 10L);
    }

    @Test
    public void shouldDistinguishEmptyAndNull() {
        this.putToStore("k", null, 100020L, -1L);
        this.putToStore("k", null, 99990L, 100020L);
        this.putToStore("k", null, 99999L, 100020L);
        this.putToStore("k", null, 100001L, 100020L);
        this.putToStore("k", null, 100010L, 100020L);
        this.putToStore("k", "", 100005L, 100010L);
        this.putToStore("k", "", 99995L, 99999L);
        this.putToStore("k", "", 99994L, 99995L);
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 100030L);
        this.verifyTimestampedGetNullFromStore("k", 100015L);
        this.verifyTimestampedGetValueFromStore("k", 100006L, "", 100005L, 100010L);
        this.verifyTimestampedGetNullFromStore("k", 100002L);
        this.verifyTimestampedGetNullFromStore("k", 100000L);
        this.verifyTimestampedGetNullFromStore("k", 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "", 99995L, 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99994L, "", 99994L, 99995L);
        this.verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldGetRecordVersionsFromOlderSegments() {
        this.putToStore("ko", null, 99990L, -1L);
        this.putToStore("ko", null, 199990L, -1L);
        this.putToStore("ko", null, 299990L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 99980L, 100000L);
        this.putToStore("k", "v1", 99970L, -1L);
        this.putToStore("k", "v2", 99975L, -1L);
        this.putToStore("k", null, 99980L, -1L);
        this.putToStore("k", null, 99985L, -1L);
        this.putToStore("k", "v3", 99990L, -1L);
        this.putToStore("k", "v4", 99995L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 99960L, 99965L);
        this.verifyTimestampedGetValueFromStore("k", 99970L, 99995L, ResultOrder.ANY, Arrays.asList("v4", "v3", "v2", "v1"), Arrays.asList(99995L, 99990L, 99975L, 99970L), Arrays.asList(-1L, 99995L, 99980L, 99975L));
        this.verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, Long.MAX_VALUE, ResultOrder.ANY, Arrays.asList("v4", "v3", "v2", "v1"), Arrays.asList(99995L, 99990L, 99975L, 99970L), Arrays.asList(-1L, 99995L, 99980L, 99975L));
        this.verifyTimestampedGetValueFromStore("k", 99996L, 100000L, ResultOrder.ANY, Collections.singletonList("v4"), Collections.singletonList(99995L), Collections.singletonList(-1L));
        this.verifyTimestampedGetValueFromStore("k", 99995L, 99995L, ResultOrder.ANY, Collections.singletonList("v4"), Collections.singletonList(99995L), Collections.singletonList(-1L));
        this.verifyTimestampedGetValueFromStore("k", 99996L, 99996L, ResultOrder.ANY, Collections.singletonList("v4"), Collections.singletonList(99995L), Collections.singletonList(-1L));
        this.verifyTimestampedGetValueFromStore("k", 99969L, 99979L, ResultOrder.ANY, Arrays.asList("v2", "v1"), Arrays.asList(99975L, 99970L), Arrays.asList(99980L, 99975L));
        this.verifyTimestampedGetValueFromStore("k", 99976L, 99989L, ResultOrder.ANY, Collections.singletonList("v2"), Collections.singletonList(99975L), Collections.singletonList(99980L));
        this.verifyTimestampedGetValueFromStore("k", 99989L, 99996L, ResultOrder.ANY, Arrays.asList("v4", "v3"), Arrays.asList(99995L, 99990L), Arrays.asList(-1L, 99995L));
        this.verifyTimestampedGetValueFromStore("k", 99974L, 99995L, ResultOrder.ANY, Arrays.asList("v4", "v3", "v2", "v1"), Arrays.asList(99995L, 99990L, 99975L, 99970L), Arrays.asList(-1L, 99995L, 99980L, 99975L));
        this.verifyTimestampedGetValueFromStore("k", 99985L, 99990L, ResultOrder.ANY, Collections.singletonList("v3"), Collections.singletonList(99990L), Collections.singletonList(99995L));
        this.verifyTimestampedGetNullFromStore("k", 99981L, 99984L);
        this.putToStore("k", "v5", 299970L, -1L);
        this.putToStore("k", null, 299980L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 99960L, 99965L);
        this.verifyTimestampedGetValueFromStore("k", 99970L, 99974L, ResultOrder.ANY, Collections.singletonList("v1"), Collections.singletonList(99970L), Collections.singletonList(99975L));
        this.verifyTimestampedGetNullFromStore("k", 99981L, 99984L);
    }

    @Test
    public void shouldGetRecordVersionsInAscendingOrder() {
        this.putToStore("k", "v1", 99970L, -1L);
        this.putToStore("k", "v2", 99975L, -1L);
        this.putToStore("k", "v3", 99990L, -1L);
        this.putToStore("k", "v4", 99995L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 99970L, 99995L, ResultOrder.ASCENDING, Arrays.asList("v1", "v2", "v3", "v4"), Arrays.asList(99970L, 99975L, 99990L, 99995L), Arrays.asList(99975L, 99990L, 99995L, -1L));
    }

    @Test
    public void shouldGetRecordVersionsFromMultipleOldSegmentsInAscendingOrder() {
        this.putToStore("k", "v1", 99990L, -1L);
        this.putToStore("k", "v2", 99995L, -1L);
        this.putToStore("k", "v3", 199990L, -1L);
        this.putToStore("k", "v4", 199995L, -1L);
        this.verifyTimestampedGetValueFromStore("k", Long.MIN_VALUE, Long.MAX_VALUE, ResultOrder.ASCENDING, Arrays.asList("v1", "v2", "v3", "v4"), Arrays.asList(99990L, 99995L, 199990L, 199995L), Arrays.asList(99995L, 199990L, 199995L, -1L));
    }

    @Test
    public void shouldNotGetExpiredRecordVersions() {
        this.putToStore("k", "v_old", 0L, -1L);
        this.putToStore("k", "v", 99990L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 0L, 99989L, ResultOrder.ANY, Collections.singletonList("v_old"), Collections.singletonList(0L), Collections.singletonList(99990L));
        this.putToStore("ko", "vo", 399989L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 0L, 99989L, ResultOrder.ANY, Collections.singletonList("v_old"), Collections.singletonList(0L), Collections.singletonList(99990L));
        this.putToStore("ko", "vo2", 399990L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 0L, 99989L);
    }

    @Test
    public void shouldGetExpiredIfLatestVersionValue() {
        this.putToStore("k", "v", 1L, -1L);
        this.putToStore("ko", "vo_old", 1L, -1L);
        this.putToStore("ko", "vo_new", 300012L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 0L, 10L, ResultOrder.ANY, Collections.singletonList("v"), Collections.singletonList(1L), Collections.singletonList(-1L));
        this.verifyTimestampedGetNullFromStore("ko", 0L, 10L);
    }

    @Test
    public void shouldRestore() {
        ArrayList<DataRecord> records = new ArrayList<DataRecord>();
        records.add(new DataRecord("k", "vp20", 100020L));
        records.add(new DataRecord("k", "vp10", 100010L));
        records.add(new DataRecord("k", "vn10", 99990L));
        records.add(new DataRecord("k", "vn2", 99998L));
        records.add(new DataRecord("k", "vn1", 99999L));
        records.add(new DataRecord("k", "vp1", 100001L));
        this.store.restoreBatch(RocksDBVersionedStoreTest.getChangelogRecords(records));
        this.verifyGetValueFromStore("k", "vp20", 100020L);
        this.verifyTimestampedGetValueFromStore("k", 100030L, "vp20", 100020L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 100015L, "vp10", 100010L, 100020L);
        this.verifyTimestampedGetValueFromStore("k", 100005L, "vp1", 100001L, 100010L);
        this.verifyTimestampedGetValueFromStore("k", 100000L, "vn1", 99999L, 100001L);
        this.verifyTimestampedGetValueFromStore("k", 99999L, "vn1", 99999L, 100001L);
        this.verifyTimestampedGetValueFromStore("k", 99998L, "vn2", 99998L, 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "vn10", 99990L, 99998L);
    }

    @Test
    public void shouldRestoreWithNulls() {
        ArrayList<DataRecord> records = new ArrayList<DataRecord>();
        records.add(new DataRecord("k", null, 100020L));
        records.add(new DataRecord("k", null, 99999L));
        records.add(new DataRecord("k", null, 100001L));
        records.add(new DataRecord("k", null, 99990L));
        records.add(new DataRecord("k", null, 100010L));
        records.add(new DataRecord("k", "vp5", 100005L));
        records.add(new DataRecord("k", "vn5", 99995L));
        records.add(new DataRecord("k", "vn6", 99994L));
        this.store.restoreBatch(RocksDBVersionedStoreTest.getChangelogRecords(records));
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 100030L);
        this.verifyTimestampedGetNullFromStore("k", 100015L);
        this.verifyTimestampedGetValueFromStore("k", 100006L, "vp5", 100005L, 100010L);
        this.verifyTimestampedGetNullFromStore("k", 100002L);
        this.verifyTimestampedGetNullFromStore("k", 100000L);
        this.verifyTimestampedGetNullFromStore("k", 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
        this.verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldRestoreWithNullsAndRepeatTimestamps() {
        ArrayList<DataRecord> records = new ArrayList<DataRecord>();
        records.add(new DataRecord("k", "to_be_replaced", 100020L));
        records.add(new DataRecord("k", null, 99990L));
        records.add(new DataRecord("k", "to_be_replaced", 99990L));
        records.add(new DataRecord("k", null, 99990L));
        records.add(new DataRecord("k", "to_be_replaced", 99999L));
        records.add(new DataRecord("k", "to_be_replaced", 100001L));
        records.add(new DataRecord("k", null, 99999L));
        records.add(new DataRecord("k", null, 100001L));
        records.add(new DataRecord("k", null, 100010L));
        records.add(new DataRecord("k", null, 100005L));
        records.add(new DataRecord("k", "vp5", 100005L));
        records.add(new DataRecord("k", "to_be_replaced", 99995L));
        records.add(new DataRecord("k", "vn5", 99995L));
        records.add(new DataRecord("k", null, 100020L));
        records.add(new DataRecord("k", null, 100020L));
        records.add(new DataRecord("k", "vn6", 99994L));
        this.store.restoreBatch(RocksDBVersionedStoreTest.getChangelogRecords(records));
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 100030L);
        this.verifyTimestampedGetNullFromStore("k", 100015L);
        this.verifyTimestampedGetValueFromStore("k", 100006L, "vp5", 100005L, 100010L);
        this.verifyTimestampedGetNullFromStore("k", 100002L);
        this.verifyTimestampedGetNullFromStore("k", 100000L);
        this.verifyTimestampedGetNullFromStore("k", 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "vn5", 99995L, 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99994L, "vn6", 99994L, 99995L);
        this.verifyTimestampedGetNullFromStore("k", 99992L);
    }

    @Test
    public void shouldRestoreMultipleBatches() {
        ArrayList<DataRecord> records = new ArrayList<DataRecord>();
        records.add(new DataRecord("k", null, 99980L));
        records.add(new DataRecord("k", "vn10", 99990L));
        records.add(new DataRecord("k", null, 99999L));
        ArrayList<DataRecord> moreRecords = new ArrayList<DataRecord>();
        moreRecords.add(new DataRecord("k", null, 100001L));
        moreRecords.add(new DataRecord("k", "vp10", 100010L));
        moreRecords.add(new DataRecord("k", null, 100020L));
        this.store.restoreBatch(RocksDBVersionedStoreTest.getChangelogRecords(records));
        this.store.restoreBatch(RocksDBVersionedStoreTest.getChangelogRecords(moreRecords));
        this.verifyGetNullFromStore("k");
        this.verifyTimestampedGetNullFromStore("k", 100030L);
        this.verifyTimestampedGetValueFromStore("k", 100015L, "vp10", 100010L, 100020L);
        this.verifyTimestampedGetNullFromStore("k", 100005L);
        this.verifyTimestampedGetNullFromStore("k", 100002L);
        this.verifyTimestampedGetNullFromStore("k", 100000L);
        this.verifyTimestampedGetNullFromStore("k", 99999L);
        this.verifyTimestampedGetValueFromStore("k", 99995L, "vn10", 99990L, 99999L);
        this.verifyTimestampedGetNullFromStore("k", 99985L);
    }

    @Test
    public void shouldNotRestoreExpired() {
        ArrayList<DataRecord> records = new ArrayList<DataRecord>();
        records.add(new DataRecord("k", "v", 300010L));
        records.add(new DataRecord("k1", "v1", 10L));
        records.add(new DataRecord("k2", "v2", 9L));
        this.store.restoreBatch(RocksDBVersionedStoreTest.getChangelogRecords(records));
        this.verifyGetValueFromStore("k", "v", 300010L);
        this.verifyGetValueFromStore("k1", "v1", 10L);
        this.verifyGetNullFromStore("k2");
        this.verifyExpiredRecordSensor(0);
    }

    @Test
    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
        ArrayList<DataRecord> records = new ArrayList<DataRecord>();
        records.add(new DataRecord("k2", "v2", 0L));
        records.add(new DataRecord("k", "v", 300010L));
        this.store.restoreBatch(RocksDBVersionedStoreTest.getChangelogRecords(records));
        this.verifyGetValueFromStore("k2", "v2", 0L);
        this.verifyGetValueFromStore("k", "v", 300010L);
    }

    @Test
    public void shouldAllowZeroHistoryRetention() {
        this.store.close();
        this.store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, 100000L);
        this.store.init((StateStoreContext)this.context, (StateStore)this.store);
        this.putToStore("k", "v", 10L, -1L);
        this.verifyGetValueFromStore("k", "v", 10L);
        this.verifyTimestampedGetValueFromStore("k", 10L, "v", 10L, -1L);
        this.verifyTimestampedGetValueFromStore("k", 11L, "v", 10L, -1L);
        this.putToStore("k", "updated", 10L, -1L);
        this.verifyGetValueFromStore("k", "updated", 10L);
        this.verifyTimestampedGetValueFromStore("k", 10L, "updated", 10L, -1L);
        this.putToStore("k", "v2", 12L, -1L);
        this.verifyGetValueFromStore("k", "v2", 12L);
        this.verifyTimestampedGetValueFromStore("k", 12L, "v2", 12L, -1L);
        this.verifyTimestampedGetNullFromStore("k", 11L);
        this.deleteFromStore("k", 13L);
        this.verifyGetNullFromStore("k");
        this.putToStore("k2", "v", 12L, Long.MIN_VALUE);
        this.verifyGetNullFromStore("k2");
        this.verifyExpiredRecordSensor(1);
    }

    private void putToStore(String key, String value, long timestamp, long expectedValidTo) {
        long validTo = this.store.put(new Bytes(STRING_SERIALIZER.serialize(null, (Object)key)), STRING_SERIALIZER.serialize(null, (Object)value), timestamp);
        MatcherAssert.assertThat((Object)validTo, (Matcher)CoreMatchers.equalTo((Object)expectedValidTo));
    }

    private VersionedRecord<String> deleteFromStore(String key, long timestamp) {
        VersionedRecord versionedRecord = this.store.delete(new Bytes(STRING_SERIALIZER.serialize(null, (Object)key)), timestamp);
        return RocksDBVersionedStoreTest.deserializedRecord((VersionedRecord<byte[]>)versionedRecord);
    }

    private VersionedRecord<String> getFromStore(String key) {
        VersionedRecord versionedRecord = this.store.get(new Bytes(STRING_SERIALIZER.serialize(null, (Object)key)));
        return RocksDBVersionedStoreTest.deserializedRecord((VersionedRecord<byte[]>)versionedRecord);
    }

    private VersionedRecord<String> getFromStore(String key, long asOfTimestamp) {
        VersionedRecord versionedRecord = this.store.get(new Bytes(STRING_SERIALIZER.serialize(null, (Object)key)), asOfTimestamp);
        return RocksDBVersionedStoreTest.deserializedRecord((VersionedRecord<byte[]>)versionedRecord);
    }

    private List<VersionedRecord<String>> getFromStore(String key, long fromTime, long toTime, ResultOrder order) {
        VersionedRecordIterator resultRecords = this.store.get(new Bytes(STRING_SERIALIZER.serialize(null, (Object)key)), fromTime, toTime, order);
        ArrayList<VersionedRecord<String>> versionedRecordsList = new ArrayList<VersionedRecord<String>>();
        while (resultRecords.hasNext()) {
            versionedRecordsList.add(RocksDBVersionedStoreTest.deserializedRecord((VersionedRecord<byte[]>)((VersionedRecord)resultRecords.next())));
        }
        return versionedRecordsList;
    }

    private void verifyGetValueFromStore(String key, String expectedValue, long expectedTimestamp) {
        VersionedRecord<String> latest = this.getFromStore(key);
        MatcherAssert.assertThat((Object)latest.value(), (Matcher)CoreMatchers.equalTo((Object)expectedValue));
        MatcherAssert.assertThat((Object)latest.timestamp(), (Matcher)CoreMatchers.equalTo((Object)expectedTimestamp));
        MatcherAssert.assertThat((Object)latest.validTo().isPresent(), (Matcher)CoreMatchers.equalTo((Object)false));
    }

    private void verifyGetNullFromStore(String key) {
        VersionedRecord<String> record = this.getFromStore(key);
        MatcherAssert.assertThat(record, (Matcher)CoreMatchers.nullValue());
    }

    private void verifyTimestampedGetValueFromStore(String key, long timestamp, String expectedValue, long expectedTimestamp, long expectedValidTo) {
        VersionedRecord<String> latest = this.getFromStore(key, timestamp);
        MatcherAssert.assertThat((Object)latest.value(), (Matcher)CoreMatchers.equalTo((Object)expectedValue));
        MatcherAssert.assertThat((Object)latest.timestamp(), (Matcher)CoreMatchers.equalTo((Object)expectedTimestamp));
        if (expectedValidTo == -1L) {
            MatcherAssert.assertThat((Object)latest.validTo().isPresent(), (Matcher)CoreMatchers.equalTo((Object)false));
        } else {
            MatcherAssert.assertThat(latest.validTo().get(), (Matcher)CoreMatchers.equalTo((Object)expectedValidTo));
        }
    }

    private void verifyTimestampedGetValueFromStore(String key, long fromTime, long toTime, ResultOrder order, List<String> expectedValues, List<Long> expectedTimestamps, List<Long> expectedValidTos) {
        List<VersionedRecord<String>> results = this.getFromStore(key, fromTime, toTime, order);
        MatcherAssert.assertThat((Object)results.size(), (Matcher)CoreMatchers.equalTo((Object)expectedValues.size()));
        for (int i = 0; i < results.size(); ++i) {
            VersionedRecord<String> record = results.get(i);
            MatcherAssert.assertThat((Object)record.value(), (Matcher)CoreMatchers.equalTo((Object)expectedValues.get(i)));
            MatcherAssert.assertThat((Object)record.timestamp(), (Matcher)CoreMatchers.equalTo((Object)expectedTimestamps.get(i)));
            if (expectedValidTos.get(i) == -1L) {
                MatcherAssert.assertThat((Object)record.validTo().isPresent(), (Matcher)CoreMatchers.equalTo((Object)false));
                continue;
            }
            MatcherAssert.assertThat(record.validTo().get(), (Matcher)CoreMatchers.equalTo((Object)expectedValidTos.get(i)));
        }
    }

    private void verifyTimestampedGetNullFromStore(String key, long timestamp) {
        VersionedRecord<String> record = this.getFromStore(key, timestamp);
        MatcherAssert.assertThat(record, (Matcher)CoreMatchers.nullValue());
    }

    private void verifyTimestampedGetNullFromStore(String key, long fromTime, long toTime) {
        List<VersionedRecord<String>> results = this.getFromStore(key, fromTime, toTime, ResultOrder.ANY);
        MatcherAssert.assertThat((Object)results.size(), (Matcher)CoreMatchers.equalTo((Object)0));
    }

    private void verifyExpiredRecordSensor(int expectedValue) {
        Metric metric = (Metric)this.context.metrics().metrics().get(new MetricName(DROPPED_RECORDS_METRIC, TASK_LEVEL_GROUP, "", this.expectedMetricsTags));
        Assert.assertEquals((double)((Double)metric.metricValue()), (double)expectedValue, (double)0.001);
    }

    private static VersionedRecord<String> deserializedRecord(VersionedRecord<byte[]> versionedRecord) {
        return versionedRecord == null ? null : (versionedRecord.validTo().isPresent() ? new VersionedRecord(STRING_DESERIALIZER.deserialize(null, (byte[])versionedRecord.value()), versionedRecord.timestamp(), ((Long)versionedRecord.validTo().get()).longValue()) : new VersionedRecord(STRING_DESERIALIZER.deserialize(null, (byte[])versionedRecord.value()), versionedRecord.timestamp()));
    }

    private static List<ConsumerRecord<byte[], byte[]>> getChangelogRecords(List<DataRecord> data) {
        ArrayList<ConsumerRecord<byte[], byte[]>> records = new ArrayList<ConsumerRecord<byte[], byte[]>>();
        for (DataRecord d : data) {
            byte[] rawKey = STRING_SERIALIZER.serialize(null, (Object)d.key);
            byte[] rawValue = STRING_SERIALIZER.serialize(null, (Object)d.value);
            records.add((ConsumerRecord<byte[], byte[]>)new ConsumerRecord("", 0, 0L, d.timestamp, TimestampType.CREATE_TIME, rawKey.length, rawValue == null ? 0 : rawValue.length, (Object)rawKey, (Object)rawValue, (Headers)new RecordHeaders(), Optional.empty()));
        }
        return records;
    }

    private static class DataRecord {
        final String key;
        final String value;
        final long timestamp;

        DataRecord(String key, String value, long timestamp) {
            this.key = key;
            this.value = value;
            this.timestamp = timestamp;
        }
    }
}

