package com.datatorrent.lib.join;

import com.datatorrent.lib.join.TimeEvent;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import javax.validation.constraints.Min;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Unstable
/* loaded from: input_file:com/datatorrent/lib/join/TimeBasedStore.class */
public class TimeBasedStore<T extends TimeEvent> {
    private static final Logger logger = LoggerFactory.getLogger(TimeBasedStore.class);

    @Min(1)
    protected int noOfBuckets;
    protected Bucket<T>[] buckets;

    @Min(1)
    protected long expiryTimeInMillis;

    @Min(1)
    protected long spanTimeInMillis;
    protected int bucketSpanInMillis;
    protected long startOfBucketsInMillis;
    protected long endOBucketsInMillis;
    private transient Timer bucketSlidingTimer;
    protected transient Map<Long, Bucket> dirtyBuckets = new HashMap();
    private boolean isOuter = false;
    private List<T> unmatchedEvents = new ArrayList();
    private Map<Object, Set<Long>> key2Buckets = new ConcurrentHashMap();
    private final transient Lock lock = new Lock();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/lib/join/TimeBasedStore$Lock.class */
    public static class Lock {
        private Lock() {
        }
    }

    private void recomputeNumBuckets() {
        long timeInMillis = Calendar.getInstance().getTimeInMillis();
        this.startOfBucketsInMillis = timeInMillis - this.spanTimeInMillis;
        this.expiryTimeInMillis = this.startOfBucketsInMillis;
        this.endOBucketsInMillis = timeInMillis;
        this.noOfBuckets = (int) Math.ceil((timeInMillis - this.startOfBucketsInMillis) / (this.bucketSpanInMillis * 1.0d));
        this.buckets = (Bucket[]) Array.newInstance((Class<?>) Bucket.class, this.noOfBuckets);
    }

    public void setup() {
        setBucketSpanInMillis((int) (this.spanTimeInMillis > ((long) this.bucketSpanInMillis) ? this.bucketSpanInMillis : this.spanTimeInMillis));
        if (this.buckets == null) {
            recomputeNumBuckets();
        }
        startService();
    }

    public List<TimeEvent> getValidTuples(T t) {
        List<T> list;
        Object eventKey = t.getEventKey();
        Set<Long> set = this.key2Buckets.get(eventKey);
        if (set == null) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        for (Long l : set) {
            Bucket<T> bucket = this.buckets[(int) (l.longValue() % this.noOfBuckets)];
            if (bucket != null && bucket.bucketKey == l.longValue() && (list = bucket.get(eventKey)) != null) {
                arrayList.addAll(list);
            }
        }
        return arrayList;
    }

    public boolean put(T t) {
        long bucketKeyFor = getBucketKeyFor(t);
        if (bucketKeyFor < 0) {
            return false;
        }
        newEvent(bucketKeyFor, t);
        return true;
    }

    public long getBucketKeyFor(T t) {
        long time = t.getTime();
        if (time < this.expiryTimeInMillis) {
            return -1L;
        }
        long j = (time - this.startOfBucketsInMillis) / this.bucketSpanInMillis;
        synchronized (this.lock) {
            if (time > this.endOBucketsInMillis) {
                long j2 = (((time - this.endOBucketsInMillis) / this.bucketSpanInMillis) + 1) * this.bucketSpanInMillis;
                this.expiryTimeInMillis += j2;
                this.endOBucketsInMillis += j2;
            }
        }
        return j;
    }

    public void newEvent(long j, T t) {
        int i = (int) (j % this.noOfBuckets);
        Bucket<T> bucket = this.buckets[i];
        if (bucket == null || bucket.bucketKey != j) {
            if (bucket != null) {
                this.dirtyBuckets.put(Long.valueOf(bucket.bucketKey), bucket);
            }
            bucket = createBucket(j);
            this.buckets[i] = bucket;
        }
        Object eventKey = t.getEventKey();
        Set<Long> set = this.key2Buckets.get(eventKey);
        if (set == null) {
            HashSet hashSet = new HashSet();
            hashSet.add(Long.valueOf(j));
            this.key2Buckets.put(eventKey, hashSet);
        } else {
            set.add(Long.valueOf(j));
        }
        bucket.addNewEvent(eventKey, t);
    }

