package org.apache.flink.table.runtime.operators.sink;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest.class */
public class SinkUpsertMaterializerTest {
    private final StateTtlConfig ttlConfig = StateConfigUtil.createTtlConfig(1000);
    private final LogicalType[] types = {new IntType(), new VarCharType()};
    private final RowDataSerializer serializer = new RowDataSerializer(this.types);
    private final RowDataKeySelector keySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[0], this.types);
    private final GeneratedRecordEqualiser equaliser = new GeneratedRecordEqualiser("", "", new Object[0]) { // from class: org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializerTest.1
        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public RecordEqualiser m59newInstance(ClassLoader classLoader) {
            return new TestRecordEqualiser();
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/table/runtime/operators/sink/SinkUpsertMaterializerTest$TestRecordEqualiser.class */
    public static class TestRecordEqualiser implements RecordEqualiser {
        private TestRecordEqualiser() {
        }

        public boolean equals(RowData rowData, RowData rowData2) {
            return rowData.getRowKind() == rowData2.getRowKind() && rowData.getInt(0) == rowData2.getInt(0) && rowData.getString(1).equals(rowData2.getString(1));
        }
    }

    @Test
    public void test() throws Exception {
        KeyedOneInputStreamOperatorTestHarness keyedOneInputStreamOperatorTestHarness = new KeyedOneInputStreamOperatorTestHarness(new SinkUpsertMaterializer(this.ttlConfig, this.serializer, this.equaliser), this.keySelector, this.keySelector.getProducedType());
        keyedOneInputStreamOperatorTestHarness.open();
        keyedOneInputStreamOperatorTestHarness.setStateTtlProcessingTime(1L);
        keyedOneInputStreamOperatorTestHarness.processElement(StreamRecordUtils.insertRecord(1, "a1"));
        shouldEmit(keyedOneInputStreamOperatorTestHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 1, "a1"));
        keyedOneInputStreamOperatorTestHarness.processElement(StreamRecordUtils.insertRecord(1, "a2"));
        shouldEmit(keyedOneInputStreamOperatorTestHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1, "a2"));
        keyedOneInputStreamOperatorTestHarness.processElement(StreamRecordUtils.insertRecord(1, "a3"));
        shouldEmit(keyedOneInputStreamOperatorTestHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1, "a3"));
        keyedOneInputStreamOperatorTestHarness.processElement(StreamRecordUtils.deleteRecord(1, "a2"));
        shouldEmitNothing(keyedOneInputStreamOperatorTestHarness);
        keyedOneInputStreamOperatorTestHarness.processElement(StreamRecordUtils.deleteRecord(1, "a3"));
        shouldEmit(keyedOneInputStreamOperatorTestHarness, StreamRecordUtils.rowOfKind(RowKind.UPDATE_AFTER, 1, "a1"));
        keyedOneInputStreamOperatorTestHarness.processElement(StreamRecordUtils.deleteRecord(1, "a1"));
        shouldEmit(keyedOneInputStreamOperatorTestHarness, StreamRecordUtils.rowOfKind(RowKind.DELETE, 1, "a1"));
        keyedOneInputStreamOperatorTestHarness.processElement(StreamRecordUtils.insertRecord(1, "a4"));
        shouldEmit(keyedOneInputStreamOperatorTestHarness, StreamRecordUtils.rowOfKind(RowKind.INSERT, 1, "a4"));
        keyedOneInputStreamOperatorTestHarness.setStateTtlProcessingTime(1002L);
        keyedOneInputStreamOperatorTestHarness.processElement(StreamRecordUtils.deleteRecord(1, "a4"));
        shouldEmitNothing(keyedOneInputStreamOperatorTestHarness);
        keyedOneInputStreamOperatorTestHarness.close();
    }

    private void shouldEmitNothing(OneInputStreamOperatorTestHarness<RowData, RowData> oneInputStreamOperatorTestHarness) {
        Assertions.assertThat(getEmittedRows(oneInputStreamOperatorTestHarness)).isEmpty();
    }

    private void shouldEmit(OneInputStreamOperatorTestHarness<RowData, RowData> oneInputStreamOperatorTestHarness, RowData rowData) {
        Assertions.assertThat(getEmittedRows(oneInputStreamOperatorTestHarness)).containsExactly(new RowData[]{rowData});
    }

    private static List<RowData> getEmittedRows(OneInputStreamOperatorTestHarness<RowData, RowData> oneInputStreamOperatorTestHarness) {
        ArrayList arrayList = new ArrayList();
        while (true) {
            Object poll = oneInputStreamOperatorTestHarness.getOutput().poll();
            if (poll == null) {
                return arrayList;
            }
            RowData rowData = (RowData) ((StreamRecord) poll).getValue();
            GenericRowData of = GenericRowData.of(new Object[]{Integer.valueOf(rowData.getInt(0)), rowData.getString(1)});
            of.setRowKind(rowData.getRowKind());
            arrayList.add(of);
        }
    }
}
