/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.common.operators.base;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.api.common.operators.BinaryOperatorInformation;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.builder.Tuple2Builder;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CoGroupOperatorCollectionTest
implements Serializable {
    CoGroupOperatorCollectionTest() {
    }

    @Test
    void testExecuteOnCollection() {
        try {
            List<Tuple2> input1 = Arrays.asList(new Tuple2Builder().add((Object)"foo", (Object)1).add((Object)"foobar", (Object)1).add((Object)"foo", (Object)1).add((Object)"bar", (Object)1).add((Object)"foo", (Object)1).add((Object)"foo", (Object)1).build());
            List<Tuple2> input2 = Arrays.asList(new Tuple2Builder().add((Object)"foo", (Object)1).add((Object)"foo", (Object)1).add((Object)"bar", (Object)1).add((Object)"foo", (Object)1).add((Object)"barfoo", (Object)1).add((Object)"foo", (Object)1).build());
            ExecutionConfig executionConfig = new ExecutionConfig();
            HashMap accumulators = new HashMap();
            HashMap cpTasks = new HashMap();
            TaskInfoImpl taskInfo = new TaskInfoImpl("Test UDF", 4, 0, 4, 0);
            RuntimeUDFContext ctx = new RuntimeUDFContext((TaskInfo)taskInfo, null, executionConfig, cpTasks, accumulators, UnregisteredMetricsGroup.createOperatorMetricGroup());
            SumCoGroup udf1 = new SumCoGroup();
            SumCoGroup udf2 = new SumCoGroup();
            executionConfig.disableObjectReuse();
            List resultSafe = this.getCoGroupOperator(udf1).executeOnCollections(input1, input2, (RuntimeContext)ctx, executionConfig);
            executionConfig.enableObjectReuse();
            List resultRegular = this.getCoGroupOperator(udf2).executeOnCollections(input1, input2, (RuntimeContext)ctx, executionConfig);
            Assertions.assertThat((boolean)udf1.isClosed).isTrue();
            Assertions.assertThat((boolean)udf2.isClosed).isTrue();
            HashSet<Tuple2> expected = new HashSet<Tuple2>(Arrays.asList(new Tuple2Builder().add((Object)"foo", (Object)8).add((Object)"bar", (Object)2).add((Object)"foobar", (Object)1).add((Object)"barfoo", (Object)1).build()));
            Assertions.assertThat(new HashSet(resultSafe)).containsExactlyInAnyOrderElementsOf(expected);
            Assertions.assertThat(new HashSet(resultRegular)).containsExactlyInAnyOrderElementsOf(expected);
            executionConfig.disableObjectReuse();
            List resultSafe2 = this.getCoGroupOperator(new SumCoGroup()).executeOnCollections(Collections.emptyList(), Collections.emptyList(), (RuntimeContext)ctx, executionConfig);
            executionConfig.enableObjectReuse();
            List resultRegular2 = this.getCoGroupOperator(new SumCoGroup()).executeOnCollections(Collections.emptyList(), Collections.emptyList(), (RuntimeContext)ctx, executionConfig);
            Assertions.assertThat((List)resultSafe2).isEmpty();
            Assertions.assertThat((List)resultRegular2).isEmpty();
        }
        catch (Throwable t) {
            t.printStackTrace();
            Assertions.fail((String)t.getMessage());
        }
    }

    private CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>, CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>> getCoGroupOperator(RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> udf) {
        TypeInformation tuple2Info = TypeInformation.of((TypeHint)new TypeHint<Tuple2<String, Integer>>(){});
        return new CoGroupOperatorBase(udf, new BinaryOperatorInformation(tuple2Info, tuple2Info, tuple2Info), new int[]{0}, new int[]{0}, "coGroup on Collections");
    }

    private static class SumCoGroup
    extends RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> {
        private boolean isOpened = false;
        private boolean isClosed = false;

        private SumCoGroup() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.isOpened = true;
            RuntimeContext ctx = this.getRuntimeContext();
            Assertions.assertThat((String)ctx.getTaskInfo().getTaskName()).isEqualTo("Test UDF");
            Assertions.assertThat((int)ctx.getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(4);
            Assertions.assertThat((int)ctx.getTaskInfo().getIndexOfThisSubtask()).isZero();
        }

        public void coGroup(Iterable<Tuple2<String, Integer>> first, Iterable<Tuple2<String, Integer>> second, Collector<Tuple2<String, Integer>> out) throws Exception {
            Assertions.assertThat((boolean)this.isOpened).isTrue();
            Assertions.assertThat((boolean)this.isClosed).isFalse();
            String f0 = null;
            int sumF1 = 0;
            for (Tuple2<String, Integer> input : first) {
                f0 = f0 == null ? (String)input.f0 : f0;
                sumF1 += ((Integer)input.f1).intValue();
            }
            for (Tuple2<String, Integer> input : second) {
                f0 = f0 == null ? (String)input.f0 : f0;
                sumF1 += ((Integer)input.f1).intValue();
            }
            out.collect((Object)Tuple2.of((Object)f0, (Object)sumF1));
        }

        public void close() throws Exception {
            this.isClosed = true;
        }
    }
}

