/*
 * Decompiled with CFR 0.152.
 */
package com.microsoft.azure.kusto;

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;
import com.microsoft.azure.flink.writer.internal.committer.KustoCommitter;
import com.microsoft.azure.flink.writer.internal.sink.KustoGenericWriteAheadSink;
import com.microsoft.azure.flink.writer.internal.sink.KustoSink;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.util.Preconditions;
import org.jetbrains.annotations.Contract;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class KustoWriteSink {
    protected static final Logger LOG = LoggerFactory.getLogger(KustoWriteSink.class);
    protected KustoConnectionOptions connectionOptions;
    protected KustoWriteOptions writeOptions;

    private KustoWriteSink() {
    }

    @Contract(value=" -> new")
    @NotNull
    public static KustoWriteSink builder() {
        return new KustoWriteSink();
    }

    public KustoWriteSink setConnectionOptions(KustoConnectionOptions connectionOptions) {
        if (connectionOptions == null) {
            throw new IllegalArgumentException("Connection options cannot be null. Please use KustoConnectionOptions.Builder() to create one. ");
        }
        this.connectionOptions = connectionOptions;
        return this;
    }

    public KustoWriteSink setWriteOptions(KustoWriteOptions writeOptions) {
        if (writeOptions == null) {
            throw new IllegalArgumentException("Connection options cannot be null. Please use KustoConnectionOptions.Builder() to create one.");
        }
        this.writeOptions = writeOptions;
        return this;
    }

    protected void sanityCheck() {
        Preconditions.checkNotNull((Object)this.connectionOptions, (String)"Kusto connection options must be supplied.");
        Preconditions.checkArgument((StringUtils.isNotEmpty((CharSequence)this.connectionOptions.getAppId()) || StringUtils.isNotEmpty((CharSequence)this.connectionOptions.getManagedIdentityAppId()) ? 1 : 0) != 0, (Object)"Either AppId or ManagedIdentityAppId must be supplied.");
        Preconditions.checkNotNull((Object)this.writeOptions, (String)"Kusto write options must be supplied.");
        Preconditions.checkNotNull((Object)this.writeOptions.getDatabase(), (String)"Kusto write options should have database name specified");
        Preconditions.checkNotNull((Object)this.writeOptions.getTable(), (String)"Kusto write options should have table specified");
    }

    public <IN> void build(@NotNull DataStream<IN> dataStream) throws Exception {
        this.build(dataStream, 1);
    }

    public <IN> void build(@NotNull DataStream<IN> dataStream, int parallelism) throws Exception {
        this.sinkDataStream(dataStream, parallelism);
    }

    private <IN> DataStreamSink<IN> sinkDataStream(@NotNull DataStream<IN> dataStream, int parallelism) {
        boolean isSupportedType;
        TypeInformation typeInfo = dataStream.getType();
        TypeSerializer serializer = typeInfo.createSerializer(dataStream.getExecutionEnvironment().getConfig());
        boolean bl = isSupportedType = typeInfo instanceof TupleTypeInfo || typeInfo instanceof RowTypeInfo || typeInfo instanceof CaseClassTypeInfo || typeInfo instanceof PojoTypeInfo;
        if (!isSupportedType) {
            throw new IllegalArgumentException("No support for the type of the given DataStream: " + dataStream.getType());
        }
        this.sanityCheck();
        LOG.info("Building KustoSink with WriteOptions: {} and ConnectionOptions {}", (Object)this.writeOptions.toString(), (Object)this.connectionOptions.toString());
        return dataStream.sinkTo(new KustoSink(this.connectionOptions, this.writeOptions, serializer, typeInfo)).setParallelism(parallelism);
    }

    public <IN> void build(@NotNull DataStream<IN> dataStream, int parallelism, String name, String uid) throws Exception {
        DataStreamSink<IN> sinkStream = this.sinkDataStream(dataStream, parallelism);
        sinkStream.name(name).uid(uid);
    }

    public <IN> void buildWriteAheadSink(@NotNull DataStream<IN> dataStream) throws Exception {
        this.buildWriteAheadSink(dataStream, 1);
    }

    public <IN> void buildWriteAheadSink(@NotNull DataStream<IN> dataStream, int parallelism) throws Exception {
        boolean isSupportedType;
        TypeInformation typeInfo = dataStream.getType();
        TypeSerializer serializer = typeInfo.createSerializer(dataStream.getExecutionEnvironment().getConfig());
        boolean bl = isSupportedType = typeInfo instanceof TupleTypeInfo || typeInfo instanceof RowTypeInfo || typeInfo instanceof CaseClassTypeInfo || typeInfo instanceof PojoTypeInfo;
        if (!isSupportedType) {
            throw new IllegalArgumentException("No support for the type of the given DataStream: " + dataStream.getType());
        }
        this.sanityCheck();
        LOG.info("Building GenericWriteAheadSink with WriteOptions: {} and ConnectionOptions {}", (Object)this.writeOptions.toString(), (Object)this.connectionOptions.toString());
        KustoGenericWriteAheadSink genericSink = new KustoGenericWriteAheadSink(this.connectionOptions, this.writeOptions, new KustoCommitter(this.connectionOptions, this.writeOptions), serializer, typeInfo, UUID.randomUUID().toString());
        String operatorName = String.format("KustoGenericWriteAheadSink-%s-%s", this.writeOptions.getDatabase(), this.writeOptions.getTable());
        dataStream.transform(operatorName, typeInfo, genericSink).setParallelism(parallelism);
    }
}

