1. 分布式锁的定义
分布式锁是一种在分布式系统中用来协调多个进程或线程对共享资源进行访问的机制。它确保在分布式环境下,多个节点(如不同的服务器或进程)不会同时访问同一个共享资源,从而避免数据不一致、资源竞争等问题。
2. 分布式锁的工作原理
分布式锁的工作原理与单机锁类似,但它需要考虑多个节点之间的协调。在获取锁时,进程必须确保锁的唯一性,即在任何时刻,只有一个进程能够成功获取锁,并且锁的状态能够在不同节点之间保持一致。通常,分布式锁的实现需要满足以下条件:
条件 | 描述 |
---|---|
互斥性 | 在同一时间,只有一个进程能获得锁,确保没有其他进程能够同时访问该资源。 |
死锁避免 | 应防止由于某些原因(如进程崩溃或网络问题)导致的死锁情况。这通常通过设置锁的过期时间来实现,确保锁在持有者意外失联时能够自动释放。 |
容错性 | 即使某些节点发生故障或网络分区,锁机制依然能够正确运行,或在适当的时间内恢复正常。这意味着锁的状态在多个节点之间要保持一致,且系统具备一定的自愈能力。 |
3. 基于数据库
基于数据库的分布式锁是利用数据库的特性来实现的一种简单而常见的分布式锁机制。通过对数据库记录的插入、更新或删除操作,确保在同一时间只有一个进程能够持有锁,从而实现对共享资源的互斥访问。
3.1 使用表记录实现分布式锁
原理:使用表记录实现分布式锁的核心思想是通过数据库表的一行记录来充当锁的标识。每次请求想要获取锁时,会尝试在表中插入一条特定的记录。如果插入成功,则表示获得了锁;如果插入失败(如违反了唯一性约束),则表示锁已被其他请求持有。
步骤:
- 创建一张专门用于锁定的表,表中包含一个锁名(或资源名)字段和一个锁定状态字段。
- 当一个进程需要获取锁时,尝试向该表中插入一条记录。如果插入成功,则表示获取锁成功。
- 如果记录已存在(利用主键冲突,唯一约束实现可能更加灵活,可以应用在表的任何字段上),表示锁已被其他进程持有,获取锁失败。
- 任务完成后,持锁进程删除该记录以释放锁。
除了锁表,我们还需要一个商品库存表模拟秒杀。
create database lock_;
use lock_;
CREATE TABLE product
(
product_id INT PRIMARY KEY,
product_name VARCHAR(255),
stock INT -- 库存量
);
-- 初始化库存为5
INSERT INTO product (product_id, product_name, stock)
VALUES (1, '大白菜', 5);
-- 分布式锁表
CREATE TABLE distributed_lock (
lock_name VARCHAR(255) PRIMARY KEY, -- 锁的名称
lock_owner VARCHAR(255), -- 锁的持有者标识
lock_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP -- 锁定时间
);
3.1.1 项目结构
3.1.2 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>lock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.1.3 application.yml
spring:
application:
name: lock
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/lock_?useSSL=false&serverTimezone=UTC
username: root
password: 123456
mybatis:
configuration:
map-underscore-to-camel-case: true
server:
port: 8001
3.1.4 LockApplication.java
package org.example;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("org.example.mapper")
public class LockApplication {
public static void main(String[] args) {
SpringApplication.run(LockApplication.class, args);
}
}
3.1.5 Product.java
package org.example.model;
import lombok.Data;
@Data
public class Product {
private Integer productId;
private String productName;
private Integer stock;
}
3.1.6 DistributedLock.java
package org.example.model;
import java.util.Date;
import lombok.Data;
@Data
public class DistributedLock {
private String lockName;
private String lockOwner;
private Date lockTime;
}
3.1.7 ProductMapper.java
package org.example.mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductMapper {
@Select("SELECT stock FROM product WHERE product_id = #{productId}")
int queryStock(Integer productId);
@Update("UPDATE product SET stock = stock - 1 WHERE product_id = #{productId}")
int updateStock(Integer productId);
}
3.1.8 DistributedLockMapper.java
package org.example.mapper;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.springframework.stereotype.Repository;
@Repository
public interface DistributedLockMapper {
@Insert("INSERT INTO distributed_lock (lock_name, lock_owner) VALUES (#{lockName}, #{lockOwner})")
int insert(String lockName, String lockOwner);
@Delete("DELETE FROM distributed_lock WHERE lock_name = #{lockName} AND lock_owner = #{lockOwner}")
int delete(String lockName, String lockOwner);
}
3.1.9 DistributedLockService.java
package org.example.service;
import org.example.mapper.DistributedLockMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DistributedLockService {
private final DistributedLockMapper distributedLockMapper;
@Autowired
public DistributedLockService(DistributedLockMapper distributedLockMapper) {
this.distributedLockMapper = distributedLockMapper;
}
// 尝试获取锁
public boolean tryLock(String lockName, String lockOwner) {
try {
return distributedLockMapper.insert(lockName, lockOwner) > 0;
} catch (Exception e) {
return false;
}
}
// 释放锁
public boolean unlock(String lockName, String lockOwner) {
try {
return distributedLockMapper.delete(lockName, lockOwner) > 0;
} catch (Exception e) {
return false;
}
}
}
3.1.10 ProductService.java
package org.example.service;
import org.example.mapper.ProductMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
private final ProductMapper productMapper;
@Autowired
public ProductService(ProductMapper productMapper) {
this.productMapper = productMapper;
}
// 扣减库存
public boolean reduceStock(int productId) {
// 查询库存
int stock = productMapper.queryStock(productId);
if (stock > 0) {
// 扣减库存
productMapper.updateStock(productId);
System.out.println("库存扣减成功,剩余库存: " + (stock - 1));
return true;
}
System.out.println("库存不足,无法扣减。");
return false;
}
}
3.1.11 DistributedLockController.java
package org.example.controller;
import org.example.service.DistributedLockService;
import org.example.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
public class DistributedLockController {
@Autowired
private DistributedLockService distributedLockService;
@Autowired
private ProductService productService;
@GetMapping("/order")
public void pay() {
UUID uuid = UUID.randomUUID();
String lockName = "product_1";
String lockOwner = "user_" + uuid;
boolean lockAcquired = false;
int retryCount = 0;
int maxRetries = 10; // 重试次数
while (!lockAcquired && retryCount < maxRetries) {
lockAcquired = distributedLockService.tryLock(lockName, lockOwner);
if (lockAcquired) {
System.out.println(lockOwner + " 成功获取锁,尝试扣减库存...");
try {
productService.reduceStock(1);
} finally {
if (distributedLockService.unlock(lockName, lockOwner)) {
System.out.println(lockOwner + " 成功释放锁。");
} else {
System.out.println(lockOwner + " 释放锁失败。");
}
}
} else {
retryCount++;
System.out.println(lockOwner + " 获取锁失败,重试中... (" + retryCount + "/" + maxRetries + ")");
}
}
if (!lockAcquired) {
System.out.println(lockOwner + " 最终未能获取锁,放弃。");
}
}
}
3.1.12 测试验证
前提准备:修改目录3.1.3的服务端口,先启动8001和8002两个不同的端口代表两台不同的服务器,等待两台服务器启动完成后打开Apache JMeter工具,创建一个线程组100个用户,10s内启动所有用户,同时创建两个不同的HTTP端口请求。
说明:Copy然后另一个HTTP Request改端口为8002即可。
运行结果8001:
user_8001 成功获取锁,尝试扣减库存…
库存扣减成功,剩余库存: 4
user_8001 成功释放锁。
user_8001 成功获取锁,尝试扣减库存…
库存扣减成功,剩余库存: 2
user_8001 成功释放锁。
user_8001 成功获取锁,尝试扣减库存…
库存扣减成功,剩余库存: 0
user_8001 成功释放锁。user_8001 成功获取锁,尝试扣减库存…
库存不足,无法扣减。
user_8001 成功释放锁。…
运行结果8002:
user_8002 成功获取锁,尝试扣减库存…
库存扣减成功,剩余库存: 3
user_8002 成功释放锁。
user_8002 成功获取锁,尝试扣减库存…
库存扣减成功,剩余库存: 1
user_8002 成功释放锁。
user_8002 成功获取锁,尝试扣减库存…
库存不足,无法扣减。
user_8002 成功释放锁。…
说明:在高并发情况下,锁的争夺会导致部分用户未能成功获取锁,造成库存无法完全消耗的情况。这种设计虽然确保了数据的一致性,但在极端并发场景下,可能会导致一些请求被拒绝.所以引入了重试机制进行优化。
3.2 使用乐观锁 (Optimistic Lock) 实现分布式锁
**原理:**乐观锁基于“乐观”的假设,认为并发冲突的可能性较小,因此在更新数据时不直接加锁,而是通过版本号或时间戳等机制来检测数据是否被其他事务修改过。如果在提交时检测到冲突(版本号变化),则放弃本次操作,并要求重试。
-
步骤:
- 创建一张锁表,包含锁名、锁定状态以及一个版本号或时间戳字段。
- 获取锁时,通过更新操作将锁的状态改变,并检查版本号或时间戳,确保操作是原子性的。
- 如果版本号不匹配,表示锁已被其他进程持有,获取锁失败。
- 任务完成后,进程更新锁的状态并修改版本号或时间戳。
CREATE TABLE product
(
product_id INT PRIMARY KEY,
stock INT,
version INT default 0 -- 乐观锁的版本号
);
-- 初始化库存为5
INSERT INTO product (product_id, stock)
VALUES (1, 5);
3.2.1 项目结构
3.2.2 pom.xml
同:目录3.1.2完全一致。
3.2.3 application.yml
同:目录3.1.3完全一致。
3.2.4 LockApplication.java
同:目录3.1.4完全一致。
3.2.5 Product.java
package org.example.model;
import lombok.Data;
@Data
public class Product {
private Integer productId;
private Integer stock;
private Integer version;
}
3.2.6 ProductStock.java
package org.example.model;
import lombok.Data;
@Data
public class ProductStock {
private Integer stock;
private Integer version;
}
3.2.7 ProductMapper.java
package org.example.mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.example.model.ProductStock;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductMapper {
// 获取当前库存和版本号
@Select("SELECT stock,version FROM product WHERE product_id = #{productId}")
ProductStock queryStock(Integer productId);
// 更新库存并更新版本号
@Update("UPDATE product SET stock = stock - 1 , version = version + 1 WHERE product_id = #{productId} AND version =#{version} AND stock > 0")
int updateStock(Integer productId,Integer version);
}
3.2.8 ProductService.java
package org.example.service;
import org.example.mapper.ProductMapper;
import org.example.model.ProductStock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
private final ProductMapper productMapper;
@Autowired
public ProductService(ProductMapper productMapper) {
this.productMapper = productMapper;
}
@Value("${server.port}")
private String port;
// 扣减库存
public boolean reduceStock(int productId) {
// 获取当前库存和版本号
ProductStock stock = productMapper.queryStock(productId);
// 检查库存是否足够
if (stock.getStock()<=0) {
System.out.println("库存不足,扣减失败");
return false;
}
// 尝试更新库存并更新版本号
if (productMapper.updateStock(productId,stock.getVersion())==0) {
// 如果更新失败,说明版本号不一致,乐观锁冲突
System.out.println(port+"-乐观锁冲突,扣减库存失败,可能其他线程已经修改了数据");
return false;
}
System.out.println(port+"-库存扣减成功,剩余库存: " + (stock.getStock()-1));
return true;
}
}
3.2.9 ProductController.java
package org.example.controller;
import org.example.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProductController {
@Autowired
private ProductService productService;
@GetMapping("/order")
public String order() {
final int MAX_RETRIES = 3; // 设置最大重试次数
for (int i = 0; i < MAX_RETRIES; i++) {
boolean success = productService.reduceStock(1);
if (success) {
return "库存扣减成功";
} else {
System.out.println("第 " + (i + 1) + " 次扣减库存失败,尝试重试...");
}
}
return "扣减库存失败,超出最大重试次数";
}
}
3.2.10 测试验证
前提准备:同目录3.1.12完全一致。
运行结果8001:
8001-库存扣减成功,剩余库存: 4
8001-库存扣减成功,剩余库存: 2
8001-库存扣减成功,剩余库存: 0
库存不足,扣减失败
第 1 次扣减库存失败,尝试重试…
库存不足,扣减失败
第 2 次扣减库存失败,尝试重试…
库存不足,扣减失败
第 3 次扣减库存失败,尝试重试…
库存不足,扣减失败…
运行结果8002:
8002-库存扣减成功,剩余库存: 3
8002-库存扣减成功,剩余库存: 1
库存不足,扣减失败
第 1 次扣减库存失败,尝试重试…
库存不足,扣减失败
第 2 次扣减库存失败,尝试重试…
库存不足,扣减失败
第 3 次扣减库存失败,尝试重试…
库存不足,扣减失败…
3.3 使用悲观锁(Pessimistic Lock)实现分布式锁
原理:
悲观锁(Pessimistic Lock)是一种基于“悲观”假设的锁机制,即假设每次对数据的操作都会发生并发冲突,因此在操作数据之前必须先对其进行加锁,防止其他事务或线程对数据进行并发操作。这样可以确保在加锁期间,只有获得锁的进程或线程能够访问数据,从而避免数据的不一致性。
开始事务: 当方法用@Transactional
注解标记时,Spring会在方法开始时开启一个事务。
获取悲观锁:在事务内执行查询时,如果使用了悲观锁(如SELECT ... FOR UPDATE
),数据库会锁定相关数据行。
事务提交:当方法执行完毕且没有发生异常时,事务提交,锁会被释放,其他等待的事务可以继续执行。
事务回滚:如果在事务期间发生了异常,事务回滚,锁也会被释放,确保数据的原子性和一致性。
CREATE TABLE product
(
product_id INT PRIMARY KEY,
stock INT
);
-- 初始化库存为5
INSERT INTO product (product_id, stock)
VALUES (1, 5);
3.3.1 项目结构
同:目录3.2.1项目完全一致。以下说明不同的类。
3.3.2 Product.java
package org.example.model;
import lombok.Data;
@Data
public class Product {
private Integer productId;
private Integer stock;
}
3.3.3 ProductMapper.java
package org.example.mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.example.model.Product;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductMapper {
// 使用悲观锁获取当前库存
@Select("SELECT stock FROM product WHERE product_id = #{productId} FOR UPDATE")
Product queryStockWithLock(Integer productId);
// 更新库存
@Update("UPDATE product SET stock = stock - 1 WHERE product_id = #{productId} AND stock > 0")
int updateStock(Integer productId);
}
3.3.4 ProductService.java
package org.example.service;
import org.example.mapper.ProductMapper;
import org.example.model.Product;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class ProductService {
private final ProductMapper productMapper;
@Autowired
public ProductService(ProductMapper productMapper) {
this.productMapper = productMapper;
}
@Value("${server.port}")
private String port;
// 使用悲观锁扣减库存
@Transactional
public boolean reduceStock(int productId) {
System.out.println(port+"-开始尝试获取悲观锁,锁定库存记录...");
// 使用悲观锁获取当前库存
Product stock = productMapper.queryStockWithLock(productId);
// 悲观锁已生效,此时其他事务无法修改当前行
System.out.println(port+"-已成功获取悲观锁,库存记录已锁定。当前库存:" + stock.getStock());
// 检查库存是否足够
if (stock.getStock() <= 0) {
System.out.println("库存不足,扣减失败");
return false;
}
// 更新库存
int result = productMapper.updateStock(productId);
if (result == 0) {
System.out.println("库存扣减失败");
return false;
}
System.out.println("库存扣减成功,剩余库存: " + (stock.getStock() - 1));
return true;
}
}
3.3.5 测试验证
前提准备:同目录3.1.12完全一致。
运行结果8001:
8001-开始尝试获取悲观锁,锁定库存记录…
8001-已成功获取悲观锁,库存记录已锁定。当前库存:5
库存扣减成功,剩余库存: 4
8001-开始尝试获取悲观锁,锁定库存记录…
8001-已成功获取悲观锁,库存记录已锁定。当前库存:3
库存扣减成功,剩余库存: 2
8001-开始尝试获取悲观锁,锁定库存记录…
8001-已成功获取悲观锁,库存记录已锁定。当前库存:1
库存扣减成功,剩余库存: 0
8001-开始尝试获取悲观锁,锁定库存记录……
运行结果8002:
8002-开始尝试获取悲观锁,锁定库存记录…
8002-已成功获取悲观锁,库存记录已锁定。当前库存:4
库存扣减成功,剩余库存: 3
8002-开始尝试获取悲观锁,锁定库存记录…
8002-已成功获取悲观锁,库存记录已锁定。当前库存:2
库存扣减成功,剩余库存: 1
8002-开始尝试获取悲观锁,锁定库存记录…
8002-已成功获取悲观锁,库存记录已锁定。当前库存:0
库存不足,扣减失败
第 1 次扣减库存失败,尝试重试……
4.基于 Redis
4.1 安装Redis
4.1.1 拉取 Redis 官方镜像
docker pull redis:latest
4.1.2 启动 Redis 容器并设置用户名和密码
docker run -d --name redis \
-p 6379:6379 \
redis:latest \
--requirepass "123456" #设置密码
4.1.3 验证 Redis 服务
进入redis容器
docker exec -it redis /bin/bash
通过 Redis CLI 客户端连接到 Redis 服务器:
redis-cli -h 127.0.0.1 -p 6379
在 Redis 命令行界面中手动输入密码进行验证
AUTH 123456 #验证密码
成功连接后,可以通过运行简单的 Redis 命令来验证连接是否成功:
127.0.0.1:6379> ping
如果返回 PONG
,表示连接成功。
4.2 Redis 分布式锁的实现思路
Redis 分布式锁的核心在于:
- 获取锁:使用
SETNX
命令尝试设置锁,如果设置成功则获取锁。 - 设置过期时间:使用 Redis 的
EXPIRE
或SET
命令设置锁的过期时间,避免死锁。 - 释放锁:业务完成后释放锁,确保只有持有锁的线程可以释放它。
CREATE TABLE product
(
product_id INT PRIMARY KEY,
stock INT
);
-- 初始化库存为5
INSERT INTO product (product_id, stock)
VALUES (1, 5);
4.2.1 项目结构
4.2.2 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>lock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
4.2.3 application.yml
spring:
application:
name: lock
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/lock_?useSSL=false&serverTimezone=UTC
username: root
password: 123456
data:
redis:
host: 192.168.186.77
port: 6379
password: 123456
mybatis:
configuration:
map-underscore-to-camel-case: true
server:
port: 8001
4.2.4 LockApplication.java
package org.example;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("org.example.mapper")
public class LockApplication {
public static void main(String[] args) {
SpringApplication.run(LockApplication.class, args);
}
}
4.2.5 RedisConfig.java
package org.example.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(@Autowired RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
// 使用StringRedisSerializer来序列化和反序列化redis的key
template.setKeySerializer(new StringRedisSerializer());
// 使用GenericJackson2JsonRedisSerializer来序列化和反序列化redis的value
template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
// 同样设置HashKey和HashValue序列化方式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(new GenericJackson2JsonRedisSerializer());
template.afterPropertiesSet();
return template;
}
}
4.2.6 Product.java
package org.example.model;
import lombok.Data;
@Data
public class Product {
private Integer productId;
private Integer stock;
}
4.2.7 ProductMapper.java
package org.example.mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductMapper {
@Select("SELECT stock FROM product WHERE product_id = #{productId} FOR UPDATE")
int getStockByProductId(int productId);
@Update("UPDATE product SET stock =stock-1 WHERE product_id = #{productId}")
void updateProductStock(int productId);
}
4.2.8 ProductService.java
package org.example.service;
import org.example.mapper.ProductMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
@Autowired
private ProductMapper productMapper;
public boolean reduceStock(int productId) {
// 获取当前库存
int stock = productMapper.getStockByProductId(productId);
if (stock > 0) {
// 扣减库存
productMapper.updateProductStock(productId);
System.out.println( "库存扣减成功,剩余库存:" + (stock-1));
return true;
} else {
return false;
}
}
}
4.2.9 RedisDistributedLockService.java
package org.example.service;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@Service
public class RedisDistributedLockService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductService productService;
private static final String LOCK_KEY_PREFIX = "distributed_lock_";
@Value("${server.port}")
private String port;
public boolean acquireLock(String lockKey, String clientId, long expireTime) {
Boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId);
if (Boolean.TRUE.equals(success)) {
redisTemplate.expire(lockKey, expireTime, TimeUnit.SECONDS);
return true;
}
return false;
}
public void releaseLock(String lockKey, String clientId) {
String currentValue = (String) redisTemplate.opsForValue().get(lockKey);
if (clientId.equals(currentValue)) {
redisTemplate.delete(lockKey);
}
}
public void execute() {
String lockKey = LOCK_KEY_PREFIX+1;
String clientId = UUID.randomUUID().toString();
long expireTime = 10;
try {
boolean lockAcquired = acquireLock(lockKey, clientId, expireTime);
if (lockAcquired) {
System.out.println(port + " - 成功获取锁,执行任务。");
// 扣减库存
boolean success = productService.reduceStock(1); // 假设每次扣减1个库存
if(!success) {
System.out.println("库存不足。");
}
} else {
System.out.println(port + " - 未能获取锁,任务已被其他节点处理。");
}
} finally {
releaseLock(lockKey, clientId);
}
}
}
4.2.10 RedisDistributedLockController.java
package org.example.controller;
import org.example.service.RedisDistributedLockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class RedisDistributedLockController {
@Autowired
private RedisDistributedLockService redisDistributedLockService;
@GetMapping("/order")
public String executeTask() {
redisDistributedLockService.execute();
return "任务请求已提交";
}
}
4.2.11 测试验证
前提准备:同目录3.1.12完全一致。
运行结果8001:
8001 - 成功获取锁,执行任务。
库存扣减成功,剩余库存:4
8001 - 成功获取锁,执行任务。
库存扣减成功,剩余库存:2
8001 - 成功获取锁,执行任务。
库存扣减成功,剩余库存:0
8001 - 成功获取锁,执行任务。
库存不足。…
运行结果8002:
8002 - 成功获取锁,执行任务。
库存扣减成功,剩余库存:3
8002 - 成功获取锁,执行任务。
库存扣减成功,剩余库存:1
8002 - 成功获取锁,执行任务。
库存不足。…
4.3 Redis+悲观锁实现秒杀(适合单一节点)
4.3.1 实现原理
1. Redis
- 目的:利用 Redis 的高性能来快速处理库存的扣减操作,减少对数据库的直接访问,从而提升系统的并发处理能力。
- 过程:在应用启动时,将数据库中的库存数据加载到 Redis 中。秒杀时,所有的库存操作首先在 Redis 中进行,这样可以显著减少数据库的压力。
2. Redis 原子操作 (decrement)
- 目的:确保在并发情况下,多个线程对同一个库存的扣减操作不会发生冲突,从而防止超卖。
- 过程:每次秒杀请求到达时,直接通过 Redis 的
decrement
操作原子性地减少库存。如果库存不足(stock < 0
),则直接返回失败。
3. 数据库的悲观锁
- 目的:进一步确保数据库中的库存数据与 Redis 中的数据一致,防止并发情况下的库存不一致问题。
- 过程:如果 Redis 中的库存扣减成功,则使用悲观锁(
SELECT ... FOR UPDATE
)在数据库中锁定库存行,进行库存更新。悲观锁确保在锁定的库存更新完成之前,其他事务无法修改该库存数据。
4.回滚机制
- 目的:确保在任何异常情况下,Redis 和数据库的库存数据一致。
- 过程:如果在数据库操作中发现库存不足,或者在执行过程中发生异常,会回滚 Redis 中的库存操作(即通过
increment
恢复 Redis 库存),并抛出异常或返回操作失败。
CREATE TABLE product
(
product_id INT PRIMARY KEY,
stock INT
);
-- 初始化库存为5
INSERT INTO product (product_id, stock)
VALUES (1, 5);
CREATE TABLE orders
(
order_id INT PRIMARY KEY AUTO_INCREMENT,
product_id INT,
time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
4.3.2 项目结构
4.3.3 pom.xml
同:目录4.2.2完全一致。
4.3.4 application.yml
同:目录4.2.3完全一致。
4.3.5 LockApplication.java
同:目录4.2.4完全一致。
4.3.7 RedisConfig.java
同:目录4.2.5完全一致。
4.3.6 Product.java
同:目录4.2.6完全一致。
4.3.7 OrderMapper.java
package org.example.mapper;
import org.apache.ibatis.annotations.Insert;
import org.springframework.stereotype.Repository;
@Repository
public interface OrderMapper {
@Insert("INSERT INTO orders (product_id) VALUES (#{productId})")
void insertOrder(int productId);
}
4.3.8 ProductMapper.java
package org.example.mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.example.model.Product;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface ProductMapper {
@Select("SELECT * FROM product")
List<Product> getAllProducts();
@Select("SELECT * FROM product WHERE product_id = #{productId} FOR UPDATE")
Product getStockByProductId(int productId);
@Update("UPDATE product SET stock =stock-1 WHERE product_id = #{productId}")
void updateProductStock(int productId);
}
4.3.9 ProductService.java
package org.example.service;
import jakarta.annotation.PostConstruct;
import org.example.mapper.OrderMapper;
import org.example.mapper.ProductMapper;
import org.example.model.Product;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
@Service
public class ProductService {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ProductMapper productMapper;
@Autowired
private OrderMapper orderMapper;
private static final String PRODUCT_STOCK_KEY = "product_stock_";
// 初始化时从数据库加载库存到 Redis
@PostConstruct
public void loadProductStockToRedis() {
List<Product> products = productMapper.getAllProducts();
for (Product product : products) {
redisTemplate.opsForValue().set(PRODUCT_STOCK_KEY + product.getProductId(), product.getStock());
System.out.println("已加载商品ID:" + product.getProductId() + " 的库存到 Redis,库存为:" + product.getStock());
}
}
@Transactional
public boolean reduceStock(int productId) {
String stockKey = PRODUCT_STOCK_KEY + productId;
// 1. 从 Redis 中扣减库存,确保原子操作
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 如果库存不足或扣减失败,回滚 Redis 库存并返回失败
redisTemplate.opsForValue().increment(stockKey);
System.out.println("秒杀失败!商品ID:" + productId + ",库存不足。");
return false;
}
// 2. 使用数据库的悲观锁检查并扣减库存
try {
Product product = productMapper.getStockByProductId(productId);
if (product.getStock() >= 1) {
// 更新数据库库存
productMapper.updateProductStock(productId);
// 创建订单
orderMapper.insertOrder(productId);
System.out.println("秒杀成功!商品ID:" + productId + ",剩余库存:" + stock);
return true;
} else {
// 如果数据库库存不足,回滚 Redis 库存并返回失败
redisTemplate.opsForValue().increment(stockKey);
System.out.println("秒杀失败!商品ID:" + productId + ",数据库库存不足。");
return false;
}
} catch (Exception e) {
// 发生异常时回滚 Redis 库存并抛出异常
redisTemplate.opsForValue().increment(stockKey);
throw e;
}
}
}
4.3.10 ProductController.java
package org.example.controller;
import org.example.service.ProductService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProductController {
@Autowired
private ProductService productService;
@GetMapping("/order")
public String order() {
boolean success = productService.reduceStock(1);
return success ? "秒杀成功" : "秒杀失败";
}
}
4.3.12 测试验证
前提准备:修改商品的stock(余额)为10,进行秒杀模拟。
运行结果:
已加载商品ID:1 的库存到 Redis,库存为:10
秒杀成功!商品ID:1,剩余库存:3
秒杀成功!商品ID:1,剩余库存:8
秒杀成功!商品ID:1,剩余库存:0
秒杀成功!商品ID:1,剩余库存:1
秒杀成功!商品ID:1,剩余库存:5
秒杀成功!商品ID:1,剩余库存:6
秒杀成功!商品ID:1,剩余库存:7
秒杀失败!商品ID:1,库存不足。
秒杀成功!商品ID:1,剩余库存:9
秒杀失败!商品ID:1,库存不足。
秒杀成功!商品ID:1,剩余库存:2
秒杀失败!商品ID:1,库存不足。
秒杀成功!商品ID:1,剩余库存:4
秒杀失败!商品ID:1,库存不足。
说明:订单是并发下的,同时抢购,同时产品表没有出现超卖。
5. 基于 Zookeeper
5.1 安装 Zookeeper
5.1.1 拉取 Zookeeper Docker 镜像
docker pull zookeeper
5.1.2 运行 Zookeeper 容器
docker run -d --name zookeeper -p 2181:2181 zookeeper
5.1.3 验证是否启动成功
docker exec -it zookeeper /bin/bash
5.1.4 连接到Zookeeper 客户端
zkCli.sh -server localhost:2181
5.2 分布式锁
Zookeeper 是一个分布式协调服务,它为分布式系统提供了一种强一致性的机制。Zookeeper 集群中的所有节点(通常为奇数个)通过一致性算法(如 ZAB 协议)来保证数据的一致性和可靠性。
临时节点(Ephemeral Node):临时节点是 Zookeeper 的一种特殊节点,它在客户端会话有效期间存在,一旦客户端断开连接(例如崩溃或超时),临时节点将自动删除。临时节点的特性确保了锁在客户端失效后能够被自动释放,从而避免了死锁的发生。
顺序节点(Sequential Node):顺序节点是在创建节点时,Zookeeper 会在节点名称后附加一个全局递增的序号。每次请求创建顺序节点时,Zookeeper 会生成一个具有唯一序号的节点。通过顺序节点,可以为多个客户端竞争锁的请求排序,实现公平锁。
步骤:
- 创建锁节点:客户端尝试在 Zookeeper 的某个路径下创建一个带有唯一序号的临时顺序节点(如
/locks/lock-000000001
)。 - 判断锁的持有者:客户端获取当前所有节点的列表,判断自己创建的节点是否是序号最小的节点。如果是,则认为自己获取到了锁;如果不是,则监听比自己序号小的节点(即前一个节点)的删除事件。
- 等待锁释放:如果当前节点不是序号最小的节点,客户端会进入等待状态,直到它监听的前一个节点被删除。当前一个节点被删除时,客户端重新检查自己是否是序号最小的节点,如果是,则获取锁。
- 释放锁:当客户端完成对共享资源的操作后,删除它创建的临时顺序节点,从而释放锁。
- 通知下一个客户端:锁的释放会触发下一个序号节点的监听事件,该客户端随即尝试获取锁并执行相应操作。
5.3 简单案例
CREATE TABLE product
(
product_id INT PRIMARY KEY,
stock INT
);
-- 初始化库存为5
INSERT INTO product (product_id, stock)
VALUES (1, 5);
5.3.1 项目结构
5.3.2 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>lock</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.3.0</version>
</dependency>
<!-- Zookeeper client -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.7.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
说明:选择合适的版本zookeeper依赖。
5.3.3 application.yml
spring:
application:
name: lock
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/lock_?useSSL=false&serverTimezone=UTC
username: root
password: 123456
mybatis:
configuration:
map-underscore-to-camel-case: true
zookeeper:
connect-string: 192.168.186.77:2181
session-timeout-ms: 5000
connection-timeout-ms: 3000
retry:
base-sleep-time-ms: 1000
max-retries: 3
server:
port: 8001
5.3.4 LockApplication.java
package org.example;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@MapperScan("org.example.mapper")
public class LockApplication {
public static void main(String[] args) {
SpringApplication.run(LockApplication.class, args);
}
}
5.3.5 ZookeeperConfig.java
package org.example.config;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ZookeeperConfig {
@Value("${zookeeper.connect-string}")
private String connectString;
@Value("${zookeeper.session-timeout-ms}")
private int sessionTimeoutMs;
@Value("${zookeeper.connection-timeout-ms}")
private int connectionTimeoutMs;
@Value("${zookeeper.retry.base-sleep-time-ms}")
private int baseSleepTimeMs;
@Value("${zookeeper.retry.max-retries}")
private int maxRetries;
@Bean
public CuratorFramework curatorFramework() {
CuratorFramework client = CuratorFrameworkFactory.newClient(
connectString,
sessionTimeoutMs,
connectionTimeoutMs,
new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries)
);
client.start();
return client;
}
}
5.3.6 Product.java
package org.example.model;
import lombok.Data;
@Data
public class Product {
private Integer productId;
private Integer stock;
}
5.3.7 ProductMapper.java
package org.example.mapper;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import org.springframework.stereotype.Repository;
@Repository
public interface ProductMapper {
@Select("SELECT stock FROM product WHERE product_id = #{productId} FOR UPDATE")
int getStockByProductId(int productId);
@Update("UPDATE product SET stock =stock-1 WHERE product_id = #{productId}")
void updateProductStock(int productId);
}
5.3.8 ProductService.java
package org.example.service;
import org.example.mapper.ProductMapper;
import org.example.model.Product;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ProductService {
@Autowired
private ProductMapper productMapper;
public void reduceStock(int productId) {
// 获取当前库存
int stock = productMapper.getStockByProductId(productId);
if (stock > 0) {
// 扣减库存
productMapper.updateProductStock(productId);
System.out.println("库存扣减成功,剩余库存:" + (stock-1));
} else {
System.out.println("库存不足。");
}
}
}
5.3.9 ZookeeperLockService.java
package org.example.service;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class ZookeeperLockService {
private final CuratorFramework client;
@Autowired
public ZookeeperLockService(CuratorFramework client) {
this.client = client;
}
/**
* 获取分布式锁
*
* @param lockPath 锁路径
* @param timeout 获取锁的超时时间
* @param timeUnit 时间单位
* @return 返回锁实例,如果成功获取锁,否则返回null
*/
public InterProcessMutex acquireLock(String lockPath, long timeout, TimeUnit timeUnit) {
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
try {
if (lock.acquire(timeout, timeUnit)) {
return lock; // 返回成功获取锁的实例
}
} catch (Exception e) {
e.printStackTrace();
}
return null; // 如果未能获取锁,返回null
}
/**
* 释放分布式锁
*
* @param lock 锁实例
*/
public void releaseLock(InterProcessMutex lock) {
if (lock != null) {
try {
lock.release();
System.out.println("锁释放成功!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
说明:InterProcessMutex
是 Apache Curator 库中提供的一种实现分布式锁的工具类,它基于 Zookeeper 来实现锁的互斥性。InterProcessMutex
提供了一种跨进程、跨节点的锁机制,确保在分布式环境中,同一时间只有一个客户端能够获得锁,其他客户端需要等待该锁被释放后才能继续操作。
5.3.10 LockController.java
package org.example.controller;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.example.service.ProductService;
import org.example.service.ZookeeperLockService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@RestController
public class LockController {
@Autowired
private ZookeeperLockService zookeeperLockService;
@Autowired
private ProductService productService;
@Value("${server.port}")
private String port;
@GetMapping("/order")
public void executeTaskWithLock() {
int productId = 1;
String lockPath = "/zk-lock/" + productId;
InterProcessMutex lock = zookeeperLockService.acquireLock(lockPath, 5, TimeUnit.SECONDS);
if (lock != null) {
try {
System.out.println(port+"-成功获取锁");
productService.reduceStock(1);
} finally {
// 释放锁
zookeeperLockService.releaseLock(lock);
System.out.println(port+"-成功释放锁");
}
} else {
System.out.println(port+"-未能获取锁,任务已被其他节点处理");
}
}
}
5.3.11 测试验证
前提准备:同目录3.1.12完全一致。
运行结果8001:
8001-成功获取锁
库存扣减成功,剩余库存:4
锁释放成功!
8001-成功释放锁
8001-成功获取锁
库存扣减成功,剩余库存:2
锁释放成功!
8001-成功释放锁
8001-成功获取锁
库存扣减成功,剩余库存:0
锁释放成功!
8001-成功释放锁…
运行结果8002:
8002-成功获取锁
库存扣减成功,剩余库存:3
锁释放成功!
8002-成功释放锁
8002-成功获取锁
库存扣减成功,剩余库存:1
锁释放成功!
8002-成功释放锁
8002-成功获取锁
库存不足。…
6. 总结
简单的模拟包括基于数据库的乐观锁、悲观锁、以及利用数据库的唯一性约束的分布式锁,以及基于Redis、Zookeeper的分布式锁,仅供学习参考。