package com.datatorrent.lib.io;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.experimental.AppData;
import com.datatorrent.lib.appdata.StoreUtils;
import com.datatorrent.lib.appdata.query.WindowBoundedService;
import java.net.URI;
import java.net.URISyntaxException;
import javax.validation.constraints.Min;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/lib/io/PubSubWebSocketAppDataQuery.class */
public class PubSubWebSocketAppDataQuery extends PubSubWebSocketInputOperator<String> implements AppData.ConnectionInfoProvider, AppData.EmbeddableQueryInfoProvider<String> {
    private static final Logger logger = LoggerFactory.getLogger(PubSubWebSocketAppDataQuery.class);
    private static final long serialVersionUID = 201506121124L;
    public static final long DEFAULT_EXECUTE_INTERVAL_MILLIS = 10;
    private boolean useEmitThread;

    @Min(0)
    private long executeIntervalMillis = 10;
    private transient WindowBoundedService windowBoundedService;

    public PubSubWebSocketAppDataQuery() {
        this.skipNull = true;
    }

    @Override // com.datatorrent.lib.io.WebSocketInputOperator
    public void setup(Context.OperatorContext operatorContext) {
        setUri(uriHelper(operatorContext, getUri()));
        logger.debug("Setting up:\nuri:{}\ntopic:{}", getUri(), getTopic());
        super.setup(operatorContext);
        if (this.useEmitThread) {
            this.windowBoundedService = new WindowBoundedService(this.executeIntervalMillis, new StoreUtils.BufferingOutputPortFlusher(this.outputPort));
            this.windowBoundedService.setup(operatorContext);
        }
    }

    public void beginWindow(long j) {
        super.beginWindow(j);
        if (this.windowBoundedService != null) {
            this.windowBoundedService.beginWindow(j);
        }
    }

    public void endWindow() {
        if (this.windowBoundedService != null) {
            this.windowBoundedService.endWindow();
        }
        super.endWindow();
    }

    @Override // com.datatorrent.lib.io.WebSocketInputOperator
    public void teardown() {
        if (this.windowBoundedService != null) {
            this.windowBoundedService.teardown();
        }
        super.teardown();
    }

    public static URI uriHelper(Context.OperatorContext operatorContext, URI uri) {
        if (uri == null) {
            if (operatorContext.getValue(DAG.GATEWAY_CONNECT_ADDRESS) == null) {
                throw new IllegalArgumentException("The uri property is not set and the dt.attr.GATEWAY_CONNECT_ADDRESS is not defined");
            }
            try {
                uri = new URI("ws://" + ((String) operatorContext.getValue(DAG.GATEWAY_CONNECT_ADDRESS)) + "/pubsub");
            } catch (URISyntaxException e) {
                throw new RuntimeException(e);
            }
        }
        return uri;
    }

    @Override // com.datatorrent.lib.io.WebSocketInputOperator
    public URI getUri() {
        return super.getUri();
    }

    @Override // com.datatorrent.lib.io.WebSocketInputOperator
    public void setUri(URI uri) {
        super.setUri(uri);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.lib.io.PubSubWebSocketInputOperator, com.datatorrent.lib.io.WebSocketInputOperator
    public String convertMessage(String str) {
        try {
            return new JSONObject(str).getString("data");
        } catch (JSONException e) {
            return null;
        }
    }

    public String getAppDataURL() {
        return "pubsub";
    }

    public DefaultOutputPort<String> getOutputPort() {
        return this.outputPort;
    }

    public void enableEmbeddedMode() {
        this.useEmitThread = true;
    }

    public long getExecuteIntervalMillis() {
        return this.executeIntervalMillis;
    }

    public void setExecuteIntervalMillis(long j) {
        this.executeIntervalMillis = j;
    }
}
