动态扩缩容下的全局流水号设计

news2025/1/11 14:59:34

关于全局流水号,业内用的比较多的就是雪花算法,一直没理解在动态扩缩容下其中的workId和

datacenterId如何设置,查到了几个方法:reidis中取,待后期实践下。

先简单的介绍一下雪花算法,雪花算法生成的Id由:1bit 不用 + 41bit时间戳+10bit工作机器id+12bit序列号,如下图:

不用:1bit,因为最高位是符号位,0表示正,1表示负,所以这里固定为0
时间戳:41bit,服务上线的时间毫秒级的时间戳(为当前时间-服务第一次上线时间),这里为(2^41-1)/1000/60/60/24/365 = 49.7年
工作机器id:10bit,表示工作机器id,用于处理分布式部署id不重复问题,可支持2^10 = 1024个节点
序列号:12bit,用于离散同一机器同一毫秒级别生成多条Id时,可允许同一毫秒生成2^12 = 4096个Id,则一秒就可生成4096*1000 = 400w个Id


说明:上面总体是64位,具体位数可自行配置,如想运行更久,需要增加时间戳位数;如想支持更多节点,可增加工作机器id位数;如想支持更高并发,增加序列号位数

公司使用的 k8s 容器化部署服务应用,所以需要支持动态增加节点,并且每次部署的机器不一定一样时,就会有问题。参考了 雪花算法snowflake生成Id重复问题 其中的思想:

在redis中存储一个当前workerId的最大值
每次生成workerId时,从redis中获取到当前workerId最大值,并+1作为当前workerId,并存入redis
如果workerId为1023,自增为1024,则重置0,作为当前workerId,并存入redis
然后优化成以下逻辑:

定义一个 redis 作为缓存 key,然后服务每次初始化的时候都 incr 这个 key。
上面得到的 incr 的结果然后与 1024 取模。取模可以优化为:result & 0x000003FF
所以最后的代码为下面:

首先我们先定义雪花算法生成分布式 ID 类:

SnowflakeIdWorker.java

public class SnowflakeIdWorker {
    /** 开始时间截 (建议用服务第一次上线的时间,到毫秒级的时间戳) */
    private final long twepoch = 687888001020L;

    /** 机器id所占的位数 */
    private final long workerIdBits = 10L;

    /** 支持的最大机器id,结果是1023 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数) */
    private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

    /** 序列在id中占的位数 */
    private final long sequenceBits = 12L;

    /** 机器ID向左移12位 */
    private final long workerIdShift = sequenceBits;

    /** 时间截向左移22位(10+12) */
    private final long timestampLeftShift = sequenceBits + workerIdBits;

    /** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
     * <<为左移,每左移动1位,则扩大1倍
     * */
    private final long sequenceMask = -1L ^ (-1L << sequenceBits);

    /** 工作机器ID(0~1024) */
    private long workerId;

    /** 毫秒内序列(0~4095) */
    private long sequence = 0L;

    /** 上次生成ID的时间截 */
    private long lastTimestamp = -1L;

    //==============================Constructors=====================================
    /**
     * 构造函数
     * @param workerId 工作ID (0~1023)
     */
    public SnowflakeIdWorker(long workerId) {
        if (workerId > maxWorkerId || workerId < 0) {
            throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId));
        }
        this.workerId = workerId;
    }

    // ==============================Methods==========================================
    /**
     * 获得下一个ID (该方法是线程安全的)
     * @return SnowflakeId
     */
    public synchronized long nextId() {
        long timestamp = timeGen();
        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
        if (timestamp < lastTimestamp) {
            throw new RuntimeException(
                    String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
        }

        //如果是同一时间生成的,则进行毫秒内序列
        if (lastTimestamp == timestamp) {
            //如果毫秒相同,则从0递增生成序列号
            sequence = (sequence + 1) & sequenceMask;
            //毫秒内序列溢出
            if (sequence == 0) {
                //阻塞到下一个毫秒,获得新的时间戳
                timestamp = tilNextMillis(lastTimestamp);
            }
        }
        //时间戳改变,毫秒内序列重置
        else {
            sequence = 0L;
        }

        //上次生成ID的时间截
        lastTimestamp = timestamp;

        //移位并通过或运算拼到一起组成64位的ID
        return ((timestamp - twepoch) << timestampLeftShift) //
                | (workerId << workerIdShift) //
                | sequence;
    }

    /**
     * 阻塞到下一个毫秒,直到获得新的时间戳
     * @param lastTimestamp 上次生成ID的时间截
     * @return 当前时间戳
     */
    protected long tilNextMillis(long lastTimestamp) {
        long timestamp = timeGen();
        while (timestamp <= lastTimestamp) {
            timestamp = timeGen();
        }
        return timestamp;
    }

    /**
     * 返回以毫秒为单位的当前时间,从1970-01-01 08:00:00算起
     * @return 当前时间(毫秒)
     */
    protected long timeGen() {
        return System.currentTimeMillis();
    }

    public static void main(String[] args) {
        SnowflakeIdWorker snowflakeIdWorker = new SnowflakeIdWorker(1);
        Set<Long> params = new HashSet<>();
        for (int i = 0; i < 3000_0000; i++) {
            params.add(snowflakeIdWorker.nextId());
        }
        System.out.println(params.size());
    }

}


 

