/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.nifi;

import com.datatorrent.api.Context;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.contrib.nifi.NiFiDataPacket;
import com.datatorrent.contrib.nifi.NiFiDataPacketBuilder;
import com.datatorrent.netlet.util.DTThrowable;
import java.io.IOException;
import java.util.List;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNiFiOutputOperator<T>
extends BaseOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiOutputOperator.class);
    protected final SiteToSiteClient.Builder siteToSiteBuilder;
    protected final NiFiDataPacketBuilder<T> dataPacketBuilder;
    protected final WindowDataManager windowDataManager;
    protected transient SiteToSiteClient client;
    private transient int operatorContextId;
    private transient long currentWindowId;
    private transient long largestRecoveryWindowId;
    protected transient boolean skipProcessingTuple = false;

    public AbstractNiFiOutputOperator(SiteToSiteClient.Builder siteToSiteBuilder, NiFiDataPacketBuilder<T> dataPacketBuilder, WindowDataManager windowDataManager) {
        this.siteToSiteBuilder = siteToSiteBuilder;
        this.dataPacketBuilder = dataPacketBuilder;
        this.windowDataManager = windowDataManager;
    }

    public void setup(Context.OperatorContext context) {
        this.client = this.siteToSiteBuilder.build();
        this.operatorContextId = context.getId();
        this.windowDataManager.setup((Context)context);
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        this.largestRecoveryWindowId = this.windowDataManager.getLargestCompletedWindow();
        this.skipProcessingTuple = this.currentWindowId <= this.largestRecoveryWindowId;
    }

    public void endWindow() {
        if (this.currentWindowId <= this.largestRecoveryWindowId) {
            return;
        }
        this.endNewWindow();
        try {
            this.windowDataManager.save((Object)"processedWindow", this.currentWindowId);
        }
        catch (IOException e) {
            DTThrowable.rethrow((Exception)e);
        }
    }

    protected abstract void endNewWindow();

    public void teardown() {
        LOGGER.debug("Tearing down operator...");
        this.windowDataManager.teardown();
        try {
            this.client.close();
        }
        catch (IOException e) {
            DTThrowable.rethrow((Exception)e);
        }
    }

    protected void processTuples(List<T> tuples) {
        if (tuples == null || tuples.size() == 0) {
            return;
        }
        try {
            Transaction transaction = this.client.createTransaction(TransferDirection.SEND);
            if (transaction == null) {
                throw new IllegalStateException("Unable to create a NiFi Transaction to send data");
            }
            for (T tuple : tuples) {
                NiFiDataPacket dp = this.dataPacketBuilder.createNiFiDataPacket(tuple);
                transaction.send(dp.getContent(), dp.getAttributes());
            }
            transaction.confirm();
            transaction.complete();
        }
        catch (IOException ioe) {
            DTThrowable.rethrow((Exception)ioe);
        }
    }
}

