package org.apache.flink.table.runtime.operators.source;

import javax.annotation.Nullable;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/source/InputConversionOperatorTest.class */
public class InputConversionOperatorTest {
    @Test
    public void testInvalidRecords() {
        InputConversionOperator inputConversionOperator = new InputConversionOperator(createConverter(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f", DataTypes.INT())})), false, false, false, true);
        Assertions.assertThatThrownBy(() -> {
            inputConversionOperator.processElement(new StreamRecord(Row.ofKind(RowKind.INSERT, new Object[0])));
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkRuntimeException.class, "Error during input conversion from external DataStream API to internal Table API data structures")});
        Assertions.assertThatThrownBy(() -> {
            inputConversionOperator.processElement(new StreamRecord(Row.ofKind(RowKind.DELETE, new Object[]{12})));
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkRuntimeException.class, "Conversion expects insert-only records")});
    }

    @Test
    public void testInvalidEventTime() {
        InputConversionOperator inputConversionOperator = new InputConversionOperator(createConverter(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f", DataTypes.INT())})), false, true, false, true);
        Assertions.assertThatThrownBy(() -> {
            inputConversionOperator.processElement(new StreamRecord(Row.ofKind(RowKind.INSERT, new Object[]{12})));
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkRuntimeException.class, "Could not find timestamp in DataStream API record.")});
    }

    @Test
    public void testWatermarkSuppression() throws Exception {
        new InputConversionOperator(createConverter(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f", DataTypes.INT())})), false, false, false, true).processWatermark(new Watermark(1000L));
    }

    @Test(expected = NullPointerException.class)
    public void testReceiveMaxWatermark() throws Exception {
        new InputConversionOperator(createConverter(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("f", DataTypes.INT())})), false, false, false, true).processWatermark(Watermark.MAX_WATERMARK);
    }

    private static DynamicTableSource.DataStructureConverter createConverter(DataType dataType) {
        final DataStructureConverter converter = DataStructureConverters.getConverter(dataType);
        return new DynamicTableSource.DataStructureConverter() { // from class: org.apache.flink.table.runtime.operators.source.InputConversionOperatorTest.1
            @Nullable
            public Object toInternal(@Nullable Object obj) {
                return converter.toInternalOrNull(obj);
            }

            public void open(RuntimeConverter.Context context) {
            }
        };
    }
}
