package org.apache.apex.malhar.lib.state.spillable;

import com.datatorrent.api.Context;
import com.datatorrent.netlet.util.Slice;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.BucketedState;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.apex.malhar.lib.utils.serde.Serde;
import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.classification.InterfaceStability;

@DefaultSerializer(FieldSerializer.class)
@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.class */
public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent, Serializable {

    @NotNull
    private SpillableStateStore store;

    @NotNull
    private byte[] identifier;
    private long bucket;

    @NotNull
    private Serde<K, Slice> serdeKey;

    @NotNull
    private Serde<V, Slice> serdeValue;
    private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
    private transient MutableInt tempOffset = new MutableInt();
    private int size = 0;

    private SpillableByteMapImpl() {
    }

    public SpillableByteMapImpl(SpillableStateStore spillableStateStore, byte[] bArr, long j, Serde<K, Slice> serde, Serde<V, Slice> serde2) {
        this.store = (SpillableStateStore) Preconditions.checkNotNull(spillableStateStore);
        this.identifier = (byte[]) Preconditions.checkNotNull(bArr);
        this.bucket = j;
        this.serdeKey = (Serde) Preconditions.checkNotNull(serde);
        this.serdeValue = (Serde) Preconditions.checkNotNull(serde2);
    }

    public SpillableStateStore getStore() {
        return this.store;
    }

    @Override // java.util.Map
    public int size() {
        return this.size;
    }

    @Override // java.util.Map
    public boolean isEmpty() {
        return this.size == 0;
    }

    @Override // java.util.Map
    public boolean containsKey(Object obj) {
        return get(obj) != null;
    }

    @Override // java.util.Map
    public boolean containsValue(Object obj) {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.Map
    public V get(Object obj) {
        if (this.cache.getRemovedKeys().contains(obj)) {
            return null;
        }
        V v = this.cache.get(obj);
        if (v != null) {
            return v;
        }
        Slice sync = this.store.getSync(this.bucket, SliceUtils.concatenate(this.identifier, this.serdeKey.serialize(obj)));
        if (sync == null || sync == BucketedState.EXPIRED || sync.length == 0) {
            return null;
        }
        this.tempOffset.setValue(0);
        return this.serdeValue.deserialize(sync, this.tempOffset);
    }

    @Override // java.util.Map
    public V put(K k, V v) {
        V v2 = get(k);
        if (v2 == null) {
            this.size++;
        }
        this.cache.put(k, v);
        return v2;
    }

    @Override // java.util.Map
    public V remove(Object obj) {
        V v = get(obj);
        if (v != null) {
            this.size--;
        }
        this.cache.remove(obj);
        return v;
    }

    @Override // java.util.Map
    public void putAll(Map<? extends K, ? extends V> map) {
        for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
            put(entry.getKey(), entry.getValue());
        }
    }

    @Override // java.util.Map
    public void clear() {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.Map
    public Set<K> keySet() {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.Map
    public Collection<V> values() {
        throw new UnsupportedOperationException();
    }

    @Override // java.util.Map
    public Set<Map.Entry<K, V>> entrySet() {
        throw new UnsupportedOperationException();
    }

    public void setup(Context.OperatorContext operatorContext) {
    }

    @Override // org.apache.apex.malhar.lib.state.spillable.WindowListener
    public void beginWindow(long j) {
    }

    @Override // org.apache.apex.malhar.lib.state.spillable.WindowListener
    public void endWindow() {
        for (K k : this.cache.getChangedKeys()) {
            this.store.put(this.bucket, SliceUtils.concatenate(this.identifier, this.serdeKey.serialize(k)), (Slice) this.serdeValue.serialize(this.cache.get(k)));
        }
        Iterator<K> it = this.cache.getRemovedKeys().iterator();
        while (it.hasNext()) {
            this.store.put(this.bucket, SliceUtils.concatenate(this.identifier, this.serdeKey.serialize(it.next())), new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
        }
        this.cache.endWindow();
    }

    public void teardown() {
    }
}
