package com.datatorrent.lib.join;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Iterator;
import java.util.List;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.classification.InterfaceStability;

@InterfaceStability.Unstable
/* loaded from: input_file:com/datatorrent/lib/join/AbstractJoinOperator.class */
public abstract class AbstractJoinOperator<T> extends BaseOperator implements Operator.CheckpointListener {

    @AutoMetric
    private long tuplesJoinedPerSec;
    private double windowTimeSec;
    protected int tuplesCount;
    protected boolean isLeft;

    @NotNull
    protected StoreContext leftStore;

    @NotNull
    protected StoreContext rightStore;
    private String includeFieldStr;
    private String keyFieldStr;
    private String timeFieldStr;
    static final /* synthetic */ boolean $assertionsDisabled;
    public final transient DefaultOutputPort<List<T>> outputPort = new DefaultOutputPort<>();
    protected JoinStrategy strategy = JoinStrategy.INNER_JOIN;

    @InputPortFieldAnnotation
    public transient DefaultInputPort<T> input1 = new DefaultInputPort<T>() { // from class: com.datatorrent.lib.join.AbstractJoinOperator.1
        public void process(T t) {
            AbstractJoinOperator.this.isLeft = true;
            AbstractJoinOperator.this.processTuple(t);
        }
    };

    @InputPortFieldAnnotation
    public transient DefaultInputPort<T> input2 = new DefaultInputPort<T>() { // from class: com.datatorrent.lib.join.AbstractJoinOperator.2
        public void process(T t) {
            AbstractJoinOperator.this.isLeft = false;
            AbstractJoinOperator.this.processTuple(t);
        }
    };

    /* loaded from: input_file:com/datatorrent/lib/join/AbstractJoinOperator$JoinStrategy.class */
    public enum JoinStrategy {
        INNER_JOIN,
        LEFT_OUTER_JOIN,
        RIGHT_OUTER_JOIN,
        OUTER_JOIN
    }

    /* loaded from: input_file:com/datatorrent/lib/join/AbstractJoinOperator$StoreContext.class */
    public static class StoreContext {
        private transient String timeFields;
        private transient String[] includeFields;
        private transient String keys;
        private JoinStore store;

        public StoreContext(JoinStore joinStore) {
            this.store = joinStore;
        }

        public String getTimeFields() {
            return this.timeFields;
        }

        public void setTimeFields(String str) {
            this.timeFields = str;
        }

        public String[] getIncludeFields() {
            return this.includeFields;
        }

        public void setIncludeFields(String[] strArr) {
            this.includeFields = strArr;
        }

        public String getKeys() {
            return this.keys;
        }

        public void setKeys(String str) {
            this.keys = str;
        }

        public JoinStore getStore() {
            return this.store;
        }

