/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.compiler.postpass;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.flink.api.common.operators.DualInputOperator;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory;
import org.apache.flink.compiler.CompilerException;
import org.apache.flink.compiler.CompilerPostPassException;
import org.apache.flink.compiler.plan.BulkIterationPlanNode;
import org.apache.flink.compiler.plan.BulkPartialSolutionPlanNode;
import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.plan.DualInputPlanNode;
import org.apache.flink.compiler.plan.NAryUnionPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.PlanNode;
import org.apache.flink.compiler.plan.SingleInputPlanNode;
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.compiler.plan.SolutionSetPlanNode;
import org.apache.flink.compiler.plan.SourcePlanNode;
import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
import org.apache.flink.compiler.plan.WorksetPlanNode;
import org.apache.flink.compiler.postpass.OptimizerPostPass;
import org.apache.flink.compiler.util.NoOpUnaryUdfOp;
import org.apache.flink.runtime.operators.DriverStrategy;

public class JavaApiPostPass
implements OptimizerPostPass {
    private final Set<PlanNode> alreadyDone = new HashSet<PlanNode>();

    @Override
    public void postPass(OptimizedPlan plan) {
        for (SinkPlanNode sink : plan.getDataSinks()) {
            this.traverse(sink);
        }
    }

    protected void traverse(PlanNode node) {
        if (!this.alreadyDone.add(node)) {
            return;
        }
        if (node instanceof SinkPlanNode) {
            SinkPlanNode sn = (SinkPlanNode)node;
            Channel inchannel = sn.getInput();
            this.traverseChannel(inchannel);
        } else if (node instanceof SourcePlanNode) {
            TypeInformation typeInfo = JavaApiPostPass.getTypeInfoFromSource((SourcePlanNode)node);
            ((SourcePlanNode)node).setSerializer(JavaApiPostPass.createSerializer(typeInfo));
        } else if (node instanceof BulkIterationPlanNode) {
            BulkIterationPlanNode iterationNode = (BulkIterationPlanNode)node;
            if (iterationNode.getRootOfStepFunction() instanceof NAryUnionPlanNode) {
                throw new CompilerException("Optimizer cannot compile an iteration step function where next partial solution is created by a Union node.");
            }
            if (iterationNode.getRootOfTerminationCriterion() != null) {
                SingleInputPlanNode addMapper = (SingleInputPlanNode)iterationNode.getRootOfTerminationCriterion();
                this.traverseChannel(addMapper.getInput());
            }
            BulkIterationBase operator = (BulkIterationBase)iterationNode.getPactContract();
            iterationNode.setSerializerForIterationChannel(JavaApiPostPass.createSerializer(operator.getOperatorInfo().getOutputType()));
            this.traverseChannel(iterationNode.getInput());
            this.traverse(iterationNode.getRootOfStepFunction());
        } else if (node instanceof WorksetIterationPlanNode) {
            WorksetIterationPlanNode iterationNode = (WorksetIterationPlanNode)node;
            if (iterationNode.getNextWorkSetPlanNode() instanceof NAryUnionPlanNode) {
                throw new CompilerException("Optimizer cannot compile a workset iteration step function where the next workset is produced by a Union node.");
            }
            if (iterationNode.getSolutionSetDeltaPlanNode() instanceof NAryUnionPlanNode) {
                throw new CompilerException("Optimizer cannot compile a workset iteration step function where the solution set delta is produced by a Union node.");
            }
            DeltaIterationBase operator = (DeltaIterationBase)iterationNode.getPactContract();
            iterationNode.setSolutionSetSerializer(JavaApiPostPass.createSerializer(operator.getOperatorInfo().getFirstInputType()));
            iterationNode.setWorksetSerializer(JavaApiPostPass.createSerializer(operator.getOperatorInfo().getSecondInputType()));
            iterationNode.setSolutionSetComparator(JavaApiPostPass.createComparator(operator.getOperatorInfo().getFirstInputType(), iterationNode.getSolutionSetKeyFields(), JavaApiPostPass.getSortOrders(iterationNode.getSolutionSetKeyFields(), null)));
            this.traverseChannel(iterationNode.getInput1());
            this.traverseChannel(iterationNode.getInput2());
            this.traverse(iterationNode.getSolutionSetDeltaPlanNode());
            this.traverse(iterationNode.getNextWorkSetPlanNode());
        } else if (node instanceof SingleInputPlanNode) {
            SingleInputPlanNode sn = (SingleInputPlanNode)node;
            if (!(sn.getOptimizerNode().getPactContract() instanceof SingleInputOperator)) {
                if (sn.getOptimizerNode().getPactContract() instanceof NoOpUnaryUdfOp) {
                    this.traverseChannel(sn.getInput());
                    return;
                }
                throw new RuntimeException("Wrong operator type found in post pass.");
            }
            SingleInputOperator singleInputOperator = (SingleInputOperator)sn.getOptimizerNode().getPactContract();
            for (int i = 0; i < sn.getDriverStrategy().getNumRequiredComparators(); ++i) {
                sn.setComparator(JavaApiPostPass.createComparator(singleInputOperator.getOperatorInfo().getInputType(), sn.getKeys(i), JavaApiPostPass.getSortOrders(sn.getKeys(i), sn.getSortOrders(i))), i);
            }
            this.traverseChannel(sn.getInput());
            for (Channel channel : sn.getBroadcastInputs()) {
                this.traverseChannel(channel);
            }
        } else if (node instanceof DualInputPlanNode) {
            DualInputPlanNode dn = (DualInputPlanNode)node;
            if (!(dn.getOptimizerNode().getPactContract() instanceof DualInputOperator)) {
                throw new RuntimeException("Wrong operator type found in post pass.");
            }
            DualInputOperator dualInputOperator = (DualInputOperator)dn.getOptimizerNode().getPactContract();
            if (dn.getDriverStrategy().getNumRequiredComparators() > 0) {
                dn.setComparator1(JavaApiPostPass.createComparator(dualInputOperator.getOperatorInfo().getFirstInputType(), dn.getKeysForInput1(), JavaApiPostPass.getSortOrders(dn.getKeysForInput1(), dn.getSortOrders())));
                dn.setComparator2(JavaApiPostPass.createComparator(dualInputOperator.getOperatorInfo().getSecondInputType(), dn.getKeysForInput2(), JavaApiPostPass.getSortOrders(dn.getKeysForInput2(), dn.getSortOrders())));
                dn.setPairComparator(JavaApiPostPass.createPairComparator(dualInputOperator.getOperatorInfo().getFirstInputType(), dualInputOperator.getOperatorInfo().getSecondInputType()));
            }
            this.traverseChannel(dn.getInput1());
            this.traverseChannel(dn.getInput2());
            for (Channel channel : dn.getBroadcastInputs()) {
                this.traverseChannel(channel);
            }
        } else if (!(node instanceof BulkPartialSolutionPlanNode || node instanceof SolutionSetPlanNode || node instanceof WorksetPlanNode)) {
            if (node instanceof NAryUnionPlanNode) {
                for (Channel channel : node.getInputs()) {
                    this.traverseChannel(channel);
                }
            } else {
                throw new CompilerPostPassException("Unknown node type encountered: " + node.getClass().getName());
            }
        }
    }

    private void traverseChannel(Channel channel) {
        PlanNode source = channel.getSource();
        Operator<?> javaOp = source.getPactContract();
        TypeInformation type = javaOp.getOperatorInfo().getOutputType();
        if (javaOp instanceof GroupReduceOperatorBase && (source.getDriverStrategy() == DriverStrategy.SORTED_GROUP_COMBINE || source.getDriverStrategy() == DriverStrategy.ALL_GROUP_COMBINE)) {
            GroupReduceOperatorBase groupNode = (GroupReduceOperatorBase)javaOp;
            type = groupNode.getInput().getOperatorInfo().getOutputType();
        } else if (javaOp instanceof PlanUnwrappingReduceGroupOperator && source.getDriverStrategy().equals((Object)DriverStrategy.SORTED_GROUP_COMBINE)) {
            PlanUnwrappingReduceGroupOperator groupNode = (PlanUnwrappingReduceGroupOperator)javaOp;
            type = groupNode.getInput().getOperatorInfo().getOutputType();
        }
        channel.setSerializer(JavaApiPostPass.createSerializer(type));
        if (channel.getShipStrategy().requiresComparator()) {
            channel.setShipStrategyComparator(JavaApiPostPass.createComparator(type, channel.getShipStrategyKeys(), JavaApiPostPass.getSortOrders(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder())));
        }
        if (channel.getLocalStrategy().requiresComparator()) {
            channel.setLocalStrategyComparator(JavaApiPostPass.createComparator(type, channel.getLocalStrategyKeys(), JavaApiPostPass.getSortOrders(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder())));
        }
        this.traverse(channel.getSource());
    }

    private static <T> TypeInformation<T> getTypeInfoFromSource(SourcePlanNode node) {
        Operator<?> op = node.getOptimizerNode().getPactContract();
        if (op instanceof GenericDataSourceBase) {
            return ((GenericDataSourceBase)op).getOperatorInfo().getOutputType();
        }
        throw new RuntimeException("Wrong operator type found in post pass.");
    }

    private static <T> TypeSerializerFactory<?> createSerializer(TypeInformation<T> typeInfo) {
        TypeSerializer serializer = typeInfo.createSerializer();
        if (serializer.isStateful()) {
            return new RuntimeStatefulSerializerFactory(serializer, typeInfo.getTypeClass());
        }
        return new RuntimeStatelessSerializerFactory(serializer, typeInfo.getTypeClass());
    }

    private static <T> TypeComparatorFactory<?> createComparator(TypeInformation<T> typeInfo, FieldList keys, boolean[] sortOrder) {
        if (!(typeInfo instanceof CompositeType)) {
            if (typeInfo instanceof AtomicType) {
                throw new UnsupportedOperationException("Grouping on atomic types is currently not implemented. " + typeInfo);
            }
            throw new RuntimeException("Unrecognized type: " + typeInfo);
        }
        TypeComparator comparator = ((CompositeType)typeInfo).createComparator(keys.toArray(), sortOrder, 0);
        return new RuntimeComparatorFactory(comparator);
    }

    private static <T1 extends Tuple, T2 extends Tuple> TypePairComparatorFactory<T1, T2> createPairComparator(TypeInformation<?> typeInfo1, TypeInformation<?> typeInfo2) {
        if (!typeInfo1.isTupleType() && !(typeInfo1 instanceof PojoTypeInfo) && (typeInfo2.isTupleType() || typeInfo2 instanceof PojoTypeInfo)) {
            throw new RuntimeException("The runtime currently supports only keyed binary operations (such as joins) on tuples and POJO types.");
        }
        return new RuntimePairComparatorFactory();
    }

    private static final boolean[] getSortOrders(FieldList keys, boolean[] orders) {
        if (orders == null) {
            orders = new boolean[keys.size()];
            Arrays.fill(orders, true);
        }
        return orders;
    }
}

