/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.ml.feature.sqltransformer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.ml.api.Stage;
import org.apache.flink.ml.api.Transformer;
import org.apache.flink.ml.common.datastream.EndOfStreamWindows;
import org.apache.flink.ml.feature.sqltransformer.SQLTransformerParams;
import org.apache.flink.ml.param.Param;
import org.apache.flink.ml.param.WithParams;
import org.apache.flink.ml.util.ParamUtils;
import org.apache.flink.ml.util.ReadWriteUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

public class SQLTransformer
implements Transformer<SQLTransformer>,
SQLTransformerParams<SQLTransformer> {
    static final String TABLE_IDENTIFIER = "__THIS__";
    private static final String INSERT_ONLY_EXCEPTION_PATTERN = "^.* doesn't support consuming .* changes which is produced by node .*$";
    private final Map<Param<?>, Object> paramMap = new HashMap();

    public SQLTransformer() {
        ParamUtils.initializeMapWithDefaultValues(this.paramMap, (WithParams)this);
    }

    public Table[] transform(Table ... inputs) {
        Preconditions.checkArgument((inputs.length == 1 ? 1 : 0) != 0);
        StreamTableEnvironment tEnv = (StreamTableEnvironment)((TableImpl)inputs[0]).getTableEnvironment();
        String statement = this.getStatement().replace(TABLE_IDENTIFIER, inputs[0].toString());
        Table outputTable = tEnv.sqlQuery(statement);
        if (!this.isInsertOnlyTable(tEnv, outputTable)) {
            Schema schema = Schema.newBuilder().fromResolvedSchema(outputTable.getResolvedSchema()).build();
            DataStream outputStream = tEnv.toChangelogStream(outputTable, schema);
            outputStream = outputStream.windowAll((WindowAssigner)EndOfStreamWindows.get()).aggregate((AggregateFunction)new ChangeLogStreamToDataStreamFunction(), Types.LIST((TypeInformation)outputStream.getType()), Types.LIST((TypeInformation)outputStream.getType())).flatMap(new FlattenListFunction(), outputStream.getType());
            outputTable = tEnv.fromDataStream(outputStream, schema);
        }
        return new Table[]{outputTable};
    }

    public void save(String path) throws IOException {
        ReadWriteUtils.saveMetadata((Stage)this, (String)path);
    }

    public static SQLTransformer load(StreamTableEnvironment tEnv, String path) throws IOException {
        return (SQLTransformer)ReadWriteUtils.loadStageParam((String)path);
    }

    public Map<Param<?>, Object> getParamMap() {
        return this.paramMap;
    }

    private boolean isInsertOnlyTable(StreamTableEnvironment tEnv, Table table) {
        try {
            tEnv.toDataStream(table);
            return true;
        }
        catch (Exception e) {
            if (e instanceof TableException && e.getMessage() != null && e.getMessage().matches(INSERT_ONLY_EXCEPTION_PATTERN)) {
                return false;
            }
            throw e;
        }
    }

    private static class FlattenListFunction<T>
    implements FlatMapFunction<List<T>, T> {
        private FlattenListFunction() {
        }

        public void flatMap(List<T> values, Collector<T> out) throws Exception {
            for (T value : values) {
                out.collect(value);
            }
        }
    }

    private static class ChangeLogStreamToDataStreamFunction
    implements AggregateFunction<Row, List<Row>, List<Row>> {
        private ChangeLogStreamToDataStreamFunction() {
        }

        public List<Row> createAccumulator() {
            return new ArrayList<Row>();
        }

        public List<Row> add(Row value, List<Row> accumulator) {
            switch (value.getKind()) {
                case INSERT: {
                    accumulator.add(value);
                    break;
                }
                case UPDATE_AFTER: {
                    value.setKind(RowKind.INSERT);
                    accumulator.add(value);
                    break;
                }
                case UPDATE_BEFORE: 
                case DELETE: {
                    value.setKind(RowKind.INSERT);
                    accumulator.remove(value);
                    break;
                }
                default: {
                    throw new UnsupportedOperationException();
                }
            }
            return accumulator;
        }

        public List<Row> getResult(List<Row> accumulator) {
            return accumulator;
        }

        public List<Row> merge(List<Row> a, List<Row> b) {
            a.addAll(b);
            return a;
        }
    }
}

