Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] SOFARegistry client load balancing #272

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.alipay.sofa.registry.client.consistenthash;

import java.nio.charset.Charset;

/**
* @author liqiuliang
* @create 2022-10-5
*/
public class CRCHashStrategy implements HashStrategy {

private static final int LOOKUP_TABLE[] = {0x0000, 0x1021, 0x2042, 0x3063,
0x4084, 0x50A5, 0x60C6, 0x70E7, 0x8108, 0x9129, 0xA14A, 0xB16B,
0xC18C, 0xD1AD, 0xE1CE, 0xF1EF, 0x1231, 0x0210, 0x3273, 0x2252,
0x52B5, 0x4294, 0x72F7, 0x62D6, 0x9339, 0x8318, 0xB37B, 0xA35A,
0xD3BD, 0xC39C, 0xF3FF, 0xE3DE, 0x2462, 0x3443, 0x0420, 0x1401,
0x64E6, 0x74C7, 0x44A4, 0x5485, 0xA56A, 0xB54B, 0x8528, 0x9509,
0xE5EE, 0xF5CF, 0xC5AC, 0xD58D, 0x3653, 0x2672, 0x1611, 0x0630,
0x76D7, 0x66F6, 0x5695, 0x46B4, 0xB75B, 0xA77A, 0x9719, 0x8738,
0xF7DF, 0xE7FE, 0xD79D, 0xC7BC, 0x48C4, 0x58E5, 0x6886, 0x78A7,
0x0840, 0x1861, 0x2802, 0x3823, 0xC9CC, 0xD9ED, 0xE98E, 0xF9AF,
0x8948, 0x9969, 0xA90A, 0xB92B, 0x5AF5, 0x4AD4, 0x7AB7, 0x6A96,
0x1A71, 0x0A50, 0x3A33, 0x2A12, 0xDBFD, 0xCBDC, 0xFBBF, 0xEB9E,
0x9B79, 0x8B58, 0xBB3B, 0xAB1A, 0x6CA6, 0x7C87, 0x4CE4, 0x5CC5,
0x2C22, 0x3C03, 0x0C60, 0x1C41, 0xEDAE, 0xFD8F, 0xCDEC, 0xDDCD,
0xAD2A, 0xBD0B, 0x8D68, 0x9D49, 0x7E97, 0x6EB6, 0x5ED5, 0x4EF4,
0x3E13, 0x2E32, 0x1E51, 0x0E70, 0xFF9F, 0xEFBE, 0xDFDD, 0xCFFC,
0xBF1B, 0xAF3A, 0x9F59, 0x8F78, 0x9188, 0x81A9, 0xB1CA, 0xA1EB,
0xD10C, 0xC12D, 0xF14E, 0xE16F, 0x1080, 0x00A1, 0x30C2, 0x20E3,
0x5004, 0x4025, 0x7046, 0x6067, 0x83B9, 0x9398, 0xA3FB, 0xB3DA,
0xC33D, 0xD31C, 0xE37F, 0xF35E, 0x02B1, 0x1290, 0x22F3, 0x32D2,
0x4235, 0x5214, 0x6277, 0x7256, 0xB5EA, 0xA5CB, 0x95A8, 0x8589,
0xF56E, 0xE54F, 0xD52C, 0xC50D, 0x34E2, 0x24C3, 0x14A0, 0x0481,
0x7466, 0x6447, 0x5424, 0x4405, 0xA7DB, 0xB7FA, 0x8799, 0x97B8,
0xE75F, 0xF77E, 0xC71D, 0xD73C, 0x26D3, 0x36F2, 0x0691, 0x16B0,
0x6657, 0x7676, 0x4615, 0x5634, 0xD94C, 0xC96D, 0xF90E, 0xE92F,
0x99C8, 0x89E9, 0xB98A, 0xA9AB, 0x5844, 0x4865, 0x7806, 0x6827,
0x18C0, 0x08E1, 0x3882, 0x28A3, 0xCB7D, 0xDB5C, 0xEB3F, 0xFB1E,
0x8BF9, 0x9BD8, 0xABBB, 0xBB9A, 0x4A75, 0x5A54, 0x6A37, 0x7A16,
0x0AF1, 0x1AD0, 0x2AB3, 0x3A92, 0xFD2E, 0xED0F, 0xDD6C, 0xCD4D,
0xBDAA, 0xAD8B, 0x9DE8, 0x8DC9, 0x7C26, 0x6C07, 0x5C64, 0x4C45,
0x3CA2, 0x2C83, 0x1CE0, 0x0CC1, 0xEF1F, 0xFF3E, 0xCF5D, 0xDF7C,
0xAF9B, 0xBFBA, 0x8FD9, 0x9FF8, 0x6E17, 0x7E36, 0x4E55, 0x5E74,
0x2E93, 0x3EB2, 0x0ED1, 0x1EF0,};

/**
* Create a CRC16 checksum from the bytes. implementation is from
* mp911de/lettuce, modified with some more optimizations
*
* @param bytes
* @return CRC16 as integer value
*/
public static int getCRC16(byte[] bytes) {
int crc = 0x0000;

for (byte b : bytes) {
crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >>> 8) ^ (b & 0xFF)) & 0xFF]);
}
return crc & 0xFFFF;
}

