雪花算法
雪花算法(Snowflake)是分布式系统中生成全局唯一、有序递增ID 的经典方案,核心是通过 64 位 Long 型数据按位拆分字段,兼顾唯一性、有序性和高效性。
1. 核心结构(64 位 Long 型)
雪花算法的 ID 结构按位划分,不同实现可能微调字段长度,主流标准如下:
-
第 1 位(符号位):固定为 0,确保 ID 为正数(Long 型符号位 0 表示正,1 表示负)。
-
第 2-42 位(时间戳位):共 41 位,存储
相对于固定起始时间的毫秒数
。
-
41 位可表示 2^41-1 毫秒,约 69 年,满足大部分系统生命周期需求。
-
第 43-52 位(机器 ID 位):共 10 位,用于区分不同节点(服务器、进程)。
-
10 位可支持 2^10=1024 个节点,适配中小型分布式集群。
-
第 53-64 位(序列号位):共 12 位,记录同一毫秒内的生成顺序。
-
12 位可支持每个节点每毫秒生成 2^12=4096 个 ID,应对高并发场景。
2. 工作原理
- 时间戳递增:ID 中的时间戳部分随系统时间毫秒级递增,保证 ID 整体有序。
- 机器 ID 唯一:提前为每个节点分配唯一的机器 ID(如通过配置文件、注册中心分配),避免跨节点 ID 冲突。
- 序列号循环:同一毫秒内多次生成 ID 时,序列号从 0 开始递增;跨毫秒时,序列号重置为 0。
3. 核心优势
- 全局唯一:通过 “时间戳 + 机器 ID + 序列号” 三重维度确保无重复 ID。
- 有序递增:时间戳在前,同一节点生成的 ID 按时间顺序递增,便于数据库索引优化。
- 无中心依赖:无需集群协调(如数据库、Redis),本地生成 ID,性能极高(单节点每秒可生成百万级 ID)。
- 占用空间小:64 位 Long 型存储,便于传输和存储。
4. 潜在局限与解决方案
- 时钟回拨问题:若节点系统时间回调(如同步 NTP 时间),可能生成重复 ID。
- 解决方案:检测到时钟回拨时,暂停 ID 生成直至时间追平,或使用上次生成的最大时间戳。
- 机器 ID 分配:需提前规划机器 ID,避免重复分配。
- 解决方案:通过配置中心统一分配,或结合节点 IP、端口计算生成。
- 生命周期限制:41 位时间戳仅支持约 69 年,需提前规划起始时间或后期扩展字段。
5. 代码实现
import time
import threading
class Snowflake:
"""
雪花算法生成器:生成64位全局唯一ID
结构:1位符号位(0) + 41位时间戳 + 10位机器ID + 12位序列号
"""
def __init__(self, machine_id: int, start_time: float = 1577808000000):
"""
初始化雪花算法生成器
:param machine_id: 机器ID(0-1023,10位最大支持1024台机器)
:param start_time: 起始时间戳(毫秒),默认2020-01-01 00:00:00的毫秒级时间戳
"""
# 校验机器ID合法性(10位最大为2^10-1=1023)
if machine_id < 0 or machine_id > 1023:
raise ValueError("机器ID必须在0-1023之间")
self.machine_id = machine_id # 机器ID
self.start_time = start_time # 起始时间戳(毫秒)
# 位偏移量定义
self.timestamp_bits = 41 # 时间戳位数
self.machine_bits = 10 # 机器ID位数
self.sequence_bits = 12 # 序列号位数
# 最大取值计算(位运算:n位最大为2^n -1)
self.max_sequence = (1 << self.sequence_bits) - 1 # 4095
self.max_timestamp = (1 << self.timestamp_bits) - 1 # 2^41-1
# 偏移量(用于拼接各部分)
self.machine_shift = self.sequence_bits # 机器ID需要左移12位
self.timestamp_shift = self.machine_bits + self.sequence_bits # 时间戳需要左移22位
# 状态变量(记录上一次生成ID的信息)
self.last_timestamp = 0 # 上一次时间戳
self.sequence = 0 # 当前毫秒内的序列号
# 线程锁(保证多线程环境下序列号安全)
self.lock = threading.Lock()
def _get_current_timestamp(self) -> int:
"""获取当前时间戳(毫秒)"""
return int(time.time() * 1000)
def generate_id(self) -> int:
"""生成一个雪花ID"""
with self.lock: # 加锁保证线程安全
current_timestamp = self._get_current_timestamp()
# 1. 处理时钟回拨(当前时间 < 上一次时间)
if current_timestamp < self.last_timestamp:
# 计算回拨毫秒数,这里选择等待时间追平(也可抛出异常)
sleep_ms = self.last_timestamp - current_timestamp
time.sleep(sleep_ms / 1000) # 转换为秒
current_timestamp = self._get_current_timestamp() # 重新获取时间
# 若仍回拨,说明时间异常,抛出错误
if current_timestamp < self.last_timestamp:
raise RuntimeError(f"时钟回拨异常,回拨{self.last_timestamp - current_timestamp}毫秒")
# 2. 处理同一毫秒内的序列号
if current_timestamp == self.last_timestamp:
self.sequence += 1
# 序列号超出最大值(4095),等待到下一个毫秒
if self.sequence > self.max_sequence:
# 循环等待至下一毫秒
while current_timestamp <= self.last_timestamp:
current_timestamp = self._get_current_timestamp()
self.sequence = 0 # 重置序列号
else:
# 跨毫秒,重置序列号
self.sequence = 0
# 3. 更新上一次时间戳
self.last_timestamp = current_timestamp
# 4. 计算相对时间戳(当前时间 - 起始时间)
relative_timestamp = current_timestamp - self.start_time
if relative_timestamp > self.max_timestamp:
raise RuntimeError("雪花算法已过期(超过41位时间戳上限)")
# 5. 拼接ID:时间戳 << 22 | 机器ID << 12 | 序列号
snowflake_id = (
(relative_timestamp << self.timestamp_shift)
| (self.machine_id << self.machine_shift)
| self.sequence
)
return snowflake_id
if __name__ == "__main__":
# 初始化生成器(机器ID=1,使用默认起始时间)
snowflake = Snowflake(machine_id=1)
# 生成10个ID并打印
for _ in range(10):
sid = snowflake.generate_id()
print(f"雪花ID: {sid}(长度:{len(str(sid))}位)")
# 多线程测试(验证线程安全)
def generate_in_thread():
for _ in range(5):
print(f"线程{threading.current_thread().name}生成ID: {snowflake.generate_id()}")
threads = [threading.Thread(target=generate_in_thread, name=f"thread-{i}") for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
发表评论