分布式锁实现
- 一 为什么要使用分布式锁
- 二 分布式锁应该具备哪些条件
- 三 分布式锁的三种实现方式
- 四 基于数据库的实现方式
- 五 基于Redis的实现方式
一 为什么要使用分布式锁
我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,可以使用我们学到的锁进行处理,并且可以完美的运行,毫无Bug!
注意这是单机应用,后来业务发展,需要做集群,一个应用需要部署到几台机器上然后做负载均衡,大致如下图:
上图可以看到,变量A存在三个服务器内存中(这个变量A主要体现是在一个类中的一个成员变量,是一个有状态的对象),如果不加任何控制的话,变量A同时都会在分配一块内存,三个请求发过来同时对这个变量操作,显然结果是不对的!即使不是同时发过来,三个请求分别操作三个不同内存区域的数据,变量A之间不存在共享,也不具有可见性,处理的结果也是不对的!
如果我们业务中确实存在这个场景的话,我们就需要一种方法解决这个问题!
为了保证一个方法或属性在高并发情况下的同一时间只能被同一个线程执行,在传统单体应用单机部署的情况下,可以使用并发处理相关的功能进行互斥控制。但是,随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的应用并不能提供分布式锁的能力。为了解决这个问题就需要一种跨机器的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
二 分布式锁应该具备哪些条件
在分析分布式锁的三种实现方式之前,先了解一下分布式锁应该具备哪些条件:
- 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
- 高可用的获取锁与释放锁;
- 高性能的获取锁与释放锁;
- 具备可重入特性;
- 具备锁失效机制,防止死锁;
- 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。
三 分布式锁的三种实现方式
目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。
在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。
# 三种实现方式
基于数据库实现分布式锁;
基于缓存(Redis等)实现分布式锁;
基于Zookeeper实现分布式锁;
四 基于数据库的实现方式
基于数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。
(1)创建一个表:
DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
`desc` varchar(255) NOT NULL COMMENT '备注信息',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';
(2)想要执行某个方法,就使用这个方法名向表中插入数据:
INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');
因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。
(3)成功插入则获取锁,执行完成后删除对应的行数据释放锁:
delete from method_lock where method_name ='methodName';
注意:这只是使用基于数据库的一种方法,使用数据库实现分布式锁还有很多其他的玩法!
使用基于数据库的这种实现方式很简单,但是对于分布式锁应该具备的条件来说,它有一些问题需要解决及优化:
- 因为是基于数据库实现的,数据库的可用性和性能将直接影响分布式锁的可用性及性能,所以,数据库需要双机部署、数据同步、主备切换;
- 不具备可重入的特性,因为同一个线程在释放锁之前,行数据一直存在,无法再次成功插入数据,所以,需要在表中新增一列,用于记录当前获取到锁的机器和线程信息,在再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁;
- 没有锁失效机制,因为有可能出现成功插入数据后,服务器宕机了,对应的数据没有被删除,当服务恢复后一直获取不到锁,所以,需要在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;
- 不具备阻塞锁特性,获取不到锁直接返回失败,所以需要优化获取逻辑,循环多次去获取。
- 在实施的过程中会遇到各种不同的问题,为了解决这些问题,实现方式将会越来越复杂;依赖数据库需要一定的资源开销,性能问题需要考虑。
五 基于Redis的实现方式
选用Redis实现分布式锁原因:
- Redis有很高的性能;
- Redis命令对此支持较好,实现起来比较方便
使用命令介绍:
- SETNX:
SETNX key val
:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。 - expire:
expire key timeout
:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。 - delete:
delete key
:删除key
在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。
实现思想:
- 获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。
- 获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。
- 释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
python实现redis分布式锁
pip3 install redlock-py
github地址:https://github.com/SPSCommerce/redlock-py
源码安装:
python3 setup.py install
import redis
import uuid
import time
from threading import Thread, get_ident
# 连接redis
redis_client = redis.Redis(host="localhost",
port=6379,
# password=password,
db=10)
# 获取一个锁
# lock_name:锁定名称
# acquire_time: 客户端等待获取锁的时间
# time_out: 锁的超时时间
def acquire_lock(lock_name, acquire_time=10, time_out=10):
"""获取一个分布式锁"""
identifier = str(uuid.uuid4())
end = time.time() + acquire_time
lock = "string:lock:" + lock_name
while time.time() < end:
if redis_client.setnx(lock, identifier):
# 给锁设置超时时间, 防止进程崩溃导致其他进程无法获取锁
redis_client.expire(lock, time_out)
return identifier
elif not redis_client.ttl(lock):
redis_client.expire(lock, time_out)
time.sleep(0.001)
return False
# 释放一个锁
def release_lock(lock_name, identifier):
"""通用的锁释放函数"""
lock = "string:lock:" + lock_name
pip = redis_client.pipeline(True)
while True:
try:
pip.watch(lock)
lock_value = redis_client.get(lock)
if not lock_value:
return True
if lock_value.decode() == identifier:
pip.multi()
pip.delete(lock)
pip.execute()
return True
pip.unwatch()
break
except redis.excetions.WacthcError:
pass
return False
def seckill():
identifier = acquire_lock('resource')
print(get_ident(), "获得了锁")
release_lock('resource', identifier)
print(get_ident(), "释放了锁")
if __name__ == '__main__':
for i in range(50):
t = Thread(target=seckill)
t.start()