/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.flink.writer.internal.sink;

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
import com.microsoft.azure.flink.writer.internal.sink.KustoSinkCommon;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.util.ListCollector;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class KustoSinkWriter<IN>
implements SinkWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(KustoSinkWriter.class);
    private final KustoSinkCommon<IN> kustoSinkCommon;
    private final KustoWriteOptions writeOptions;
    private final MailboxExecutor mailboxExecutor;
    private final boolean flushOnCheckpoint;
    private final List<IN> bulkRequests = new ArrayList<IN>();
    private final Collector<IN> collector;
    private final Counter numRecordsOut;
    private boolean checkpointInProgress = false;
    private volatile transient boolean closed = false;
    private transient ScheduledExecutorService scheduler;
    private transient ScheduledFuture<?> scheduledFuture;
    private volatile transient Exception flushException;

    public KustoSinkWriter(KustoConnectionOptions connectionOptions, KustoWriteOptions writeOptions, @NotNull TypeSerializer<IN> serializer, @NotNull TypeInformation<IN> typeInformation, boolean flushOnCheckpoint, Sink.InitContext initContext) throws URISyntaxException {
        boolean flushOnlyOnCheckpoint;
        this.writeOptions = (KustoWriteOptions)Preconditions.checkNotNull((Object)writeOptions);
        this.flushOnCheckpoint = flushOnCheckpoint;
        Preconditions.checkNotNull((Object)initContext);
        this.mailboxExecutor = (MailboxExecutor)Preconditions.checkNotNull((Object)initContext.getMailboxExecutor());
        SinkWriterMetricGroup metricGroup = (SinkWriterMetricGroup)Preconditions.checkNotNull((Object)initContext.metricGroup());
        this.numRecordsOut = metricGroup.getNumRecordsSendCounter();
        this.collector = new ListCollector(this.bulkRequests);
        LOG.info("Initializing the class from KustoSinkWriter");
        this.kustoSinkCommon = new KustoSinkCommon<IN>((KustoConnectionOptions)Preconditions.checkNotNull((Object)connectionOptions), this.writeOptions, (MetricGroup)metricGroup, serializer, typeInformation, KustoSinkWriter.class.getSimpleName());
        metricGroup.setCurrentSendTimeGauge(() -> this.kustoSinkCommon.ackTime - this.kustoSinkCommon.lastSendTime);
        boolean bl = flushOnlyOnCheckpoint = writeOptions.getDeliveryGuarantee() == DeliveryGuarantee.AT_LEAST_ONCE;
        if (!flushOnlyOnCheckpoint && writeOptions.getBatchIntervalMs() > 0L) {
            this.scheduler = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("kusto-writer"));
            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> {
                KustoSinkWriter kustoSinkWriter = this;
                synchronized (kustoSinkWriter) {
                    if (!this.closed && this.isOverMaxBatchIntervalLimit()) {
                        try {
                            this.doBulkWrite();
                        }
                        catch (Exception e) {
                            this.flushException = e;
                        }
                    }
                }
            }, writeOptions.getBatchIntervalMs(), writeOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
        }
    }

    public void write(IN element, SinkWriter.Context context) throws IOException, InterruptedException {
        this.checkFlushException();
        while (this.checkpointInProgress) {
            this.mailboxExecutor.yield();
        }
        this.numRecordsOut.inc();
        this.collector.collect(element);
        if (this.isOverMaxBatchSizeLimit() || this.isOverMaxBatchIntervalLimit()) {
            this.doBulkWrite();
        }
    }

    public void flush(boolean endOfInput) throws IOException {
        this.checkFlushException();
        this.checkpointInProgress = true;
        while (!this.bulkRequests.isEmpty() && (this.flushOnCheckpoint || endOfInput)) {
            this.doBulkWrite();
        }
        this.checkpointInProgress = false;
    }

    public synchronized void close() throws Exception {
        if (!this.closed) {
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (!this.bulkRequests.isEmpty()) {
                try {
                    this.doBulkWrite();
                }
                catch (Exception e) {
                    LOG.error("Writing records to Kusto failed when closing KustoWriter", (Throwable)e);
                    throw new IOException("Writing records to Kusto failed.", e);
                }
                finally {
                    this.kustoSinkCommon.close();
                    this.closed = true;
                }
            } else {
                this.kustoSinkCommon.close();
                this.closed = true;
            }
        }
    }

    void doBulkWrite() throws IOException {
        if (this.bulkRequests.isEmpty()) {
            LOG.debug("No records to write to DB {} & table {} ", (Object)this.writeOptions.getDatabase(), (Object)this.writeOptions.getTable());
            return;
        }
        LOG.info("Ingesting to DB {} & table {} record count {}", new Object[]{this.writeOptions.getDatabase(), this.writeOptions.getTable(), this.bulkRequests.size()});
        if (this.kustoSinkCommon.ingest(this.bulkRequests)) {
            this.bulkRequests.clear();
        }
    }

    private boolean isOverMaxBatchSizeLimit() {
        boolean isOverMaxBatchSizeLimit;
        long bulkActions = this.writeOptions.getBatchSize();
        boolean bl = isOverMaxBatchSizeLimit = bulkActions != -1L && (long)this.bulkRequests.size() >= bulkActions;
        if (isOverMaxBatchSizeLimit) {
            LOG.debug("OverMaxBatchSizeLimit triggered at time {} with batch size {}.", (Object)Instant.now(Clock.systemUTC()), (Object)this.bulkRequests.size());
        }
        return isOverMaxBatchSizeLimit;
    }

    private boolean isOverMaxBatchIntervalLimit() {
        boolean isOverIntervalLimit;
        long bulkFlushInterval = this.writeOptions.getBatchIntervalMs();
        long lastSentInterval = Instant.now(Clock.systemUTC()).toEpochMilli() - this.kustoSinkCommon.lastSendTime;
        boolean bl = isOverIntervalLimit = this.kustoSinkCommon.lastSendTime >= 0L && bulkFlushInterval != -1L && lastSentInterval >= bulkFlushInterval;
        if (isOverIntervalLimit) {
            LOG.trace("OverMaxBatchIntervalLimit triggered at {}. LastSentTime {}.The last sent interval is {} and bulkFlushInterval {}.", new Object[]{Instant.now(Clock.systemUTC()), Instant.ofEpochMilli(this.kustoSinkCommon.lastSendTime), lastSentInterval, bulkFlushInterval});
        }
        return isOverIntervalLimit;
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to Kusto failed.", this.flushException);
        }
    }
}

