package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.collector.ListenableCollector;
import org.apache.flink.table.runtime.generated.GeneratedCollectorWrapper;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.LookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest.class */
public class LookupJoinHarnessTest {
    private final TypeSerializer<RowData> inSerializer = new RowDataSerializer(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest$CalculateOnTemporalTable.class */
    public static final class CalculateOnTemporalTable implements FlatMapFunction<RowData, RowData> {
        private static final long serialVersionUID = -1860345072157431136L;

        public void flatMap(RowData rowData, Collector<RowData> collector) throws Exception {
            if (rowData.getString(1).getSizeInBytes() >= 6) {
                collector.collect(rowData);
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((RowData) obj, (Collector<RowData>) collector);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest$FilterOnTable.class */
    public enum FilterOnTable {
        WITH_FILTER,
        WITHOUT_FILTER
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest$JoinType.class */
    public enum JoinType {
        INNER_JOIN,
        LEFT_JOIN
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest$TestingFetcherCollector.class */
    public static final class TestingFetcherCollector extends ListenableCollector<RowData> {
        private static final long serialVersionUID = -312754413938303160L;

        public void collect(RowData rowData) {
            RowData rowData2 = (RowData) getInput();
            getCollectListener().ifPresent(collectListener -> {
                collectListener.onCollect(rowData);
            });
            outputResult(new JoinedRowData(rowData2, rowData));
        }
    }

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/LookupJoinHarnessTest$TestingFetcherFunction.class */
    public static final class TestingFetcherFunction implements FlatMapFunction<RowData, RowData> {
        private static final long serialVersionUID = 4018474964018227081L;
        private static final Map<Integer, List<GenericRowData>> data = new HashMap();

        public void flatMap(RowData rowData, Collector<RowData> collector) throws Exception {
            List<GenericRowData> list = data.get(Integer.valueOf(rowData.getInt(0)));
            if (list != null) {
                Iterator<GenericRowData> it = list.iterator();
                while (it.hasNext()) {
                    collector.collect(it.next());
                }
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((RowData) obj, (Collector<RowData>) collector);
        }

        static {
            data.put(1, Collections.singletonList(GenericRowData.of(new Object[]{1, StringData.fromString("Julian")})));
            data.put(3, Arrays.asList(GenericRowData.of(new Object[]{3, StringData.fromString("Jark")}), GenericRowData.of(new Object[]{3, StringData.fromString("Jackson")})));
            data.put(4, Collections.singletonList(GenericRowData.of(new Object[]{4, StringData.fromString("Fabian")})));
        }
    }

    @Test
    public void testTemporalInnerJoin() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createHarness = createHarness(JoinType.INNER_JOIN, FilterOnTable.WITHOUT_FILTER);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalInnerJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createHarness = createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalLeftJoin() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createHarness = createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITHOUT_FILTER);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    @Test
    public void testTemporalLeftJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> createHarness = createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER);
        createHarness.open();
        createHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
        createHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
        createHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
        createHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
        createHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        arrayList.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        arrayList.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        arrayList.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createHarness.getOutput());
        createHarness.close();
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(JoinType joinType, FilterOnTable filterOnTable) throws Exception {
        boolean z = joinType == JoinType.LEFT_JOIN;
        return new OneInputStreamOperatorTestHarness<>(new ProcessOperator(filterOnTable == FilterOnTable.WITHOUT_FILTER ? new LookupJoinRunner(new GeneratedFunctionWrapper(new TestingFetcherFunction()), new GeneratedCollectorWrapper(new TestingFetcherCollector()), z, 2) : new LookupJoinWithCalcRunner(new GeneratedFunctionWrapper(new TestingFetcherFunction()), new GeneratedFunctionWrapper(new CalculateOnTemporalTable()), new GeneratedCollectorWrapper(new TestingFetcherCollector()), z, 2)), this.inSerializer);
    }
}
