/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.bundle;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.runtime.operators.bundle.MapBundleFunction;
import org.apache.flink.table.runtime.operators.bundle.MapBundleOperator;
import org.apache.flink.table.runtime.operators.bundle.trigger.BundleTrigger;
import org.apache.flink.table.runtime.operators.bundle.trigger.CountBundleTrigger;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class MapBundleOperatorTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSimple() throws Exception {
        TestMapBundleFunction func = new TestMapBundleFunction();
        CountBundleTrigger trigger = new CountBundleTrigger(3L);
        KeySelector & Serializable keySelector = (KeySelector & Serializable)value -> (String)value.f0;
        OneInputStreamOperatorTestHarness op = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new MapBundleOperator((MapBundleFunction)func, (BundleTrigger)trigger, (KeySelector)keySelector));
        op.open();
        Object object = op.getCheckpointLock();
        synchronized (object) {
            StreamRecord input = new StreamRecord(null);
            input.replace((Object)new Tuple2((Object)"k1", (Object)"v1"));
            op.processElement(input);
            input.replace((Object)new Tuple2((Object)"k1", (Object)"v2"));
            op.processElement(input);
            Assertions.assertThat((int)func.getFinishCount()).isEqualTo(0);
            input.replace((Object)new Tuple2((Object)"k2", (Object)"v3"));
            op.processElement(input);
            Assertions.assertThat((int)func.getFinishCount()).isEqualTo(1);
            Assertions.assertThat(Arrays.asList("k1=v1,v2", "k2=v3")).isEqualTo(func.getOutputs());
            input.replace((Object)new Tuple2((Object)"k3", (Object)"v4"));
            op.processElement(input);
            input.replace((Object)new Tuple2((Object)"k4", (Object)"v5"));
            op.processElement(input);
            Assertions.assertThat((int)func.getFinishCount()).isEqualTo(1);
            op.close();
            Assertions.assertThat((int)func.getFinishCount()).isEqualTo(2);
            Assertions.assertThat(Arrays.asList("k3=v4", "k4=v5")).isEqualTo(func.getOutputs());
        }
    }

    private static class TestMapBundleFunction
    extends MapBundleFunction<String, String, Tuple2<String, String>, String> {
        private int finishCount = 0;
        private List<String> outputs = new ArrayList<String>();

        private TestMapBundleFunction() {
        }

        public String addInput(@Nullable String value, Tuple2<String, String> input) throws Exception {
            if (value == null) {
                return (String)input.f1;
            }
            return value + "," + (String)input.f1;
        }

        public void finishBundle(Map<String, String> buffer, Collector<String> out) throws Exception {
            ++this.finishCount;
            this.outputs.clear();
            for (Map.Entry<String, String> entry : buffer.entrySet()) {
                this.outputs.add(entry.getKey() + "=" + entry.getValue());
            }
        }

        int getFinishCount() {
            return this.finishCount;
        }

        List<String> getOutputs() {
            return this.outputs;
        }
    }
}