接着定义一个 ID 生成的接口以及实现类。

public interface IdManager {

    String getId();

}

 下面是实现类

@Slf4j
@Service("idManager")
public class IdManagerImpl implements IdManager {

    @Resource(name = "stringRedisTemplate")
    private StringRedisTemplate stringRedisTemplate;

    private SnowflakeIdWorker snowflakeIdWorker;

    @PostConstruct
    public void init() {
        String cacheKey = KeyUtils.getKey("order", "snowflake", "workerId", "incr");

        Long increment = stringRedisTemplate.opsForValue().increment(cacheKey);

        long workerId = increment & 0x000003FF;
        log.info("IdManagerImpl.init snowflake worker id is {}", workerId);

        snowflakeIdWorker = new SnowflakeIdWorker(workerId);
    }

    @Override
    public String getId() {
        long nextId = snowflakeIdWorker.nextId();

        return Long.toString(nextId);
    }
}

在服务每次上线的时候就会把之前的 incr 值加 1。然后与 1024 取模,最后 workerId 就会一直在 [0 ~ 1023] 范围内进行动态取值。


原文链接:https://blog.csdn.net/u012410733/article/details/121882691

还有的做法是依赖配置中心的数据,因为无论是扩缩容至少都要注册到注册中心上,那拿到注册中心上的ip和端口号来动态生成workId和datacenterId


import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.cloud.nacos.NacosServiceManager;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.AbstractEventListener;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.text.DecimalFormat;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

/**
 * SnowflakeId + Nacos
 */
@Component
public class SnowflakeIdGenerator {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private NacosServiceManager nacosServiceManager;

    @Autowired
    private NacosDiscoveryProperties nacosDiscoveryProperties;

    private static SnowflakeIdWorker snowflakeIdWorker;

    private static int nodeId;

    @PostConstruct
    public void run() throws Exception {
        init();
    }

    /**
     * 获取雪花Id
     *
     * @return
     */
    public static long nextId() {
        return snowflakeIdWorker.nextId();
    }

    /**
     * 获取当前节点Id
     *
     * @return
     */
    public static int nodeId() {
        return nodeId;
    }

    /**
     * 获取当前服务所有节点 + 增加服务监听
     *
     * @throws NacosException
     */
    private void init() throws NacosException {
        NamingService namingService = nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
        namingService.subscribe(nacosDiscoveryProperties.getService(), new AbstractEventListener() {
            @Override
            public void onEvent(Event event) {
                if (-1 == nacosDiscoveryProperties.getPort()) {
                    return;
                }
                nodeId = calcNodeId(((NamingEvent) event).getInstances());
                if (nodeId > 1024) {
                    throw new IllegalArgumentException("Worker & Datacenter Id calc results exceed 1024");
                }
                long workerId = nodeId % 31;
                long datacenterId = (long) Math.floor((float) nodeId / 31);
                logger.info("nodeId:" + nodeId + " workerId:" + workerId + " datacenterId:" + datacenterId);
                snowflakeIdWorker = new SnowflakeIdWorker(workerId, datacenterId);
            }
        });
    }

    /**
     * 用ip+port计算服务列表的索引
     *
     * @param instanceList
     * @return
     */
    private int calcNodeId(List<Instance> instanceList) {
        List<Long> ipPosrList = instanceList.stream()
                .map(x -> dealIpPort(x.getIp(), x.getPort()))
                .sorted(Comparator.naturalOrder())
                .collect(Collectors.toList());
        return ipPosrList.indexOf(dealIpPort(nacosDiscoveryProperties.getIp(), nacosDiscoveryProperties.getPort()));
    }

    /**
     * ip补0 + 端口号
     *
     * @param ip
     * @param port
     * @return
     */
    private static Long dealIpPort(String ip, int port) {
        String[] ips = ip.split("\\.");
        StringBuilder sbr = new StringBuilder();
        for (int i = 0; i < ips.length; i++) {
            sbr.append(new DecimalFormat("000").format(Integer.parseInt(ips[i])));
        }
        return Long.parseLong(sbr.toString() + port);
    }

}

