/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.twitter;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import java.util.concurrent.ArrayBlockingQueue;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import twitter4j.HashtagEntity;
import twitter4j.StallWarning;
import twitter4j.Status;
import twitter4j.StatusDeletionNotice;
import twitter4j.StatusListener;
import twitter4j.StreamListener;
import twitter4j.TwitterStream;
import twitter4j.TwitterStreamFactory;
import twitter4j.URLEntity;
import twitter4j.conf.ConfigurationBuilder;

@InterfaceStability.Evolving
public class TwitterSampleInput
implements InputOperator,
Operator.ActivationListener<Context.OperatorContext>,
StatusListener {
    public final transient DefaultOutputPort<Status> status = new DefaultOutputPort();
    public final transient DefaultOutputPort<String> text = new DefaultOutputPort();
    public final transient DefaultOutputPort<String> url = new DefaultOutputPort();
    public final transient DefaultOutputPort<String> hashtag = new DefaultOutputPort();
    public final transient DefaultOutputPort<?> userMention = null;
    public final transient DefaultOutputPort<?> media = null;
    private boolean debug;
    private transient Thread operatorThread;
    private transient TwitterStream ts;
    private transient ArrayBlockingQueue<Status> statuses = new ArrayBlockingQueue(0x100000);
    protected transient int count;
    private int feedMultiplier = 1;
    @Min(value=0L)
    private int feedMultiplierVariance = 0;
    @NotNull
    private String consumerKey;
    @NotNull
    private String consumerSecret;
    @NotNull
    private String accessToken;
    @NotNull
    private String accessTokenSecret;
    private boolean reConnect;
    private static final Logger logger = LoggerFactory.getLogger(TwitterSampleInput.class);

    public void setup(Context.OperatorContext context) {
        this.operatorThread = Thread.currentThread();
        if (this.feedMultiplier != 1) {
            logger.info("Load set to be {}% of the entire twitter feed", (Object)this.feedMultiplier);
        }
        ConfigurationBuilder cb = this.setupConfigurationBuilder();
        this.ts = new TwitterStreamFactory(cb.build()).getInstance();
    }

    public void teardown() {
        this.ts = null;
    }

    public void onStatus(Status status) {
        int randomMultiplier = this.feedMultiplier;
        if (this.feedMultiplierVariance > 0) {
            int min = this.feedMultiplier - this.feedMultiplierVariance;
            if (min < 0) {
                min = 0;
            }
            int max = this.feedMultiplier + this.feedMultiplierVariance;
            randomMultiplier = min + (int)(Math.random() * (double)(max - min + 1));
        }
        try {
            int i = randomMultiplier;
            while (i-- > 0) {
                this.statuses.put(status);
                ++this.count;
            }
        }
        catch (InterruptedException ex) {
            logger.debug("Streaming interrupted; Passing the inerruption to the operator", (Throwable)ex);
            this.operatorThread.interrupt();
        }
    }

    public void endWindow() {
        if (this.count % 16 == 0) {
            logger.debug("processed {} statuses", (Object)this.count);
        }
    }

    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
    }

    public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
    }

    public void onScrubGeo(long userId, long upToStatusId) {
    }

    public void onStallWarning(StallWarning stallWarning) {
    }

    public void onException(Exception ex) {
        logger.error("Sampling Error", (Throwable)ex);
        logger.debug("reconnect: {}", (Object)this.reConnect);
        this.ts.shutdown();
        if (this.reConnect) {
            try {
                Thread.sleep(1000L);
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.setUpTwitterConnection();
        } else {
            this.operatorThread.interrupt();
        }
    }

    protected ConfigurationBuilder setupConfigurationBuilder() {
        ConfigurationBuilder cb = new ConfigurationBuilder();
        cb.setDebugEnabled(this.debug).setOAuthConsumerKey(this.consumerKey).setOAuthConsumerSecret(this.consumerSecret).setOAuthAccessToken(this.accessToken).setOAuthAccessTokenSecret(this.accessTokenSecret);
        return cb;
    }

    private void setUpTwitterConnection() {
        ConfigurationBuilder cb = this.setupConfigurationBuilder();
        this.ts = new TwitterStreamFactory(cb.build()).getInstance();
        this.ts.addListener((StreamListener)this);
        this.ts.sample();
    }

    public void beginWindow(long windowId) {
    }

    public void activate(Context.OperatorContext context) {
        this.ts.addListener((StreamListener)this);
        this.ts.sample();
    }

    public void deactivate() {
        this.ts.shutdown();
    }

    public void setFeedMultiplier(int multiplier) {
        this.feedMultiplier = multiplier;
    }

    public int getFeedMultiplier() {
        return this.feedMultiplier;
    }

    public void setFeedMultiplierVariance(int multiplierVariance) {
        this.feedMultiplierVariance = multiplierVariance;
    }

    public int getFeedMultiplierVariance() {
        return this.feedMultiplierVariance;
    }

    public void emitTuples() {
        int size = this.statuses.size();
        while (size-- > 0) {
            HashtagEntity[] hashtagEntities;
            URLEntity[] entities;
            Status s = this.statuses.poll();
            if (this.status.isConnected()) {
                this.status.emit((Object)s);
            }
            if (this.text.isConnected()) {
                this.text.emit((Object)s.getText());
            }
            if (this.url.isConnected() && (entities = s.getURLEntities()) != null) {
                for (URLEntity uRLEntity : entities) {
                    this.url.emit((Object)(uRLEntity.getExpandedURL() == null ? uRLEntity.getURL() : uRLEntity.getExpandedURL()).toString());
                }
            }
            if (!this.hashtag.isConnected() || (hashtagEntities = s.getHashtagEntities()) == null) continue;
            for (URLEntity uRLEntity : hashtagEntities) {
                this.hashtag.emit((Object)uRLEntity.getText());
            }
        }
    }

    public void setDebug(boolean debug) {
        this.debug = debug;
    }

    public String getConsumerKey() {
        return this.consumerKey;
    }

    public void setConsumerKey(String consumerKey) {
        this.consumerKey = consumerKey;
    }

    public String getConsumerSecret() {
        return this.consumerSecret;
    }

    public void setConsumerSecret(String consumerSecret) {
        this.consumerSecret = consumerSecret;
    }

    public String getAccessToken() {
        return this.accessToken;
    }

    public void setAccessToken(String accessToken) {
        this.accessToken = accessToken;
    }

    public String getAccessTokenSecret() {
        return this.accessTokenSecret;
    }

    public void setAccessTokenSecret(String accessTokenSecret) {
        this.accessTokenSecret = accessTokenSecret;
    }

    public boolean isReConnect() {
        return this.reConnect;
    }

    public void setReConnect(boolean reConnect) {
        this.reConnect = reConnect;
    }

    public int hashCode() {
        int hash = 7;
        hash = 11 * hash + (this.debug ? 1 : 0);
        hash = 11 * hash + this.feedMultiplier;
        hash = 11 * hash + this.feedMultiplierVariance;
        hash = 11 * hash + (this.consumerKey != null ? this.consumerKey.hashCode() : 0);
        hash = 11 * hash + (this.consumerSecret != null ? this.consumerSecret.hashCode() : 0);
        hash = 11 * hash + (this.accessToken != null ? this.accessToken.hashCode() : 0);
        hash = 11 * hash + (this.accessTokenSecret != null ? this.accessTokenSecret.hashCode() : 0);
        return hash;
    }

    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (this.getClass() != obj.getClass()) {
            return false;
        }
        TwitterSampleInput other = (TwitterSampleInput)obj;
        if (this.debug != other.debug) {
            return false;
        }
        if (this.feedMultiplier != other.feedMultiplier) {
            return false;
        }
        if (this.feedMultiplierVariance != other.feedMultiplierVariance) {
            return false;
        }
        if (this.consumerKey == null ? other.consumerKey != null : !this.consumerKey.equals(other.consumerKey)) {
            return false;
        }
        if (this.consumerSecret == null ? other.consumerSecret != null : !this.consumerSecret.equals(other.consumerSecret)) {
            return false;
        }
        if (this.accessToken == null ? other.accessToken != null : !this.accessToken.equals(other.accessToken)) {
            return false;
        }
        return !(this.accessTokenSecret == null ? other.accessTokenSecret != null : !this.accessTokenSecret.equals(other.accessTokenSecret));
    }

    public String toString() {
        return "TwitterSampleInput{debug=" + this.debug + ", feedMultiplier=" + this.feedMultiplier + ", feedMultiplierVariance=" + this.feedMultiplierVariance + ", consumerKey=" + this.consumerKey + ", consumerSecret=" + this.consumerSecret + ", accessToken=" + this.accessToken + ", accessTokenSecret=" + this.accessTokenSecret + '}';
    }
}

