package org.apache.flink.table.runtime.operators.join.temporal;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/temporal/TemporalRowTimeJoinOperatorTest.class */
public class TemporalRowTimeJoinOperatorTest extends TemporalTimeJoinOperatorTestBase {
    @Test
    public void testRowTimeTemporalJoin() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(1L));
        arrayList.add(new Watermark(2L));
        arrayList.add(StreamRecordUtils.insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
        arrayList.add(new Watermark(5L));
        arrayList.add(StreamRecordUtils.insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
        arrayList.add(new Watermark(8L));
        arrayList.add(new Watermark(9L));
        arrayList.add(StreamRecordUtils.insertRecord(11L, "k2", "5a12", 10L, "k2", "2a6"));
        arrayList.add(new Watermark(13L));
        testRowTimeTemporalJoin(false, arrayList);
    }

    @Test
    public void testRowTimeLeftTemporalJoin() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Watermark(1L));
        arrayList.add(StreamRecordUtils.insertRecord(1L, "k1", "1a1", null, null, null));
        arrayList.add(new Watermark(2L));
        arrayList.add(StreamRecordUtils.insertRecord(1L, "k1", "1a1", null, null, null));
        arrayList.add(StreamRecordUtils.insertRecord(3L, "k1", "1a3", 2L, "k1", "1a2"));
        arrayList.add(new Watermark(5L));
        arrayList.add(StreamRecordUtils.insertRecord(6L, "k2", "2a3", 4L, "k2", "2a4"));
        arrayList.add(new Watermark(8L));
        arrayList.add(StreamRecordUtils.insertRecord(9L, "k2", "5a11", null, null, null));
        arrayList.add(new Watermark(9L));
        arrayList.add(StreamRecordUtils.insertRecord(11L, "k2", "5a12", 10L, "k2", "2a6"));
        arrayList.add(new Watermark(13L));
        testRowTimeTemporalJoin(true, arrayList);
    }

    private void testRowTimeTemporalJoin(boolean z, List<Object> list) throws Exception {
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(new TemporalRowTimeJoinOperator(this.rowType, this.rowType, this.joinCondition, 0, 0, 0L, 0L, z));
        createTestHarness.open();
        createTestHarness.processWatermark1(new Watermark(1L));
        createTestHarness.processWatermark2(new Watermark(1L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1", "1a1"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(2L, "k1", "1a2"));
        createTestHarness.processWatermark1(new Watermark(2L));
        createTestHarness.processWatermark2(new Watermark(2L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(1L, "k1", "1a1"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(3L, "k1", "1a3"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(4L, "k2", "2a4"));
        createTestHarness.processWatermark1(new Watermark(5L));
        createTestHarness.processWatermark2(new Watermark(5L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(6L, "k2", "2a3"));
        createTestHarness.processElement2(StreamRecordUtils.updateBeforeRecord(7L, "k2", "2a4"));
        createTestHarness.processElement2(StreamRecordUtils.updateAfterRecord(7L, "k2", "2a5"));
        createTestHarness.processWatermark1(new Watermark(8L));
        createTestHarness.processWatermark2(new Watermark(9L));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(9L, "k2", "5a11"));
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(11L, "k2", "5a12"));
        createTestHarness.processElement2(StreamRecordUtils.deleteRecord(9L, "k2", "2a5"));
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(10L, "k2", "2a6"));
        createTestHarness.processWatermark1(new Watermark(13L));
        createTestHarness.processWatermark2(new Watermark(13L));
        this.assertor.assertOutputEquals("output wrong.", list, createTestHarness.getOutput());
        createTestHarness.close();
    }

    @Test
    public void testRowTimeTemporalJoinWithStateRetention() throws Exception {
        TemporalRowTimeJoinOperator temporalRowTimeJoinOperator = new TemporalRowTimeJoinOperator(this.rowType, this.rowType, this.joinCondition, 0, 0, 4L, 6L, true);
        KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness = createTestHarness(temporalRowTimeJoinOperator);
        createTestHarness.open();
        createTestHarness.setProcessingTime(3L);
        createTestHarness.processElement2(StreamRecordUtils.insertRecord(3L, "k1", "0a3"));
        createTestHarness.setProcessingTime(6L);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(6L, "k1", "0a6"));
        createTestHarness.processWatermark1(new Watermark(7L));
        createTestHarness.processWatermark2(new Watermark(7L));
        createTestHarness.processElement2(StreamRecordUtils.updateBeforeRecord(3L, "k1", "0a3"));
        createTestHarness.processElement2(StreamRecordUtils.updateAfterRecord(3L, "k1", "0a5"));
        createTestHarness.setProcessingTime(9L);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(9L, "k1", "7a9"));
        createTestHarness.processWatermark1(new Watermark(13L));
        createTestHarness.processWatermark2(new Watermark(13L));
        createTestHarness.setProcessingTime(15L);
        createTestHarness.processElement1(StreamRecordUtils.insertRecord(15L, "k1", "13a15"));
        createTestHarness.processWatermark1(new Watermark(15L));
        createTestHarness.processWatermark2(new Watermark(16L));
        ArrayList arrayList = new ArrayList();
        arrayList.add(StreamRecordUtils.insertRecord(6L, "k1", "0a6", 3L, "k1", "0a3"));
        arrayList.add(new Watermark(7L));
        arrayList.add(StreamRecordUtils.insertRecord(9L, "k1", "7a9", 3L, "k1", "0a5"));
        arrayList.add(new Watermark(13L));
        arrayList.add(StreamRecordUtils.insertRecord(15L, "k1", "13a15", null, null, null));
        arrayList.add(new Watermark(15L));
        this.assertor.assertOutputEquals("output wrong.", arrayList, createTestHarness.getOutput());
        Assertions.assertThat((Long) temporalRowTimeJoinOperator.getKeyedStateStore().getState(new ValueStateDescriptor(TemporalRowTimeJoinOperator.getNextLeftIndexStateName(), Types.LONG)).value()).isNull();
        Assertions.assertThat((Long) temporalRowTimeJoinOperator.getKeyedStateStore().getState(new ValueStateDescriptor(TemporalRowTimeJoinOperator.getRegisteredTimerStateName(), Types.LONG)).value()).isNull();
        createTestHarness.close();
    }

    private KeyedTwoInputStreamOperatorTestHarness<RowData, RowData, RowData, RowData> createTestHarness(TemporalRowTimeJoinOperator temporalRowTimeJoinOperator) throws Exception {
        return new KeyedTwoInputStreamOperatorTestHarness<>(temporalRowTimeJoinOperator, this.keySelector, this.keySelector, this.keyType);
    }
}
