package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.util.LinkedHashMap;
import java.util.List;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.python.PythonFunctionInfo;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.ProjectionCodeGenerator;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.CommonPythonUtil;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.generated.GeneratedProjection;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.class */
public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData> implements SingleTransformationTranslator<RowData> {
    private static final String PYTHON_TABLE_FUNCTION_OPERATOR_NAME = "org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator";
    private final FlinkJoinType joinType;
    private final RexCall invocation;

    public CommonExecPythonCorrelate(int i, ExecNodeContext execNodeContext, ReadableConfig readableConfig, FlinkJoinType flinkJoinType, RexCall rexCall, List<InputProperty> list, RowType rowType, String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        Preconditions.checkArgument(list.size() == 1);
        this.joinType = flinkJoinType;
        this.invocation = rexCall;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        Transformation<?> translateToPlan = getInputEdges().get(0).translateToPlan(plannerBase);
        Configuration extractPythonConfiguration = CommonPythonUtil.extractPythonConfiguration(plannerBase.getExecEnv(), execNodeConfig);
        OneInputTransformation<RowData, RowData> createPythonOneInputTransformation = createPythonOneInputTransformation(translateToPlan, execNodeConfig, extractPythonConfiguration);
        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(extractPythonConfiguration)) {
            createPythonOneInputTransformation.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
        }
        return createPythonOneInputTransformation;
    }

    private OneInputTransformation<RowData, RowData> createPythonOneInputTransformation(Transformation<RowData> transformation, ExecNodeConfig execNodeConfig, Configuration configuration) {
        Tuple2<int[], PythonFunctionInfo> extractPythonTableFunctionInfo = extractPythonTableFunctionInfo();
        int[] iArr = (int[]) extractPythonTableFunctionInfo.f0;
        PythonFunctionInfo pythonFunctionInfo = (PythonFunctionInfo) extractPythonTableFunctionInfo.f1;
        InternalTypeInfo<RowData> internalTypeInfo = (InternalTypeInfo) transformation.getOutputType();
        InternalTypeInfo<RowData> of = InternalTypeInfo.of(getOutputType());
        return ExecNodeUtil.createOneInputTransformation((Transformation) transformation, createTransformationName(execNodeConfig), createTransformationDescription(execNodeConfig), (StreamOperator) getPythonTableFunctionOperator(execNodeConfig, configuration, internalTypeInfo, of, pythonFunctionInfo, iArr), (TypeInformation) of, transformation.getParallelism());
    }

    private Tuple2<int[], PythonFunctionInfo> extractPythonTableFunctionInfo() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        return Tuple2.of(linkedHashMap.keySet().stream().filter(rexNode -> {
            return rexNode instanceof RexInputRef;
        }).map(rexNode2 -> {
            return Integer.valueOf(((RexInputRef) rexNode2).getIndex());
        }).mapToInt(num -> {
            return num.intValue();
        }).toArray(), CommonPythonUtil.createPythonFunctionInfo(this.invocation, linkedHashMap));
    }

    private OneInputStreamOperator<RowData, RowData> getPythonTableFunctionOperator(ExecNodeConfig execNodeConfig, Configuration configuration, InternalTypeInfo<RowData> internalTypeInfo, InternalTypeInfo<RowData> internalTypeInfo2, PythonFunctionInfo pythonFunctionInfo, int[] iArr) {
        Class<?> loadClass = CommonPythonUtil.loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME);
        RowType rowType = internalTypeInfo.toRowType();
        RowType rowType2 = internalTypeInfo2.toRowType();
        RowType project = Projection.of(iArr).project(rowType);
        try {
            return (OneInputStreamOperator) loadClass.getConstructor(Configuration.class, PythonFunctionInfo.class, RowType.class, RowType.class, RowType.class, FlinkJoinType.class, GeneratedProjection.class).newInstance(configuration, pythonFunctionInfo, rowType, project, Projection.range(rowType.getFieldCount(), rowType2.getFieldCount()).project(rowType2), this.joinType, ProjectionCodeGenerator.generateProjection(CodeGeneratorContext.apply(execNodeConfig), "UdtfInputProjection", rowType, project, iArr));
        } catch (Exception e) {
            throw new TableException("Python Table Function Operator constructed failed.", e);
        }
    }
}
