package org.apache.apex.malhar.lib.function;

import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader;
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.apex.malhar.lib.function.Function;
import org.apache.apex.malhar.lib.utils.ByteArrayClassLoader;
import org.apache.apex.malhar.lib.utils.KryoJavaSerializer;
import org.apache.apex.malhar.lib.utils.TupleUtil;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.file.tfile.DTFile;

@InterfaceStability.Evolving
/* loaded from: input_file:org/apache/apex/malhar/lib/function/FunctionOperator.class */
public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator {
    private byte[] annonymousFunctionClass;
    protected transient FUNCTION statelessF;

    @FieldSerializer.Bind(KryoJavaSerializer.class)
    protected final MutableObject<FUNCTION> statefulF;
    protected boolean stateful;
    protected boolean isAnnonymous;

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<OUT> output;

    @OutputPortFieldAnnotation(optional = true)
    public final transient DefaultOutputPort<Tuple<OUT>> tupleOutput;

    /* loaded from: input_file:org/apache/apex/malhar/lib/function/FunctionOperator$FilterFunctionOperator.class */
    public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>> {

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<IN> input;

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<Tuple<IN>> tupleInput;

        public FilterFunctionOperator() {
            this.input = new DefaultInputPort<IN>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.FilterFunctionOperator.1
                public void process(IN in) {
                    if (FilterFunctionOperator.this.getFunction().f(in)) {
                        FilterFunctionOperator.this.output.emit(in);
                    }
                }
            };
            this.tupleInput = new DefaultInputPort<Tuple<IN>>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.FilterFunctionOperator.2
                public void process(Tuple<IN> tuple) {
                    if (FilterFunctionOperator.this.getFunction().f(tuple.getValue())) {
                        FilterFunctionOperator.this.tupleOutput.emit(tuple);
                    }
                }
            };
        }

        public FilterFunctionOperator(Function.FilterFunction<IN> filterFunction) {
            super(filterFunction);
            this.input = new DefaultInputPort<IN>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.FilterFunctionOperator.1
                public void process(IN in) {
                    if (FilterFunctionOperator.this.getFunction().f(in)) {
                        FilterFunctionOperator.this.output.emit(in);
                    }
                }
            };
            this.tupleInput = new DefaultInputPort<Tuple<IN>>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.FilterFunctionOperator.2
                public void process(Tuple<IN> tuple) {
                    if (FilterFunctionOperator.this.getFunction().f(tuple.getValue())) {
                        FilterFunctionOperator.this.tupleOutput.emit(tuple);
                    }
                }
            };
        }

        @Override // org.apache.apex.malhar.lib.function.FunctionOperator
        public /* bridge */ /* synthetic */ void setup(Context context) {
            super.setup((Context.OperatorContext) context);
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/function/FunctionOperator$FlatMapFunctionOperator.class */
    public static class FlatMapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FlatMapFunction<IN, OUT>> {

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<IN> input;

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<Tuple<IN>> tupleInput;

        public FlatMapFunctionOperator() {
            this.input = new DefaultInputPort<IN>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.FlatMapFunctionOperator.1
                public void process(IN in) {
                    Iterator it = ((Iterable) FlatMapFunctionOperator.this.getFunction().f(in)).iterator();
                    while (it.hasNext()) {
                        FlatMapFunctionOperator.this.output.emit(it.next());
                    }
                }
            };
            this.tupleInput = new DefaultInputPort<Tuple<IN>>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.FlatMapFunctionOperator.2
                public void process(Tuple<IN> tuple) {
                    Function.FlatMapFunction<IN, OUT> function = FlatMapFunctionOperator.this.getFunction();
                    if (tuple instanceof Tuple.PlainTuple) {
                        Iterator it = ((Iterable) function.f(tuple.getValue())).iterator();
                        while (it.hasNext()) {
                            FlatMapFunctionOperator.this.tupleOutput.emit(TupleUtil.buildOf((Tuple.PlainTuple) tuple, it.next()));
                        }
                        return;
                    }
                    Iterator it2 = ((Iterable) function.f(tuple.getValue())).iterator();
                    while (it2.hasNext()) {
                        FlatMapFunctionOperator.this.output.emit(it2.next());
                    }
                }
            };
        }

        public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> flatMapFunction) {
            super(flatMapFunction);
            this.input = new DefaultInputPort<IN>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.FlatMapFunctionOperator.1
                public void process(IN in) {
                    Iterator it = ((Iterable) FlatMapFunctionOperator.this.getFunction().f(in)).iterator();
                    while (it.hasNext()) {
                        FlatMapFunctionOperator.this.output.emit(it.next());
                    }
                }
            };
            this.tupleInput = new DefaultInputPort<Tuple<IN>>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.FlatMapFunctionOperator.2
                public void process(Tuple<IN> tuple) {
                    Function.FlatMapFunction<IN, OUT> function = FlatMapFunctionOperator.this.getFunction();
                    if (tuple instanceof Tuple.PlainTuple) {
                        Iterator it = ((Iterable) function.f(tuple.getValue())).iterator();
                        while (it.hasNext()) {
                            FlatMapFunctionOperator.this.tupleOutput.emit(TupleUtil.buildOf((Tuple.PlainTuple) tuple, it.next()));
                        }
                        return;
                    }
                    Iterator it2 = ((Iterable) function.f(tuple.getValue())).iterator();
                    while (it2.hasNext()) {
                        FlatMapFunctionOperator.this.output.emit(it2.next());
                    }
                }
            };
        }

        @Override // org.apache.apex.malhar.lib.function.FunctionOperator
        public /* bridge */ /* synthetic */ void setup(Context context) {
            super.setup((Context.OperatorContext) context);
        }
    }

    /* loaded from: input_file:org/apache/apex/malhar/lib/function/FunctionOperator$MapFunctionOperator.class */
    public static class MapFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.MapFunction<IN, OUT>> {

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<IN> input;

        @InputPortFieldAnnotation(optional = true)
        public final transient DefaultInputPort<Tuple<IN>> tupleInput;

        public MapFunctionOperator() {
            this.input = new DefaultInputPort<IN>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.MapFunctionOperator.1
                public void process(IN in) {
                    MapFunctionOperator.this.output.emit(MapFunctionOperator.this.getFunction().f(in));
                }
            };
            this.tupleInput = new DefaultInputPort<Tuple<IN>>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.MapFunctionOperator.2
                public void process(Tuple<IN> tuple) {
                    Function.MapFunction<IN, OUT> function = MapFunctionOperator.this.getFunction();
                    if (tuple instanceof Tuple.PlainTuple) {
                        TupleUtil.buildOf((Tuple.PlainTuple) tuple, function.f(tuple.getValue()));
                    } else {
                        MapFunctionOperator.this.output.emit(function.f(tuple.getValue()));
                    }
                }
            };
        }

        public MapFunctionOperator(Function.MapFunction<IN, OUT> mapFunction) {
            super(mapFunction);
            this.input = new DefaultInputPort<IN>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.MapFunctionOperator.1
                public void process(IN in) {
                    MapFunctionOperator.this.output.emit(MapFunctionOperator.this.getFunction().f(in));
                }
            };
            this.tupleInput = new DefaultInputPort<Tuple<IN>>() { // from class: org.apache.apex.malhar.lib.function.FunctionOperator.MapFunctionOperator.2
                public void process(Tuple<IN> tuple) {
                    Function.MapFunction<IN, OUT> function = MapFunctionOperator.this.getFunction();
                    if (tuple instanceof Tuple.PlainTuple) {
                        TupleUtil.buildOf((Tuple.PlainTuple) tuple, function.f(tuple.getValue()));
                    } else {
                        MapFunctionOperator.this.output.emit(function.f(tuple.getValue()));
                    }
                }
            };
        }

        @Override // org.apache.apex.malhar.lib.function.FunctionOperator
        public /* bridge */ /* synthetic */ void setup(Context context) {
            super.setup((Context.OperatorContext) context);
        }
    }

    public FunctionOperator(FUNCTION function) {
        this.statefulF = new MutableObject<>();
        this.stateful = false;
        this.isAnnonymous = false;
        this.output = new DefaultOutputPort<>();
        this.tupleOutput = new DefaultOutputPort<>();
        this.isAnnonymous = function.getClass().isAnonymousClass();
        if (!this.isAnnonymous) {
            if (function instanceof Function.Stateful) {
                this.statelessF = function;
                return;
            } else {
                this.statefulF.setValue(function);
                this.stateful = true;
                return;
            }
        }
        this.annonymousFunctionClass = functionClassData(function);
        try {
            readFunction();
        } catch (Exception e) {
            this.annonymousFunctionClass = null;
            this.statefulF.setValue(function);
            this.stateful = true;
        }
    }

    private byte[] functionClassData(Function function) {
        Class<?> cls = function.getClass();
        String name = cls.getName();
        try {
            byte[] bytes = name.replace('.', '/').getBytes();
            byte[] byteArray = IOUtils.toByteArray(cls.getClassLoader().getResourceAsStream(name.replace('.', '/') + ".class"));
            int i = 0;
            for (int i2 = 0; i2 < byteArray.length; i2++) {
                i = byteArray[i2] != bytes[i] ? 0 : i + 1;
                if (i == bytes.length) {
                    for (int i3 = 0; i3 < bytes.length; i3++) {
                        if (byteArray[i2 - i3] == 36) {
                            byteArray[i2 - i3] = 95;
                        }
                    }
                    i = 0;
                }
            }
            ClassReader classReader = new ClassReader(new ByteArrayInputStream(byteArray));
            ClassWriter classWriter = new ClassWriter(0);
            classReader.accept(new AnnonymousClassModifier(DTFile.DEFAULT_INPUT_FS_BUF_SIZE, classWriter), 0);
            byte[] byteArray2 = classWriter.toByteArray();
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(bytes.length + 4 + 4);
            DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
            try {
                try {
                    dataOutputStream.writeInt(bytes.length);
                    dataOutputStream.write(name.replace('$', '_').getBytes());
                    dataOutputStream.writeInt(byteArray2.length);
                    dataOutputStream.write(byteArray2);
                    try {
                        dataOutputStream.flush();
                        dataOutputStream.close();
                        return byteArrayOutputStream.toByteArray();
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                } catch (IOException e2) {
                    throw new RuntimeException(e2);
                }
            } catch (Throwable th) {
                try {
                    dataOutputStream.flush();
                    dataOutputStream.close();
                    throw th;
                } catch (IOException e3) {
                    throw new RuntimeException(e3);
                }
            }
        } catch (IOException e4) {
            throw new RuntimeException(e4);
        }
    }

    public FunctionOperator() {
        this.statefulF = new MutableObject<>();
        this.stateful = false;
        this.isAnnonymous = false;
        this.output = new DefaultOutputPort<>();
        this.tupleOutput = new DefaultOutputPort<>();
    }

    public void beginWindow(long j) {
    }

    public void endWindow() {
    }

    @Override // 
    public void setup(Context.OperatorContext operatorContext) {
        readFunction();
    }

    private void readFunction() {
        try {
            if (this.statelessF == null && this.statefulF.getValue() == null) {
                DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(this.annonymousFunctionClass));
                byte[] bArr = new byte[dataInputStream.readInt()];
                dataInputStream.read(bArr);
                String str = new String(bArr);
                byte[] bArr2 = new byte[dataInputStream.readInt()];
                dataInputStream.read(bArr2);
                HashMap hashMap = new HashMap();
                hashMap.put(str, bArr2);
                this.statelessF = (FUNCTION) new ByteArrayClassLoader(hashMap, Thread.currentThread().getContextClassLoader()).findClass(str).newInstance();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void teardown() {
    }

    public FUNCTION getFunction() {
        readFunction();
        return this.stateful ? (FUNCTION) this.statefulF.getValue() : this.statelessF;
    }

    public FUNCTION getStatelessF() {
        return this.statelessF;
    }

    public void setStatelessF(FUNCTION function) {
        this.statelessF = function;
    }

    public FUNCTION getStatefulF() {
        return (FUNCTION) this.statefulF.getValue();
    }

    public void setStatefulF(FUNCTION function) {
        this.statefulF.setValue(function);
    }

    public boolean isStateful() {
        return this.stateful;
    }

    public void setStateful(boolean z) {
        this.stateful = z;
    }

    public boolean isAnnonymous() {
        return this.isAnnonymous;
    }

    public void setIsAnnonymous(boolean z) {
        this.isAnnonymous = z;
    }
}
