package com.datatorrent.lib.math;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
import com.datatorrent.lib.util.KeyValPair;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.mutable.MutableDouble;
import org.apache.commons.lang.mutable.MutableLong;

/* loaded from: input_file:com/datatorrent/lib/math/AverageKeyVal.class */
public class AverageKeyVal<K> extends BaseNumberKeyValueOperator<K, Number> {
    protected HashMap<K, MutableDouble> sums = new HashMap<>();
    protected HashMap<K, MutableLong> counts = new HashMap<>();
    public final transient DefaultInputPort<KeyValPair<K, ? extends Number>> data = new DefaultInputPort<KeyValPair<K, ? extends Number>>() { // from class: com.datatorrent.lib.math.AverageKeyVal.1
        public void process(KeyValPair<K, ? extends Number> keyValPair) {
            K key = keyValPair.getKey();
            if (AverageKeyVal.this.doprocessKey(key)) {
                MutableDouble mutableDouble = AverageKeyVal.this.sums.get(key);
                if (mutableDouble == null) {
                    mutableDouble = new MutableDouble(keyValPair.getValue().doubleValue());
                } else {
                    mutableDouble.add(keyValPair.getValue().doubleValue());
                }
                AverageKeyVal.this.sums.put(AverageKeyVal.this.cloneKey(key), mutableDouble);
                MutableLong mutableLong = AverageKeyVal.this.counts.get(key);
                if (mutableLong == null) {
                    mutableLong = new MutableLong(0L);
                    AverageKeyVal.this.counts.put(AverageKeyVal.this.cloneKey(key), mutableLong);
                }
                mutableLong.increment();
            }
        }
    };

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleAverage = new DefaultOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<KeyValPair<K, Integer>> intAverage = new DefaultOutputPort<>();

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<KeyValPair<K, Long>> longAverage = new DefaultOutputPort<>();

    public void endWindow() {
        for (Map.Entry<K, MutableDouble> entry : this.sums.entrySet()) {
            K key = entry.getKey();
            double doubleValue = entry.getValue().doubleValue();
            if (this.doubleAverage.isConnected()) {
                this.doubleAverage.emit(new KeyValPair(key, Double.valueOf(doubleValue / this.counts.get(key).doubleValue())));
            }
            if (this.intAverage.isConnected()) {
                this.intAverage.emit(new KeyValPair(key, Integer.valueOf((int) doubleValue)));
            }
            if (this.longAverage.isConnected()) {
                this.longAverage.emit(new KeyValPair(key, Long.valueOf((long) doubleValue)));
            }
        }
        this.sums.clear();
        this.counts.clear();
    }
}
