package org.apache.apex.malhar.lib.dedup;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.netlet.util.Slice;
import java.util.concurrent.Future;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.managed.ManagedTimeUnifiedStateImpl;
import org.apache.apex.malhar.lib.state.managed.TimeBucketAssigner;
import org.apache.hadoop.classification.InterfaceStability;
import org.joda.time.Duration;
import org.joda.time.Instant;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/dedup/TimeBasedDedupOperator.class */
public class TimeBasedDedupOperator extends AbstractDeduper<Object> implements Operator.ActivationListener<Context> {

    @NotNull
    private String keyExpression;

    @NotNull
    private String timeExpression;

    @NotNull
    private long bucketSpan;

    @NotNull
    private long expireBefore;
    private transient Class<?> pojoClass;
    private transient PojoUtils.Getter<Object, Long> timeGetter;
    private transient PojoUtils.Getter<Object, Object> keyGetter;
    private transient StreamCodec<Object> streamCodec;
    private long referenceInstant = new Instant().getMillis() / 1000;

    @InputPortFieldAnnotation(schemaRequired = true)
    public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() { // from class: org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator.1
        public void setup(Context.PortContext portContext) {
            TimeBasedDedupOperator.this.pojoClass = (Class) portContext.getAttributes().get(Context.PortContext.TUPLE_CLASS);
            TimeBasedDedupOperator.this.streamCodec = TimeBasedDedupOperator.this.getDeduperStreamCodec();
        }

        public void process(Object obj) {
            TimeBasedDedupOperator.this.processTuple(obj);
        }

        public StreamCodec<Object> getStreamCodec() {
            return TimeBasedDedupOperator.this.streamCodec;
        }
    };

    public TimeBasedDedupOperator() {
        this.managedState = new ManagedTimeUnifiedStateImpl();
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    protected long getTime(Object obj) {
        return this.timeGetter.get(obj).longValue();
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    protected Slice getKey(Object obj) {
        return this.streamCodec.toByteArray(this.keyGetter.get(obj));
    }

    protected StreamCodec<Object> getDeduperStreamCodec() {
        return new DeduperStreamCodec(this.keyExpression);
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    public void setup(Context.OperatorContext operatorContext) {
        TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
        timeBucketAssigner.setBucketSpan(Duration.standardSeconds(this.bucketSpan));
        timeBucketAssigner.setExpireBefore(Duration.standardSeconds(this.expireBefore));
        timeBucketAssigner.setReferenceInstant(new Instant(this.referenceInstant * 1000));
        this.managedState.setTimeBucketAssigner(timeBucketAssigner);
        super.setup(operatorContext);
    }

    public void activate(Context context) {
        if (this.timeExpression != null) {
            this.timeGetter = PojoUtils.createGetter(this.pojoClass, this.timeExpression, Long.class);
        } else {
            this.timeGetter = null;
        }
        this.keyGetter = PojoUtils.createGetter(this.pojoClass, this.keyExpression, Object.class);
    }

    public void deactivate() {
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    protected Future<Slice> getAsyncManagedState(Object obj) {
        return ((ManagedTimeUnifiedStateImpl) this.managedState).getAsync(getTime(obj), getKey(obj));
    }

    @Override // org.apache.apex.malhar.lib.dedup.AbstractDeduper
    protected void putManagedState(Object obj) {
        ((ManagedTimeUnifiedStateImpl) this.managedState).put(getTime(obj), getKey(obj), new Slice(new byte[0]));
    }

    public String getKeyExpression() {
        return this.keyExpression;
    }

    public void setKeyExpression(String str) {
        this.keyExpression = str;
    }

    public String getTimeExpression() {
        return this.timeExpression;
    }

    public void setTimeExpression(String str) {
        this.timeExpression = str;
    }

    public long getBucketSpan() {
        return this.bucketSpan;
    }

    public void setBucketSpan(long j) {
        this.bucketSpan = j;
    }

    public long getExpireBefore() {
        return this.expireBefore;
    }

    public void setExpireBefore(long j) {
        this.expireBefore = j;
    }

    public long getReferenceInstant() {
        return this.referenceInstant;
    }

    public void setReferenceInstant(long j) {
        this.referenceInstant = j;
    }
}
