ZooKeeper 实战(六) - 生成分布式ID
文章目录
- ZooKeeper 实战(六) - 生成分布式ID
- 1.何为分布式ID
- 2.分布式ID方案
- 3.创建ZooKeeper节点
- 4.获取序列ID
- 5.处理序列ID
- 6.使用分布式ID
- 7.完整代码
- 8.功能优化
- 8.1.问题思考?
- 1.容量问题
- 2.并发问题
- 3.内存问题
- 8.2.解决并发问题
- 8.3.内存问题
1.何为分布式ID
分布式唯一ID指在分布式系统中用于标识和区分各个实体、资源或事件的唯一标识符。由于分布式系统可能包含多个节点和多个并发操作,需要确保在整个系统中每个实体都具有唯一的标识,避免冲突和重复的情况。
分布式系统唯一ID的设计通常需要满足以下要求:
- 唯一性:每个ID在整个分布式系统中都是唯一的,不会发生冲突。
- 高并发性:ID的生成应该是高并发的,能应对绝大部分(接近于100%)的并发场景,不会成为系统的性能瓶颈。
- 可读性:ID尽可能具有可读性,方便开发人员和用户进行识别和使用。
- 可扩展性:能够适应分布式系统的扩展性需求,支持产生大规模、高并发的ID。
- 可排序性:ID可以按照一定规则进行排序,方便查询和排序操作。主键的排序性也能提升数据库索引的效率。数据库的索引结构通常是基于B树或B+树的,有序的主键可以保持索引结构的有序性,减少数据的分裂和平衡,从而提高索引的维护效率和查询性能。
2.分布式ID方案
使用ZooKeeper实现生成分布式ID可以保证分布式系统中每个节点生成的ID是唯一且递增的。以下是使用ZooKeeper实现生成分布式ID的基本步骤:
- 创建ZooKeeper节点:在ZooKeeper集群中创建一个顺序节点来实现全局递增的功能。节点路径可以选择一个统一的命名规则,例如"/appName/distributeStr"。
- 获取序列ID:每个节点需要在生成ID的时候,向ZooKeeper集群发起一个创建节点的请求。在请求的路径中添加顺序节点的标志,例如"/appName/distributeStr/id_"。ZooKeeper将为每个节点创建一个唯一的有序节点,并返回节点的路径。
- 处理序列ID:获取到ZooKeeper返回的节点路径后,需要解析路径中的序列号,也可以附加上某些信息,作为生成的分布式ID。
- 使用分布式ID:使用解析出的分布式ID进行业务处理。分布式ID可以在多个节点上同时生成,并且保证每个节点生成的ID是唯一且递增的。
3.创建ZooKeeper节点
问题来了,我们要明确创建一个什么样的ZooKeeper节点呢?首先,要知道ID是用来标识一个实体的,而不是作用于整个系统。比如一个系统中,实体有用户、商品、订单等,但是我们并不需要保证用户、商品和订单之间的ID是唯一的,而是要保证同一类实体比如用户这一个类,即任意两个用户A和B的ID不能重复。所以我们可以把系统中用于产生分布式ID的Znode的粒度控制在实体维度,例如/app/user
节点是用户实体的分布式ID节点,/app/product
节点是商品实体的分布式ID节点等。
/**
* 分布式ID节点缓存
*/
private Map<String,String> NodePathMap = new HashMap<>();
/**
* 生成分布式ID节点路径
* @param module 模块名称
* @return
*/
public String getIdNodePath(String module){
if (module == null || module.isBlank()){
throw new NullPointerException("请设置模块名称");
}
if (NodePathMap.get(module) == null){
if (appName == null || appName.isBlank()){
throw new NullPointerException("请设置系统名称");
}
synchronized (IdGenerator.class){
if (NodePathMap.get(module) == null){
NodePathMap.put(module,"/"+appName+"/"+module);
}
}
}
return NodePathMap.get(module);
}
4.获取序列ID
先通过上一节中的临时顺序节点的路径,向ZooKeeper集群发起一个创建节点的请求并返回节点的名称,最后去除节点路径的前缀,获取最后的序列号。
/**
* 分布式ID节点的前缀
*/
public static final String ID_PREFIX = "id_";
/**
* 创建临时节点,并返回Id
* @param module
* @return
*/
public String createNodeId(String module){
String idNodePath = getIdNodePath(module);
try {
String idPrefix = idNodePath + "/" + ID_PREFIX;
String id = client.create() // 创建节点
.creatingParentsIfNeeded() // 如果需要,递归创建节点
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL) // 指定创建节点类型,使用临时顺序节点
.forPath(idPrefix); // 设置节点路径
// 去除nodeId的前缀
return id.replace(idPrefix,"");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
5.处理序列ID
在ZK生成的临时顺序节点ID的前附加时日期时间,以便直观显示此ID的生成日期。
/**
* 日期格式
*/
public static final String DATE_FORMAT = "yyyyMMdd";
/**
* 获取当天的日期
* 格式:yyyyMMdd
* @return
*/
public String datePrefix(){
DateTimeFormatter yyyyMMdd = DateTimeFormatter.ofPattern(DATE_FORMAT);
LocalDateTime now = LocalDateTime.now();
return now.format(yyyyMMdd);
}
/**
* 生成id
* @param module 模块
* @return
*/
public String nextId(String module){
// id前缀以日期开头
return datePrefix()+createNodeId(module);
}
6.使用分布式ID
一切从简,直接把测试方法写在启动类中😂。先启动对应的Zookeeper,然后启动同一个应用的多个实例,并发开始生成ID。
@Slf4j
@SpringBootApplication
public class CuratorDemoApplication implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(CuratorDemoApplication.class, args);
}
@Autowired
private IdGenerator idGenerator;
@Override
public void run(ApplicationArguments args) throws Exception {
// 为了保证两个应用能并发生成id,同时为了方便偷懒,不想写的很复杂
// 这里控制在当前时间的分钟为06时开始往下执行
while (new Date().getMinutes() != 6){
}
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
System.out.println("分布式id生成:"+idGenerator.nextId("test"));
}
}
日志输出如下:
实例1
实例2
7.完整代码
/**
* @Name: IdGenerator
* @Description: 分布式id生成器
* @Author: ahao
* @Date: 2024/8/24 23:42
*/
@Component
public class IdGenerator {
@Value("${spring.application.name}")
private String appName;
@Autowired
private CuratorFramework client;
/**
* 分布式ID节点的前缀
*/
public static final String ID_PREFIX = "id_";
/**
* 日期格式
*/
public static final String DATE_FORMAT = "yyyyMMdd";
/**
* 分布式ID节点缓存
* K -> 模块名称
* V -> 分布式ID节点路径
*/
private static final Map<String, String> NodePathMap = new HashMap<>();
/**
* 生成分布式ID节点路径
*
* @param module 模块名称
* @return
*/
public String getIdNodePath(String module) {
if (module == null || module.isBlank()) {
throw new NullPointerException("请设置模块名称");
}
if (NodePathMap.get(module) == null) {
if (appName == null || appName.isBlank()) {
throw new NullPointerException("请设置系统名称");
}
synchronized (IdGenerator.class) {
if (NodePathMap.get(module) == null) {
NodePathMap.put(module, "/" + appName + "/" + module);
}
}
}
return NodePathMap.get(module);
}
/**
* 创建临时节点,并返回Id
*
* @param module
* @return
*/
public String createNodeId(String module) {
String idNodePath = getIdNodePath(module);
// 创建节点
try {
String idPrefix = idNodePath + "/" + ID_PREFIX;
String id = client.create()
// 如果需要,递归创建节点
.creatingParentsIfNeeded()
// 指定创建节点类型,使用临时顺序节点
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
// 设置节点路径
.forPath(idPrefix);
// 去除nodeId的前缀
return id.replace(idPrefix, "");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 获取当天的日期
* 格式:yyyyMMdd
*
* @return
*/
public String datePrefix() {
DateTimeFormatter yyyyMMdd = DateTimeFormatter.ofPattern(DATE_FORMAT);
LocalDateTime now = LocalDateTime.now();
return now.format(yyyyMMdd);
}
/**
* 生成id
*
* @param module 模块
* @return
*/
public String nextId(String module) {
return datePrefix() + createNodeId(module);
}
}
8.功能优化
8.1.问题思考?
1.容量问题
在ZooKeeper中,临时顺序节点的序号长度默认为10个字符。这个长度是根据ZooKeeper解析生成的顺序节点路径时确定的。顺序节点路径由节点名称前缀和10位的顺序号组成。由此可见,最多可生成100亿的节点序列号,如果实体数据量超过了这个限度怎么办?
2.并发问题
还是一样的,每次生成分布式ID,都需要在Zookeeper创建一个临时顺序节点,而且每次请求都是远程调用,其并发性能远远低于本地调用。
3.内存问题
每次生成分布式ID,都需要在Zookeeper创建一个临时顺序节点,虽然说当客户端断开时,会自动删除这些遗留的节点。但是如果客户端生成id十分频繁,并且长期没有重启或者版本更新,这将会导致大量的”无效“节点存在,浪费了Zookeeper的内存资源,甚至会导致Zookeeper崩溃。
8.2.解决并发问题
如何把远程调用转变为本地调用呢?其实这是不可能的,本身就是依托于Zookeeper去实现分布式ID的分配。所以有没有别的办法呢,想象一下以下场景:假如,储户去银行柜台取钱,每一次取1块钱,然后柜员打开保险柜,从里面拿出一块钱,关闭保险柜,然后给储户一块钱。事实是这样吗?不是,柜员旁都会有部分现金,比如100张一元钱,每次我们取钱,就不需要去保险柜拿,而是直接从旁边的100张中拿一张给我们,等100张都发放完了,再去保险柜拿。由此省去了99次开关保险柜的重复动作,显著提高效率。
所以我们参照上述场景,Zookeeper好比存放现金的保险柜,应用是柜员,线程是储户。每次生成ID时,应用向Zookeeper申请一定份额的“ID”,然后所有的ID的发放完,再去向Zookeeper申请。
又一个问题接踵而至,如何实现请求一次就获取到100个ID呢?单位转换。比如本次生成的临时顺序节点的序号为0000000001,也就是1,但是在应用中,不认为这就是单纯的数字1,而是表示1份ID,这一份有100个ID,也可就是从100-199,即序号000000000100-000000000199。代码实现如下:
/**
* 份额
*/
private static final long share = 100;
/**
* ID的长度 = DATE_FORMAT的长度 + zk顺序节点的序号长度 + 份额的长度
*/
private static final int length = 20;
/**
* 用于存储每个模块,本地份额所用的数量
* K -> 模块
* V -> 已用份额
*/
private static final Map<String, Long> LocalSerialNumber = new HashMap<>();
/**
* 用于存储每个模块所分配的份额序号
* K -> 模块
* V -> 当前是第几份ID
*/
private static final Map<String, Long> RemoteSerialNumber = new HashMap<>();
public synchronized String generateId(String module) {
// 先判断是否已分配本地份额
long local = LocalSerialNumber.get(module) == null ? 0 : LocalSerialNumber.get(module);
long remote = RemoteSerialNumber.get(module) == null ? 0 : RemoteSerialNumber.get(module);
if (remote == 0 // 表示还未分配本地份额
|| local >= share // 表示本地份额已用完
) {
// 向Zookeeper请求分配"一份"ID
String nodeId = createNodeId(module);
remote = Long.valueOf(nodeId);
RemoteSerialNumber.put(module, remote);
// 重置local
local = 0;
}
LocalSerialNumber.put(module, local + 1);
// 当前序号
long sort = remote * share + local;
return padding(sort);
}
/**
* 转成固定长度的字符串
*
* @param sort 序号
* @return
*/
private String padding(long sort) {
String s = String.valueOf(sort);
return "0000000000".substring(0, length - DATE_FORMAT.length() - s.length()) + s;
}
/**
* 生成id
*
* @param module 模块
* @return
*/
public String nextId(String module) {
return datePrefix() + generateId(module);
}
在100次ID的生成中,只有一次远程调用,大大提高了系统的并发性能,同时也解决了容量问题,原本只能生成100亿个ID,经过单位转换(ZK中的序号1代表应用中的100),容量提高了100倍。如果有更高要求,可以提高份额至1000,10000等。
8.3.内存问题
为了模拟创建临时顺序节点的内存资源消耗,博主创建了创建了100w个临时顺序节点。并用visualVM监控Zookeeper的堆内存的占用情况,大约消耗内存460m,结果如下。
很明显,当客户端应用长期运行并且产生大量分布式ID时,Zookeeper需要承担大量的内存消耗。有什么办法能降低这种内存消耗呢?
org.apache.curator.framework.recipes.atomic.DistributedAtomicLong
是Curator框架提供的一种分布式原子计数器的实现。内部使用乐观锁实现,当失败时,再尝试加互斥排他锁。不论是乐观锁还是排他锁,都会按照重试策略进行重试操作。
/**
* 分布式原子计数器
*
* @param module
* @return
*/
public String distributedAtomicLong(String module) {
Assert.notNull(module, "模块名称不能为空");
// 获取当前模块对应的Znode路径
String idNodePath = getIdNodePath(module);
try {
// 创建分布式原子计数器的节点,并返回路径
String nodePath = ZKPaths.makePath(idNodePath, "AtomicLong");
// 重试策略 ExponentialBackoffRetry参数说明:
// baseSleepTimeMs 初始sleep时间
// maxRetries 最大重试次数
// maxSleepMs 最大sleep时间
RetryPolicy retryPolicy = new ExponentialBackoffRetry(50, 20, 200);
// 可升级锁的配置信息
String lockPath = idNodePath + "/AtomicLongLock";
RetryPolicy lockRetryPolicy = new ExponentialBackoffRetry(50, 10, 500);
PromotedToLock promotedToLock = PromotedToLock.builder()
.lockPath(lockPath)
.retryPolicy(lockRetryPolicy)
.timeout(500, TimeUnit.MILLISECONDS)
.build();
// 分布式原子计数器,首先尝试使用乐观锁进行增量操作,如果失败,则采用可选的InterProcessMutex锁进行增量操作。
// 对于乐观锁和悲观锁,重试策略都用于重试增量操作。
DistributedAtomicLong distributedAtomicLong = new DistributedAtomicLong(client, nodePath, retryPolicy, promotedToLock);
// 自增并获取分布式原子长整型
AtomicValue<Long> longAtomicValue = distributedAtomicLong.increment();
int retryTimes = 20;
// !longAtomicValue.succeeded() && i < retryTimes 表示如果获取失败并且重试次数小于规定的20次
for (int i = 0; !longAtomicValue.succeeded() && i < retryTimes; i++) {
// 继续增加
longAtomicValue= distributedAtomicLong.increment();
}
if (longAtomicValue.succeeded()) {
// 获取自增后的值
Long obj = longAtomicValue.postValue();
return String.valueOf(obj);
} else {
throw new RuntimeException("获取分布式ID失败:DistributedAtomicLong获取超时");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
优化后的内存消耗如下: