/*
 * Decompiled with CFR 0.152.
 */
package com.datatorrent.contrib.nifi;

import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.DataPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNiFiInputOperator<T>
implements InputOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
    private transient SiteToSiteClient client;
    private final SiteToSiteClient.Builder siteToSiteBuilder;
    private transient int operatorContextId;
    private transient long currentWindowId;
    private transient List<T> currentWindowTuples;
    private transient List<T> recoveredTuples;
    private final WindowDataManager windowDataManager;

    public AbstractNiFiInputOperator(SiteToSiteClient.Builder siteToSiteBuilder, WindowDataManager windowDataManager) {
        this.siteToSiteBuilder = siteToSiteBuilder;
        this.windowDataManager = windowDataManager;
    }

    public void setup(Context.OperatorContext context) {
        this.client = this.siteToSiteBuilder.build();
        this.operatorContextId = context.getId();
        this.currentWindowTuples = new ArrayList<T>();
        this.recoveredTuples = new ArrayList<T>();
        this.windowDataManager.setup((Context)context);
    }

    public void beginWindow(long windowId) {
        this.currentWindowId = windowId;
        if (this.currentWindowId <= this.windowDataManager.getLargestCompletedWindow()) {
            try {
                List recoveredData = (List)this.windowDataManager.retrieve(windowId);
                if (recoveredData == null) {
                    return;
                }
                this.recoveredTuples.addAll(recoveredData);
            }
            catch (IOException e) {
                DTThrowable.rethrow((Exception)e);
            }
        }
    }

    public void emitTuples() {
        if (this.recoveredTuples.size() > 0) {
            this.emitTuples(this.recoveredTuples);
            this.recoveredTuples.clear();
            return;
        }
        try {
            Transaction transaction = this.client.createTransaction(TransferDirection.RECEIVE);
            if (transaction == null) {
                LOGGER.warn("A transaction could not be created, returning...");
                return;
            }
            DataPacket dataPacket = transaction.receive();
            if (dataPacket == null) {
                transaction.confirm();
                transaction.complete();
                LOGGER.debug("No data available to pull, returning and will try again...");
                return;
            }
            ArrayList<T> tuples = new ArrayList<T>();
            do {
                tuples.add(this.createTuple(dataPacket));
            } while ((dataPacket = transaction.receive()) != null);
            transaction.confirm();
            this.currentWindowTuples.addAll(tuples);
            this.windowDataManager.save(this.currentWindowTuples, this.currentWindowId);
            transaction.complete();
            this.emitTuples(tuples);
        }
        catch (IOException e) {
            DTThrowable.rethrow((Exception)e);
        }
    }

    protected abstract T createTuple(DataPacket var1) throws IOException;

    protected abstract void emitTuples(List<T> var1);

    public void endWindow() {
        try {
            this.windowDataManager.save(this.currentWindowTuples, this.currentWindowId);
        }
        catch (IOException e) {
            DTThrowable.rethrow((Exception)e);
        }
        this.currentWindowTuples.clear();
    }

    public void teardown() {
        LOGGER.debug("Tearing down operator...");
        this.windowDataManager.teardown();
        try {
            this.client.close();
        }
        catch (IOException e) {
            throw new RuntimeException("Error closing SiteToSiteClient", e);
        }
    }
}

