package org.apache.hadoop.hive.llap.io.decode;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.Callable;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonQueueMetrics;
import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer;
import org.apache.hive.common.util.FixedSizedObjectPool;

/* loaded from: input_file:org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.class */
public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedColumnBatch<BatchKey>> implements Consumer<BatchType>, ReadPipeline {
    private volatile boolean isStopped = false;
    private final HashMap<BatchKey, BatchType> pendingData = new HashMap<>();
    private ConsumerFeedback<BatchType> upstreamFeedback;
    private final Consumer<ColumnVectorBatch> downstreamConsumer;
    private Callable<Void> readCallable;
    private final LlapDaemonQueueMetrics queueMetrics;
    private static final int CVB_POOL_SIZE = 128;
    protected final FixedSizedObjectPool<ColumnVectorBatch> cvbPool;
    static final /* synthetic */ boolean $assertionsDisabled;

    public EncodedDataConsumer(Consumer<ColumnVectorBatch> consumer, final int i, LlapDaemonQueueMetrics llapDaemonQueueMetrics) {
        this.downstreamConsumer = consumer;
        this.queueMetrics = llapDaemonQueueMetrics;
        this.cvbPool = new FixedSizedObjectPool<>(CVB_POOL_SIZE, new Pool.PoolObjectHelper<ColumnVectorBatch>() { // from class: org.apache.hadoop.hive.llap.io.decode.EncodedDataConsumer.1
            /* renamed from: create, reason: merged with bridge method [inline-methods] */
            public ColumnVectorBatch m42create() {
                return new ColumnVectorBatch(i);
            }

            public void resetBeforeOffer(ColumnVectorBatch columnVectorBatch) {
            }
        });
    }

    public void init(ConsumerFeedback<BatchType> consumerFeedback, Callable<Void> callable) {
        this.upstreamFeedback = consumerFeedback;
        this.readCallable = callable;
    }

    @Override // org.apache.hadoop.hive.llap.io.decode.ReadPipeline
    public Callable<Void> getReadCallable() {
        return this.readCallable;
    }

    public void consumeData(BatchType batchtype) {
        boolean z;
        BatchType remove;
        boolean z2;
        BatchType batchtype2 = null;
        Integer num = null;
        synchronized (this.pendingData) {
            z = this.isStopped;
            if (!z) {
                batchtype2 = this.pendingData.get(batchtype.getBatchKey());
                if (batchtype2 == null) {
                    batchtype2 = batchtype;
                    this.pendingData.put(batchtype.getBatchKey(), batchtype);
                }
                num = Integer.valueOf(((EncodedColumnBatch) batchtype2).version);
            }
            this.queueMetrics.setQueueSize(this.pendingData.size());
        }
        if (z) {
            returnSourceData(batchtype);
            return;
        }
        if (!$assertionsDisabled && num == null) {
            throw new AssertionError();
        }
        synchronized (batchtype2) {
            if (batchtype2 != batchtype) {
                throw new UnsupportedOperationException("Merging is not supported");
            }
            synchronized (this.pendingData) {
                remove = this.isStopped ? null : this.pendingData.remove(batchtype.getBatchKey());
                z2 = num.intValue() != ((EncodedColumnBatch) remove).version;
            }
        }
        if (z2 && remove != batchtype) {
            returnSourceData(batchtype);
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        decodeBatch(remove, this.downstreamConsumer);
        this.queueMetrics.addProcessingTime(System.currentTimeMillis() - currentTimeMillis);
        returnSourceData(remove);
    }

    private void returnSourceData(BatchType batchtype) {
        ((EncodedColumnBatch) batchtype).version++;
        this.upstreamFeedback.returnData(batchtype);
    }

    protected abstract void decodeBatch(BatchType batchtype, Consumer<ColumnVectorBatch> consumer);

    public void setDone() {
        synchronized (this.pendingData) {
            if (!this.pendingData.isEmpty()) {
                throw new AssertionError("Not all data has been sent downstream: " + this.pendingData.size());
            }
        }
        this.downstreamConsumer.setDone();
    }

    public void setError(Throwable th) {
        this.downstreamConsumer.setError(th);
        dicardPendingData(false);
    }

    @Override // org.apache.hadoop.hive.llap.ConsumerFeedback
    public void returnData(ColumnVectorBatch columnVectorBatch) {
        this.cvbPool.offer(columnVectorBatch);
    }

    private void dicardPendingData(boolean z) {
        ArrayList arrayList = new ArrayList(this.pendingData.size());
        synchronized (this.pendingData) {
            if (z) {
                this.isStopped = true;
            }
            for (BatchType batchtype : this.pendingData.values()) {
                ((EncodedColumnBatch) batchtype).version++;
                arrayList.add(batchtype);
            }
            this.pendingData.clear();
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.upstreamFeedback.returnData((EncodedColumnBatch) it.next());
        }
    }

    @Override // org.apache.hadoop.hive.llap.ConsumerFeedback
    public void stop() {
        this.upstreamFeedback.stop();
        dicardPendingData(true);
    }

    @Override // org.apache.hadoop.hive.llap.ConsumerFeedback
    public void pause() {
        this.upstreamFeedback.pause();
    }

    @Override // org.apache.hadoop.hive.llap.ConsumerFeedback
    public void unpause() {
        this.upstreamFeedback.unpause();
    }

    static {
        $assertionsDisabled = !EncodedDataConsumer.class.desiredAssertionStatus();
    }
}
