package com.datatorrent.lib.io.block;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.counters.BasicCounters;
import com.datatorrent.lib.io.block.BlockMetadata;
import com.datatorrent.lib.io.block.ReaderContext;
import com.datatorrent.lib.util.KryoCloneUtils;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.fs.PositionedReadable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@StatsListener.DataQueueSize
/* loaded from: input_file:com/datatorrent/lib/io/block/AbstractBlockReader.class */
public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM extends InputStream & PositionedReadable> extends BaseOperator implements Partitioner<AbstractBlockReader<R, B, STREAM>>, StatsListener, Operator.IdleTimeHandler {
    protected int operatorId;
    protected transient long windowId;

    @NotNull
    protected ReaderContext<STREAM> readerContext;
    protected transient STREAM stream;
    protected transient int blocksPerWindow;
    protected transient Context.OperatorContext context;
    protected transient long sleepTimeMillis;
    protected Set<Integer> partitionKeys;
    protected int partitionMask;
    private transient long nextMillis;
    protected transient B lastProcessedBlock;
    protected transient boolean consecutiveBlock;

    @AutoMetric
    private long bytesRead;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractBlockReader.class);
    public final transient DefaultOutputPort<B> blocksMetadataOutput = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<ReaderRecord<R>> messages = new DefaultOutputPort<>();
    public final transient DefaultInputPort<B> blocksMetadataInput = (DefaultInputPort<B>) new DefaultInputPort<B>() { // from class: com.datatorrent.lib.io.block.AbstractBlockReader.1
        public void process(B b) {
            AbstractBlockReader.this.processBlockMetadata(b);
        }
    };
    protected int maxReaders = 16;
    protected int minReaders = 1;
    protected long intervalMillis = 120000;
    protected final transient StatsListener.Response response = new StatsListener.Response();
    protected final transient Map<Integer, Integer> backlogPerOperator = Maps.newHashMap();
    protected transient int partitionCount = 1;
    protected final BasicCounters<MutableLong> counters = new BasicCounters<>(MutableLong.class);
    private boolean collectStats = true;
    protected transient long lastBlockOpenTime = -1;

    /* loaded from: input_file:com/datatorrent/lib/io/block/AbstractBlockReader$ReaderCounterKeys.class */
    public enum ReaderCounterKeys {
        RECORDS,
        BLOCKS,
        BYTES,
        TIME
    }

    /* loaded from: input_file:com/datatorrent/lib/io/block/AbstractBlockReader$ReaderRecord.class */
    public static class ReaderRecord<R> {
        private final long blockId;
        private final R record;

        private ReaderRecord() {
            this.blockId = -1L;
            this.record = null;
        }

        public ReaderRecord(long j, R r) {
            this.blockId = j;
            this.record = r;
        }

        public long getBlockId() {
            return this.blockId;
        }

        public R getRecord() {
            return this.record;
        }
    }

    @Override // 
    public void setup(Context.OperatorContext operatorContext) {
        this.operatorId = operatorContext.getId();
        LOG.debug("{}: partition keys {} mask {}", new Object[]{Integer.valueOf(this.operatorId), this.partitionKeys, Integer.valueOf(this.partitionMask)});
        this.context = operatorContext;
        this.counters.setCounter(ReaderCounterKeys.BLOCKS, new MutableLong());
        this.counters.setCounter(ReaderCounterKeys.RECORDS, new MutableLong());
        this.counters.setCounter(ReaderCounterKeys.BYTES, new MutableLong());
        this.counters.setCounter(ReaderCounterKeys.TIME, new MutableLong());
        this.sleepTimeMillis = ((Integer) operatorContext.getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
    }

    public void beginWindow(long j) {
        this.windowId = j;
        this.blocksPerWindow = 0;
        this.bytesRead = 0L;
    }

    public void handleIdleTime() {
        if (this.lastProcessedBlock == null || System.currentTimeMillis() - this.lastBlockOpenTime <= this.intervalMillis) {
            try {
                Thread.sleep(this.sleepTimeMillis);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } else {
            try {
                teardownStream(this.lastProcessedBlock);
                this.lastProcessedBlock = null;
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    public void endWindow() {
        this.counters.getCounter(ReaderCounterKeys.BLOCKS).add(this.blocksPerWindow);
        this.context.setCounters(this.counters);
    }

    protected void processBlockMetadata(B b) {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (b.getPreviousBlockId() == -1 || this.lastProcessedBlock == null || b.getPreviousBlockId() != this.lastProcessedBlock.getBlockId()) {
                teardownStream(this.lastProcessedBlock);
                this.consecutiveBlock = false;
                this.lastBlockOpenTime = System.currentTimeMillis();
                this.stream = setupStream(b);
            } else {
                this.consecutiveBlock = true;
            }
            readBlock(b);
            this.lastProcessedBlock = b;
            this.counters.getCounter(ReaderCounterKeys.TIME).add(System.currentTimeMillis() - currentTimeMillis);
            if (this.blocksMetadataOutput.isConnected()) {
                this.blocksMetadataOutput.emit(b);
            }
            this.blocksPerWindow++;
        } catch (IOException e) {
            try {
                if (this.lastProcessedBlock != null) {
                    teardownStream(this.lastProcessedBlock);
                    this.lastProcessedBlock = null;
                }
                throw new RuntimeException(e);
            } catch (IOException e2) {
                throw new RuntimeException("closing last", e);
            }
        }
    }

    protected void readBlock(BlockMetadata blockMetadata) throws IOException {
        this.readerContext.initialize(this.stream, blockMetadata, this.consecutiveBlock);
        while (true) {
            ReaderContext.Entity next = this.readerContext.next();
            if (next == null) {
                return;
            }
            this.counters.getCounter(ReaderCounterKeys.BYTES).add(next.getUsedBytes());
            this.bytesRead += next.getUsedBytes();
            R convertToRecord = convertToRecord(next.getRecord());
            if (convertToRecord != null) {
                this.counters.getCounter(ReaderCounterKeys.RECORDS).increment();
                this.messages.emit(new ReaderRecord(blockMetadata.getBlockId(), convertToRecord));
            }
        }
    }

    public Collection<Partitioner.Partition<AbstractBlockReader<R, B, STREAM>>> definePartitions(Collection<Partitioner.Partition<AbstractBlockReader<R, B, STREAM>>> collection, Partitioner.PartitioningContext partitioningContext) {
        if (collection.iterator().next().getStats() == null) {
            return collection;
        }
        ArrayList<Partitioner.Partition> newArrayList = Lists.newArrayList();
        Iterator<Partitioner.Partition<AbstractBlockReader<R, B, STREAM>>> it = collection.iterator();
        while (it.hasNext()) {
            newArrayList.add(new DefaultPartition(it.next().getPartitionedInstance()));
        }
        collection.clear();
        int size = this.partitionCount - newArrayList.size();
        ArrayList newArrayList2 = Lists.newArrayList();
        if (size >= 0) {
            KryoCloneUtils createCloneUtils = KryoCloneUtils.createCloneUtils(this);
            while (true) {
                int i = size;
                size--;
                if (i <= 0) {
                    break;
                }
                newArrayList.add(new DefaultPartition(createCloneUtils.getClone()));
            }
        } else {
            Iterator it2 = newArrayList.iterator();
            while (true) {
                int i2 = size;
                size++;
                if (i2 >= 0) {
                    break;
                }
                Partitioner.Partition partition = (Partitioner.Partition) it2.next();
                newArrayList2.add(((AbstractBlockReader) partition.getPartitionedInstance()).counters);
                LOG.debug("partition removed {}", Integer.valueOf(((AbstractBlockReader) partition.getPartitionedInstance()).operatorId));
                it2.remove();
            }
        }
        DefaultPartition.assignPartitionKeys(Collections.unmodifiableCollection(newArrayList), this.blocksMetadataInput);
        int i3 = ((Partitioner.PartitionKeys) ((Partitioner.Partition) newArrayList.iterator().next()).getPartitionKeys().get(this.blocksMetadataInput)).mask;
        for (Partitioner.Partition partition2 : newArrayList) {
            AbstractBlockReader abstractBlockReader = (AbstractBlockReader) partition2.getPartitionedInstance();
            abstractBlockReader.partitionKeys = ((Partitioner.PartitionKeys) partition2.getPartitionKeys().get(this.blocksMetadataInput)).partitions;
            abstractBlockReader.partitionMask = i3;
            LOG.debug("partitions {},{}", abstractBlockReader.partitionKeys, Integer.valueOf(abstractBlockReader.partitionMask));
        }
        AbstractBlockReader abstractBlockReader2 = (AbstractBlockReader) ((Partitioner.Partition) newArrayList.iterator().next()).getPartitionedInstance();
        Iterator it3 = newArrayList2.iterator();
        while (it3.hasNext()) {
            addCounters(abstractBlockReader2.counters, (BasicCounters) it3.next());
        }
        return newArrayList;
    }

    protected void addCounters(BasicCounters<MutableLong> basicCounters, BasicCounters<MutableLong> basicCounters2) {
        for (ReaderCounterKeys readerCounterKeys : ReaderCounterKeys.values()) {
            MutableLong counter = basicCounters.getCounter(readerCounterKeys);
            if (counter == null) {
                counter = new MutableLong();
                basicCounters.setCounter(readerCounterKeys, counter);
            }
            MutableLong counter2 = basicCounters2.getCounter(readerCounterKeys);
            if (counter2 != null) {
                counter.add(counter2.longValue());
            }
        }
    }

    public void partitioned(Map<Integer, Partitioner.Partition<AbstractBlockReader<R, B, STREAM>>> map) {
    }

    public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
        int adjustedCount;
        this.response.repartitionRequired = false;
        if (!this.collectStats) {
            return this.response;
        }
        List lastWindowedStats = batchedOperatorStats.getLastWindowedStats();
        if (lastWindowedStats != null && lastWindowedStats.size() > 0) {
            Stats.OperatorStats operatorStats = (Stats.OperatorStats) lastWindowedStats.get(lastWindowedStats.size() - 1);
            if (operatorStats.inputPorts.size() > 0) {
                this.backlogPerOperator.put(Integer.valueOf(batchedOperatorStats.getOperatorId()), Integer.valueOf(((Stats.OperatorStats.PortStats) operatorStats.inputPorts.get(0)).queueSize));
            }
        }
        if (System.currentTimeMillis() < this.nextMillis) {
            return this.response;
        }
        this.nextMillis = System.currentTimeMillis() + this.intervalMillis;
        LOG.debug("Proposed NextMillis = {}", Long.valueOf(this.nextMillis));
        long j = 0;
        while (this.backlogPerOperator.entrySet().iterator().hasNext()) {
            j += r0.next().getValue().intValue();
        }
        LOG.debug("backlog {} partitionCount {}", Long.valueOf(j), Integer.valueOf(this.partitionCount));
        this.backlogPerOperator.clear();
        if (j == this.partitionCount) {
            return this.response;
        }
        if (j > this.maxReaders) {
            LOG.debug("large backlog {}", Long.valueOf(j));
            adjustedCount = this.maxReaders;
        } else if (j < this.minReaders) {
            LOG.debug("small backlog {}", Long.valueOf(j));
            adjustedCount = this.minReaders;
        } else {
            adjustedCount = getAdjustedCount(j);
            LOG.debug("moderate backlog {}", Long.valueOf(j));
        }
        LOG.debug("backlog {} newPartitionCount {} partitionCount {}", new Object[]{Long.valueOf(j), Integer.valueOf(adjustedCount), Integer.valueOf(this.partitionCount)});
        if (adjustedCount == this.partitionCount) {
            return this.response;
        }
        this.partitionCount = adjustedCount;
        this.response.repartitionRequired = true;
        LOG.debug("partition required", Long.valueOf(j), Integer.valueOf(this.partitionCount));
        return this.response;
    }

    protected int getAdjustedCount(long j) {
        int i;
        int i2 = 1;
        while (true) {
            i = i2;
            if (i >= j) {
                break;
            }
            i2 = i << 1;
        }
        if (i > j) {
            i >>>= 1;
        }
        LOG.debug("adjust {} => {}", Long.valueOf(j), Integer.valueOf(i));
        return i;
    }

    protected abstract STREAM setupStream(B b) throws IOException;

    protected void teardownStream(B b) throws IOException {
        if (this.stream != null) {
            this.stream.close();
            this.stream = null;
        }
    }

    protected abstract R convertToRecord(byte[] bArr);

    public void setMaxReaders(int i) {
        this.maxReaders = i;
    }

    public void setMinReaders(int i) {
        this.minReaders = i;
    }

    public int getMaxReaders() {
        return this.maxReaders;
    }

    public int getMinReaders() {
        return this.minReaders;
    }

    public void setCollectStats(boolean z) {
        this.collectStats = z;
    }

    public boolean isCollectStats() {
        return this.collectStats;
    }

    public void setIntervalMillis(long j) {
        this.intervalMillis = j;
    }

    public long getIntervalMillis() {
        return this.intervalMillis;
    }

    public void setReaderContext(ReaderContext<STREAM> readerContext) {
        this.readerContext = readerContext;
    }

    public ReaderContext<STREAM> getReaderContext() {
        return this.readerContext;
    }

    public String toString() {
        return "Reader{nextMillis=" + this.nextMillis + ", intervalMillis=" + this.intervalMillis + '}';
    }
}
