package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.api.Context;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.util.Slice;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Queues;
import com.google.common.primitives.Longs;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.managed.Bucket;
import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.apex.malhar.lib.wal.FileSystemWAL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.class */
public class IncrementalCheckpointManager extends FSWindowDataManager implements ManagedStateComponent {
    private static final String WAL_RELATIVE_PATH = "managed_state";
    private transient ExecutorService writerService;
    private volatile transient boolean transfer;
    protected transient ManagedStateContext managedStateContext;
    private transient int waitMillis;
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManager.class);
    private final transient Map<Long, Map<Long, Map<Slice, Bucket.BucketedValue>>> savedWindows = new ConcurrentSkipListMap();
    private final transient LinkedBlockingQueue<Long> windowsToTransfer = Queues.newLinkedBlockingQueue();
    private final transient AtomicReference<Throwable> throwable = new AtomicReference<>();
    private final transient AtomicLong latestExpiredTimeBucket = new AtomicLong(-1);
    private volatile long lastTransferredWindow = -1;
    private transient long largestWindowAddedToTransferQueue = -1;

    public IncrementalCheckpointManager() {
        setStatePath(WAL_RELATIVE_PATH);
        setRelyOnCheckpoints(true);
    }

    @Override // org.apache.apex.malhar.lib.wal.FSWindowDataManager
    public void setup(Context.OperatorContext operatorContext) {
        throw new UnsupportedOperationException("not supported");
    }

    @Override // org.apache.apex.malhar.lib.state.managed.ManagedStateComponent
    public void setup(@NotNull final ManagedStateContext managedStateContext) {
        this.managedStateContext = (ManagedStateContext) Preconditions.checkNotNull(managedStateContext, "managed state context");
        this.waitMillis = ((Integer) managedStateContext.getOperatorContext().getValue(Context.OperatorContext.SPIN_MILLIS)).intValue();
        super.setup(managedStateContext.getOperatorContext());
        this.writerService = Executors.newSingleThreadExecutor(new NameableThreadFactory("managed-state-writer"));
        this.transfer = true;
        this.writerService.submit(new Runnable() { // from class: org.apache.apex.malhar.lib.state.managed.IncrementalCheckpointManager.1
            @Override // java.lang.Runnable
            public void run() {
                while (IncrementalCheckpointManager.this.transfer) {
                    IncrementalCheckpointManager.this.transferWindowFiles();
                    if (IncrementalCheckpointManager.this.latestExpiredTimeBucket.get() > -1) {
                        try {
                            managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(IncrementalCheckpointManager.this.latestExpiredTimeBucket.getAndSet(-1L));
                        } catch (IOException e) {
                            IncrementalCheckpointManager.this.throwable.set(e);
                            IncrementalCheckpointManager.LOG.debug("delete files", e);
                            Throwables.propagate(e);
                        }
                    }
                }
            }
        });
    }

    protected void transferWindowFiles() {
        try {
            Long poll = this.windowsToTransfer.poll();
            if (poll != null) {
                try {
                    LOG.debug("transfer window {}", poll);
                    for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : this.savedWindows.remove(poll).entrySet()) {
                        this.managedStateContext.getBucketsFileSystem().writeBucketData(poll.longValue(), entry.getKey().longValue(), entry.getValue());
                    }
                    committed(poll.longValue());
                } catch (Throwable th) {
                    this.throwable.set(th);
                    LOG.debug("transfer window {}", poll, th);
                    Throwables.propagate(th);
                }
                this.lastTransferredWindow = poll.longValue();
            } else {
                Thread.sleep(this.waitMillis);
            }
        } catch (InterruptedException e) {
            LOG.debug("interrupted", e);
        }
    }

    @Override // org.apache.apex.malhar.lib.wal.FSWindowDataManager, org.apache.apex.malhar.lib.wal.WindowDataManager
    public void save(Object obj, long j) throws IOException {
        throw new UnsupportedOperationException("doesn't support saving any object");
    }

    public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> map, long j, boolean z) throws IOException {
        Throwable th = this.throwable.get();
        if (th != null) {
            LOG.error("Error while transferring");
            Throwables.propagate(th);
        }
        this.savedWindows.put(Long.valueOf(j), map);
        if (z) {
            return;
        }
        super.save(map, j);
    }

    public Map<Long, Object> retrieveAllWindows() throws IOException {
        HashMap hashMap = new HashMap();
        FileSystemWAL.FileSystemWALReader reader = getWal().getReader();
        reader.seek2(getWal().getWalStartPointer());
        Slice readNext = readNext(reader);
        while (true) {
            Slice slice = readNext;
            if (reader.getCurrentPointer().compareTo(getWal().getWalEndPointerAfterRecovery()) >= 0 || slice == null) {
                break;
            }
            long fromByteArray = Longs.fromByteArray(slice.toByteArray());
            hashMap.put(Long.valueOf(fromByteArray), fromSlice(readNext(reader)));
            readNext = readNext(reader);
        }
        reader.seek2(getWal().getWalStartPointer());
        return hashMap;
    }

    @Override // org.apache.apex.malhar.lib.wal.FSWindowDataManager, org.apache.apex.malhar.lib.wal.WindowDataManager
    public void committed(long j) throws IOException {
        LOG.debug("data manager committed {}", Long.valueOf(j));
        for (Long l : this.savedWindows.keySet()) {
            if (l.longValue() > this.largestWindowAddedToTransferQueue) {
                if (l.longValue() > j) {
                    return;
                }
                LOG.debug("to transfer {}", l);
                this.largestWindowAddedToTransferQueue = l.longValue();
                this.windowsToTransfer.add(l);
            }
        }
    }

    @Override // org.apache.apex.malhar.lib.wal.FSWindowDataManager, org.apache.apex.malhar.lib.state.managed.ManagedStateComponent
    public void teardown() {
        super.teardown();
        this.transfer = false;
        this.writerService.shutdownNow();
    }

    public void setLatestExpiredTimeBucket(long j) {
        this.latestExpiredTimeBucket.set(j);
    }

    public long getLastTransferredWindow() {
        return this.lastTransferredWindow;
    }
}
