/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.chaining;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChainedAllReduceDriver<IT>
extends ChainedDriver<IT, IT> {
    private static final Logger LOG = LoggerFactory.getLogger(ChainedAllReduceDriver.class);
    private ReduceFunction<IT> reducer;
    private TypeSerializer<IT> serializer;
    private IT base;

    @Override
    public void setup(AbstractInvokable parent) {
        ReduceFunction red;
        this.reducer = red = BatchTask.instantiateUserCode(this.config, this.userCodeClassLoader, ReduceFunction.class);
        FunctionUtils.setFunctionRuntimeContext(red, this.getUdfRuntimeContext());
        TypeSerializerFactory serializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader);
        this.serializer = serializerFactory.getSerializer();
        if (LOG.isDebugEnabled()) {
            LOG.debug("ChainedAllReduceDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
        }
    }

    @Override
    public void openTask() throws Exception {
        Configuration stubConfig = this.config.getStubParameters();
        BatchTask.openUserCode(this.reducer, stubConfig);
    }

    @Override
    public void closeTask() throws Exception {
        BatchTask.closeUserCode(this.reducer);
    }

    @Override
    public void cancelTask() {
        try {
            FunctionUtils.closeFunction(this.reducer);
        }
        catch (Throwable throwable) {
            // empty catch block
        }
    }

    @Override
    public Function getStub() {
        return this.reducer;
    }

    @Override
    public String getTaskName() {
        return this.taskName;
    }

    @Override
    public void collect(IT record) {
        this.numRecordsIn.inc();
        try {
            this.base = this.base == null ? this.serializer.copy(record) : (this.objectReuseEnabled ? this.reducer.reduce(this.base, record) : this.serializer.copy(this.reducer.reduce(this.base, record)));
        }
        catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
    }

    @Override
    public void close() {
        try {
            if (this.base != null) {
                this.outputCollector.collect(this.base);
                this.base = null;
            }
        }
        catch (Exception e) {
            throw new ExceptionInChainedStubException(this.taskName, e);
        }
        this.outputCollector.close();
    }
}