    public void startService() {
        this.bucketSlidingTimer = new Timer();
        this.endOBucketsInMillis = this.expiryTimeInMillis + (this.noOfBuckets * this.bucketSpanInMillis);
        logger.debug("bucket properties {}, {}", Long.valueOf(this.spanTimeInMillis), Integer.valueOf(this.bucketSpanInMillis));
        logger.debug("bucket time params: start {}, end {}", Long.valueOf(this.startOfBucketsInMillis), Long.valueOf(this.endOBucketsInMillis));
        this.bucketSlidingTimer.scheduleAtFixedRate(new TimerTask() { // from class: com.datatorrent.lib.join.TimeBasedStore.1
            /*  JADX ERROR: Failed to decode insn: 0x001D: MOVE_MULTI, method: com.datatorrent.lib.join.TimeBasedStore.1.run():void
                java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
                	at java.base/java.lang.System.arraycopy(Native Method)
                	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
                	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
                	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
                	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
                	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
                	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
                	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
                	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
                	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
                	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
                	at jadx.core.ProcessClass.process(ProcessClass.java:70)
                	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
                	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
                	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
                	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
                */
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                /*
                    r6 = this;
                    r0 = 0
                    r7 = r0
                    r0 = r6
                    com.datatorrent.lib.join.TimeBasedStore r0 = com.datatorrent.lib.join.TimeBasedStore.this
                    com.datatorrent.lib.join.TimeBasedStore$Lock r0 = com.datatorrent.lib.join.TimeBasedStore.access$100(r0)
                    r1 = r0
                    r9 = r1
                    monitor-enter(r0)
                    r0 = r6
                    com.datatorrent.lib.join.TimeBasedStore r0 = com.datatorrent.lib.join.TimeBasedStore.this
                    r1 = r0
                    long r1 = r1.expiryTimeInMillis
                    r2 = r6
                    com.datatorrent.lib.join.TimeBasedStore r2 = com.datatorrent.lib.join.TimeBasedStore.this
                    int r2 = r2.bucketSpanInMillis
                    long r2 = (long) r2
                    long r1 = r1 + r2
                    // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                    r0.expiryTimeInMillis = r1
                    r7 = r-1
                    r-1 = r6
                    com.datatorrent.lib.join.TimeBasedStore r-1 = com.datatorrent.lib.join.TimeBasedStore.this
                    r0 = r-1
                    long r0 = r0.endOBucketsInMillis
                    r1 = r6
                    com.datatorrent.lib.join.TimeBasedStore r1 = com.datatorrent.lib.join.TimeBasedStore.this
                    int r1 = r1.bucketSpanInMillis
                    long r1 = (long) r1
                    long r0 = r0 + r1
                    r-1.endOBucketsInMillis = r0
                    r-1 = r9
                    monitor-exit(r-1)
                    goto L42
                    r10 = move-exception
                    r0 = r9
                    monitor-exit(r0)
                    r0 = r10
                    throw r0
                    r-1 = r6
                    com.datatorrent.lib.join.TimeBasedStore r-1 = com.datatorrent.lib.join.TimeBasedStore.this
                    r0 = r7
                    r-1.deleteExpiredBuckets(r0)
                    return
                */
                throw new UnsupportedOperationException("Method not decompiled: com.datatorrent.lib.join.TimeBasedStore.AnonymousClass1.run():void");
            }
        }, this.bucketSpanInMillis, this.bucketSpanInMillis);
    }

    void deleteExpiredBuckets(long j) {
        Iterator<Long> it = this.dirtyBuckets.keySet().iterator();
        while (it.hasNext()) {
            Bucket bucket = this.dirtyBuckets.get(Long.valueOf(it.next().longValue()));
            if (this.startOfBucketsInMillis + (bucket.bucketKey * this.bucketSpanInMillis) < j) {
                deleteBucket(bucket);
                it.remove();
            }
        }
    }

    public List<T> getUnmatchedEvents() {
        ArrayList arrayList = new ArrayList(this.unmatchedEvents);
        this.unmatchedEvents.clear();
        return arrayList;
    }

    private void deleteBucket(Bucket bucket) {
        Map<Object, List<T>> events;
        if (bucket == null || (events = bucket.getEvents()) == null) {
            return;
        }
        for (Map.Entry<Object, List<T>> entry : events.entrySet()) {
            if (this.isOuter) {
                for (T t : entry.getValue()) {
                    if (!t.isMatch()) {
                        this.unmatchedEvents.add(t);
                    }
                }
            }
            this.key2Buckets.get(entry.getKey()).remove(Long.valueOf(bucket.bucketKey));
            if (this.key2Buckets.get(entry.getKey()).size() == 0) {
                this.key2Buckets.remove(entry.getKey());
            }
        }
    }

    protected Bucket<T> createBucket(long j) {
        return new Bucket<>(j);
    }

    public void shutdown() {
        this.bucketSlidingTimer.cancel();
    }

    public void isOuterJoin(boolean z) {
        this.isOuter = z;
    }

    public long getSpanTimeInMillis() {
        return this.spanTimeInMillis;
    }

    public void setSpanTimeInMillis(long j) {
        this.spanTimeInMillis = j;
    }

    public int getBucketSpanInMillis() {
        return this.bucketSpanInMillis;
    }

    public void setBucketSpanInMillis(int i) {
        this.bucketSpanInMillis = i;
    }

    static /* synthetic */ Lock access$100(TimeBasedStore timeBasedStore) {
        return timeBasedStore.lock;
    }
}
