package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.api.Context;
import com.datatorrent.lib.fileaccess.FileAccess;
import com.datatorrent.netlet.util.Slice;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Futures;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.BucketedState;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.class */
public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implements BucketedState {
    private final transient LinkedBlockingQueue<Long> purgedTimeBuckets = Queues.newLinkedBlockingQueue();
    private final transient Set<Bucket> bucketsForTeardown = Sets.newHashSet();
    private static transient Logger LOG = LoggerFactory.getLogger(ManagedTimeUnifiedStateImpl.class);

    /* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl$TimeUnifiedBucketsFileSystem.class */
    private static class TimeUnifiedBucketsFileSystem extends BucketsFileSystem {
        private TimeUnifiedBucketsFileSystem() {
        }

        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        protected FileAccess.FileWriter getWriter(long j, String str) throws IOException {
            return this.managedStateContext.getFileAccess().getWriter(this.managedStateContext.getOperatorContext().getId(), str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        public FileAccess.FileReader getReader(long j, String str) throws IOException {
            return this.managedStateContext.getFileAccess().getReader(this.managedStateContext.getOperatorContext().getId(), str);
        }

        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        protected void rename(long j, String str, String str2) throws IOException {
            this.managedStateContext.getFileAccess().rename(this.managedStateContext.getOperatorContext().getId(), str, str2);
        }

        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        protected DataOutputStream getOutputStream(long j, String str) throws IOException {
            return this.managedStateContext.getFileAccess().mo69getOutputStream(this.managedStateContext.getOperatorContext().getId(), str);
        }

        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        protected DataInputStream getInputStream(long j, String str) throws IOException {
            return this.managedStateContext.getFileAccess().mo68getInputStream(this.managedStateContext.getOperatorContext().getId(), str);
        }

        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        protected boolean exists(long j, String str) throws IOException {
            return this.managedStateContext.getFileAccess().exists(this.managedStateContext.getOperatorContext().getId(), str);
        }

        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        protected RemoteIterator<LocatedFileStatus> listFiles(long j) throws IOException {
            return this.managedStateContext.getFileAccess().listFiles(this.managedStateContext.getOperatorContext().getId());
        }

        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        protected void delete(long j, String str) throws IOException {
            this.managedStateContext.getFileAccess().delete(this.managedStateContext.getOperatorContext().getId(), str);
        }

        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        protected void deleteBucket(long j) throws IOException {
            this.managedStateContext.getFileAccess().deleteBucket(this.managedStateContext.getOperatorContext().getId());
        }

        @Override // org.apache.apex.malhar.lib.state.managed.BucketsFileSystem
        protected void addBucketName(long j) {
            long id = this.managedStateContext.getOperatorContext().getId();
            if (this.bucketNamesOnFS.contains(Long.valueOf(id))) {
                return;
            }
            this.bucketNamesOnFS.add(Long.valueOf(id));
        }
    }

    public ManagedTimeUnifiedStateImpl() {
        this.bucketsFileSystem = new TimeUnifiedBucketsFileSystem();
    }

    @Override // org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl
    public long getNumBuckets() {
        return this.timeBucketAssigner.getNumBuckets();
    }

    @Override // org.apache.apex.malhar.lib.state.BucketedState
    public void put(long j, @NotNull Slice slice, @NotNull Slice slice2) {
        long timeBucket = this.timeBucketAssigner.getTimeBucket(j);
        putInBucket(timeBucket, timeBucket, slice, slice2);
    }

    @Override // org.apache.apex.malhar.lib.state.BucketedState
    public Slice getSync(long j, @NotNull Slice slice) {
        long timeBucket = this.timeBucketAssigner.getTimeBucket(j);
        return timeBucket == -1 ? BucketedState.EXPIRED : getValueFromBucketSync(timeBucket, timeBucket, slice);
    }

    @Override // org.apache.apex.malhar.lib.state.BucketedState
    public Future<Slice> getAsync(long j, @NotNull Slice slice) {
        long timeBucket = this.timeBucketAssigner.getTimeBucket(j);
        return timeBucket == -1 ? Futures.immediateFuture(BucketedState.EXPIRED) : getValueFromBucketAsync(timeBucket, timeBucket, slice);
    }

    @Override // org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl
    public void endWindow() {
        super.endWindow();
        while (true) {
            Long poll = this.purgedTimeBuckets.poll();
            if (null == poll) {
                break;
            }
            long bucketIdx = getBucketIdx(poll.longValue());
            if (this.buckets.containsKey(Long.valueOf(bucketIdx)) && this.buckets.get(Long.valueOf(bucketIdx)).getBucketId() == poll.longValue()) {
                this.bucketsForTeardown.add(this.buckets.get(Long.valueOf(bucketIdx)));
                this.buckets.remove(Long.valueOf(bucketIdx));
            }
        }
        Iterator<Bucket> it = this.bucketsForTeardown.iterator();
        while (it.hasNext()) {
            Bucket next = it.next();
            if (!this.tasksPerBucketId.containsKey(Long.valueOf(next.getBucketId()))) {
                next.teardown();
                it.remove();
            }
        }
    }

    @Override // org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl
    protected void handleBucketConflict(long j, long j2) {
        Preconditions.checkArgument(this.buckets.get(Long.valueOf(j)).getBucketId() < j2, "new time bucket should have a value greater than the old time bucket");
        this.bucketsForTeardown.add(this.buckets.get(Long.valueOf(j)));
        this.buckets.put(Long.valueOf(j), newBucket(j2));
        this.buckets.get(Long.valueOf(j)).setup(this);
    }

    @Override // org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl, org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner.PurgeListener
    public void purgeTimeBucketsLessThanEqualTo(long j) {
        this.purgedTimeBuckets.add(Long.valueOf(j));
        super.purgeTimeBucketsLessThanEqualTo(j);
    }

    @Override // org.apache.apex.malhar.lib.state.managed.AbstractManagedStateImpl
    public void setup(Context.OperatorContext operatorContext) {
        if (this.timeBucketAssigner == null) {
            setTimeBucketAssigner(new UnboundedTimeBucketAssigner());
        }
        super.setup(operatorContext);
    }
}
