/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.ml.recommendation.swing;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.iteration.operator.OperatorStateUtils;
import org.apache.flink.ml.api.AlgoOperator;
import org.apache.flink.ml.api.Stage;
import org.apache.flink.ml.common.datastream.TableUtils;
import org.apache.flink.ml.param.Param;
import org.apache.flink.ml.param.WithParams;
import org.apache.flink.ml.recommendation.swing.SwingParams;
import org.apache.flink.ml.util.ParamUtils;
import org.apache.flink.ml.util.ReadWriteUtils;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableImpl;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

public class Swing
implements AlgoOperator<Swing>,
SwingParams<Swing> {
    private final Map<Param<?>, Object> paramMap = new HashMap();

    public Swing() {
        ParamUtils.initializeMapWithDefaultValues(this.paramMap, (WithParams)this);
    }

    public Table[] transform(Table ... inputs) {
        Preconditions.checkArgument((inputs.length == 1 ? 1 : 0) != 0);
        String userCol = this.getUserCol();
        String itemCol = this.getItemCol();
        ResolvedSchema schema = inputs[0].getResolvedSchema();
        if (!Types.LONG.equals((Object)TableUtils.getTypeInfoByName((ResolvedSchema)schema, (String)userCol)) || !Types.LONG.equals((Object)TableUtils.getTypeInfoByName((ResolvedSchema)schema, (String)itemCol))) {
            throw new IllegalArgumentException("The types of user and item must be Long.");
        }
        if (this.getMaxUserBehavior() < this.getMinUserBehavior()) {
            throw new IllegalArgumentException(String.format("The maxUserBehavior must be greater than or equal to minUserBehavior. The current setting: maxUserBehavior=%d, minUserBehavior=%d.", this.getMaxUserBehavior(), this.getMinUserBehavior()));
        }
        StreamTableEnvironment tEnv = (StreamTableEnvironment)((TableImpl)inputs[0]).getTableEnvironment();
        SingleOutputStreamOperator purchasingBehavior = tEnv.toDataStream(inputs[0]).map((MapFunction & Serializable)row -> {
            Long userId = (Long)row.getFieldAs(userCol);
            Long itemId = (Long)row.getFieldAs(itemCol);
            if (userId == null || itemId == null) {
                throw new RuntimeException("Data of user and item column must not be null.");
            }
            return Tuple2.of((Object)userId, (Object)itemId);
        }).returns(Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.LONG, Types.LONG}));
        SingleOutputStreamOperator userBehavior = purchasingBehavior.keyBy((KeySelector & Serializable)tuple -> (Long)tuple.f0).transform("collectingUserBehavior", Types.TUPLE((TypeInformation[])new TypeInformation[]{Types.LONG, Types.LONG, PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO}), (OneInputStreamOperator)new CollectingUserBehavior(this.getMinUserBehavior(), this.getMaxUserBehavior()));
        RowTypeInfo outputTypeInfo = new RowTypeInfo(new TypeInformation[]{Types.LONG, Types.STRING}, new String[]{this.getItemCol(), this.getOutputCol()});
        SingleOutputStreamOperator output = userBehavior.keyBy((KeySelector & Serializable)tuple -> (Long)tuple.f1).transform("computingSimilarItems", (TypeInformation)outputTypeInfo, (OneInputStreamOperator)new ComputingSimilarItems(this.getK(), this.getMaxUserNumPerItem(), this.getMaxUserBehavior(), this.getAlpha1(), this.getAlpha2(), this.getBeta(), this.getSeed()));
        return new Table[]{tEnv.fromDataStream((DataStream)output)};
    }

    public Map<Param<?>, Object> getParamMap() {
        return this.paramMap;
    }

    public void save(String path) throws IOException {
        ReadWriteUtils.saveMetadata((Stage)this, (String)path);
    }

    public static Swing load(StreamTableEnvironment tEnv, String path) throws IOException {
        return (Swing)ReadWriteUtils.loadStageParam((String)path);
    }

    private static class ComputingSimilarItems
    extends AbstractStreamOperator<Row>
    implements OneInputStreamOperator<Tuple3<Long, Long, long[]>, Row>,
    BoundedOneInput {
        private final int k;
        private final int maxUserNumPerItem;
        private final int maxUserBehavior;
        private final int alpha1;
        private final int alpha2;
        private final double beta;
        private static final Character commaDelimiter = Character.valueOf(',');
        private static final Character semicolonDelimiter = Character.valueOf(';');
        private final Random random;
        private Map<Long, long[]> userAndPurchasedItems = new HashMap<Long, long[]>();
        private Map<Long, List<Long>> itemAndPurchasers = new HashMap<Long, List<Long>>();
        private ListState<Map<Long, long[]>> userAndPurchasedItemsState;
        private ListState<Map<Long, List<Long>>> itemAndPurchasersState;

        private ComputingSimilarItems(int k, int maxUserNumPerItem, int maxUserBehavior, int alpha1, int alpha2, double beta, long seed) {
            this.k = k;
            this.maxUserNumPerItem = maxUserNumPerItem;
            this.maxUserBehavior = maxUserBehavior;
            this.alpha1 = alpha1;
            this.alpha2 = alpha2;
            this.beta = beta;
            this.random = new Random(seed);
        }

        public void endInput() throws Exception {
            HashMap userWeights = new HashMap(this.userAndPurchasedItems.size());
            this.userAndPurchasedItems.forEach((k, v) -> {
                int count = ((long[])v).length;
                userWeights.put(k, this.calculateWeight(count));
            });
            long[] interaction = new long[this.maxUserBehavior];
            for (long mainItem : this.itemAndPurchasers.keySet()) {
                List<Long> userList = this.itemAndPurchasers.get(mainItem);
                HashMap<Long, Double> id2swing = new HashMap<Long, Double>();
                for (int i = 1; i < userList.size(); ++i) {
                    long u = userList.get(i);
                    for (int j = i + 1; j < userList.size(); ++j) {
                        long v2 = userList.get(j);
                        int interactionSize = ComputingSimilarItems.calculateCommonItems(this.userAndPurchasedItems.get(u), this.userAndPurchasedItems.get(v2), interaction);
                        if (interactionSize == 0) continue;
                        double similarity = (Double)userWeights.get(u) * (Double)userWeights.get(v2) / (double)(this.alpha2 + interactionSize);
                        for (int k2 = 0; k2 < interactionSize; ++k2) {
                            long simItem = interaction[k2];
                            if (simItem == mainItem) continue;
                            double itemSimilarity = id2swing.getOrDefault(simItem, 0.0) + similarity;
                            id2swing.put(simItem, itemSimilarity);
                        }
                    }
                }
                ArrayList itemAndScore = new ArrayList();
                id2swing.forEach((key, value) -> itemAndScore.add(Tuple2.of((Object)key, (Object)value)));
                itemAndScore.sort((o1, o2) -> Double.compare((Double)o2.f1, (Double)o1.f1));
                if (itemAndScore.size() == 0) continue;
                int itemNums = Math.min(this.k, itemAndScore.size());
                String itemList = ((Stream)itemAndScore.stream().sequential()).limit(itemNums).map(tuple2 -> "" + tuple2.f0 + commaDelimiter + tuple2.f1).collect(Collectors.joining("" + semicolonDelimiter));
                this.output.collect((Object)new StreamRecord((Object)Row.of((Object[])new Object[]{mainItem, itemList})));
            }
            this.userAndPurchasedItemsState.clear();
            this.itemAndPurchasersState.clear();
        }

        private double calculateWeight(int size) {
            return 1.0 / Math.pow(this.alpha1 + size, this.beta);
        }

        private static int calculateCommonItems(long[] u, long[] v, long[] interaction) {
            int pointerU = 0;
            int pointerV = 0;
            int interactionSize = 0;
            while (pointerU < u.length && pointerV < v.length) {
                if (u[pointerU] == v[pointerV]) {
                    interaction[interactionSize++] = u[pointerU];
                    ++pointerU;
                    ++pointerV;
                    continue;
                }
                if (u[pointerU] < v[pointerV]) {
                    ++pointerU;
                    continue;
                }
                ++pointerV;
            }
            return interactionSize;
        }

        public void processElement(StreamRecord<Tuple3<Long, Long, long[]>> streamRecord) throws Exception {
            Tuple3 tuple3 = (Tuple3)streamRecord.getValue();
            long user = (Long)tuple3.f0;
            long[] userBehavior = (long[])tuple3.f2;
            long mainItem = (Long)tuple3.f1;
            if (!this.userAndPurchasedItems.containsKey(user)) {
                Arrays.sort(userBehavior);
                this.userAndPurchasedItems.put(user, userBehavior);
            }
            this.itemAndPurchasers.putIfAbsent(mainItem, new ArrayList());
            List<Long> purchasers = this.itemAndPurchasers.get(mainItem);
            if (purchasers.size() == 0) {
                purchasers.add(0L);
            }
            long total = purchasers.get(0);
            if (purchasers.size() <= this.maxUserNumPerItem) {
                purchasers.add(user);
            } else {
                int index = this.random.nextInt((int)total) + 1;
                if (index <= this.maxUserNumPerItem) {
                    purchasers.set(index, user);
                }
            }
            purchasers.set(0, ++total);
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.userAndPurchasedItemsState = context.getOperatorStateStore().getListState(new ListStateDescriptor("userAndPurchasedItemsState", Types.MAP((TypeInformation)Types.LONG, (TypeInformation)PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO)));
            OperatorStateUtils.getUniqueElement(this.userAndPurchasedItemsState, (String)"userAndPurchasedItemsState").ifPresent(stat -> {
                this.userAndPurchasedItems = stat;
            });
            this.itemAndPurchasersState = context.getOperatorStateStore().getListState(new ListStateDescriptor("itemAndPurchasersState", Types.MAP((TypeInformation)Types.LONG, (TypeInformation)Types.LIST((TypeInformation)Types.LONG))));
            OperatorStateUtils.getUniqueElement(this.itemAndPurchasersState, (String)"itemAndPurchasersState").ifPresent(stat -> {
                this.itemAndPurchasers = stat;
            });
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
            this.userAndPurchasedItemsState.update(Collections.singletonList(this.userAndPurchasedItems));
            this.itemAndPurchasersState.update(Collections.singletonList(this.itemAndPurchasers));
        }
    }

    private static class CollectingUserBehavior
    extends AbstractStreamOperator<Tuple3<Long, Long, long[]>>
    implements OneInputStreamOperator<Tuple2<Long, Long>, Tuple3<Long, Long, long[]>>,
    BoundedOneInput {
        private final int minUserItemInteraction;
        private final int maxUserItemInteraction;
        private Map<Long, Map<Long, String>> userAndPurchasedItems = new HashMap<Long, Map<Long, String>>();
        private ListState<Map<Long, Map<Long, String>>> userAndPurchasedItemsState;

        private CollectingUserBehavior(int minUserItemInteraction, int maxUserItemInteraction) {
            this.minUserItemInteraction = minUserItemInteraction;
            this.maxUserItemInteraction = maxUserItemInteraction;
        }

        public void endInput() {
            this.userAndPurchasedItems.forEach((user, items) -> {
                if (items.size() >= this.minUserItemInteraction && items.size() <= this.maxUserItemInteraction) {
                    long[] itemsArray = new long[items.size()];
                    int i = 0;
                    for (Long value : items.keySet()) {
                        itemsArray[i++] = value;
                    }
                    items.forEach((item, nullValue) -> this.output.collect((Object)new StreamRecord((Object)new Tuple3(user, item, (Object)itemsArray))));
                }
            });
            this.userAndPurchasedItemsState.clear();
        }

        public void processElement(StreamRecord<Tuple2<Long, Long>> element) {
            Tuple2 userAndItem = (Tuple2)element.getValue();
            long user = (Long)userAndItem.f0;
            long item = (Long)userAndItem.f1;
            Map items = this.userAndPurchasedItems.getOrDefault(user, new LinkedHashMap());
            if (items.size() <= this.maxUserItemInteraction) {
                items.put(item, null);
            }
            this.userAndPurchasedItems.putIfAbsent(user, items);
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.userAndPurchasedItemsState = context.getOperatorStateStore().getListState(new ListStateDescriptor("userAndPurchasedItemsState", Types.MAP((TypeInformation)Types.LONG, (TypeInformation)Types.MAP((TypeInformation)Types.LONG, (TypeInformation)Types.STRING))));
            OperatorStateUtils.getUniqueElement(this.userAndPurchasedItemsState, (String)"userAndPurchasedItemsState").ifPresent(stat -> {
                this.userAndPurchasedItems = stat;
            });
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
            this.userAndPurchasedItemsState.update(Collections.singletonList(this.userAndPurchasedItems));
        }
    }
}

