package org.apache.apex.malhar.lib.state.managed;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.lib.fileaccess.FileAccess;
import com.datatorrent.lib.fileaccess.TFileImpl;
import com.datatorrent.lib.util.comparator.SliceComparator;
import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimaps;
import com.google.common.util.concurrent.Futures;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.managed.Bucket;
import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.class */
public abstract class AbstractManagedStateImpl implements ManagedState, Component<Context.OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext, TimeBucketAssigner.PurgeListener {
    private long maxMemorySize;
    protected int numBuckets;
    protected Bucket[] buckets;

    @NotNull
    protected transient ExecutorService readerService;
    protected transient Context.OperatorContext operatorContext;

    @FieldSerializer.Bind(JavaSerializer.class)
    private Duration durationPreventingFreeingSpace;
    private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateImpl.class);

    @NotNull
    private FileAccess fileAccess = new TFileImpl.DTFileImpl();

    @NotNull
    protected TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();

    @Min(1)
    private int numReaders = 1;

    @NotNull
    protected IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager();

    @NotNull
    protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();

    @NotNull
    protected Comparator<Slice> keyComparator = new SliceComparator();
    protected final transient AtomicReference<Throwable> throwable = new AtomicReference<>();

    @NotNull
    @FieldSerializer.Bind(JavaSerializer.class)
    private Duration checkStateSizeInterval = Duration.millis(((Integer) Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue).intValue() * ((Integer) Context.OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue).intValue());
    private transient StateTracker stateTracker = new StateTracker();
    final transient Object commitLock = new Object();
    protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId = Multimaps.synchronizedListMultimap(ArrayListMultimap.create());

    /* loaded from: input_file:org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl$ValueFetchTask.class */
    static class ValueFetchTask implements Callable<Slice> {
        private final Bucket bucket;
        private final long timeBucketId;
        private final Slice key;
        private final AbstractManagedStateImpl managedState;

        ValueFetchTask(@NotNull Bucket bucket, @NotNull Slice slice, long j, AbstractManagedStateImpl abstractManagedStateImpl) {
            this.bucket = (Bucket) Preconditions.checkNotNull(bucket);
            this.timeBucketId = j;
            this.key = (Slice) Preconditions.checkNotNull(slice);
            this.managedState = (AbstractManagedStateImpl) Preconditions.checkNotNull(abstractManagedStateImpl);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Slice call() throws Exception {
            Slice slice;
            try {
                synchronized (this.bucket) {
                    slice = this.bucket.get(this.key, this.timeBucketId, Bucket.ReadSource.ALL);
                    this.managedState.tasksPerBucketId.remove(Long.valueOf(this.bucket.getBucketId()), this);
                }
                return slice;
            } catch (Throwable th) {
                this.managedState.throwable.set(th);
                throw Throwables.propagate(th);
            }
        }
    }

    @Override // 
    public void setup(Context.OperatorContext operatorContext) {
        this.operatorContext = operatorContext;
        this.fileAccess.init();
        this.timeBucketAssigner.setPurgeListener(this);
        this.timeBucketAssigner.setup(this);
        this.checkpointManager.setup(this);
        this.bucketsFileSystem.setup(this);
        if (this.buckets == null) {
            this.numBuckets = getNumBuckets();
            this.buckets = new Bucket[this.numBuckets];
        }
        for (Bucket bucket : this.buckets) {
            if (bucket != null) {
                bucket.setup(this);
            }
        }
        this.stateTracker.setup(this);
        long longValue = ((Long) operatorContext.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)).longValue();
        if (longValue != -1) {
            try {
                for (long j : this.checkpointManager.getWindowIds(this.operatorContext.getId())) {
                    if (j <= longValue) {
                        Map<Long, Map<Slice, Bucket.BucketedValue>> map = (Map) this.checkpointManager.load(this.operatorContext.getId(), j);
                        if (map != null && !map.isEmpty()) {
                            for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : map.entrySet()) {
                                this.buckets[prepareBucket(entry.getKey().longValue())].recoveredData(j, entry.getValue());
                            }
                        }
                        this.checkpointManager.save(map, this.operatorContext.getId(), j, true);
                    } else {
                        this.checkpointManager.delete(this.operatorContext.getId(), j);
                    }
                }
            } catch (IOException e) {
                throw new RuntimeException("recovering", e);
            }
        }
        this.readerService = Executors.newFixedThreadPool(this.numReaders, new NameableThreadFactory("managedStateReaders"));
    }

    public abstract int getNumBuckets();

    public void beginWindow(long j) {
        if (this.throwable.get() != null) {
            Throwables.propagate(this.throwable.get());
        }
        this.timeBucketAssigner.beginWindow(j);
    }

    protected int prepareBucket(long j) {
        this.stateTracker.bucketAccessed(j);
        int bucketIdx = getBucketIdx(j);
        Bucket bucket = this.buckets[bucketIdx];
        if (bucket == null) {
            Bucket newBucket = newBucket(j);
            newBucket.setup(this);
            this.buckets[bucketIdx] = newBucket;
        } else if (bucket.getBucketId() != j) {
            handleBucketConflict(bucketIdx, j);
        }
        return bucketIdx;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void putInBucket(long j, long j2, @NotNull Slice slice, @NotNull Slice slice2) {
        Preconditions.checkNotNull(slice, "key");
        Preconditions.checkNotNull(slice2, "value");
        if (j2 != -1) {
            this.buckets[prepareBucket(j)].put(slice, j2, slice2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Slice getValueFromBucketSync(long j, long j2, @NotNull Slice slice) {
        Slice slice2;
        Preconditions.checkNotNull(slice, "key");
        Bucket bucket = this.buckets[prepareBucket(j)];
        synchronized (bucket) {
            slice2 = bucket.get(slice, j2, Bucket.ReadSource.ALL);
        }
        return slice2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<Slice> getValueFromBucketAsync(long j, long j2, @NotNull Slice slice) {
        Preconditions.checkNotNull(slice, "key");
        Bucket bucket = this.buckets[prepareBucket(j)];
        synchronized (bucket) {
            Slice slice2 = bucket.get(slice, j2, Bucket.ReadSource.MEMORY);
            if (slice2 != null) {
                return Futures.immediateFuture(slice2);
            }
            ValueFetchTask valueFetchTask = new ValueFetchTask(bucket, slice, j2, this);
            this.tasksPerBucketId.put(Long.valueOf(bucket.getBucketId()), valueFetchTask);
            return this.readerService.submit(valueFetchTask);
        }
    }

    protected void handleBucketConflict(int i, long j) {
        throw new IllegalArgumentException("bucket conflict " + this.buckets[i].getBucketId() + " " + j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getBucketIdx(long j) {
        return (int) (j % this.numBuckets);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Bucket getBucket(long j) {
        return this.buckets[getBucketIdx(j)];
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Bucket newBucket(long j) {
        return new Bucket.DefaultBucket(j);
    }

    public void endWindow() {
        this.timeBucketAssigner.endWindow();
    }

    public void beforeCheckpoint(long j) {
        HashMap newHashMap = Maps.newHashMap();
        for (Bucket bucket : this.buckets) {
            if (bucket != null) {
                synchronized (bucket) {
                    Map<Slice, Bucket.BucketedValue> checkpoint = bucket.checkpoint(j);
                    if (!checkpoint.isEmpty()) {
                        newHashMap.put(Long.valueOf(bucket.getBucketId()), checkpoint);
                    }
                }
            }
        }
        if (newHashMap.isEmpty()) {
            return;
        }
        try {
            this.checkpointManager.save(newHashMap, this.operatorContext.getId(), j, false);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void checkpointed(long j) {
    }

    public void committed(long j) {
        synchronized (this.commitLock) {
            try {
                for (Bucket bucket : this.buckets) {
                    if (bucket != null) {
                        synchronized (bucket) {
                            bucket.committed(j);
                        }
                    }
                }
                this.checkpointManager.committed(this.operatorContext.getId(), j);
            } catch (IOException | InterruptedException e) {
                throw new RuntimeException("committing " + j, e);
            }
        }
    }

    public void teardown() {
        this.checkpointManager.teardown();
        this.bucketsFileSystem.teardown();
        this.timeBucketAssigner.teardown();
        this.readerService.shutdownNow();
        for (Bucket bucket : this.buckets) {
            if (bucket != null) {
                synchronized (bucket) {
                    bucket.teardown();
                }
            }
        }
        this.stateTracker.teardown();
    }

    @Override // org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner.PurgeListener
    public void purgeTimeBucketsLessThanEqualTo(long j) {
        this.checkpointManager.setLatestExpiredTimeBucket(j);
    }

    @Override // org.apache.apex.malhar.lib.state.managed.ManagedStateContext
    public Context.OperatorContext getOperatorContext() {
        return this.operatorContext;
    }

    @Override // org.apache.apex.malhar.lib.state.managed.ManagedState
    public void setMaxMemorySize(long j) {
        this.maxMemorySize = j;
    }

    public long getMaxMemorySize() {
        return this.maxMemorySize;
    }

    public void setFileAccess(@NotNull FileAccess fileAccess) {
        this.fileAccess = (FileAccess) Preconditions.checkNotNull(fileAccess);
    }

    @Override // org.apache.apex.malhar.lib.state.managed.ManagedStateContext
    public FileAccess getFileAccess() {
        return this.fileAccess;
    }

    public void setTimeBucketAssigner(@NotNull TimeBucketAssigner timeBucketAssigner) {
        this.timeBucketAssigner = (TimeBucketAssigner) Preconditions.checkNotNull(timeBucketAssigner);
    }

    @Override // org.apache.apex.malhar.lib.state.managed.ManagedStateContext
    public TimeBucketAssigner getTimeBucketAssigner() {
        return this.timeBucketAssigner;
    }

    @Override // org.apache.apex.malhar.lib.state.managed.ManagedStateContext
    public Comparator<Slice> getKeyComparator() {
        return this.keyComparator;
    }

    public void setKeyComparator(@NotNull Comparator<Slice> comparator) {
        this.keyComparator = (Comparator) Preconditions.checkNotNull(comparator);
    }

    @Override // org.apache.apex.malhar.lib.state.managed.ManagedStateContext
    public BucketsFileSystem getBucketsFileSystem() {
        return this.bucketsFileSystem;
    }

    public int getNumReaders() {
        return this.numReaders;
    }

    public void setNumReaders(int i) {
        this.numReaders = i;
    }

    public Duration getCheckStateSizeInterval() {
        return this.checkStateSizeInterval;
    }

    public void setCheckStateSizeInterval(@NotNull Duration duration) {
        this.checkStateSizeInterval = (Duration) Preconditions.checkNotNull(duration);
    }

    public Duration getDurationPreventingFreeingSpace() {
        return this.durationPreventingFreeingSpace;
    }

    public void setDurationPreventingFreeingSpace(Duration duration) {
        this.durationPreventingFreeingSpace = duration;
    }

    @VisibleForTesting
    void setStateTracker(@NotNull StateTracker stateTracker) {
        this.stateTracker = (StateTracker) Preconditions.checkNotNull(stateTracker, "state tracker");
    }

    @VisibleForTesting
    void setBucketsFileSystem(@NotNull BucketsFileSystem bucketsFileSystem) {
        this.bucketsFileSystem = (BucketsFileSystem) Preconditions.checkNotNull(bucketsFileSystem, "buckets file system");
    }
}
