package com.datatorrent.contrib.nifi;

import com.datatorrent.api.Context;
import com.datatorrent.api.InputOperator;
import com.datatorrent.netlet.util.DTThrowable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.apex.malhar.lib.wal.WindowDataManager;
import org.apache.nifi.remote.Transaction;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.client.SiteToSiteClient;
import org.apache.nifi.remote.protocol.DataPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/nifi/AbstractNiFiInputOperator.class */
public abstract class AbstractNiFiInputOperator<T> implements InputOperator {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractNiFiInputOperator.class);
    private transient SiteToSiteClient client;
    private final SiteToSiteClient.Builder siteToSiteBuilder;
    private transient int operatorContextId;
    private transient long currentWindowId;
    private transient List<T> currentWindowTuples;
    private transient List<T> recoveredTuples;
    private final WindowDataManager windowDataManager;

    public AbstractNiFiInputOperator(SiteToSiteClient.Builder builder, WindowDataManager windowDataManager) {
        this.siteToSiteBuilder = builder;
        this.windowDataManager = windowDataManager;
    }

    public void setup(Context.OperatorContext operatorContext) {
        this.client = this.siteToSiteBuilder.build();
        this.operatorContextId = operatorContext.getId();
        this.currentWindowTuples = new ArrayList();
        this.recoveredTuples = new ArrayList();
        this.windowDataManager.setup(operatorContext);
    }

    public void beginWindow(long j) {
        this.currentWindowId = j;
        if (this.currentWindowId <= this.windowDataManager.getLargestRecoveryWindow()) {
            try {
                List list = (List) this.windowDataManager.load(this.operatorContextId, j);
                if (list == null) {
                    return;
                }
                this.recoveredTuples.addAll(list);
            } catch (IOException e) {
                DTThrowable.rethrow(e);
            }
        }
    }

    public void emitTuples() {
        if (this.recoveredTuples.size() > 0) {
            emitTuples(this.recoveredTuples);
            this.recoveredTuples.clear();
            return;
        }
        try {
            Transaction createTransaction = this.client.createTransaction(TransferDirection.RECEIVE);
            if (createTransaction == null) {
                LOGGER.warn("A transaction could not be created, returning...");
                return;
            }
            DataPacket receive = createTransaction.receive();
            if (receive == null) {
                createTransaction.confirm();
                createTransaction.complete();
                LOGGER.debug("No data available to pull, returning and will try again...");
                return;
            }
            ArrayList arrayList = new ArrayList();
            do {
                arrayList.add(createTuple(receive));
                receive = createTransaction.receive();
            } while (receive != null);
            createTransaction.confirm();
            this.currentWindowTuples.addAll(arrayList);
            this.windowDataManager.save(this.currentWindowTuples, this.operatorContextId, this.currentWindowId);
            createTransaction.complete();
            emitTuples(arrayList);
        } catch (IOException e) {
            DTThrowable.rethrow(e);
        }
    }

    protected abstract T createTuple(DataPacket dataPacket) throws IOException;

    protected abstract void emitTuples(List<T> list);

    public void endWindow() {
        try {
            this.windowDataManager.save(this.currentWindowTuples, this.operatorContextId, this.currentWindowId);
        } catch (IOException e) {
            DTThrowable.rethrow(e);
        }
        this.currentWindowTuples.clear();
    }

    public void teardown() {
        LOGGER.debug("Tearing down operator...");
        this.windowDataManager.teardown();
        try {
            this.client.close();
        } catch (IOException e) {
            throw new RuntimeException("Error closing SiteToSiteClient", e);
        }
    }
}
