package org.apache.flink.table.runtime.operators.python.aggregate;

import java.time.ZoneId;
import java.util.ArrayList;
import org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.UpdatableRowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.functions.python.PythonAggregateFunctionInfo;
import org.apache.flink.table.runtime.dataview.DataViewSpec;
import org.apache.flink.table.runtime.groupwindow.NamedWindowProperty;
import org.apache.flink.table.runtime.groupwindow.ProctimeAttribute;
import org.apache.flink.table.runtime.groupwindow.RowtimeAttribute;
import org.apache.flink.table.runtime.groupwindow.WindowEnd;
import org.apache.flink.table.runtime.groupwindow.WindowProperty;
import org.apache.flink.table.runtime.groupwindow.WindowStart;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.table.runtime.operators.window.assigners.WindowAssigner;
import org.apache.flink.table.runtime.typeutils.serializers.python.RowDataSerializer;
import org.apache.flink.table.runtime.util.TimeWindowUtil;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TinyIntType;

@Internal
/* loaded from: input_file:org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.class */
public class PythonStreamGroupWindowAggregateOperator<K, W extends Window> extends AbstractPythonStreamAggregateOperator implements Triggerable<K, W> {
    private static final long serialVersionUID = 1;

    @VisibleForTesting
    static final String STREAM_GROUP_WINDOW_AGGREGATE_URN = "flink:transform:stream_group_window_aggregate:v1";

    @VisibleForTesting
    static final byte REGISTER_EVENT_TIMER = 0;

    @VisibleForTesting
    static final byte REGISTER_PROCESSING_TIMER = 1;

    @VisibleForTesting
    static final byte DELETE_EVENT_TIMER = 2;

    @VisibleForTesting
    static final byte DELETE_PROCESSING_TIMER = 3;
    private final boolean countStarInserted;

    @VisibleForTesting
    final int inputTimeFieldIndex;

    @VisibleForTesting
    final long allowedLateness;

    @VisibleForTesting
    final ZoneId shiftTimeZone;
    private FlinkFnApi.GroupWindow.WindowProperty[] namedProperties;

    @VisibleForTesting
    final WindowAssigner<W> windowAssigner;
    private final FlinkFnApi.GroupWindow.WindowType windowType;
    private final boolean isRowTime;
    private final boolean isTimeWindow;
    private final long size;
    private final long slide;
    private final long gap;

    @VisibleForTesting
    transient TypeSerializer<W> windowSerializer;
    private transient InternalTimerService<W> internalTimerService;
    private transient UpdatableRowData reuseTimerData;
    private transient int timerDataLength;
    private transient int keyLength;
    private transient UpdatableRowData reuseRowData;
    private transient UpdatableRowData reuseTimerRowData;
    private transient RowDataSerializer keySerializer;

    protected PythonStreamGroupWindowAggregateOperator(Configuration configuration, RowType rowType, RowType rowType2, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, DataViewSpec[][] dataViewSpecArr, int[] iArr, int i, boolean z, boolean z2, int i2, WindowAssigner<W> windowAssigner, FlinkFnApi.GroupWindow.WindowType windowType, boolean z3, boolean z4, long j, long j2, long j3, long j4, NamedWindowProperty[] namedWindowPropertyArr, ZoneId zoneId) {
        super(configuration, rowType, rowType2, pythonAggregateFunctionInfoArr, dataViewSpecArr, iArr, i, z);
        this.countStarInserted = z2;
        this.inputTimeFieldIndex = i2;
        this.windowAssigner = windowAssigner;
        this.windowType = windowType;
        this.isRowTime = z3;
        this.isTimeWindow = z4;
        this.size = j;
        this.slide = j2;
        this.gap = j3;
        this.allowedLateness = j4;
        this.shiftTimeZone = zoneId;
        this.namedProperties = new FlinkFnApi.GroupWindow.WindowProperty[namedWindowPropertyArr.length];
        for (int i3 = 0; i3 < namedWindowPropertyArr.length; i3++) {
            WindowProperty property = namedWindowPropertyArr[i3].getProperty();
            if (property instanceof WindowStart) {
                this.namedProperties[i3] = FlinkFnApi.GroupWindow.WindowProperty.WINDOW_START;
            } else if (property instanceof WindowEnd) {
                this.namedProperties[i3] = FlinkFnApi.GroupWindow.WindowProperty.WINDOW_END;
            } else if (property instanceof RowtimeAttribute) {
                this.namedProperties[i3] = FlinkFnApi.GroupWindow.WindowProperty.ROW_TIME_ATTRIBUTE;
            } else {
                if (!(property instanceof ProctimeAttribute)) {
                    throw new RuntimeException("Unexpected property " + property);
                }
                this.namedProperties[i3] = FlinkFnApi.GroupWindow.WindowProperty.PROC_TIME_ATTRIBUTE;
            }
        }
    }

