package org.apache.flink.table.runtime.operators.dynamicfiltering;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.IntStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArrayComparator;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/dynamicfiltering/DynamicFilteringDataCollectorOperator.class */
public class DynamicFilteringDataCollectorOperator extends AbstractStreamOperator<Object> implements OneInputStreamOperator<RowData, Object> {
    private static final Logger LOG = LoggerFactory.getLogger(DynamicFilteringDataCollectorOperator.class);
    private final RowType dynamicFilteringFieldType;
    private final List<Integer> dynamicFilteringFieldIndices;
    private final long threshold;
    private final OperatorEventGateway operatorEventGateway;
    private transient TypeInformation<RowData> typeInfo;
    private transient TypeSerializer<RowData> serializer;
    private transient Set<byte[]> buffer;
    private transient long currentSize;
    private transient RowData.FieldGetter[] fieldGetters;

    public DynamicFilteringDataCollectorOperator(RowType rowType, List<Integer> list, long j, OperatorEventGateway operatorEventGateway) {
        this.dynamicFilteringFieldType = (RowType) Preconditions.checkNotNull(rowType);
        this.dynamicFilteringFieldIndices = (List) Preconditions.checkNotNull(list);
        this.threshold = j;
        this.operatorEventGateway = (OperatorEventGateway) Preconditions.checkNotNull(operatorEventGateway);
    }

    public void open() throws Exception {
        super.open();
        this.typeInfo = InternalTypeInfo.of(this.dynamicFilteringFieldType);
        this.serializer = this.typeInfo.createSerializer(new ExecutionConfig());
        BytePrimitiveArrayComparator bytePrimitiveArrayComparator = new BytePrimitiveArrayComparator(true);
        bytePrimitiveArrayComparator.getClass();
        this.buffer = new TreeSet(bytePrimitiveArrayComparator::compare);
        this.currentSize = 0L;
        this.fieldGetters = (RowData.FieldGetter[]) IntStream.range(0, this.dynamicFilteringFieldIndices.size()).mapToObj(i -> {
            return RowData.createFieldGetter(this.dynamicFilteringFieldType.getTypeAt(i), this.dynamicFilteringFieldIndices.get(i).intValue());
        }).toArray(i2 -> {
            return new RowData.FieldGetter[i2];
        });
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        if (exceedThreshold()) {
            return;
        }
        RowData rowData = (RowData) streamRecord.getValue();
        GenericRowData genericRowData = new GenericRowData(this.dynamicFilteringFieldIndices.size());
        for (int i = 0; i < this.dynamicFilteringFieldIndices.size(); i++) {
            genericRowData.setField(i, this.fieldGetters[i].getFieldOrNull(rowData));
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            try {
                this.serializer.serialize(genericRowData, new DataOutputViewStreamWrapper(byteArrayOutputStream));
                if (!this.buffer.add(byteArrayOutputStream.toByteArray())) {
                    if (byteArrayOutputStream != null) {
                        if (0 == 0) {
                            byteArrayOutputStream.close();
                            return;
                        }
                        try {
                            byteArrayOutputStream.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                this.currentSize += byteArrayOutputStream.size();
                if (byteArrayOutputStream != null) {
                    if (0 != 0) {
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        byteArrayOutputStream.close();
                    }
                }
                if (exceedThreshold()) {
                    this.buffer.clear();
                    LOG.warn("Collected data size exceeds the threshold, {} > {}, dynamic filtering is disabled.", Long.valueOf(this.currentSize), Long.valueOf(this.threshold));
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (byteArrayOutputStream != null) {
                if (th != null) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
            throw th5;
        }
    }

    private boolean exceedThreshold() {
        return this.threshold > 0 && this.currentSize > this.threshold;
    }

    public void finish() throws Exception {
        if (exceedThreshold()) {
            LOG.info("Finish collecting. {} bytes are collected which exceeds the threshold {}. Sending empty data.", Long.valueOf(this.currentSize), Long.valueOf(this.threshold));
        } else {
            LOG.info("Finish collecting. {} bytes in {} rows are collected. Sending the data.", Long.valueOf(this.currentSize), Integer.valueOf(this.buffer.size()));
        }
        sendEvent();
    }

    private void sendEvent() {
        this.operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(new DynamicFilteringEvent(exceedThreshold() ? new DynamicFilteringData(this.typeInfo, this.dynamicFilteringFieldType, Collections.emptyList(), false) : new DynamicFilteringData(this.typeInfo, this.dynamicFilteringFieldType, new ArrayList(this.buffer), true))));
    }

    public void close() throws Exception {
        super.close();
        if (this.buffer != null) {
            this.buffer.clear();
        }
    }
}
