/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query;

import java.net.InetAddress;
import java.util.ArrayList;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.junit.Assert;
import org.junit.Test;

public class KvStateLocationTest {
    @Test
    public void testRegisterAndLookup() throws Exception {
        int keyGroup;
        KeyGroupRange keyGroupRange;
        int rangeIdx;
        JobID jobId = new JobID();
        JobVertexID jobVertexId = new JobVertexID();
        int numKeyGroups = 123;
        int numRanges = 10;
        int fract = numKeyGroups / numRanges;
        int remain = numKeyGroups % numRanges;
        ArrayList<KeyGroupRange> keyGroupRanges = new ArrayList<KeyGroupRange>(numRanges);
        int start = 0;
        for (int i = 0; i < numRanges; ++i) {
            int end = start + fract - 1;
            if (remain > 0) {
                --remain;
                ++end;
            }
            KeyGroupRange range = new KeyGroupRange(start, end);
            keyGroupRanges.add(range);
            start = end + 1;
        }
        String registrationName = "asdasdasdasd";
        KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numKeyGroups, registrationName);
        KvStateID[] kvStateIds = new KvStateID[numRanges];
        KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numRanges];
        InetAddress host = InetAddress.getLocalHost();
        int registeredCount = 0;
        for (rangeIdx = 0; rangeIdx < numRanges; ++rangeIdx) {
            kvStateIds[rangeIdx] = new KvStateID();
            serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx);
            keyGroupRange = (KeyGroupRange)keyGroupRanges.get(rangeIdx);
            location.registerKvState(keyGroupRange, kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
            Assert.assertEquals((long)(registeredCount += keyGroupRange.getNumberOfKeyGroups()), (long)location.getNumRegisteredKeyGroups());
        }
        for (rangeIdx = 0; rangeIdx < numRanges; ++rangeIdx) {
            keyGroupRange = (KeyGroupRange)keyGroupRanges.get(rangeIdx);
            for (keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
                Assert.assertEquals((Object)kvStateIds[rangeIdx], (Object)location.getKvStateID(keyGroup));
                Assert.assertEquals((Object)serverAddresses[rangeIdx], (Object)location.getKvStateServerAddress(keyGroup));
            }
        }
        for (rangeIdx = 0; rangeIdx < numRanges; ++rangeIdx) {
            kvStateIds[rangeIdx] = new KvStateID();
            serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx);
            location.registerKvState((KeyGroupRange)keyGroupRanges.get(rangeIdx), kvStateIds[rangeIdx], serverAddresses[rangeIdx]);
            Assert.assertEquals((long)registeredCount, (long)location.getNumRegisteredKeyGroups());
        }
        for (rangeIdx = 0; rangeIdx < numRanges; ++rangeIdx) {
            keyGroupRange = (KeyGroupRange)keyGroupRanges.get(rangeIdx);
            for (keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
                Assert.assertEquals((Object)kvStateIds[rangeIdx], (Object)location.getKvStateID(keyGroup));
                Assert.assertEquals((Object)serverAddresses[rangeIdx], (Object)location.getKvStateServerAddress(keyGroup));
            }
        }
        for (rangeIdx = 0; rangeIdx < numRanges; ++rangeIdx) {
            keyGroupRange = (KeyGroupRange)keyGroupRanges.get(rangeIdx);
            location.unregisterKvState(keyGroupRange);
            Assert.assertEquals((long)(registeredCount -= keyGroupRange.getNumberOfKeyGroups()), (long)location.getNumRegisteredKeyGroups());
        }
        for (rangeIdx = 0; rangeIdx < numRanges; ++rangeIdx) {
            keyGroupRange = (KeyGroupRange)keyGroupRanges.get(rangeIdx);
            for (keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) {
                Assert.assertEquals(null, (Object)location.getKvStateID(keyGroup));
                Assert.assertEquals(null, (Object)location.getKvStateServerAddress(keyGroup));
            }
        }
        Assert.assertEquals((long)0L, (long)location.getNumRegisteredKeyGroups());
    }
}