    public static <K, W extends Window> PythonStreamGroupWindowAggregateOperator<K, W> createTumblingGroupWindowAggregateOperator(Configuration configuration, RowType rowType, RowType rowType2, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, DataViewSpec[][] dataViewSpecArr, int[] iArr, int i, boolean z, boolean z2, int i2, WindowAssigner<W> windowAssigner, boolean z3, boolean z4, long j, long j2, NamedWindowProperty[] namedWindowPropertyArr, ZoneId zoneId) {
        return new PythonStreamGroupWindowAggregateOperator<>(configuration, rowType, rowType2, pythonAggregateFunctionInfoArr, dataViewSpecArr, iArr, i, z, z2, i2, windowAssigner, FlinkFnApi.GroupWindow.WindowType.TUMBLING_GROUP_WINDOW, z3, z4, j, 0L, 0L, j2, namedWindowPropertyArr, zoneId);
    }

    public static <K, W extends Window> PythonStreamGroupWindowAggregateOperator<K, W> createSlidingGroupWindowAggregateOperator(Configuration configuration, RowType rowType, RowType rowType2, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, DataViewSpec[][] dataViewSpecArr, int[] iArr, int i, boolean z, boolean z2, int i2, WindowAssigner<W> windowAssigner, boolean z3, boolean z4, long j, long j2, long j3, NamedWindowProperty[] namedWindowPropertyArr, ZoneId zoneId) {
        return new PythonStreamGroupWindowAggregateOperator<>(configuration, rowType, rowType2, pythonAggregateFunctionInfoArr, dataViewSpecArr, iArr, i, z, z2, i2, windowAssigner, FlinkFnApi.GroupWindow.WindowType.SLIDING_GROUP_WINDOW, z3, z4, j, j2, 0L, j3, namedWindowPropertyArr, zoneId);
    }