        public void setStore(JoinStore joinStore) {
            this.store = joinStore;
        }
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.leftStore.getStore().isOuterJoin(this.strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || this.strategy.equals(JoinStrategy.OUTER_JOIN));
        this.rightStore.getStore().isOuterJoin(this.strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || this.strategy.equals(JoinStrategy.OUTER_JOIN));
        this.leftStore.getStore().setup(operatorContext);
        this.rightStore.getStore().setup(operatorContext);
        populateFields();
        this.windowTimeSec = ((((Integer) operatorContext.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT)).intValue() * ((Integer) operatorContext.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS)).intValue()) * 1.0d) / 1000.0d;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processTuple(T t) {
        JoinStore store = this.isLeft ? this.leftStore.getStore() : this.rightStore.getStore();
        TimeEvent createEvent = createEvent(t);
        if (store.put(createEvent)) {
            join(createEvent, this.isLeft);
        }
    }

    private void populateFields() {
        populateIncludeFields();
        populateKeyFields();
        if (this.timeFieldStr != null) {
            populateTimeFields();
        }
    }

    private void populateIncludeFields() {
        String[] split = this.includeFieldStr.split(";");
        if (!$assertionsDisabled && split.length != 2) {
            throw new AssertionError();
        }
        this.leftStore.setIncludeFields(split[0].split(","));
        this.rightStore.setIncludeFields(split[1].split(","));
    }

    private void join(TimeEvent timeEvent, boolean z) {
        JoinStore store = z ? this.rightStore.getStore() : this.leftStore.getStore();
        ArrayList arrayList = timeEvent != null ? (ArrayList) store.getValidTuples(timeEvent) : (ArrayList) store.getUnMatchedTuples();
        if (arrayList != null) {
            ArrayList arrayList2 = new ArrayList();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                TimeEvent timeEvent2 = (TimeEvent) it.next();
                T createOutputTuple = createOutputTuple();
                Object obj = null;
                if (timeEvent != null) {
                    obj = timeEvent.getValue();
                }
                copyValue(createOutputTuple, obj, z);
                copyValue(createOutputTuple, timeEvent2.getValue(), !z);
                arrayList2.add(createOutputTuple);
                timeEvent2.setMatch(true);
            }
            if (timeEvent != null) {
                timeEvent.setMatch(true);
            }
            if (arrayList2.size() != 0) {
                this.outputPort.emit(arrayList2);
                this.tuplesCount += arrayList2.size();
            }
        }
    }

    public void endWindow() {
        if (this.strategy.equals(JoinStrategy.LEFT_OUTER_JOIN) || this.strategy.equals(JoinStrategy.OUTER_JOIN)) {
            join(null, false);
        }
        if (this.strategy.equals(JoinStrategy.RIGHT_OUTER_JOIN) || this.strategy.equals(JoinStrategy.OUTER_JOIN)) {
            join(null, true);
        }
        this.leftStore.getStore().endWindow();
        this.rightStore.getStore().endWindow();
        this.tuplesJoinedPerSec = (long) (this.tuplesCount / this.windowTimeSec);
    }

    public void beginWindow(long j) {
        super.beginWindow(j);
        this.tuplesJoinedPerSec = 0L;
        this.tuplesCount = 0;
    }

    public void checkpointed(long j) {
        this.leftStore.getStore().checkpointed(j);
        this.rightStore.getStore().checkpointed(j);
    }

    public void committed(long j) {
        this.leftStore.getStore().committed(j);
        this.rightStore.getStore().committed(j);
    }

    protected TimeEvent createEvent(Object obj) {
        String keys = this.leftStore.getKeys();
        String timeFields = this.leftStore.getTimeFields();
        if (!this.isLeft) {
            keys = this.rightStore.getKeys();
            timeFields = this.rightStore.getTimeFields();
        }
        return timeFields != null ? new TimeEventImpl(getKeyValue(keys, obj), ((Long) getTime(timeFields, obj)).longValue(), obj) : new TimeEventImpl(getKeyValue(keys, obj), Calendar.getInstance().getTimeInMillis(), obj);
    }

    private void populateKeyFields() {
        this.leftStore.setKeys(this.keyFieldStr.split(",")[0]);
        this.rightStore.setKeys(this.keyFieldStr.split(",")[1]);
    }

    public JoinStrategy getStrategy() {
        return this.strategy;
    }

    public void setStrategy(JoinStrategy joinStrategy) {
        this.strategy = joinStrategy;
    }

    public void setLeftStore(@NotNull JoinStore joinStore) {
        this.leftStore = new StoreContext(joinStore);
    }

    public void setRightStore(@NotNull JoinStore joinStore) {
        this.rightStore = new StoreContext(joinStore);
    }

    public void setKeyFields(String str) {
        this.keyFieldStr = str;
    }

    public void setTimeFieldStr(String str) {
        this.timeFieldStr = str;
    }

    public void setIncludeFields(String str) {
        this.includeFieldStr = str;
    }

    public StoreContext getLeftStore() {
        return this.leftStore;
    }

    public StoreContext getRightStore() {
        return this.rightStore;
    }

    public String getIncludeFieldStr() {
        return this.includeFieldStr;
    }

    public String getKeyFieldStr() {
        return this.keyFieldStr;
    }

    public String getTimeFieldStr() {
        return this.timeFieldStr;
    }

    private void populateTimeFields() {
        this.leftStore.setTimeFields(this.timeFieldStr.split(",")[0]);
        this.rightStore.setTimeFields(this.timeFieldStr.split(",")[1]);
    }

    public void setStrategy(String str) {
        this.strategy = JoinStrategy.valueOf(str.toUpperCase());
    }

    protected abstract T createOutputTuple();

    protected abstract void copyValue(T t, Object obj, boolean z);

    protected abstract Object getKeyValue(String str, Object obj);

    protected abstract Object getTime(String str, Object obj);

    static {
        $assertionsDisabled = !AbstractJoinOperator.class.desiredAssertionStatus();
    }
}
