文章目录
- 什么是线程安全
- 什么场景下会发生
- 如何保证线程安全
- 单机环境
- 1.无状态设计
- 2.使用final关键字(不可变)
- 3.使用synchronized关键字
- 4.使用volatile关键字
- 5.使用java.util.concurrent.atomic包中的原子包装类
- 6.使用java.util.concurrent.locks包中的锁
- 7.使用线程安全集合类
- 8.使用ThreadLocal
- 集群环境
- 1.分布式锁
- 2.数据分片,分割和隔离数据
- 3.串行化避免并发
- 4.分布式原子操作
- 5.原子操作CAS + Retry/Failfast (通用解决方案-令牌限制保护)
- 总结
在多线程编程中,线程安全是一个关键的概念,它涉及到多个线程访问和修改共享资源时的正确性和一致性。无论是在单机环境下还是在分布式集群环境下,保证线程安全都是开发者需要重视的问题。
什么是线程安全
线程安全是程序设计中的术语,指某个函数、函数库在多线程环境中被调用时,能够正确地处理多个线程之间的公用变量,使程序功能正确完成。
以上来自维基百科
举个例子来说明线程安全的重要性。假设有一个电影院播放一场电影,座位总数为10个。如果没有线程安全的保护措施,当多个人同时抢购电影票时,可能会出现剩余座位数大于0的情况,导致售出过多的票,不符合预期。
示例代码:
public class MovieTicket {
private int availableTickets;
public MovieTicket(int totalTickets) {
this.availableTickets = totalTickets;
}
public void sellTickets(int numTickets, String user) {
if (numTickets > availableTickets) {
System.out.println("抱歉," + user + ",剩余票数不足!");
return;
}
// 模拟售票过程
// 例如查库 写库 调用远程服务等
try {
Thread.sleep(100); // 假设售票过程需要一定的时间
} catch (InterruptedException e) {
e.printStackTrace();
}
availableTickets -= numTickets;
System.out.println(user + "购买了" + numTickets + "张票,剩余票数:" + availableTickets);
}
public int getTicketsAvailable() {
return availableTickets;
}
}
MovieTicket
类表示电影院的票务系统sellTicket()
方法用于售票,每次调用会将剩余的票数减少,然后输出售票信息。getTicketsAvailable()
方法用于获取剩余票数
接下来,让我们模拟10个用户同时购买电影票:
public class Test {
public static void main(String[] args) {
MovieTicket ticketCounter = new MovieTicket(10);
for (int i = 0; i < 10; i++) {
int finalI = i;
Thread thread1 = new Thread(() -> ticketCounter.sellTickets(1, "User"+ finalI));
thread1.start();
}
}
}
执行结果如下:
User7购买了1张票,剩余票数:3
User1购买了1张票,剩余票数:3
User8购买了1张票,剩余票数:3
User2购买了1张票,剩余票数:3
User9购买了1张票,剩余票数:3
User6购买了1张票,剩余票数:3
User4购买了1张票,剩余票数:3
User5购买了1张票,剩余票数:3
User0购买了1张票,剩余票数:3
User3购买了1张票,剩余票数:3
期望结果如下:
User0购买了1张票,剩余票数:9
User8购买了1张票,剩余票数:8
User7购买了1张票,剩余票数:7
User9购买了1张票,剩余票数:6
User6购买了1张票,剩余票数:5
User3购买了1张票,剩余票数:4
User5购买了1张票,剩余票数:3
User4购买了1张票,剩余票数:2
User2购买了1张票,剩余票数:1
User1购买了1张票,剩余票数:0
什么场景下会发生
当多个线程同时读写共享资源时,可能会出现数据竞争的情况,导致数据被污染或产生不确定的结果。
共享资源可以是计数器变量、数组、数据库中的记录或任何其他内容。
常见的操作为:
- Check-then-act operation(初始化)
- Read-modify-write operation(递增计数器)
如何保证线程安全
单机环境
1.无状态设计
设计无状态的类,即没有全局变量或共享状态的类。通过避免对共享资源的竞争,可以减少线程之间的冲突和竞争条件,从而保证线程安全性。下面是一个示例:
示例代码:
public class ThreadSafeCalculator {
// 没有任何全局变量或共享状态
public int add(int a, int b) {
return a + b;
}
public int subtract(int a, int b) {
return a - b;
}
// 其他无状态的计算方法...
}
通过将类设计为无状态的,每个线程都可以创建自己的实例或共享同一个实例,并独立地调用方法进行计算。由于没有共享的状态,线程之间不会发生竞争或冲突,从而保证了线程安全性。
需要注意的是,无状态设计并不适用于所有场景。某些情况下,可能确实需要共享状态或全局变量来实现特定的功能。在这种情况下,需要采取适当的同步机制(如使用锁)来确保多线程访问共享资源的安全性。
2.使用final关键字(不可变)
final变量在java中也是线程安全的,因为一旦分配了一个对象的某些引用,它就不能指向另一个对象的引用。
代码示例:
public class ThreadSafeCounter {
private final int limit = 100;
private final Object lock = new Object();
public void increment() {
}
public int getLimit() {
return limit;
}
}
需要注意的是,final
关键字仅保证变量引用不会被修改,但并不保证引用对象内部状态的不可变性。如果引用对象本身是可变的,并且多个线程对其进行修改,仍然需要额外的同步机制来保证线程安全。
3.使用synchronized关键字
使用synchronized关键字修饰共享资源的访问方法或代码块,确保同一时间只有一个线程可以访问共享资源。
synchronized关键字可以防止多个线程同时修改共享资源,保证数据的一致性和正确性。当一个线程获取到锁时,其他线程将被阻塞,直到锁被释放。
示例代码:
public synchronized void sellTickets(int numTickets, String user) {
// 线程安全的代码块
// ...
}
4.使用volatile关键字
volatile关键字用于修饰共享变量,保证每次访问该变量时都从主内存中读取最新的值,而不是使用线程的本地缓存。它可以确保多个线程之间的可见性,但并不能解决原子性和有序性的问题。因此,volatile适用于一些简单的变量状态标记或开关。
示例代码:
private volatile boolean flag = false;
public void setFlag(boolean value) {
flag = value;
}
public boolean getFlag() {
return flag;
}
5.使用java.util.concurrent.atomic包中的原子包装类
Java提供了一系列的Atomic类,如AtomicInteger、AtomicLong等,它们提供了原子性操作,可以在单个操作中完成读取和更新操作,从而保证线程安全。Atomic类使用了底层的CAS(Compare and Swap)操作,确保原子性和可见性。
示例代码:
private AtomicInteger availableTickets = new AtomicInteger(10);
public void sellTickets(int numTickets, String user) {
int remainingTickets = availableTickets.getAndAdd(-numTickets);
// 线程安全的代码块
// ...
}
6.使用java.util.concurrent.locks包中的锁
ReentrantLock是Java提供的可重入锁,它提供了更多的灵活性和扩展性。通过显式地获取锁和释放锁,可以确保只有一个线程可以访问共享资源。与synchronized关键字相比,ReentrantLock提供了更多的高级功能,如可中断的锁、公平锁等。
示例代码:
private ReentrantLock lock = new ReentrantLock();
public void sellTickets(int numTickets, String user) {
lock.lock();
try {
// 线程安全的代码块
// ...
} finally {
lock.unlock();
}
}
7.使用线程安全集合类
Java提供了许多线程安全的数据结构,如ConcurrentHashMap、CopyOnWriteArrayList等。这些数据结构内部实现了线程安全的访问和修改机制,可以直接在多线程环境中使用,无需额外的同步措施。
示例代码:
private Map<String, Integer> map = new ConcurrentHashMap<>();
public void updateMap(String key, int value) {
map.put(key, value);
}
8.使用ThreadLocal
ThreadLocal 是 Java 提供的一种线程封闭的机制,它可以为每个线程提供独立的变量副本(空间换时间)。通过将共享变量存储在 ThreadLocal 中,可以避免多个线程之间的数据共享和竞争,从而保证线程安全。
示例代码:
private ThreadLocal<Integer> threadLocalCount = ThreadLocal.withInitial(() -> 0);
public void incrementCount() {
int count = threadLocalCount.get();
threadLocalCount.set(count + 1);
}
集群环境
在集群环境下,保障线程安全需要考虑更多的因素和挑战。由于集群涉及多个服务器和多个进程/线程同时运行,线程安全性的维护变得更加复杂。以下是一些在集群环境中保障线程安全的常见方案
1.分布式锁
- 使用分布式锁来协调多个节点之间对共享资源的访问。
- 常见的分布式锁实现包括基于数据库的锁、基于缓存的锁(如Redis锁)和基于ZooKeeper的锁等。
- 在访问共享资源之前,节点需要获取分布式锁,确保只有一个节点可以执行临界区代码。
示例伪代码:
// 加锁
if (acquireLock(key)) {
try {
// 执行操作
} finally {
// 释放锁
releaseLock(key);
}
}
参考
- 从零开发短视频电商 分布式锁-基于数据库实现
- 从零开发短视频电商 分布式锁-基于Redis实现
2.数据分片,分割和隔离数据
- 将共享数据划分为多个片段,并将每个片段分配给不同的节点进行处理。
- 每个节点只负责自己所分配的数据片段,避免多个节点同时访问相同的数据。
- 可以根据数据的特点和负载情况来选择合适的数据分片策略,如基于哈希、一致性Hash、范围等方式。
示例伪代码:
// 获取数据分片的节点
Node node = getShardNode(key);
// 在指定节点上执行操作
result = node.processData(key, data);
3.串行化避免并发
- 使用消息队列作为数据交换的中间件,将共享资源的操作转换为异步消息的形式。
- 每个节点从消息队列中接收消息并处理,确保只有一个节点处理每条消息。
- 消息队列可以提供可靠的消息传递机制,并通过消息消费的顺序保证数据的一致性。
- 通过某些策略和业务设计来避免并发。
// 发送消息到消息队列
queue.send(key,message);
// 在节点上异步消费消息
queue.consume(key,message -> {
// 处理消息
});
4.分布式原子操作
Redis 提供了一些原子命令,可以在集群环境下实现一些常见的分布式原子操作。以下是一些常用的 Redis 原子命令和示例:
1.SETNX(Set if Not eXists)
如果指定的键不存在,则设置键的值为给定的值,该操作是原子的。
// 设置键名为 "key" 的值为 "value",仅当该键不存在时
jedis.setnx("key", "value");
2.原子计数器
Redis 的 INCR 和 DECR 命令可以对存储在 Redis 中的整数值进行原子操作。
// 自增计数器
Long incrementedValue = jedis.incr("counter_key");
// 自减计数器
Long decrementedValue = jedis.decr("counter_key");
3.事务原子性操作组合
Redis 提供了 MULTI/EXEC/WATCH 命令组合,可以实现多个操作的原子性执行。
// 监视键
jedis.watch("key");
// 开启事务
Transaction transaction = jedis.multi();
// 执行多个操作
transaction.set("key1", "value1");
transaction.set("key2", "value2");
// 提交事务
List<Object> results = transaction.exec();
4.lua脚本
Redis 提供了 Lua 脚本支持,可以使用 Lua 脚本实现更复杂的原子操作。通过将多个 Redis 命令组合到一个 Lua 脚本中,在执行脚本时,Redis 会将整个脚本作为一个原子操作进行执行,确保在执行期间不会被其他命令中断。
可以使用 Redis 的 Lua 脚本来保证线程安全和避免超卖的问题。
-- Lua 脚本代码
local key = KEYS[1] -- 键名
local quantity = ARGV[1] -- 购买数量
local remaining = tonumber(redis.call('GET', key)) -- 获取当前剩余票数
if remaining and remaining >= tonumber(quantity) then
redis.call('DECRBY', key, quantity) -- 减少票数
return 1 -- 返回成功标志
else
return 0 -- 返回失败标志
end
在这个 Lua 脚本中,我们首先获取指定键的当前剩余票数,然后根据购买数量进行判断。如果剩余票数足够,则使用 Redis 的 DECRBY
命令原子地减少票数,并返回成功标志。否则,直接返回失败标志。
在 Java 中,我们可以使用 Jedis 或者 Lettuce 等 Redis 客户端来执行 Lua 脚本。以下是一个使用 Jedis 执行 Lua 脚本的示例代码:
Jedis jedis = new Jedis("localhost", 6379);
String script = "local key = KEYS[1]\n" +
"local quantity = ARGV[1]\n" +
"local remaining = tonumber(redis.call('GET', key))\n" +
"if remaining and remaining >= tonumber(quantity) then\n" +
" redis.call('DECRBY', key, quantity)\n" +
" return 1\n" +
"else\n" +
" return 0\n" +
"end";
String key = "ticket";
String quantity = "2";
// 执行 Lua 脚本
Long result = (Long) jedis.eval(script, Collections.singletonList(key),Collections.singletonList(quantity));
if (result == 1) {
// 购票成功
System.out.println("购票成功");
} else {
// 购票失败
System.out.println("购票失败");
}
通过执行这个 Lua 脚本,我们可以在分布式环境中保证线程安全,并避免电影票超卖的问题。当多个线程或节点同时执行脚本时,Redis 会保证 Lua 脚本的原子性,从而确保了购票操作的正确性和一致性。
数据库中有多种原子操作,以下是开发中常见的几个示例:
原子计数(Atomic Counter):对数据库中的计数器进行原子操作,通常是增加或减少计数器的值。
示例:对文章表中的浏览次数计数器进行递增操作。
UPDATE articles SET view_count = view_count + 1 WHERE id = 456;
5.原子操作CAS + Retry/Failfast (通用解决方案-令牌限制保护)
在集群环境下保障线程安全,结合原子操作和令牌保护是一种有效的方案。该方案通过使用原子操作和令牌机制来确保多个线程或节点之间的协调和互斥。以下是该方案的详细解释和示例伪代码:
- 原子操作:使用数据库或分布式存储系统提供的原子操作来保证数据的一致性。这些原子操作包括原子增加、原子更新、原子删除等,可以根据具体的业务需求选择合适的原子操作。
- 令牌保护机制:在执行一组不安全的操作之前,引入一个令牌拿取操作。令牌的数量与集群中的资源或操作能力相关联。在每个线程或节点执行不安全操作之前,需要先从令牌池中获取一个令牌。获取令牌的过程需要保证线程安全,可以使用原子操作来实现。
- Retry/Failfast:如果线程或节点无法获取令牌,即无法进入关键操作阶段,可以选择重试或者放弃操作。重试机制可以让线程等待并再次尝试获取令牌,直到成功为止。Failfast机制则立即放弃操作,避免浪费资源。
下面是一个示例伪代码,演示了原子操作和令牌保护的集群线程安全方案:
int maxRetries = 3;
int retryInterval = 100; // milliseconds
int currentRetry = 0;
boolean success = false;
while (!success && currentRetry < maxRetries) {
// 尝试获取令牌
if (threadSafeAcquireToken()) {
try {
// 执行一组不安全的操作
executeUnsafeOperations();
success = true;
} finally {
// 释放令牌
releaseToken();
}
} else {
// 没有获取到令牌,选择重试或者放弃操作
currentRetry++;
handleRetryOrFail();
// 拿不到令牌,等待一段时间后重试
Thread.sleep(retryInterval);
}
}
还以上面卖电影票为例子,这个令牌可以是通用的令牌,也可以是业务上的,例如这里的令牌限制,其实就是每场电影不超过10个人即10个令牌的限制。
创建 Lua 脚本 acquire_token.lua
,用于获取令牌:
local key = KEYS[1] -- 令牌池键名
local tokenCount = tonumber(ARGV[1]) -- 需要获取的令牌数量
local currentCount = tonumber(redis.call('GET', key)) -- 获取当前令牌数量
if currentCount and currentCount >= tokenCount then
redis.call('DECRBY', key, tokenCount) -- 减少令牌数量
return 1 -- 获取令牌成功
else
return 0 -- 获取令牌失败
end
创建 Lua 脚本 release_token.lua
,用于释放令牌
local key = KEYS[1] -- 令牌池键名
local tokenCount = tonumber(ARGV[1]) -- 需要释放的令牌数量
redis.call('INCRBY', key, tokenCount) -- 增加令牌数量
在 Java 中使用 Jedis 执行 Lua 脚本:
int maxRetries = 3; // 最大重试次数
int retryDelayMillis = 100; // 重试延迟时间
int retryCount = 0;
boolean acquiredToken = false;
// 获取令牌
while (!acquiredToken && retryCount < maxRetries) {
Long acquireResult = (Long) jedis.eval(acquireScript, Collections.singletonList(电影id), Collections.singletonList(String.valueOf(tokenCount)));
if (acquireResult == 1) {
acquiredToken = true;
} else {
retryCount++;
try {
Thread.sleep(retryDelayMillis);
} catch (InterruptedException e) {
}
}
}
// 处理业务
if (acquiredToken) {
try {
// 执行线程安全的操作 重点 重点 重点,这里是一大堆操作需要保证线程安全的
// 远程调用
// 写库
// ...
} finally { // 释放令牌
jedis.eval(releaseScript, Collections.singletonList(电影id), Collections.singletonList(String.valueOf(tokenCount)));
}
} else {
// 重试次数超过阈值,执行其他处理逻辑或抛出异常
// ...
throw
}
总结
通过上面的整理分析,我们搞个基于Redis的通用的令牌限制保护策略的伪代码。
public class RedisTokenProtection {
private final Jedis jedis;
private final String tokenPoolKey;
private final int maxRetries;
private final long retryInterval;
/**
* 构造函数
*
* @param jedisSupplier 提供 Jedis 实例的供应商
* @param tokenPoolKey 令牌池的键名
* @param maxRetries 最大重试次数
* @param retryInterval 重试间隔时间(毫秒)
*/
public RedisTokenProtection(Supplier<Jedis> jedisSupplier, String tokenPoolKey, int maxRetries, long retryInterval) {
this.jedis = jedisSupplier.get();
this.tokenPoolKey = tokenPoolKey;
this.maxRetries = maxRetries;
this.retryInterval = retryInterval;
}
/**
* 执行带有令牌保护的业务逻辑
*
* @param limitTokenCount 限制令牌数
* @param requestTokenKey 请求令牌的键名
* @param requestTokenCount 请求令牌的数量
* @param totalTimeout 总的执行超时时间(毫秒)
* @param supplier 提供业务逻辑的供应商
* @param <T> 返回值的类型
* @return 业务逻辑的返回值
* @throws TokenAcquisitionException 令牌获取异常
*/
public <T> T executeWithTokenProtection(int limitTokenCount, String requestTokenKey, int requestTokenCount, long totalTimeout, Supplier<T> supplier) throws TokenAcquisitionException {
long startTime = System.currentTimeMillis();
try {
// 尝试获取令牌
boolean acquiredToken = acquireToken(limitTokenCount, requestTokenKey, requestTokenCount);
if (acquiredToken) {
// 成功获取令牌后执行业务逻辑
return supplier.get();
}
throw new TokenAcquisitionException("Failed to acquire tokens.");
} catch (TokenAcquisitionException ex) {
throw ex;
} catch (Exception ex) {
long elapsedTime = System.currentTimeMillis() - startTime;
int retries = 0;
while (retries < maxRetries && elapsedTime < totalTimeout) {
try {
// 等待重试间隔
Thread.sleep(retryInterval);
boolean acquiredToken = acquireToken(limitTokenCount, requestTokenKey, requestTokenCount);
if (acquiredToken) {
// 成功获取令牌后执行业务逻辑
return supplier.get();
}
retries++;
elapsedTime = System.currentTimeMillis() - startTime;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
throw new TokenAcquisitionException("Failed to acquire tokens after retrying " + maxRetries + " times.");
} finally {
releaseToken(requestTokenKey, requestTokenCount);
}
}
// 获取令牌的逻辑,实现方法根据具体需求自行编写
// 必须是原子的
private boolean acquireToken(int limitTokenCount, String requestTokenKey, int requestTokenCount) {
String acquireTokenScript =
"local availableTokens = tonumber(redis.call('get', KEYS[1])) or 0\n" +
"if availableTokens >= tonumber(ARGV[1]) then\n" +
" redis.call('decrby', KEYS[1], ARGV[1])\n" +
" return true\n" +
"else\n" +
" return false\n" +
"end";
Object result = jedis.eval(acquireTokenScript, Collections.singletonList(requestTokenKey),
Collections.singletonList(String.valueOf(requestTokenCount)));
return (Boolean) result;
}
// 释放令牌的逻辑,实现方法根据具体需求自行编写
// 必须是原子的
private void releaseToken(String requestTokenKey, int requestTokenCount) {
String releaseTokenScript =
"redis.call('incrby', KEYS[1], ARGV[1])";
jedis.eval(releaseTokenScript, Collections.singletonList(requestTokenKey),
Collections.singletonList(String.valueOf(requestTokenCount)));
}
public class TokenAcquisitionException extends Exception {
public TokenAcquisitionException(String message) {
super(message);
}
}
}
调用示例:
public class Main {
public static void main(String[] args) {
// 创建 Jedis 实例的供应商
Supplier<Jedis> jedisSupplier = () -> {
// 这里创建和配置 Jedis 实例,例如连接到 Redis 服务器
return new Jedis("localhost");
};
// 创建 RedisTokenProtection 实例
RedisTokenProtection tokenProtection = new RedisTokenProtection(jedisSupplier, "token_pool:", 3, 1000);
try {
// 执行带有令牌保护的业务逻辑
String movieId = "亮剑";
boolean result = tokenProtection.executeWithTokenProtection(10, movieId, 1, 10000, () -> {
// 这里编写需要保护的线程不安全的业务逻辑
System.out.println("执行业务逻辑...");
// 假设这里有一段需要保护的代码
// ...
// 返回业务逻辑执行的结果
return true;
});
if (result) {
System.out.println("业务逻辑执行成功!");
} else {
System.out.println("业务逻辑执行失败!");
}
} catch (RedisTokenProtection.TokenAcquisitionException ex) {
System.out.println("获取令牌失败:" + ex.getMessage());
}
}
}
当使用 Spring AOP 和自定义注解的结合,可以更方便地实现令牌保护的功能。下面是一个示例代码,展示了如何使用 Spring AOP 和自定义注解来实现令牌保护:
首先,定义一个自定义注解 TokenProtected
,用于标注需要进行令牌保护的方法:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface TokenProtected {
int limitTokenCount() default 1;
String requestTokenKey();
int requestTokenCount() default 1;
long totalTimeout() default 0;
}
然后,创建一个切面类 TokenProtectionAspect
,使用 Spring AOP 实现令牌保护的逻辑
@Aspect
@Component
public class TokenProtectionAspect {
private final RedisTokenProtection tokenProtection;
@Autowired
public TokenProtectionAspect(RedisTokenProtection tokenProtection) {
this.tokenProtection = tokenProtection;
}
@Pointcut("@annotation(com.example.TokenProtected)")
public void tokenProtectedMethod() {
}
@Around("tokenProtectedMethod() && @annotation(tokenProtected)")
public Object protectWithToken(ProceedingJoinPoint joinPoint, TokenProtected tokenProtected) throws Throwable {
int limitTokenCount = tokenProtected.limitTokenCount();
String requestTokenKey = tokenProtected.requestTokenKey();
int requestTokenCount = tokenProtected.requestTokenCount();
long totalTimeout = tokenProtected.totalTimeout();
Supplier<Object> supplier = () -> {
try {
return joinPoint.proceed();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
};
return tokenProtection.executeWithTokenProtection(limitTokenCount, requestTokenKey, requestTokenCount, totalTimeout, supplier);
}
}
在这个切面类中,我们定义了一个切点 tokenProtectedMethod()
,用于匹配被 TokenProtected
注解标注的方法。在 protectWithToken
方法中,我们获取了 TokenProtected
注解的参数,并创建了一个 RedisTokenProtection
实例来执行令牌保护的逻辑。
最后,使用时只需在需要进行令牌保护的方法上添加 @TokenProtected
注解,配置相应的参数:
@Service
public class MyService {
@TokenProtected(limitTokenCount = 100, requestTokenKey = "myTokenKey", requestTokenCount = 1, totalTimeout = 5000)
public void protectedMethod() {
// 令牌保护的业务逻辑
}
}
在上述示例中,protectedMethod
方法被标记为需要进行令牌保护的方法,并提供了相关的令牌参数。
通过以上步骤,你可以使用 Spring AOP 和自定义注解来实现一个方便易用的令牌保护机制。切面类会拦截带有 @TokenProtected
注解的方法,并在执行前后进行令牌的获取和释放。