代码在https://gitee.com/JiaXiaohei/snowflake-nacos

还有一种方法是这样获取的

@Configuration
public class SnowFlakeIdConfig {

    @Bean
    public SnowFlakeIdUtil propertyConfigurer() {
        return new SnowFlakeIdUtil(getWorkId(), getDataCenterId(), 10);
    }


    /**
     * workId使用IP生成
     * @return workId
     */
    private static Long getWorkId() {
        try {
            String hostAddress = Inet4Address.getLocalHost().getHostAddress();
            int[] ints = StringUtils.toCodePoints(hostAddress);
            int sums = 0;
            for (int b : ints) {
                sums = sums + b;
            }
            return (long) (sums % 32);
        }
        catch (UnknownHostException e) {
            // 失败就随机
            return RandomUtils.nextLong(0, 31);
        }
    }


    /**
     * dataCenterId使用hostName生成
     * @return dataCenterId
     */
    private static Long getDataCenterId() {
        try {
            String hostName = SystemUtils.getHostName();
            int[] ints = StringUtils.toCodePoints(hostName);
            int sums = 0;
            for (int i: ints) {
                sums = sums + i;
            }
            return (long) (sums % 32);
        }
        catch (Exception e) {
            // 失败就随机
            return RandomUtils.nextLong(0, 31);
        }
    }

}

有参考:雪花算法(snowflake)容器化部署支持动态增加节点_k8s雪花id重复-CSDN博客

用Nacos分配Snowflake的Worker ID_nacos workid-CSDN博客 雪花算法的原理和实现Java-CSDN博客

Leaf——美团点评分布式ID生成系统 - 美团技术团队 (meituan.com)

