/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.library;

import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.GatherFunction;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.ScatterFunction;
import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.types.NullValue;

public class LabelPropagation<K, VV extends Comparable<VV>, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, VV>>> {
    private final int maxIterations;

    public LabelPropagation(int maxIterations) {
        this.maxIterations = maxIterations;
    }

    @Override
    public DataSet<Vertex<K, VV>> run(Graph<K, VV, EV> input) {
        TypeInformation valueType = ((TupleTypeInfo)input.getVertices().getType()).getTypeAt(1);
        return input.mapEdges(new GraphUtils.MapTo(NullValue.getInstance())).runScatterGatherIteration(new SendNewLabelToNeighbors(valueType), new UpdateVertexLabel(), this.maxIterations).getVertices();
    }

    public static final class UpdateVertexLabel<K, VV extends Comparable<VV>>
    extends GatherFunction<K, VV, VV> {
        @Override
        public void updateVertex(Vertex<K, VV> vertex, MessageIterator<VV> inMessages) {
            HashMap<Comparable, Long> labelsWithFrequencies = new HashMap<Comparable, Long>();
            long maxFrequency = 1L;
            Comparable mostFrequentLabel = (Comparable)vertex.getValue();
            for (Comparable comparable : inMessages) {
                if (labelsWithFrequencies.containsKey(comparable)) {
                    long currentFreq = (Long)labelsWithFrequencies.get(comparable);
                    labelsWithFrequencies.put(comparable, currentFreq + 1L);
                    continue;
                }
                labelsWithFrequencies.put(comparable, 1L);
            }
            for (Map.Entry entry : labelsWithFrequencies.entrySet()) {
                if ((Long)entry.getValue() == maxFrequency) {
                    if (((Comparable)entry.getKey()).compareTo(mostFrequentLabel) <= 0) continue;
                    mostFrequentLabel = (Comparable)entry.getKey();
                    continue;
                }
                if ((Long)entry.getValue() <= maxFrequency) continue;
                maxFrequency = (Long)entry.getValue();
                mostFrequentLabel = (Comparable)entry.getKey();
            }
            this.setNewVertexValue(mostFrequentLabel);
        }
    }

    public static final class SendNewLabelToNeighbors<K, VV extends Comparable<VV>>
    extends ScatterFunction<K, VV, VV, NullValue>
    implements ResultTypeQueryable<VV> {
        private final TypeInformation<VV> typeInformation;

        public SendNewLabelToNeighbors(TypeInformation<VV> typeInformation) {
            this.typeInformation = typeInformation;
        }

        @Override
        public void sendMessages(Vertex<K, VV> vertex) {
            this.sendMessageToAllNeighbors(vertex.getValue());
        }

        public TypeInformation<VV> getProducedType() {
            return this.typeInformation;
        }
    }
}