    public static <K, W extends Window> PythonStreamGroupWindowAggregateOperator<K, W> createSessionGroupWindowAggregateOperator(Configuration configuration, RowType rowType, RowType rowType2, PythonAggregateFunctionInfo[] pythonAggregateFunctionInfoArr, DataViewSpec[][] dataViewSpecArr, int[] iArr, int i, boolean z, boolean z2, int i2, WindowAssigner<W> windowAssigner, boolean z3, long j, long j2, NamedWindowProperty[] namedWindowPropertyArr, ZoneId zoneId) {
        return new PythonStreamGroupWindowAggregateOperator<>(configuration, rowType, rowType2, pythonAggregateFunctionInfoArr, dataViewSpecArr, iArr, i, z, z2, i2, windowAssigner, FlinkFnApi.GroupWindow.WindowType.SESSION_GROUP_WINDOW, z3, true, 0L, 0L, j, j2, namedWindowPropertyArr, zoneId);
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator, org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.windowSerializer = this.windowAssigner.getWindowSerializer(new ExecutionConfig());
        this.internalTimerService = getInternalTimerService("window-timers", this.windowSerializer, this);
        this.reuseRowData = new UpdatableRowData(GenericRowData.of(new Object[]{(byte) 0, null, null, null, null}), 5);
        this.reuseTimerRowData = new UpdatableRowData(GenericRowData.of(new Object[]{(byte) 1, null, null, null, null}), 5);
        this.reuseTimerData = new UpdatableRowData(GenericRowData.of(new Object[]{0, null, 0}), 3);
        this.reuseTimerRowData.setField(4, this.reuseTimerData);
        this.keyLength = getKeyType().getFieldCount();
        this.keySerializer = getKeySerializer();
        super.open();
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public void processElementInternal(RowData rowData) throws Exception {
        this.reuseRowData.setField(1, rowData);
        this.reuseRowData.setLong(3, this.internalTimerService.currentWatermark());
        this.udfInputTypeSerializer.serialize(this.reuseRowData, this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
    }

    @Override // org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator
    public void emitResult(Tuple3<String, byte[], Integer> tuple3) throws Exception {
        this.bais.setBuffer((byte[]) tuple3.f1, 0, ((Integer) tuple3.f2).intValue());
        RowData rowData = (RowData) this.udfOutputTypeSerializer.deserialize(this.baisWrapper);
        if (rowData.getByte(0) == 0) {
            RowData rowData2 = (GenericRowData) rowData.getRow(1, this.outputType.getFieldCount());
            int fieldCount = this.outputType.getFieldCount();
            for (int length = fieldCount - this.namedProperties.length; length < fieldCount; length++) {
                FlinkFnApi.GroupWindow.WindowProperty windowProperty = this.namedProperties[length - (fieldCount - this.namedProperties.length)];
                if (windowProperty == FlinkFnApi.GroupWindow.WindowProperty.WINDOW_START || windowProperty == FlinkFnApi.GroupWindow.WindowProperty.WINDOW_END) {
                    rowData2.setField(length, TimestampData.fromEpochMillis(rowData2.getLong(length)));
                } else {
                    rowData2.setField(length, TimestampData.fromEpochMillis(getShiftEpochMills(rowData2.getLong(length))));
                }
            }
            this.rowDataWrapper.collect(rowData2);
            return;
        }
        RowData row = rowData.getRow(2, this.timerDataLength);
        byte b = row.getByte(0);
        RowData row2 = row.getRow(1, this.keyLength);
        long j = row.getLong(2);
        byte[] binary = row.getBinary(3);
        this.bais.setBuffer(binary, 0, binary.length);
        Window window = (Window) this.windowSerializer.deserialize(this.baisWrapper);
        BinaryRowData copy = this.keySerializer.toBinaryRow(row2).copy();
        synchronized (getKeyedStateBackend()) {
            setCurrentKey(copy);
            if (b == 0) {
                this.internalTimerService.registerEventTimeTimer(window, TimeWindowUtil.toEpochMillsForTimer(j, this.shiftTimeZone));
            } else if (b == 1) {
                this.internalTimerService.registerProcessingTimeTimer(window, TimeWindowUtil.toEpochMillsForTimer(j, this.shiftTimeZone));
            } else if (b == 2) {
                this.internalTimerService.deleteEventTimeTimer(window, TimeWindowUtil.toEpochMillsForTimer(j, this.shiftTimeZone));
            } else {
                if (b != 3) {
                    throw new RuntimeException(String.format("Unsupported timerOperandType %s.", Byte.valueOf(b)));
                }
                this.internalTimerService.deleteProcessingTimeTimer(window, TimeWindowUtil.toEpochMillsForTimer(j, this.shiftTimeZone));
            }
        }
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public String getFunctionUrn() {
        return STREAM_GROUP_WINDOW_AGGREGATE_URN;
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public RowType createUserDefinedFunctionInputType() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new RowType.RowField("record_type", new TinyIntType()));
        arrayList.add(new RowType.RowField("row_data", this.inputType));
        arrayList.add(new RowType.RowField(ByteBuddyDoFnInvokerFactory.TIMESTAMP_PARAMETER_METHOD, new BigIntType()));
        arrayList.add(new RowType.RowField("watermark", new BigIntType()));
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new RowType.RowField("timer_type", new TinyIntType()));
        arrayList2.add(new RowType.RowField("key", getKeyType()));
        arrayList2.add(new RowType.RowField("encoded_namespace", new BinaryType()));
        arrayList.add(new RowType.RowField("timer", new RowType(arrayList2)));
        return new RowType(arrayList);
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public RowType createUserDefinedFunctionOutputType() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new RowType.RowField("record_type", new TinyIntType()));
        ArrayList arrayList2 = new ArrayList(this.outputType.getFields().subList(0, this.outputType.getFieldCount() - this.namedProperties.length));
        for (int i = 0; i < this.namedProperties.length; i++) {
            arrayList2.add(new RowType.RowField("w" + i, new BigIntType()));
        }
        arrayList.add(new RowType.RowField("row_data", new RowType(arrayList2)));
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(new RowType.RowField("timer_operand_type", new TinyIntType()));
        arrayList3.add(new RowType.RowField("key", getKeyType()));
        arrayList3.add(new RowType.RowField(ByteBuddyDoFnInvokerFactory.TIMESTAMP_PARAMETER_METHOD, new BigIntType()));
        arrayList3.add(new RowType.RowField("encoded_namespace", new BinaryType()));
        this.timerDataLength = arrayList3.size();
        arrayList.add(new RowType.RowField("timer", new RowType(arrayList3)));
        return new RowType(arrayList);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public FlinkFnApi.UserDefinedAggregateFunctions getUserDefinedFunctionsProto() {
        FlinkFnApi.UserDefinedAggregateFunctions.Builder builder = super.getUserDefinedFunctionsProto().toBuilder();
        builder.setCountStarInserted(this.countStarInserted);
        FlinkFnApi.GroupWindow.Builder newBuilder = FlinkFnApi.GroupWindow.newBuilder();
        newBuilder.setWindowType(this.windowType);
        newBuilder.setIsTimeWindow(this.isTimeWindow);
        newBuilder.setIsRowTime(this.isRowTime);
        newBuilder.setTimeFieldIndex(this.inputTimeFieldIndex);
        newBuilder.setWindowSize(this.size);
        newBuilder.setWindowSlide(this.slide);
        newBuilder.setWindowGap(this.gap);
        newBuilder.setAllowedLateness(this.allowedLateness);
        for (FlinkFnApi.GroupWindow.WindowProperty windowProperty : this.namedProperties) {
            newBuilder.addNamedProperties(windowProperty);
        }
        newBuilder.setShiftTimezone(this.shiftTimeZone.getId());
        builder.setGroupWindow(newBuilder);
        return builder.build();
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public TypeSerializer<W> getWindowSerializer() {
        return this.windowSerializer;
    }

    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        emitTriggerTimerData(internalTimer, (byte) 0);
    }

    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        emitTriggerTimerData(internalTimer, (byte) 1);
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public FlinkFnApi.CoderInfoDescriptor createInputCoderInfoDescriptor(RowType rowType) {
        return ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false);
    }

