package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.codegen.OperatorCodeGenerator;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacyTableSourceScan;
import org.apache.flink.table.planner.plan.utils.ScanUtil;
import org.apache.flink.table.planner.sources.TableSourceUtil;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.wmassigners.PeriodicWatermarkAssignerWrapper;
import org.apache.flink.table.runtime.operators.wmassigners.PunctuatedWatermarkAssignerWrapper;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner;
import org.apache.flink.table.sources.wmstrategies.PunctuatedWatermarkAssigner;
import org.apache.flink.table.types.logical.RowType;

/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecLegacyTableSourceScan.class */
public class StreamExecLegacyTableSourceScan extends CommonExecLegacyTableSourceScan implements StreamExecNode<RowData> {
    public StreamExecLegacyTableSourceScan(ReadableConfig readableConfig, TableSource<?> tableSource, List<String> list, RowType rowType, String str) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecLegacyTableSourceScan.class), ExecNodeContext.newPersistedConfig(StreamExecLegacyTableSourceScan.class, readableConfig), tableSource, list, rowType, str);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacyTableSourceScan
    protected Transformation<RowData> createConversionTransformationIfNeeded(StreamExecutionEnvironment streamExecutionEnvironment, ExecNodeConfig execNodeConfig, Transformation<?> transformation, @Nullable RexNode rexNode) {
        Transformation<?> transformation2;
        String str;
        String str2;
        RowType outputType = getOutputType();
        int[] computeIndexMapping = computeIndexMapping(true);
        if (needInternalConversion(computeIndexMapping)) {
            if (ScanUtil.hasTimeAttributeField(computeIndexMapping)) {
                String ELEMENT = OperatorCodeGenerator.ELEMENT();
                str = String.format("ctx.%s = %s;", ELEMENT, ELEMENT);
                str2 = String.format("ctx.%s = null;", ELEMENT);
            } else {
                str = "";
                str2 = "";
            }
            transformation2 = ScanUtil.convertToInternalRow(new CodeGeneratorContext(execNodeConfig).setOperatorBaseClass(TableStreamOperator.class), transformation, computeIndexMapping, TableSourceUtil.fixPrecisionForProducedDataType(this.tableSource, outputType), outputType, this.qualifiedName, (str3, str4) -> {
                return createFormattedTransformationName(str3, str4, execNodeConfig);
            }, str5 -> {
                return createFormattedTransformationDescription(str5, execNodeConfig);
            }, JavaScalaConversionUtil.toScala(Optional.ofNullable(rexNode)), str, str2);
        } else {
            transformation2 = transformation;
        }
        DataStream dataStream = new DataStream(streamExecutionEnvironment, transformation2);
        return ((DataStream) JavaScalaConversionUtil.toJava(TableSourceUtil.getRowtimeAttributeDescriptor(this.tableSource, outputType)).map(rowtimeAttributeDescriptor -> {
            int indexOf = outputType.getFieldNames().indexOf(rowtimeAttributeDescriptor.getAttributeName());
            PeriodicWatermarkAssigner watermarkStrategy = rowtimeAttributeDescriptor.getWatermarkStrategy();
            return watermarkStrategy instanceof PeriodicWatermarkAssigner ? dataStream.assignTimestampsAndWatermarks(new PeriodicWatermarkAssignerWrapper(watermarkStrategy, indexOf)) : watermarkStrategy instanceof PunctuatedWatermarkAssigner ? dataStream.assignTimestampsAndWatermarks(new PunctuatedWatermarkAssignerWrapper((PunctuatedWatermarkAssigner) watermarkStrategy, indexOf, this.tableSource.getProducedDataType())) : dataStream;
        }).orElse(dataStream)).getTransformation();
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecLegacyTableSourceScan
    protected <IN> Transformation<IN> createInput(StreamExecutionEnvironment streamExecutionEnvironment, InputFormat<IN, ? extends InputSplit> inputFormat, TypeInformation<IN> typeInformation) {
        return streamExecutionEnvironment.createInput(inputFormat, typeInformation).name(this.tableSource.explainSource()).getTransformation();
    }
}
