/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.hash;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.MutableHashTable;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.types.StringPair;
import org.apache.flink.runtime.operators.testutils.types.StringPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator;
import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MutableHashTablePerformanceBenchmark {
    private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
    private MemoryManager memManager;
    private IOManager ioManager;
    private TypeSerializer<StringPair> pairBuildSideAccesssor;
    private TypeSerializer<StringPair> pairProbeSideAccesssor;
    private TypeComparator<StringPair> pairBuildSideComparator;
    private TypeComparator<StringPair> pairProbeSideComparator;
    private TypePairComparator<StringPair, StringPair> pairComparator;
    private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char.";

    @Before
    public void setup() {
        this.pairBuildSideAccesssor = new StringPairSerializer();
        this.pairProbeSideAccesssor = new StringPairSerializer();
        this.pairBuildSideComparator = new StringPairComparator();
        this.pairProbeSideComparator = new StringPairComparator();
        this.pairComparator = new StringPairPairComparator();
        this.memManager = new MemoryManager(0x4000000L, 1);
        this.ioManager = new IOManagerAsync();
    }

    @After
    public void tearDown() {
        this.ioManager.shutdown();
        if (!this.ioManager.isProperlyShutDown()) {
            Assert.fail((String)"I/O manager was not property shut down.");
        }
        if (!this.memManager.verifyEmpty()) {
            Assert.fail((String)"Not all memory was properly released to the memory manager --> Memory Leak.");
        }
    }

    @Test
    public void compareMutableHashTablePerformance1() throws IOException {
        int buildSize = 1000000;
        int buildStep = 10;
        int buildScope = buildStep * buildSize;
        int probeSize = 5000000;
        int probeStep = 1;
        int probeScope = buildSize;
        int expectedResult = 500000;
        long withBloomFilterCost = this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
        long withoutBloomFilterCost = this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
        System.out.println("HybridHashJoin2:");
        System.out.println("Build input size: " + 100 * buildSize);
        System.out.println("Probe input size: " + 100 * probeSize);
        System.out.println("Available memory: " + this.memManager.getMemorySize());
        System.out.println("Probe record be filtered before spill: " + (1.0 - (double)probeScope / (double)buildScope) * 100.0 + "% percent.");
        System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
    }

    @Test
    public void compareMutableHashTablePerformance2() throws IOException {
        int buildSize = 1000000;
        int buildStep = 5;
        int buildScope = buildStep * buildSize;
        int probeSize = 5000000;
        int probeStep = 1;
        int probeScope = buildSize;
        int expectedResult = 1000000;
        long withBloomFilterCost = this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
        long withoutBloomFilterCost = this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
        System.out.println("HybridHashJoin3:");
        System.out.println("Build input size: " + 100 * buildSize);
        System.out.println("Probe input size: " + 100 * probeSize);
        System.out.println("Available memory: " + this.memManager.getMemorySize());
        System.out.println("Probe record be filtered before spill: " + (1.0 - (double)probeScope / (double)buildScope) * 100.0 + "% percent.");
        System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
    }

    @Test
    public void compareMutableHashTablePerformance3() throws IOException {
        int buildSize = 1000000;
        int buildStep = 2;
        int buildScope = buildStep * buildSize;
        int probeSize = 5000000;
        int probeStep = 1;
        int probeScope = buildSize;
        int expectedResult = 2500000;
        long withoutBloomFilterCost = this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
        long withBloomFilterCost = this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
        System.out.println("HybridHashJoin4:");
        System.out.println("Build input size: " + 100 * buildSize);
        System.out.println("Probe input size: " + 100 * probeSize);
        System.out.println("Available memory: " + this.memManager.getMemorySize());
        System.out.println("Probe record be filtered before spill: " + (1.0 - (double)probeScope / (double)buildScope) * 100.0 + "% percent.");
        System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
    }

    @Test
    public void compareMutableHashTablePerformance4() throws IOException {
        int buildSize = 1000000;
        int buildStep = 1;
        int buildScope = buildStep * buildSize;
        int probeSize = 5000000;
        int probeStep = 1;
        int probeScope = buildSize;
        int expectedResult = probeSize / buildStep;
        long withBloomFilterCost = this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true);
        long withoutBloomFilterCost = this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false);
        System.out.println("HybridHashJoin5:");
        System.out.println("Build input size: " + 100 * buildSize);
        System.out.println("Probe input size: " + 100 * probeSize);
        System.out.println("Available memory: " + this.memManager.getMemorySize());
        System.out.println("Probe record be filtered before spill: " + (1.0 - (double)probeScope / (double)buildScope) * 100.0 + "% percent.");
        System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost));
    }

    private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize, int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException {
        List memSegments;
        InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope);
        InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope);
        try {
            memSegments = this.memManager.allocatePages((Object)MEM_OWNER, (int)(this.memManager.getMemorySize() / (long)this.memManager.getPageSize()));
        }
        catch (MemoryAllocationException maex) {
            Assert.fail((String)"Memory for the Join could not be provided.");
            return -1L;
        }
        long start = System.currentTimeMillis();
        MutableHashTable join = new MutableHashTable(this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, memSegments, this.ioManager, enableBloomFilter);
        join.open((MutableObjectIterator)buildIterator, (MutableObjectIterator)probeIterator);
        StringPair recordReuse = new StringPair();
        int numRecordsInJoinResult = 0;
        while (join.nextRecord()) {
            MutableHashTable.HashBucketIterator buildSide = join.getBuildSideIterator();
            while (buildSide.next((Object)recordReuse) != null) {
                ++numRecordsInJoinResult;
            }
        }
        Assert.assertEquals((String)"Wrong number of records in join result.", (long)expectedResultSize, (long)numRecordsInJoinResult);
        join.close();
        long cost = System.currentTimeMillis() - start;
        this.memManager.release((Collection)join.getFreedMemory());
        return cost;
    }

    static class InputIterator
    implements MutableObjectIterator<StringPair> {
        private int numLeft;
        private int distance;
        private int scope;

        public InputIterator(int size, int distance, int scope) {
            this.numLeft = size;
            this.distance = distance;
            this.scope = scope;
        }

        public StringPair next(StringPair reuse) throws IOException {
            if (this.numLeft > 0) {
                --this.numLeft;
                int currentKey = this.numLeft * this.distance % this.scope;
                reuse.setKey(Integer.toString(currentKey));
                reuse.setValue(MutableHashTablePerformanceBenchmark.COMMENT);
                return reuse;
            }
            return null;
        }

        public StringPair next() throws IOException {
            return this.next(new StringPair());
        }
    }
}