    @Override // org.apache.flink.table.runtime.operators.python.aggregate.AbstractPythonStreamAggregateOperator
    public FlinkFnApi.CoderInfoDescriptor createOutputCoderInfoDescriptor(RowType rowType) {
        return ProtoUtils.createFlattenRowTypeCoderInfoDescriptorProto(rowType, FlinkFnApi.CoderInfoDescriptor.Mode.MULTIPLE, false);
    }

    @VisibleForTesting
    long getShiftEpochMills(long j) {
        return TimeWindowUtil.toEpochMills(j, this.shiftTimeZone);
    }

    private void emitTriggerTimerData(InternalTimer<K, W> internalTimer, byte b) throws Exception {
        this.reuseTimerData.setByte(0, b);
        this.reuseTimerData.setField(1, internalTimer.getKey());
        this.windowSerializer.serialize((Window) internalTimer.getNamespace(), this.baosWrapper);
        this.reuseTimerData.setField(2, this.baos.toByteArray());
        this.baos.reset();
        this.reuseTimerRowData.setLong(2, TimeWindowUtil.toUtcTimestampMills(internalTimer.getTimestamp(), this.shiftTimeZone));
        this.udfInputTypeSerializer.serialize(this.reuseTimerRowData, this.baosWrapper);
        this.pythonFunctionRunner.process(this.baos.toByteArray());
        this.baos.reset();
        this.elementCount++;
        checkInvokeFinishBundleByCount();
        emitResults();
    }
}
