/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.flink.writer.internal.sink;

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
import com.microsoft.azure.flink.writer.internal.sink.KustoSinkCommon;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.jetbrains.annotations.NotNull;

@PublicEvolving
@Internal
public class KustoGenericWriteAheadSink<IN>
extends GenericWriteAheadSink<IN> {
    private final KustoConnectionOptions connectionOptions;
    private final KustoWriteOptions writeOptions;
    private KustoSinkCommon<IN> kustoSinkCommon;
    private final TypeSerializer<IN> serializer;
    private final TypeInformation<IN> typeInformation;

    public KustoGenericWriteAheadSink(KustoConnectionOptions connectionOptions, KustoWriteOptions writeOptions, CheckpointCommitter committer, TypeSerializer<IN> serializer, TypeInformation<IN> typeInformation, String jobID) throws Exception {
        super(committer, serializer, jobID);
        this.connectionOptions = connectionOptions;
        this.writeOptions = writeOptions;
        this.serializer = serializer;
        this.typeInformation = typeInformation;
    }

    public void open() throws Exception {
        super.open();
        if (!this.getRuntimeContext().isCheckpointingEnabled()) {
            throw new IllegalStateException("The write-ahead log requires checkpointing to be enabled.");
        }
        this.kustoSinkCommon = new KustoSinkCommon<IN>(this.connectionOptions, this.writeOptions, (MetricGroup)super.getRuntimeContext().getMetricGroup(), this.serializer, this.typeInformation, KustoGenericWriteAheadSink.class.getSimpleName());
    }

    protected boolean sendValues(@NotNull Iterable<IN> values, long checkpointId, long timestamp) throws Exception {
        return this.kustoSinkCommon.ingest(values);
    }

    public void close() throws Exception {
        super.close();
        this.kustoSinkCommon.close();
    }
}

