package com.datatorrent.stram.codec;

import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.codec.StatefulStreamCodec;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Registration;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.util.DefaultClassResolver;
import com.esotericsoftware.kryo.util.MapReferenceResolver;
import java.util.ArrayList;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/codec/DefaultStatefulStreamCodec.class */
public class DefaultStatefulStreamCodec<T> extends Kryo implements StatefulStreamCodec<T> {
    private final Output data;
    private final Output state;
    private final Input input;
    final ClassResolver classResolver;
    final ArrayList<ClassIdPair> pairs;
    private static final Logger logger = LoggerFactory.getLogger(DefaultStatefulStreamCodec.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/datatorrent/stram/codec/DefaultStatefulStreamCodec$ClassIdPair.class */
    public static class ClassIdPair {
        final int id;
        final String classname;

        ClassIdPair() {
            this.id = 0;
            this.classname = null;
        }

        ClassIdPair(int i, String str) {
            this.id = i;
            this.classname = str;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/codec/DefaultStatefulStreamCodec$ClassResolver.class */
    public static class ClassResolver extends DefaultClassResolver {
        int firstAvailableRegistrationId;
        int nextAvailableRegistrationId;
        final ArrayList<ClassIdPair> pairs = new ArrayList<>();

        public void unregister(int i) {
            this.classToRegistration.remove(((Registration) this.idToRegistration.remove(i)).getType());
            getRegistration(Integer.TYPE);
        }

        public Registration registerImplicit(Class cls) {
            while (getRegistration(this.nextAvailableRegistrationId) != null) {
                this.nextAvailableRegistrationId++;
            }
            this.pairs.add(new ClassIdPair(this.nextAvailableRegistrationId, cls.getName()));
            Serializer defaultSerializer = this.kryo.getDefaultSerializer(cls);
            int i = this.nextAvailableRegistrationId;
            this.nextAvailableRegistrationId = i + 1;
            return register(new Registration(cls, defaultSerializer, i));
        }

        public void registerExplicit(ClassIdPair classIdPair) throws ClassNotFoundException {
            Class<?> cls = Class.forName(classIdPair.classname, false, Thread.currentThread().getContextClassLoader());
            register(new Registration(cls, this.kryo.getDefaultSerializer(cls), classIdPair.id));
            if (this.nextAvailableRegistrationId <= classIdPair.id) {
                this.nextAvailableRegistrationId = classIdPair.id + 1;
            }
        }

        public void init() {
            this.firstAvailableRegistrationId = this.kryo.getNextRegistrationId();
            this.nextAvailableRegistrationId = this.firstAvailableRegistrationId;
        }

        public void unregisterImplicitlyRegisteredTypes() {
            while (this.nextAvailableRegistrationId > this.firstAvailableRegistrationId) {
                int i = this.nextAvailableRegistrationId - 1;
                this.nextAvailableRegistrationId = i;
                unregister(i);
            }
        }
    }

    public DefaultStatefulStreamCodec() {
        super(new ClassResolver(), new MapReferenceResolver());
        this.data = new Output(4096, Integer.MAX_VALUE);
        this.state = new Output(4096, Integer.MAX_VALUE);
        this.input = new Input();
        register(Class.class);
        register(ClassIdPair.class);
        this.classResolver = getClassResolver();
        this.pairs = this.classResolver.pairs;
        this.classResolver.init();
    }

    @Override // com.datatorrent.stram.codec.StatefulStreamCodec
    public Object fromDataStatePair(StatefulStreamCodec.DataStatePair dataStatePair) {
        if (dataStatePair.state != null) {
            try {
                try {
                    this.input.setBuffer(dataStatePair.state.buffer, dataStatePair.state.offset, dataStatePair.state.length);
                    while (this.input.position() < this.input.limit()) {
                        this.classResolver.registerExplicit((ClassIdPair) readClassAndObject(this.input));
                    }
                    dataStatePair.state = null;
                } catch (Throwable th) {
                    logger.error("Catastrophic Error: Execution halted due to Kryo exception!", th);
                    synchronized (this) {
                        try {
                            wait();
                            dataStatePair.state = null;
                        } catch (InterruptedException e) {
                            throw new RuntimeException("Serialization State Error Halt Interrupted", e);
                        }
                    }
                }
            } catch (Throwable th2) {
                dataStatePair.state = null;
                throw th2;
            }
        }
        this.input.setBuffer(dataStatePair.data.buffer, dataStatePair.data.offset, dataStatePair.data.length);
        try {
            return readClassAndObject(this.input);
        } catch (Throwable th3) {
            logger.error("Catastrophic Error: Execution halted due to Kryo exception!", th3);
            synchronized (this) {
                try {
                    wait();
                    return null;
                } catch (InterruptedException e2) {
                    throw new RuntimeException("Serialization Data Error Halt Interrupted", e2);
                }
            }
        }
    }

    @Override // com.datatorrent.stram.codec.StatefulStreamCodec
    public StatefulStreamCodec.DataStatePair toDataStatePair(T t) {
        StatefulStreamCodec.DataStatePair dataStatePair = new StatefulStreamCodec.DataStatePair();
        this.data.setPosition(0);
        writeClassAndObject(this.data, t);
        if (!this.pairs.isEmpty()) {
            this.state.setPosition(0);
            Iterator<ClassIdPair> it = this.pairs.iterator();
            while (it.hasNext()) {
                writeClassAndObject(this.state, it.next());
            }
            this.pairs.clear();
            byte[] bytes = this.state.toBytes();
            dataStatePair.state = new Slice(bytes, 0, bytes.length);
        }
        byte[] bytes2 = this.data.toBytes();
        dataStatePair.data = new Slice(bytes2, 0, bytes2.length);
        return dataStatePair;
    }

    public int getPartition(T t) {
        return t.hashCode();
    }

    @Override // com.datatorrent.stram.codec.StatefulStreamCodec
    public void resetState() {
        this.classResolver.unregisterImplicitlyRegisteredTypes();
    }

    public Object fromByteArray(Slice slice) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    public Slice toByteArray(T t) {
        throw new UnsupportedOperationException("Not supported yet.");
    }

    @Override // com.datatorrent.stram.codec.StatefulStreamCodec
    public DefaultStatefulStreamCodec<T> newInstance() {
        return new DefaultStatefulStreamCodec<>();
    }
}
