package org.apache.flink.table.planner.plan.nodes.exec.stream;

import java.util.List;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.MultipleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.IntervalJoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.planner.plan.utils.JoinUtil;
import org.apache.flink.table.planner.plan.utils.KeySelectorUtil;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.KeyedCoProcessOperatorWithWatermarkDelay;
import org.apache.flink.table.runtime.operators.join.OuterJoinPaddingUtil;
import org.apache.flink.table.runtime.operators.join.interval.FilterAllFlatMapFunction;
import org.apache.flink.table.runtime.operators.join.interval.IntervalJoinFunction;
import org.apache.flink.table.runtime.operators.join.interval.PaddingLeftMapFunction;
import org.apache.flink.table.runtime.operators.join.interval.PaddingRightMapFunction;
import org.apache.flink.table.runtime.operators.join.interval.ProcTimeIntervalJoin;
import org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ExecNodeMetadata(name = "stream-exec-interval-join", version = 1, producedTransformations = {StreamExecIntervalJoin.FILTER_LEFT_TRANSFORMATION, StreamExecIntervalJoin.FILTER_RIGHT_TRANSFORMATION, StreamExecIntervalJoin.PAD_LEFT_TRANSFORMATION, StreamExecIntervalJoin.PAD_RIGHT_TRANSFORMATION, StreamExecIntervalJoin.INTERVAL_JOIN_TRANSFORMATION}, minPlanVersion = FlinkVersion.v1_15, minStateVersion = FlinkVersion.v1_15)
/* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin.class */
public class StreamExecIntervalJoin extends ExecNodeBase<RowData> implements StreamExecNode<RowData>, MultipleTransformationTranslator<RowData> {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamExecIntervalJoin.class);
    public static final String FILTER_LEFT_TRANSFORMATION = "filter-left";
    public static final String FILTER_RIGHT_TRANSFORMATION = "filter-right";
    public static final String PAD_LEFT_TRANSFORMATION = "pad-left";
    public static final String PAD_RIGHT_TRANSFORMATION = "pad-right";
    public static final String INTERVAL_JOIN_TRANSFORMATION = "interval-join";
    public static final String FIELD_NAME_INTERVAL_JOIN_SPEC = "intervalJoinSpec";

    @JsonProperty(FIELD_NAME_INTERVAL_JOIN_SPEC)
    private final IntervalJoinSpec intervalJoinSpec;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecIntervalJoin$1, reason: invalid class name */
    /* loaded from: input_file:flink-table-planner.jar:org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecIntervalJoin$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$table$runtime$operators$join$FlinkJoinType = new int[FlinkJoinType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$table$runtime$operators$join$FlinkJoinType[FlinkJoinType.INNER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$table$runtime$operators$join$FlinkJoinType[FlinkJoinType.LEFT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$table$runtime$operators$join$FlinkJoinType[FlinkJoinType.RIGHT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$table$runtime$operators$join$FlinkJoinType[FlinkJoinType.FULL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public StreamExecIntervalJoin(ReadableConfig readableConfig, IntervalJoinSpec intervalJoinSpec, InputProperty inputProperty, InputProperty inputProperty2, RowType rowType, String str) {
        this(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecIntervalJoin.class), ExecNodeContext.newPersistedConfig(StreamExecIntervalJoin.class, readableConfig), intervalJoinSpec, Lists.newArrayList(new InputProperty[]{inputProperty, inputProperty2}), rowType, str);
    }

    @JsonCreator
    public StreamExecIntervalJoin(@JsonProperty("id") int i, @JsonProperty("type") ExecNodeContext execNodeContext, @JsonProperty("configuration") ReadableConfig readableConfig, @JsonProperty("intervalJoinSpec") IntervalJoinSpec intervalJoinSpec, @JsonProperty("inputProperties") List<InputProperty> list, @JsonProperty("outputType") RowType rowType, @JsonProperty("description") String str) {
        super(i, execNodeContext, readableConfig, list, rowType, str);
        Preconditions.checkArgument(list.size() == 2);
        this.intervalJoinSpec = (IntervalJoinSpec) Preconditions.checkNotNull(intervalJoinSpec);
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        ExecEdge execEdge = getInputEdges().get(0);
        ExecEdge execEdge2 = getInputEdges().get(1);
        RowType outputType = execEdge.getOutputType();
        RowType outputType2 = execEdge2.getOutputType();
        Transformation<?> translateToPlan = execEdge.translateToPlan(plannerBase);
        Transformation<?> translateToPlan2 = execEdge2.translateToPlan(plannerBase);
        InternalTypeInfo<RowData> of = InternalTypeInfo.of(getOutputType());
        JoinSpec joinSpec = this.intervalJoinSpec.getJoinSpec();
        IntervalJoinSpec.WindowBounds windowBounds = this.intervalJoinSpec.getWindowBounds();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$table$runtime$operators$join$FlinkJoinType[joinSpec.getJoinType().ordinal()]) {
            case 1:
            case 2:
            case 3:
            case 4:
                long leftUpperBound = windowBounds.getLeftUpperBound() - windowBounds.getLeftLowerBound();
                if (leftUpperBound < 0) {
                    LOGGER.warn("The relative time interval size " + leftUpperBound + "is negative, please check the join conditions.");
                    return createNegativeWindowSizeJoin(joinSpec, translateToPlan, translateToPlan2, outputType.getFieldCount(), outputType2.getFieldCount(), of, execNodeConfig);
                }
                IntervalJoinFunction intervalJoinFunction = new IntervalJoinFunction(JoinUtil.generateConditionFunction((ReadableConfig) execNodeConfig, joinSpec, (LogicalType) outputType, (LogicalType) outputType2), of, joinSpec.getFilterNulls());
                TwoInputTransformation<RowData, RowData, RowData> createRowTimeJoin = windowBounds.isEventTime() ? createRowTimeJoin(translateToPlan, translateToPlan2, of, intervalJoinFunction, joinSpec, windowBounds, execNodeConfig) : createProcTimeJoin(translateToPlan, translateToPlan2, of, intervalJoinFunction, joinSpec, windowBounds, execNodeConfig);
                if (inputsContainSingleton()) {
                    createRowTimeJoin.setParallelism(1);
                    createRowTimeJoin.setMaxParallelism(1);
                }
                RowDataKeySelector rowDataSelector = KeySelectorUtil.getRowDataSelector(joinSpec.getLeftKeys(), InternalTypeInfo.of(outputType));
                createRowTimeJoin.setStateKeySelectors(rowDataSelector, KeySelectorUtil.getRowDataSelector(joinSpec.getRightKeys(), InternalTypeInfo.of(outputType2)));
                createRowTimeJoin.setStateKeyType(rowDataSelector.getProducedType());
                return createRowTimeJoin;
            default:
                throw new TableException("Interval Join: " + joinSpec.getJoinType() + " Join between stream and stream is not supported yet.\nplease re-check interval join statement according to description above.");
        }
    }

    private Transformation<RowData> createNegativeWindowSizeJoin(JoinSpec joinSpec, Transformation<RowData> transformation, Transformation<RowData> transformation2, int i, int i2, InternalTypeInfo<RowData> internalTypeInfo, ReadableConfig readableConfig) {
        boolean booleanValue = ((Boolean) readableConfig.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_TRANSFORMATION_UIDS)).booleanValue();
        FilterAllFlatMapFunction filterAllFlatMapFunction = new FilterAllFlatMapFunction(internalTypeInfo);
        OuterJoinPaddingUtil outerJoinPaddingUtil = new OuterJoinPaddingUtil(i, i2);
        PaddingLeftMapFunction paddingLeftMapFunction = new PaddingLeftMapFunction(outerJoinPaddingUtil, internalTypeInfo);
        PaddingRightMapFunction paddingRightMapFunction = new PaddingRightMapFunction(outerJoinPaddingUtil, internalTypeInfo);
        int parallelism = transformation.getParallelism();
        int parallelism2 = transformation2.getParallelism();
        Transformation oneInputTransformation = new OneInputTransformation(transformation, "FilterLeft", new StreamFlatMap(filterAllFlatMapFunction), internalTypeInfo, parallelism);
        if (booleanValue) {
            oneInputTransformation.setUid(createTransformationUid(FILTER_LEFT_TRANSFORMATION));
        }
        oneInputTransformation.setDescription(createFormattedTransformationDescription("filter all left input transformation", readableConfig));
        oneInputTransformation.setName(createFormattedTransformationName(oneInputTransformation.getDescription(), "FilterLeft", readableConfig));
        Transformation oneInputTransformation2 = new OneInputTransformation(transformation2, "FilterRight", new StreamFlatMap(filterAllFlatMapFunction), internalTypeInfo, parallelism2);
        if (booleanValue) {
            oneInputTransformation2.setUid(createTransformationUid(FILTER_RIGHT_TRANSFORMATION));
        }
        oneInputTransformation2.setDescription(createFormattedTransformationDescription("filter all right input transformation", readableConfig));
        oneInputTransformation2.setName(createFormattedTransformationName(oneInputTransformation2.getDescription(), "FilterRight", readableConfig));
        Transformation oneInputTransformation3 = new OneInputTransformation(transformation, "PadLeft", new StreamMap(paddingLeftMapFunction), internalTypeInfo, parallelism);
        if (booleanValue) {
            oneInputTransformation3.setUid(createTransformationUid(PAD_LEFT_TRANSFORMATION));
        }
        oneInputTransformation3.setDescription(createFormattedTransformationDescription("pad left input transformation", readableConfig));
        oneInputTransformation3.setName(createFormattedTransformationName(oneInputTransformation3.getDescription(), "PadLeft", readableConfig));
        Transformation oneInputTransformation4 = new OneInputTransformation(transformation2, "PadRight", new StreamMap(paddingRightMapFunction), internalTypeInfo, parallelism2);
        if (booleanValue) {
            oneInputTransformation4.setUid(createTransformationUid(PAD_RIGHT_TRANSFORMATION));
        }
        oneInputTransformation4.setDescription(createFormattedTransformationDescription("pad right input transformation", readableConfig));
        oneInputTransformation4.setName(createFormattedTransformationName(oneInputTransformation4.getDescription(), "PadRight", readableConfig));
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$table$runtime$operators$join$FlinkJoinType[joinSpec.getJoinType().ordinal()]) {
            case 1:
                return new UnionTransformation(Lists.newArrayList(new Transformation[]{oneInputTransformation, oneInputTransformation2}));
            case 2:
                return new UnionTransformation(Lists.newArrayList(new Transformation[]{oneInputTransformation3, oneInputTransformation2}));
            case 3:
                return new UnionTransformation(Lists.newArrayList(new Transformation[]{oneInputTransformation, oneInputTransformation4}));
            case 4:
                return new UnionTransformation(Lists.newArrayList(new Transformation[]{oneInputTransformation3, oneInputTransformation4}));
            default:
                throw new TableException("should no reach here");
        }
    }

    private TwoInputTransformation<RowData, RowData, RowData> createProcTimeJoin(Transformation<RowData> transformation, Transformation<RowData> transformation2, InternalTypeInfo<RowData> internalTypeInfo, IntervalJoinFunction intervalJoinFunction, JoinSpec joinSpec, IntervalJoinSpec.WindowBounds windowBounds, ReadableConfig readableConfig) {
        return ExecNodeUtil.createTwoInputTransformation(transformation, transformation2, createTransformationMeta(INTERVAL_JOIN_TRANSFORMATION, readableConfig), new KeyedCoProcessOperator(new ProcTimeIntervalJoin(joinSpec.getJoinType(), windowBounds.getLeftLowerBound(), windowBounds.getLeftUpperBound(), transformation.getOutputType(), transformation2.getOutputType(), intervalJoinFunction)), internalTypeInfo, transformation.getParallelism());
    }

    private TwoInputTransformation<RowData, RowData, RowData> createRowTimeJoin(Transformation<RowData> transformation, Transformation<RowData> transformation2, InternalTypeInfo<RowData> internalTypeInfo, IntervalJoinFunction intervalJoinFunction, JoinSpec joinSpec, IntervalJoinSpec.WindowBounds windowBounds, ReadableConfig readableConfig) {
        RowTimeIntervalJoin rowTimeIntervalJoin = new RowTimeIntervalJoin(joinSpec.getJoinType(), windowBounds.getLeftLowerBound(), windowBounds.getLeftUpperBound(), 0L, transformation.getOutputType(), transformation2.getOutputType(), intervalJoinFunction, windowBounds.getLeftTimeIdx(), windowBounds.getRightTimeIdx());
        return ExecNodeUtil.createTwoInputTransformation(transformation, transformation2, createTransformationMeta(INTERVAL_JOIN_TRANSFORMATION, readableConfig), new KeyedCoProcessOperatorWithWatermarkDelay(rowTimeIntervalJoin, rowTimeIntervalJoin.getMaxOutputDelay()), internalTypeInfo, transformation.getParallelism());
    }
}
