package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.dimensions.DimensionsDescriptor;
import com.datatorrent.lib.util.KeyValPair;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:com/datatorrent/lib/io/AbstractKeyValueStoreOutputOperator.class */
public abstract class AbstractKeyValueStoreOutputOperator<K, V> extends BaseOperator {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractKeyValueStoreOutputOperator.class);
    protected long currentWindowId;
    private transient int operatorId;
    private transient String appId;
    protected transient long committedWindowId = 0;
    protected Map<K, Object> dataMap = new HashMap();
    protected int continueOnError = 0;

    @InputPortFieldAnnotation(optional = true)
    public final transient DefaultInputPort<Map<K, V>> input = new DefaultInputPort<Map<K, V>>() { // from class: com.datatorrent.lib.io.AbstractKeyValueStoreOutputOperator.1
        public void process(Map<K, V> map) {
            if (AbstractKeyValueStoreOutputOperator.this.committedWindowId < AbstractKeyValueStoreOutputOperator.this.currentWindowId) {
                AbstractKeyValueStoreOutputOperator.this.process(map);
            }
        }
    };

    @InputPortFieldAnnotation(optional = true)
    public final transient DefaultInputPort<KeyValPair<K, V>> inputInd = new DefaultInputPort<KeyValPair<K, V>>() { // from class: com.datatorrent.lib.io.AbstractKeyValueStoreOutputOperator.2
        public void process(KeyValPair<K, V> keyValPair) {
            if (AbstractKeyValueStoreOutputOperator.this.committedWindowId < AbstractKeyValueStoreOutputOperator.this.currentWindowId) {
                AbstractKeyValueStoreOutputOperator.this.process(keyValPair.getKey(), keyValPair.getValue());
            }
        }
    };

    public void setContinueOnError(int i) {
        this.continueOnError = i;
    }

    public abstract String get(String str);

    public abstract void put(String str, String str2);

    public abstract void store(Map<K, Object> map);

    public abstract void startTransaction();

    public abstract void commitTransaction();

    public abstract void rollbackTransaction();

    public void process(Map<K, V> map) {
        this.dataMap.putAll(map);
    }

    public void process(K k, V v) {
        this.dataMap.put(k, v);
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.operatorId = operatorContext.getId();
        this.appId = (String) operatorContext.getValue(Context.DAGContext.APPLICATION_ID);
        String str = get(getEndWindowKey());
        if (str != null) {
            this.committedWindowId = Long.valueOf(str).longValue();
        }
    }

    public void beginWindow(long j) {
        this.currentWindowId = j;
        this.dataMap.clear();
    }

    public void endWindow() {
        try {
            if (this.committedWindowId < this.currentWindowId) {
                startTransaction();
                store(this.dataMap);
                put(getEndWindowKey(), String.valueOf(this.currentWindowId));
                commitTransaction();
                this.committedWindowId = this.currentWindowId;
            } else {
                LOG.info("Discarding data for window id {} because committed window is {}", Long.valueOf(this.currentWindowId), Long.valueOf(this.committedWindowId));
            }
        } catch (RuntimeException e) {
            logException("Error saving data", e);
            try {
                rollbackTransaction();
            } catch (RuntimeException e2) {
                logException("Error rolling back", e2);
            }
            if (this.continueOnError == 0) {
                throw e;
            }
        }
    }

    private String getEndWindowKey() {
        return "_ew:" + this.appId + DimensionsDescriptor.DELIMETER_SEPERATOR + this.operatorId;
    }

    private void logException(String str, Exception exc) {
        if (this.continueOnError != 0) {
            LOG.warn(str, exc);
        } else {
            LOG.error(str, exc);
        }
    }
}