public static int getCRC16(String key) {
return getCRC16(key.getBytes(Charset.forName("UTF-8")));
}

@Override
public int getHashCode(String origin) {
// optimization with modulo operator with power of 2
// equivalent to getCRC16(key) % 16384
return getCRC16(origin) & (16384 - 1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.alipay.sofa.registry.client.consistenthash;


import com.alipay.sofa.registry.client.remoting.ServerNode;

import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
* @author liqiuliang
* @create 2022-10-5
*/
public class ConsistentHashLoadBalancer implements LoadBalancer {

private HashStrategy hashStrategy = new FnvHashStrategy();

private int virtualNodeSize = DEFAULT_VIRTUAL_NODE_SIZE;
private final static String VIRTUAL_NODE_SUFFIX = "&&";
private final static int DEFAULT_VIRTUAL_NODE_SIZE=10;

public ConsistentHashLoadBalancer() {
}

public ConsistentHashLoadBalancer(int virtualNodeSize) {
this.virtualNodeSize = virtualNodeSize;
}

public ConsistentHashLoadBalancer(HashStrategy hashStrategy) {
this.hashStrategy = hashStrategy;
}

public ConsistentHashLoadBalancer(int virtualNodeSize, HashStrategy hashStrategy) {
this.virtualNodeSize = virtualNodeSize;
this.hashStrategy = hashStrategy;
}

@Override
public ServerNode select(List<ServerNode> servers, Invocation invocation) {
int invocationHashCode = hashStrategy.getHashCode(invocation.getHashKey());
TreeMap<Integer, ServerNode> ring = buildConsistentHashRing(servers);
ServerNode server = locate(ring, invocationHashCode);
return server;
}

private ServerNode locate(TreeMap<Integer, ServerNode> ring, int invocationHashCode) {
Map.Entry<Integer, ServerNode> locateEntry = ring.ceilingEntry(invocationHashCode);
if (locateEntry == null) {
locateEntry = ring.firstEntry();
}
return locateEntry.getValue();
}

private TreeMap<Integer, ServerNode> buildConsistentHashRing(List<ServerNode> servers) {
TreeMap<Integer, ServerNode> virtualNodeRing = new TreeMap<>();
for (ServerNode server : servers) {
for (int i = 0; i < virtualNodeSize; i++) {
virtualNodeRing.put(hashStrategy.getHashCode(
server.getUrl() + VIRTUAL_NODE_SUFFIX + i), server);
}
}
return virtualNodeRing;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.alipay.sofa.registry.client.consistenthash;

/**
* @author liqiuliang
* @create 2022-10-5
* FNV1_32_HASH
*/

public class FnvHashStrategy implements HashStrategy {

private static final long FNV_32_INIT = 2166136261L;
private static final int FNV_32_PRIME = 16777619;

@Override
public int getHashCode(String origin) {
final int p = FNV_32_PRIME;
int hash = (int) FNV_32_INIT;
for (int i = 0; i < origin.length(); i++)
hash = (hash ^ origin.charAt(i)) * p;
hash += hash << 13;
hash ^= hash >> 7;
hash += hash << 3;
hash ^= hash >> 17;
hash += hash << 5;
hash = Math.abs(hash);
return hash;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.alipay.sofa.registry.client.consistenthash;

/**
* @author liqiuliang
* @create 2022-10-5
*/
public interface HashStrategy {
int getHashCode(String origin);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.alipay.sofa.registry.client.consistenthash;

/**
* @author liqiuliang
* @create 2022-10-5
*/
public class Invocation {
public Invocation() {
}

public Invocation(String hashKey) {
this.hashKey = hashKey;
}

private String hashKey;

public String getHashKey() {
return hashKey;
}

public void setHashKey(String hashKey) {
this.hashKey = hashKey;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.alipay.sofa.registry.client.consistenthash;

/**
* @author liqiuliang
* @create 2022-10-5
*/
public class JdkHashCodeStrategy implements HashStrategy {

@Override
public int getHashCode(String origin) {
return origin.hashCode();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.alipay.sofa.registry.client.consistenthash;


import com.alipay.sofa.registry.client.remoting.ServerNode;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;

/**
* @author liqiuliang
* @create 2022-10-5
*/
public class KetamaConsistentHashLoadBalancer implements LoadBalancer {
private static MessageDigest md5Digest;

// 每一个物理节点的虚拟节点副本个数
private int virtualNodeSize;

private final static String VIRTUAL_NODE_SUFFIX = "-";

static {
try {
md5Digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
}

public KetamaConsistentHashLoadBalancer() {
}

public KetamaConsistentHashLoadBalancer(int virtualNodeSize) {
this.virtualNodeSize = virtualNodeSize;
}

@Override
public ServerNode select(List<ServerNode> servers, Invocation invocation) {
long invocationHashCode = getHashCode(invocation.getHashKey());
TreeMap<Long, ServerNode> ring = buildConsistentHashRing(servers);
ServerNode server = locate(ring, invocationHashCode);
return server;
}

private ServerNode locate(TreeMap<Long, ServerNode> ring, Long invocationHashCode) {
Map.Entry<Long, ServerNode> locateEntry = ring.ceilingEntry(invocationHashCode);
if (locateEntry == null) {
locateEntry = ring.firstEntry();
}
return locateEntry.getValue();
}

private TreeMap<Long, ServerNode> buildConsistentHashRing(List<ServerNode> servers) {
TreeMap<Long, ServerNode> virtualNodeRing = new TreeMap<>();
for (ServerNode server : servers) {
for (int i = 0; i < virtualNodeSize / 4; i++) {
byte[] digest = computeMd5(server.getUrl() + VIRTUAL_NODE_SUFFIX + i);
for (int h = 0; h < 4; h++) {
Long k = ((long) (digest[3 + h * 4] & 0xFF) << 24)
| ((long) (digest[2 + h * 4] & 0xFF) << 16)
| ((long) (digest[1 + h * 4] & 0xFF) << 8)
| (digest[h * 4] & 0xFF);
virtualNodeRing.put(k, server);

}
}
}
return virtualNodeRing;
}

private long getHashCode(String origin) {
byte[] bKey = computeMd5(origin);
long rv = ((long) (bKey[3] & 0xFF) << 24)
| ((long) (bKey[2] & 0xFF) << 16)
| ((long) (bKey[1] & 0xFF) << 8)
| (bKey[0] & 0xFF);
return rv;
}

private static byte[] computeMd5(String k) {
MessageDigest md5;
try {
md5 = (MessageDigest) md5Digest.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("clone of MD5 not supported", e);
}
md5.update(k.getBytes());
return md5.digest();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.alipay.sofa.registry.client.consistenthash;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

/**
* @author liqiuliang
* @create 2022-10-5
*/
public class KetamaHashStrategy implements HashStrategy {
private static MessageDigest md5Digest;

static {
try {
md5Digest = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("MD5 not supported", e);
}
}

@Override
public int getHashCode(String origin) {
byte[] bKey = computeMd5(origin);
long rv = ((long) (bKey[3] & 0xFF) << 24)
| ((long) (bKey[2] & 0xFF) << 16)
| ((long) (bKey[1] & 0xFF) << 8)
| (bKey[0] & 0xFF);
return (int) (rv & 0xffffffffL);
}

/**
* Get the md5 of the given key.
*/
public static byte[] computeMd5(String k) {
MessageDigest md5;
try {
md5 = (MessageDigest) md5Digest.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("clone of MD5 not supported", e);
}
md5.update(k.getBytes());
return md5.digest();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.alipay.sofa.registry.client.consistenthash;

import com.alipay.sofa.registry.client.remoting.ServerNode;

import java.util.List;

/**
* @author liqiuliang
* @create 2022-10-5
*/
public interface LoadBalancer {

ServerNode select(List<ServerNode> servers, Invocation invocation);
}
Loading