/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.stram.stream;

import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.client.Publisher;
import com.datatorrent.bufferserver.packet.BeginWindowTuple;
import com.datatorrent.bufferserver.packet.DataTuple;
import com.datatorrent.bufferserver.packet.EndStreamTuple;
import com.datatorrent.bufferserver.packet.EndWindowTuple;
import com.datatorrent.bufferserver.packet.PayloadTuple;
import com.datatorrent.bufferserver.packet.ResetWindowTuple;
import com.datatorrent.bufferserver.packet.WindowIdTuple;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.codec.StatefulStreamCodec;
import com.datatorrent.stram.engine.ByteCounterStream;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.tuple.Tuple;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BufferServerPublisher
extends Publisher
implements ByteCounterStream {
    private StreamCodec<Object> serde;
    private final AtomicLong publishedByteCount = new AtomicLong(0L);
    private EventLoop eventloop;
    private int count;
    private StatefulStreamCodec<Object> statefulSerde;
    private static final Logger logger = LoggerFactory.getLogger(BufferServerPublisher.class);

    public BufferServerPublisher(String sourceId, int queueCapacity) {
        super(sourceId, queueCapacity);
    }

    public void put(Object payload) {
        byte[] array;
        ++this.count;
        if (payload instanceof Tuple) {
            Tuple t = (Tuple)payload;
            switch (t.getType()) {
                case CHECKPOINT: {
                    if (this.statefulSerde != null) {
                        this.statefulSerde.resetState();
                    }
                    array = WindowIdTuple.getSerializedTuple((int)((int)t.getWindowId()));
                    array[0] = 10;
                    break;
                }
                case BEGIN_WINDOW: {
                    array = BeginWindowTuple.getSerializedTuple((int)((int)t.getWindowId()));
                    break;
                }
                case END_WINDOW: {
                    array = EndWindowTuple.getSerializedTuple((int)((int)t.getWindowId()));
                    break;
                }
                case END_STREAM: {
                    array = EndStreamTuple.getSerializedTuple((int)((int)t.getWindowId()));
                    break;
                }
                case RESET_WINDOW: {
                    com.datatorrent.stram.tuple.ResetWindowTuple rwt = (com.datatorrent.stram.tuple.ResetWindowTuple)t;
                    array = ResetWindowTuple.getSerializedTuple((int)rwt.getBaseSeconds(), (int)rwt.getIntervalMillis());
                    break;
                }
                default: {
                    throw new UnsupportedOperationException("this data type is not handled in the stream");
                }
            }
        } else if (this.statefulSerde == null) {
            array = PayloadTuple.getSerializedTuple((int)this.serde.getPartition(payload), (Slice)this.serde.toByteArray(payload));
        } else {
            StatefulStreamCodec.DataStatePair dsp = this.statefulSerde.toDataStatePair(payload);
            if (dsp.state != null) {
                array = DataTuple.getSerializedTuple((byte)11, (Slice)dsp.state);
                try {
                    while (!this.write(array)) {
                        Thread.sleep(5L);
                    }
                }
                catch (InterruptedException ie) {
                    throw new RuntimeException(ie);
                }
            }
            array = PayloadTuple.getSerializedTuple((int)this.statefulSerde.getPartition(payload), (Slice)dsp.data);
        }
        try {
            while (!this.write(array)) {
                Thread.sleep(5L);
            }
            this.publishedByteCount.addAndGet(array.length);
        }
        catch (InterruptedException ie) {
            throw new RuntimeException(ie);
        }
    }

    public void activate(StreamContext context) {
        this.setToken((byte[])context.get(StreamContext.BUFFER_SERVER_TOKEN));
        InetSocketAddress address = context.getBufferServerAddress();
        this.eventloop = (EventLoop)context.get(StreamContext.EVENT_LOOP);
        this.eventloop.connect(address.isUnresolved() ? new InetSocketAddress(address.getHostName(), address.getPort()) : address, (Listener.ClientListener)this);
        logger.debug("Registering publisher: {} {} windowId={} server={}", new Object[]{context.getSourceId(), context.getId(), Codec.getStringWindowId((long)context.getFinishedWindowId()), context.getBufferServerAddress()});
        super.activate(null, context.getFinishedWindowId());
    }

    public void deactivate() {
        this.setToken(null);
        this.eventloop.disconnect((Listener.ClientListener)this);
    }

    public void onMessage(byte[] buffer, int offset, int size) {
        throw new RuntimeException("OutputStream is not supposed to receive anything!");
    }

    public void setup(StreamContext context) {
        StreamCodec codec = (StreamCodec)context.get(StreamContext.CODEC);
        if (codec == null) {
            this.statefulSerde = ((StatefulStreamCodec)StreamContext.CODEC.defaultValue).newInstance();
        } else if (codec instanceof StatefulStreamCodec) {
            this.statefulSerde = ((StatefulStreamCodec)codec).newInstance();
        } else {
            this.serde = codec;
        }
    }

    public void teardown() {
    }

    @Override
    public long getByteCount(boolean reset) {
        if (reset) {
            return this.publishedByteCount.getAndSet(0L);
        }
        return this.publishedByteCount.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getCount(boolean reset) {
        try {
            int n = this.count;
            return n;
        }
        finally {
            if (reset) {
                this.count = 0;
            }
        }
    }
}

