/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.util;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.Random;
import junit.framework.TestCase;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.distributions.UniformIntegerDistribution;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.runtime.operators.shipping.RecordOutputEmitter;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.types.DeserializationException;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.KeyFieldOutOfBoundsException;
import org.apache.flink.types.NullKeyFieldException;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.types.Value;
import org.junit.Assert;
import org.junit.Test;

public class RecordOutputEmitterTest
extends TestCase {
    private static final long SEED = 485213591485399L;

    @Test
    public void testPartitionHash() {
        int i;
        RecordComparator intComp = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        RecordOutputEmitter oe1 = new RecordOutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)intComp);
        int numChans = 100;
        int numRecs = 50000;
        int[] hit = new int[numChans];
        for (int i2 = 0; i2 < numRecs; ++i2) {
            IntValue k = new IntValue(i2);
            Record rec = new Record((Value)k);
            int[] chans = oe1.selectChannels((IOReadableWritable)rec, hit.length);
            for (int j = 0; j < chans.length; ++j) {
                int n = chans[j];
                hit[n] = hit[n] + 1;
            }
        }
        int cnt = 0;
        for (int i3 = 0; i3 < hit.length; ++i3) {
            RecordOutputEmitterTest.assertTrue((hit[i3] > 0 ? 1 : 0) != 0);
            cnt += hit[i3];
        }
        RecordOutputEmitterTest.assertTrue((cnt == numRecs ? 1 : 0) != 0);
        RecordComparator stringComp = new RecordComparator(new int[]{0}, new Class[]{StringValue.class});
        RecordOutputEmitter oe2 = new RecordOutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)stringComp);
        numChans = 100;
        numRecs = 10000;
        hit = new int[numChans];
        for (i = 0; i < numRecs; ++i) {
            StringValue k = new StringValue((CharSequence)(i + ""));
            Record rec = new Record((Value)k);
            int[] chans = oe2.selectChannels((IOReadableWritable)rec, hit.length);
            for (int j = 0; j < chans.length; ++j) {
                int n = chans[j];
                hit[n] = hit[n] + 1;
            }
        }
        cnt = 0;
        for (i = 0; i < hit.length; ++i) {
            RecordOutputEmitterTest.assertTrue((hit[i] > 0 ? 1 : 0) != 0);
            cnt += hit[i];
        }
        RecordOutputEmitterTest.assertTrue((cnt == numRecs ? 1 : 0) != 0);
    }

    @Test
    public void testForward() {
        int i;
        RecordComparator intComp = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        RecordOutputEmitter oe1 = new RecordOutputEmitter(ShipStrategyType.FORWARD, (TypeComparator)intComp);
        int numChannels = 100;
        int numRecords = 50000;
        int[] hit = new int[numChannels];
        for (int i2 = 0; i2 < numRecords; ++i2) {
            IntValue k = new IntValue(i2);
            Record rec = new Record((Value)k);
            int[] chans = oe1.selectChannels((IOReadableWritable)rec, hit.length);
            for (int j = 0; j < chans.length; ++j) {
                int n = chans[j];
                hit[n] = hit[n] + 1;
            }
        }
        int cnt = 0;
        for (int i3 = 0; i3 < hit.length; ++i3) {
            RecordOutputEmitterTest.assertTrue((hit[i3] == numRecords / numChannels || hit[i3] == numRecords / numChannels - 1 ? 1 : 0) != 0);
            cnt += hit[i3];
        }
        RecordOutputEmitterTest.assertTrue((cnt == numRecords ? 1 : 0) != 0);
        RecordComparator stringComp = new RecordComparator(new int[]{0}, new Class[]{StringValue.class});
        RecordOutputEmitter oe2 = new RecordOutputEmitter(ShipStrategyType.FORWARD, (TypeComparator)stringComp);
        numChannels = 100;
        numRecords = 10000;
        hit = new int[numChannels];
        for (i = 0; i < numRecords; ++i) {
            StringValue k = new StringValue((CharSequence)(i + ""));
            Record rec = new Record((Value)k);
            int[] chans = oe2.selectChannels((IOReadableWritable)rec, hit.length);
            for (int j = 0; j < chans.length; ++j) {
                int n = chans[j];
                hit[n] = hit[n] + 1;
            }
        }
        cnt = 0;
        for (i = 0; i < hit.length; ++i) {
            RecordOutputEmitterTest.assertTrue((hit[i] == numRecords / numChannels || hit[i] == numRecords / numChannels - 1 ? 1 : 0) != 0);
            cnt += hit[i];
        }
        RecordOutputEmitterTest.assertTrue((cnt == numRecords ? 1 : 0) != 0);
    }

    @Test
    public void testBroadcast() {
        int i;
        int i2;
        RecordComparator intComp = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        RecordOutputEmitter oe1 = new RecordOutputEmitter(ShipStrategyType.BROADCAST, (TypeComparator)intComp);
        int numChannels = 100;
        int numRecords = 50000;
        int[] hit = new int[numChannels];
        for (i2 = 0; i2 < numRecords; ++i2) {
            IntValue k = new IntValue(i2);
            Record rec = new Record((Value)k);
            int[] chans = oe1.selectChannels((IOReadableWritable)rec, hit.length);
            for (int j = 0; j < chans.length; ++j) {
                int n = chans[j];
                hit[n] = hit[n] + 1;
            }
        }
        for (i2 = 0; i2 < hit.length; ++i2) {
            RecordOutputEmitterTest.assertTrue((String)(hit[i2] + ""), (hit[i2] == numRecords ? 1 : 0) != 0);
        }
        RecordComparator stringComp = new RecordComparator(new int[]{0}, new Class[]{StringValue.class});
        RecordOutputEmitter oe2 = new RecordOutputEmitter(ShipStrategyType.BROADCAST, (TypeComparator)stringComp);
        numChannels = 100;
        numRecords = 5000;
        hit = new int[numChannels];
        for (i = 0; i < numRecords; ++i) {
            StringValue k = new StringValue((CharSequence)(i + ""));
            Record rec = new Record((Value)k);
            int[] chans = oe2.selectChannels((IOReadableWritable)rec, hit.length);
            for (int j = 0; j < chans.length; ++j) {
                int n = chans[j];
                hit[n] = hit[n] + 1;
            }
        }
        for (i = 0; i < hit.length; ++i) {
            RecordOutputEmitterTest.assertTrue((String)(hit[i] + ""), (hit[i] == numRecords ? 1 : 0) != 0);
        }
    }

    @Test
    public void testMultiKeys() {
        RecordComparator multiComp = new RecordComparator(new int[]{0, 1, 3}, new Class[]{IntValue.class, StringValue.class, DoubleValue.class});
        RecordOutputEmitter oe1 = new RecordOutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)multiComp);
        int numChannels = 100;
        int numRecords = 5000;
        int[] hit = new int[numChannels];
        for (int i = 0; i < numRecords; ++i) {
            Record rec = new Record(4);
            rec.setField(0, (Value)new IntValue(i));
            rec.setField(1, (Value)new StringValue((CharSequence)("AB" + i + "CD" + i)));
            rec.setField(3, (Value)new DoubleValue((double)i * 3.141));
            int[] chans = oe1.selectChannels((IOReadableWritable)rec, hit.length);
            for (int j = 0; j < chans.length; ++j) {
                int n = chans[j];
                hit[n] = hit[n] + 1;
            }
        }
        int cnt = 0;
        for (int i = 0; i < hit.length; ++i) {
            RecordOutputEmitterTest.assertTrue((hit[i] > 0 ? 1 : 0) != 0);
            cnt += hit[i];
        }
        RecordOutputEmitterTest.assertTrue((cnt == numRecords ? 1 : 0) != 0);
    }

    @Test
    public void testMissingKey() {
        RecordComparator intComp = new RecordComparator(new int[]{1}, new Class[]{IntValue.class});
        RecordOutputEmitter oe1 = new RecordOutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)intComp);
        Record rec = new Record(0);
        rec.setField(0, (Value)new IntValue(1));
        try {
            oe1.selectChannels((IOReadableWritable)rec, 100);
        }
        catch (KeyFieldOutOfBoundsException re) {
            Assert.assertEquals((long)1L, (long)re.getFieldNumber());
            return;
        }
        Assert.fail((String)"Expected a KeyFieldOutOfBoundsException.");
    }

    @Test
    public void testNullKey() {
        RecordComparator intComp = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        RecordOutputEmitter oe1 = new RecordOutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)intComp);
        Record rec = new Record(2);
        rec.setField(1, (Value)new IntValue(1));
        try {
            oe1.selectChannels((IOReadableWritable)rec, 100);
        }
        catch (NullKeyFieldException re) {
            Assert.assertEquals((long)0L, (long)re.getFieldNumber());
            return;
        }
        Assert.fail((String)"Expected a NullKeyFieldException.");
    }

    @Test
    public void testWrongKeyClass() {
        RecordComparator doubleComp = new RecordComparator(new int[]{0}, new Class[]{DoubleValue.class});
        RecordOutputEmitter oe1 = new RecordOutputEmitter(ShipStrategyType.PARTITION_HASH, (TypeComparator)doubleComp);
        PipedInputStream pipedInput = new PipedInputStream(0x100000);
        DataInputStream in = new DataInputStream(pipedInput);
        Record rec = null;
        try {
            DataOutputStream out = new DataOutputStream(new PipedOutputStream(pipedInput));
            rec = new Record(1);
            rec.setField(0, (Value)new IntValue());
            rec.write((DataOutputView)new OutputViewDataOutputStreamWrapper(out));
            rec = new Record();
            rec.read((DataInputView)new InputViewDataInputStreamWrapper(in));
        }
        catch (IOException e) {
            RecordOutputEmitterTest.fail((String)"Test erroneous");
        }
        try {
            oe1.selectChannels(rec, 100);
        }
        catch (DeserializationException re) {
            return;
        }
        Assert.fail((String)"Expected a NullKeyFieldException.");
    }

    @Test
    public void testPartitionRange() {
        Random rnd = new Random(485213591485399L);
        boolean DISTR_MIN = false;
        int DISTR_MAX = 1000000;
        int DISTR_RANGE = 1000001;
        int NUM_BUCKETS = 137;
        double BUCKET_WIDTH = 7299.2773722627735;
        int NUM_ELEMENTS = 10000000;
        UniformIntegerDistribution distri = new UniformIntegerDistribution(0, 1000000);
        RecordComparator intComp = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
        RecordOutputEmitter oe = new RecordOutputEmitter(ShipStrategyType.PARTITION_RANGE, (TypeComparator)intComp, (DataDistribution)distri);
        IntValue integer = new IntValue();
        Record rec = new Record();
        for (int i = 0; i < 10000000; ++i) {
            int bucket;
            int shouldBeBucket;
            int nextValue = rnd.nextInt(1000001) + 0;
            integer.setValue(nextValue);
            rec.setField(0, (Value)integer);
            int[] channels = oe.selectChannels((IOReadableWritable)rec, 137);
            if (channels.length != 1) {
                Assert.fail((String)"Resulting channels array has more than one channel.");
            }
            if ((shouldBeBucket = (int)((double)(nextValue - 0) / 7299.2773722627735)) == (bucket = channels[0])) continue;
            int lowerBoundaryForSelectedBucket = 0 + (int)((double)bucket * 7299.2773722627735);
            int upperBoundaryForSelectedBucket = 0 + (int)((double)(bucket + 1) * 7299.2773722627735);
            if (nextValue > lowerBoundaryForSelectedBucket && nextValue <= upperBoundaryForSelectedBucket) continue;
            Assert.fail((String)"Wrong bucket selected");
        }
    }
}

