/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.zmq;

import com.datatorrent.api.Context;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.zmq.AbstractBaseZeroMQInputOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.ZMQ;

public abstract class AbstractBaseZeroMQOutputOperator
extends BaseOperator {
    private static final Logger logger = LoggerFactory.getLogger(AbstractBaseZeroMQInputOperator.class);
    protected transient ZMQ.Context context;
    protected transient ZMQ.Socket publisher;
    protected transient ZMQ.Socket syncservice;
    protected int SUBSCRIBERS_EXPECTED = 1;
    private String url;
    private String syncUrl;
    protected boolean syncStarted = false;

    public void setUrl(String url) {
        this.url = url;
    }

    public void setSyncUrl(String syncUrl) {
        this.syncUrl = syncUrl;
    }

    public void setSUBSCRIBERS_EXPECTED(int expected) {
        this.SUBSCRIBERS_EXPECTED = expected;
    }

    public int getSUBSCRIBERS_EXPECTED() {
        return this.SUBSCRIBERS_EXPECTED;
    }

    public void setup(Context.OperatorContext ctx) {
        logger.debug("O/P setup");
        this.context = ZMQ.context((int)1);
        this.publisher = this.context.socket(1);
        this.publisher.bind(this.url);
        this.syncservice = this.context.socket(4);
        this.syncservice.bind(this.syncUrl);
    }

    public void startSyncJob() {
        for (int subscribers = 0; subscribers < this.SUBSCRIBERS_EXPECTED; ++subscribers) {
            this.syncservice.recv(0);
            this.syncservice.send("".getBytes(), 0);
        }
        this.syncStarted = true;
    }

    public void teardown() {
        this.publisher.close();
        this.context.term();
    }
}

