/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.graph.asm.degree.annotate.undirected;

import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions;
import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
import org.apache.flink.graph.utils.proxy.OptionalBoolean;
import org.apache.flink.types.LongValue;

public class EdgeTargetDegree<K, VV, EV>
extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
    private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, false);

    public EdgeTargetDegree<K, VV, EV> setReduceOnSourceId(boolean reduceOnSourceId) {
        this.reduceOnSourceId.set(reduceOnSourceId);
        return this;
    }

    @Override
    protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
        super.mergeConfiguration(other);
        EdgeTargetDegree rhs = (EdgeTargetDegree)other;
        this.reduceOnSourceId.mergeWith(rhs.reduceOnSourceId);
    }

    @Override
    public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> input) throws Exception {
        DataSet vertexDegrees = (DataSet)input.run(new VertexDegree().setReduceOnTargetId(!this.reduceOnSourceId.get()).setParallelism(this.parallelism));
        return ((JoinOperator)input.getEdges().join(vertexDegrees, JoinOperatorBase.JoinHint.REPARTITION_HASH_SECOND).where(new int[]{1}).equalTo(new int[]{0}).with(new DegreeAnnotationFunctions.JoinEdgeWithVertexDegree()).setParallelism(this.parallelism)).name("Edge target degree");
    }
}

