package org.apache.apex.malhar.lib.join;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
import com.google.common.collect.Maps;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateImpl;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeStateMultiValue;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.hadoop.classification.InterfaceStability;
import org.joda.time.Duration;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.class */
public abstract class AbstractManagedStateInnerJoinOperator<K, T> extends AbstractInnerJoinOperator<K, T> implements Operator.CheckpointNotificationListener, Operator.IdleTimeHandler {
    public static final String stateDir = "managedState";
    public static final String stream1State = "stream1Data";
    public static final String stream2State = "stream2Data";
    private transient Map<JoinEvent<K, T>, Future<List>> waitingEvents = Maps.newLinkedHashMap();
    private int noOfBuckets = 1;
    private Long bucketSpanTime;
    protected ManagedTimeStateImpl stream1Store;
    protected ManagedTimeStateImpl stream2Store;

    /* loaded from: input_file:org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator$JoinEvent.class */
    public static class JoinEvent<K, T> {
        public K key;
        public T value;
        public boolean isStream1Data;

        public JoinEvent(K k, T t, boolean z) {
            this.key = k;
            this.value = t;
            this.isStream1Data = z;
        }
    }

    @Override // org.apache.apex.malhar.lib.join.AbstractInnerJoinOperator
    public void createStores() {
        this.stream1Store = new ManagedTimeStateImpl();
        this.stream2Store = new ManagedTimeStateImpl();
        this.stream1Store.setNumBuckets(this.noOfBuckets);
        this.stream2Store.setNumBuckets(this.noOfBuckets);
        if (this.bucketSpanTime != null) {
            this.stream1Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(this.bucketSpanTime.longValue()));
            this.stream2Store.getTimeBucketAssigner().setBucketSpan(Duration.millis(this.bucketSpanTime.longValue()));
        }
        this.stream1Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime().longValue()));
        this.stream2Store.getTimeBucketAssigner().setExpireBefore(Duration.millis(getExpiryTime().longValue()));
        this.stream1Data = new ManagedTimeStateMultiValue(this.stream1Store, !isLeftKeyPrimary());
        this.stream2Data = new ManagedTimeStateMultiValue(this.stream2Store, !isRightKeyPrimary());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.apex.malhar.lib.join.AbstractInnerJoinOperator
    public void processTuple(T t, boolean z) {
        Spillable.SpillableByteArrayListMultimap<K, T> spillableByteArrayListMultimap = z ? this.stream1Data : this.stream2Data;
        K extractKey = extractKey(t, z);
        if (((ManagedTimeStateMultiValue) spillableByteArrayListMultimap).put(extractKey, t, extractTime(t, z))) {
            ManagedTimeStateMultiValue<K, V>.CompositeFuture async = ((ManagedTimeStateMultiValue) (z ? this.stream2Data : this.stream1Data)).getAsync(extractKey);
            if (!async.isDone()) {
                this.waitingEvents.put(new JoinEvent<>(extractKey, t, z), async);
                return;
            }
            try {
                joinStream(t, z, (List) async.get());
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public void handleIdleTime() {
        if (this.waitingEvents.size() > 0) {
            processWaitEvents(false);
        }
    }

    public void beforeCheckpoint(long j) {
        this.stream1Store.beforeCheckpoint(j);
        this.stream2Store.beforeCheckpoint(j);
    }

    public void checkpointed(long j) {
        this.stream1Store.checkpointed(j);
        this.stream2Store.checkpointed(j);
    }

    public void committed(long j) {
        this.stream1Store.committed(j);
        this.stream2Store.committed(j);
    }

    @Override // org.apache.apex.malhar.lib.join.AbstractInnerJoinOperator
    public void setup(Context.OperatorContext operatorContext) {
        super.setup(operatorContext);
        ((FileAccessFSImpl) this.stream1Store.getFileAccess()).setBasePath(((String) operatorContext.getValue(DAG.APPLICATION_PATH)) + "/" + stateDir + "/" + String.valueOf(operatorContext.getId()) + "/" + stream1State);
        ((FileAccessFSImpl) this.stream2Store.getFileAccess()).setBasePath(((String) operatorContext.getValue(DAG.APPLICATION_PATH)) + "/" + stateDir + "/" + String.valueOf(operatorContext.getId()) + "/" + stream2State);
        this.stream1Store.getCheckpointManager().setStatePath("managed_state_stream1Data");
        this.stream1Store.getCheckpointManager().setStatePath("managed_state_stream2Data");
        this.stream1Store.setup(operatorContext);
        this.stream2Store.setup(operatorContext);
    }

    @Override // org.apache.apex.malhar.lib.join.AbstractInnerJoinOperator
    public void beginWindow(long j) {
        this.stream1Store.beginWindow(j);
        this.stream2Store.beginWindow(j);
        super.beginWindow(j);
    }

    private void processWaitEvents(boolean z) {
        Iterator<Map.Entry<JoinEvent<K, T>, Future<List>>> it = this.waitingEvents.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<JoinEvent<K, T>, Future<List>> next = it.next();
            Future<List> value = next.getValue();
            if (value.isDone() || z) {
                try {
                    JoinEvent<K, T> key = next.getKey();
                    joinStream(key.value, key.isStream1Data, value.get());
                    it.remove();
                    if (!z) {
                        return;
                    }
                } catch (InterruptedException | ExecutionException e) {
                    throw new RuntimeException("end window", e);
                }
            }
        }
    }

    @Override // org.apache.apex.malhar.lib.join.AbstractInnerJoinOperator
    public void endWindow() {
        processWaitEvents(true);
        this.stream1Store.endWindow();
        this.stream2Store.endWindow();
        super.endWindow();
    }

    @Override // org.apache.apex.malhar.lib.join.AbstractInnerJoinOperator
    public void teardown() {
        this.stream1Store.teardown();
        this.stream2Store.teardown();
        super.teardown();
    }

    public int getNoOfBuckets() {
        return this.noOfBuckets;
    }

    public void setNoOfBuckets(int i) {
        this.noOfBuckets = i;
    }

    public Long getBucketSpanTime() {
        return this.bucketSpanTime;
    }

    public void setBucketSpanTime(Long l) {
        this.bucketSpanTime = l;
    }
}