java 雪花算法 动态生成workId与dataCenterId - 胡子就不刮 - 博客园 (cnblogs.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1436785.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Zookeeper】what is Zookeeper?

官网地址&#xff1a;https://zookeeper.apache.org/https://zookeeper.apache.org/ 以下来自官网的介绍 ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services. A…

AI专题:AI应用落地的商业模式探索

今天分享的是AI 系列深度研究报告&#xff1a;《AI专题&#xff1a;AI应用落地的商业模式探索》。 &#xff08;报告出品方&#xff1a;国金证券&#xff09; 报告共计&#xff1a;27页 AI基座模型提供按量收费服务 以 ChatGPT 为代表的大模型能力涌现,为基座模型厂商带来增…

“小手艺”有“大情怀”, 《青春手艺人》赋能乡村振兴,传承新时代文化

文化传承发展要坚持“守正创新”&#xff0c;以守正创新的正气和锐气&#xff0c;赓续历史文脉、谱写当代华章。中央广播电视总台农业农村节目中心推出的聚焦年轻手艺人故事的微纪录片《青春手艺人》&#xff0c;为守正创新的文化传承增添了新的鲜活的青春故事。节目积极响应二…

MATLAB Fundamentals>>>(2/2) Project - Analyze Vehicle Data

#创作灵感# MATLAB基础知识官方课程学习笔记 MATLAB Fundamentals>Common Data Analysis Techniques>Summary of Common Data Analysis Techniques>(2/2) Project - Analyze Vehicle Data 任务名称&#xff1a;Fuel Economy Analysis 任务1&#xff1a; The variabl…

华为机考入门python3--(10)牛客10-字符个数统计

分类&#xff1a;字符 知识点&#xff1a; 字符的ASCII码 ord(char) 题目来自【牛客】 def count_unique_chars(s): # 创建一个空集合来保存不同的字符 unique_chars set() # 遍历字符串中的每个字符 for char in s: # 将字符转换为 ASCII 码并检查是否在范围内 #…

android retrofit上传List集合数据

由于接口需要&#xff0c;retrofit上传不能用POST,因为FormUrlEncoded注解跟Body不能共存&#xff0c;所以更改成了QueryMap 因为需要传参&#xff0c;所先将图片集合转成了Hashmap集合&#xff0c;再使用Gson 将集合转成Json 字符串 &#xff0c;再转成RequestBody 下面介绍一…

2024年【上海市安全员C3证】考试题库及上海市安全员C3证报名考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2024年【上海市安全员C3证】考试题库及上海市安全员C3证报名考试&#xff0c;包含上海市安全员C3证考试题库答案和解析及上海市安全员C3证报名考试练习。安全生产模拟考试一点通结合国家上海市安全员C3证考试最新大纲…

微信支付服务商,商户快速进件,减少工作量

大家好&#xff0c;我是小悟 服务商拓展特约商户&#xff0c;人工录入大量商户资料&#xff0c;耗时耗力。商户对标准费率不满意&#xff0c;无法说服商户先签约再帮其调整费率。 为了减少服务商工作量&#xff0c;服务商快速进件工具来了&#xff0c;分为移动端和管理端。用好…

MyBatis多数据源以及动态切换实现(基于SpringBoot 2.7.x)

MyBatis多数据源以及动态切换实现可以实现不同功能模块可以对应到不同的数据库&#xff0c;现在就让我们来讲解一下。 目录 一、引入Maven二、配置文件三、实现多数据源四、动态切换数据源 一、引入Maven 注意&#xff1a;博主这边使用的springboot版本是2.7.14的 <!-- htt…

用 Delphi 程序调用 Python 代码画曲线图

用 Python 的库画图 Python 代码如下&#xff1a; import matplotlib.pyplot as pltsquares [1, 4, 9, 16, 25]; plt.plot(squares); plt.grid(True) # 网格线 plt.show(); # 这句话会弹出个窗口出来&#xff0c;里面是上述数据的曲线。 把以上代码&#xff0c;放进 PyS…

Node.js(五)-跨域(了解)

一 、CORS相关 1. 接口的跨域问题 html: server: 访问结果&#xff1a; 刚才编写的 GET 和 POST接口&#xff0c;存在一个很严重的问题&#xff1a;不支持跨域请求。 解决接口跨域问题的方案主要有两种&#xff1a; ① CORS&#xff08;主流的解决方案&#xff0c;推荐使…

基础面试题整理7之Redis

1.redis持久化RDB、AOF RDB(Redis database) 在当前redis目录下生成一个dump.rdb文件&#xff0c;对redis数据进行备份 常用save、bgsave命令进行数据备份&#xff1a; save命令会阻塞其他redis命令&#xff0c;不会消耗额外的内存&#xff0c;与IO线程同步&#xff1b;bgsav…

gem5学习(17):ARM功耗建模——ARM Power Modelling

目录 一、Dynamic Power States 二、Power Usage Types 三、MathExprPowerModels 四、Extending an existing simulation 五、Stat dump frequency 六、Common Problems 官网教程&#xff1a;gem5: ARM Power Modelling 通过使用gem5中已记录的各种统计数据&#xff0c;…

掌握Web服务器之王:Nginx 学习网站全攻略!

介绍&#xff1a;Nginx是一款高性能的Web服务器&#xff0c;同时也是一个反向代理、负载均衡和HTTP缓存服务器。具体介绍如下&#xff1a; 轻量级设计&#xff1a;Nginx的设计理念是轻量级&#xff0c;这意味着它在占用最少的系统资源的同时提供高效的服务。 高并发能力&#x…

ROS笔记二:launch

目录 launch node标签 参数 参数服务器 节点分组 launch launch文件是一种可以可实现多节点启动和参数配置的xml文件,launch文件用于启动和配置ROS节点、参数和其他相关组件。launch文件通常使用XML格式编写&#xff0c;其主要目的是方便地启动ROS节点和设置节点之间的连…

【Unity优化(一)】音频优化

整理资教程&#xff1a;https://learn.u3d.cn/tutorial/unity-optimization-metaverse 1.音频优化 音频一般不会成为性能瓶颈&#xff0c;是为了节省内存和优化包体大小。 1.0 文件格式和压缩格式 原始音频资源尽量采用WAV格式。 移动平台音频尽量采用Vorbis压缩格式&#x…

PyTorch 2.2 中文官方教程(十八)

开始使用完全分片数据并行&#xff08;FSDP&#xff09; 原文&#xff1a;pytorch.org/tutorials/intermediate/FSDP_tutorial.html 译者&#xff1a;飞龙 协议&#xff1a;CC BY-NC-SA 4.0 作者&#xff1a;Hamid Shojanazeri&#xff0c;Yanli Zhao&#xff0c;Shen Li 注意…

微信小程序学习指南:从基础知识到代码展示

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

Windows系统安装Flink及实现MySQL之间数据同步

Apache Flink是一个框架和分布式处理引擎&#xff0c;用于对无界和有界数据流进行有状态计算。Flink的设计目标是在所有常见的集群环境中运行&#xff0c;并以内存执行速度和任意规模来执行计算。它支持高吞吐、低延迟、高性能的流处理&#xff0c;并且是一个面向流处理和批处理…

【Leetcode】292. Nim 游戏

文章目录 题目思路代码结果 题目 题目链接 你和你的朋友&#xff0c;两个人一起玩 Nim 游戏&#xff1a; 桌子上有一堆石头。你们轮流进行自己的回合&#xff0c; 你作为先手 。每一回合&#xff0c;轮到的人拿掉 1 - 3 块石头。拿掉最后一块石头的人就是获胜者。 假设你们每…