什么是雪花算法?
雪花算法是由 Twitter 开源的分布式 ID 生成算法,用于生成 64 位的长整型唯一 ID。其结构如下:
- 1 位符号位:始终为 0
- 41 位时间戳:精确到毫秒
- 10 位工作机器 ID:包含 5 位数据中心 ID 和 5 位机器 ID
- 12 位序列号:同一毫秒内的自增序号
Golang 实现
以下是一个完整的 Golang 实现:
package snowflake
import (
"sync"
"time"
"errors"
)
// Snowflake 结构体
type Snowflake struct {
mutex sync.Mutex // 互斥锁
lastTimestamp int64 // 上次的时间戳
workerId int64 // 工作节点ID
datacenterId int64 // 数据中心ID
sequence int64 // 序列号
}
const (
workerBits = uint(5) // 工作节点ID位数
datacenterBits = uint(5) // 数据中心ID位数
sequenceBits = uint(12) // 序列号位数
maxWorkerId = -1 ^ (-1 << workerBits) // 最大工作节点ID
maxDatacenterId = -1 ^ (-1 << datacenterBits) // 最大数据中心ID
maxSequence = -1 ^ (-1 << sequenceBits) // 最大序列号
timeShift = workerBits + datacenterBits + sequenceBits // 时间戳左移位数
datacenterShift = workerBits + sequenceBits // 数据中心ID左移位数
workerShift = sequenceBits // 工作节点ID左移位数
epoch = int64(1672531200000) // 起始时间戳 (2023-01-01 00:00:00 +0800)
)
// 创建新的雪花算法实例
func NewSnowflake(datacenterId, workerId int64) (*Snowflake, error) {
if datacenterId > maxDatacenterId || datacenterId < 0 {
return nil, errors.New("datacenter ID超出范围")
}
if workerId > maxWorkerId || workerId < 0 {
return nil, errors.New("worker ID超出范围")
}
return &Snowflake{
lastTimestamp: -1,
workerId: workerId,
datacenterId: datacenterId,
sequence: 0,
}, nil
}
// 生成下一个ID
func (s *Snowflake) NextId() (int64, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
timestamp := time.Now().UnixMilli()
// 时钟回拨检查
if timestamp < s.lastTimestamp {
return 0, errors.New("时钟回拨,拒绝生成ID")
}
// 同一毫秒内
if timestamp == s.lastTimestamp {
s.sequence = (s.sequence + 1) & maxSequence
// 同一毫秒内序列号用完
if s.sequence == 0 {
// 等待下一毫秒
for timestamp <= s.lastTimestamp {
timestamp = time.Now().UnixMilli()
}
}
} else {
// 不同毫秒,序列号重置
s.sequence = 0
}
s.lastTimestamp = timestamp
// 组合ID
id := ((timestamp - epoch) << timeShift) |
(s.datacenterId << datacenterShift) |
(s.workerId << workerShift) |
s.sequence
return id, nil
}
package main
import (
"fmt"
"log"
"your/path/snowflake"
)
func main() {
// 创建雪花算法实例(数据中心ID=1, 工作节点ID=1)
sf, err := snowflake.NewSnowflake(1, 1)
if err != nil {
log.Fatal(err)
}
// 生成ID
for i := 0; i < 5; i++ {
id, err := sf.NextId()
if err != nil {
log.Fatal(err)
}
fmt.Printf("ID %d: %d\n", i+1, id)
}
}
主要特点
1. 线程安全 :使用互斥锁确保并发安全
2. 高性能 :使用位运算进行计算
3. 唯一性 :通过时间戳+机器ID+序列号保证唯一
4. 有序性 :生成的ID整体趋势递增
5. 可配置 :支持自定义数据中心ID和工作节点ID
使用场景
1. 分布式系统中的全局唯一ID生成
2. 数据库分表分库的主键生成
3. 消息队列的消息ID
4. 订单号生成系统
5. 分布式日志追踪ID
注意事项
1. 确保服务器时间的准确性
2. 合理分配数据中心ID和工作节点ID
3. 注意时钟回拨问题的处理
4. 根据业务需求调整各部分位数分配
这个实现是线程安全的,并且适合在生产环境中使用。如果需要处理时钟回拨问题,可以根据具体需求添加相应的处理机制。
理时钟回拨问题处理
1. 等待方案
最基础的处理方式,当检测到时钟回拨时等待一段时间。
type Snowflake struct {
mutex sync.Mutex
lastTimestamp int64
workerId int64
datacenterId int64
sequence int64
maxWaitTime int64 // 最大等待时间(毫秒)
}
func (s *Snowflake) NextId() (int64, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
timestamp := time.Now().UnixMilli()
// 处理时钟回拨
if timestamp < s.lastTimestamp {
offset := s.lastTimestamp - timestamp
if offset <= s.maxWaitTime {
// 等待时钟追上
time.Sleep(time.Duration(offset) * time.Millisecond)
timestamp = time.Now().UnixMilli()
} else {
return 0, errors.New("时钟回拨超过最大等待时间")
}
}
// ... 后续逻辑保持不变
}
2. 备份时钟方案
使用备份时钟来处理回拨问题。
type BackupSnowflake struct {
mutex sync.Mutex
lastTimestamp int64
workerId int64
datacenterId int64
sequence int64
backupDelta int64 // 备份时钟递增步长
}
func (s *BackupSnowflake) currentTimeMillis() int64 {
timestamp := time.Now().UnixMilli()
if timestamp < s.lastTimestamp {
// 发生时钟回拨,使用备份时钟
timestamp = s.lastTimestamp + s.backupDelta
}
return timestamp
}
func (s *BackupSnowflake) NextId() (int64, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
timestamp := s.currentTimeMillis()
if timestamp == s.lastTimestamp {
s.sequence = (s.sequence + 1) & maxSequence
if s.sequence == 0 {
timestamp = s.waitNextMillis(timestamp)
}
} else {
s.sequence = 0
}
s.lastTimestamp = timestamp
// ... 生成ID的逻辑
}
3. 号段预分配方案
预先分配一段序列号,降低对时间戳的依赖。
type SegmentSnowflake struct {
mutex sync.Mutex
lastTimestamp int64
workerId int64
datacenterId int64
sequence int64
currentSegment int64
segmentBits uint
segmentSize int64
}
func NewSegmentSnowflake(datacenterId, workerId int64) *SegmentSnowflake {
segmentBits := uint(12)
return &SegmentSnowflake{
datacenterId: datacenterId,
workerId: workerId,
segmentBits: segmentBits,
segmentSize: 1 << segmentBits,
currentSegment: 0,
}
}
func (s *SegmentSnowflake) NextId() (int64, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
timestamp := time.Now().UnixMilli()
if timestamp < s.lastTimestamp {
// 发生时钟回拨,使用当前号段内的序列号
if s.sequence < s.segmentSize-1 {
s.sequence++
return s.generateId(s.lastTimestamp, s.sequence), nil
}
// 当前号段用尽,切换到新的号段
s.currentSegment++
s.sequence = 0
return s.generateId(s.lastTimestamp, s.currentSegment, s.sequence), nil
}
if timestamp != s.lastTimestamp {
s.currentSegment = 0
s.sequence = 0
}
s.lastTimestamp = timestamp
return s.generateId(timestamp, s.currentSegment, s.sequence), nil
}
func (s *SegmentSnowflake) generateId(timestamp, segment, seq int64) int64 {
return ((timestamp - epoch) << timeShift) |
(s.datacenterId << datacenterShift) |
(s.workerId << workerShift) |
(segment << s.segmentBits) |
seq
}
使用建议:
1. 对于短时间回拨(几毫秒到几秒),使用等待方案
2. 对于中等时间回拨,使用备份时钟方案
3. 对于长时间回拨或高并发场景,使用号段预分配方案
4. 在实际生产环境中,建议结合多种方案,并加入监控和告警机制
选择哪种方案需要根据具体的业务场景来决定:
- 如果业务对延迟敏感,避免使用等待方案
- 如果并发量大,考虑使用号段预分配方案
- 如果对时间戳精度要求高,可以使用备份时钟方案