package org.apache.apex.malhar.lib.utils.serde;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.apex.malhar.lib.state.spillable.WindowListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.class */
public class WindowedBlockStream extends BlockStream implements WindowListener, WindowCompleteListener {
    private static final Logger logger = LoggerFactory.getLogger(WindowedBlockStream.class);
    protected SetMultimap<Long, Integer> windowToBlockIds;
    protected Set<Integer> freeBlockIds;
    protected int maxBlockIndex;
    protected long currentWindowId;
    protected transient ReadWriteLock lock;
    protected BlockReleaseStrategy releaseStrategy;

    public WindowedBlockStream() {
        this.windowToBlockIds = HashMultimap.create();
        this.freeBlockIds = Sets.newHashSet();
        this.maxBlockIndex = 0;
        this.lock = new ReentrantReadWriteLock();
        this.releaseStrategy = new DefaultBlockReleaseStrategy();
    }

    public WindowedBlockStream(int i) {
        super(i);
        this.windowToBlockIds = HashMultimap.create();
        this.freeBlockIds = Sets.newHashSet();
        this.maxBlockIndex = 0;
        this.lock = new ReentrantReadWriteLock();
        this.releaseStrategy = new DefaultBlockReleaseStrategy();
    }

    @Override // org.apache.apex.malhar.lib.state.spillable.WindowListener
    public void beginWindow(long j) {
        this.currentWindowId = j;
        moveToNextWindow();
    }

    protected void moveToNextWindow() {
        Block orCreateCurrentBlock = getOrCreateCurrentBlock();
        if (!orCreateCurrentBlock.isClear()) {
            throw new RuntimeException("Current block not clear, should NOT move to next window. Please call toSlice() to output data first");
        }
        if (orCreateCurrentBlock.size() > 0) {
            moveToNextBlock();
        }
        this.windowToBlockIds.put(Long.valueOf(this.currentWindowId), Integer.valueOf(this.currentBlockIndex));
    }

    @Override // org.apache.apex.malhar.lib.utils.serde.BlockStream
    protected Block moveToNextBlock() {
        this.lock.writeLock().lock();
        try {
            Block block = this.currentBlock;
            if (this.freeBlockIds.isEmpty()) {
                int i = this.maxBlockIndex + 1;
                this.maxBlockIndex = i;
                this.currentBlockIndex = i;
                this.currentBlock = getOrCreateCurrentBlock();
            } else {
                this.currentBlockIndex = this.freeBlockIds.iterator().next().intValue();
                this.freeBlockIds.remove(Integer.valueOf(this.currentBlockIndex));
                this.currentBlock = this.blocks.get(Integer.valueOf(this.currentBlockIndex));
            }
            this.windowToBlockIds.put(Long.valueOf(this.currentWindowId), Integer.valueOf(this.currentBlockIndex));
            this.lock.writeLock().unlock();
            return block;
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.apex.malhar.lib.state.spillable.WindowListener
    public void endWindow() {
        releaseMemory();
    }

    @Override // org.apache.apex.malhar.lib.utils.serde.WindowCompleteListener
    public void completeWindow(long j) {
        this.lock.writeLock().lock();
        try {
            Iterator it = Sets.newHashSet(this.windowToBlockIds.keySet()).iterator();
            while (it.hasNext()) {
                long longValue = ((Long) it.next()).longValue();
                if (longValue <= j) {
                    resetWindow(longValue);
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    protected void resetWindow(long j) {
        this.lock.writeLock().lock();
        try {
            Set removeAll = this.windowToBlockIds.removeAll(Long.valueOf(j));
            int i = 0;
            Iterator it = removeAll.iterator();
            while (it.hasNext()) {
                int intValue = ((Integer) it.next()).intValue();
                i = (int) (i + this.blocks.get(Integer.valueOf(intValue)).size());
                Block block = this.blocks.get(Integer.valueOf(intValue));
                block.reset();
                if (block == this.currentBlock) {
                    moveToNextBlock();
                }
                logger.debug("reset block: {}, currentBlock: {}", Integer.valueOf(intValue), block);
            }
            this.freeBlockIds.addAll(removeAll);
            this.size -= i;
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.apex.malhar.lib.utils.serde.BlockStream
    public void reset() {
        this.lock.writeLock().lock();
        try {
            super.reset();
            this.freeBlockIds.addAll(this.blocks.keySet());
            this.freeBlockIds.remove(Integer.valueOf(this.currentBlockIndex));
            this.lock.writeLock().unlock();
        } catch (Throwable th) {
            this.lock.writeLock().unlock();
            throw th;
        }
    }

    public long dataSizeUpToWindow(long j) {
        this.lock.readLock().lock();
        try {
            long j2 = 0;
            Iterator it = this.windowToBlockIds.keySet().iterator();
            while (it.hasNext()) {
                j2 += dataSizeOfWindow(((Long) it.next()).longValue());
            }
            return j2;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    protected long dataSizeOfWindow(long j) {
        this.lock.readLock().lock();
        try {
            long j2 = 0;
            Set set = this.windowToBlockIds.get(Long.valueOf(j));
            if (set != null) {
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    j2 += this.blocks.get(Integer.valueOf(((Integer) it.next()).intValue())).size();
                }
            }
            return j2;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    public void releaseMemory() {
        this.releaseStrategy.currentFreeBlocks(this.freeBlockIds.size());
        int min = Math.min(this.releaseStrategy.getNumBlocksToRelease(), this.freeBlockIds.size());
        int i = 0;
        Iterator<Integer> it = this.freeBlockIds.iterator();
        while (i < min) {
            int intValue = it.next().intValue();
            it.remove();
            this.blocks.remove(Integer.valueOf(intValue));
            i++;
        }
        if (i > 0) {
            this.releaseStrategy.releasedBlocks(i);
        }
    }

    public void releaseAllFreeMemory() {
        int i = 0;
        Iterator<Integer> it = this.freeBlockIds.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            it.remove();
            this.blocks.remove(Integer.valueOf(intValue));
            i++;
        }
        if (i > 0) {
            this.releaseStrategy.releasedBlocks(i);
        }
    }
}
