一.发短信
发短信的场景有很多,比如手机号+验证码登录注册,电影票买完之后会发送取票码,发货之后会有物流信息,支付之后银行发的付款信息,电力系统的电费预警信息等等
在这些业务场景中,有一个特征,那就是主业务可以和短信业务割裂,比如手机号+验证码登陆,当我们点击获取验证码的时候,会连接短信业务平台发送短信,但是发短信这个业务受到短信平台的影响,可能会存在一定时间的延时,但是我们不一定非要等短信平台返回之后,再给用户返回,我们可以先返回获取验证码成功的提升样式,将发短信的业务放入到另外一个线程中执行,用户晚一会收到短信对整体的业务流程也不会受到影响,反而提升了用户体验
代码演示:
1.在springboot项目中导入依赖:
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
2.编写自定义线程池配置
package com.jeesite.modules.asysutils.juc.pool;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean("asyncServiceExecutor")
public ThreadPoolTaskExecutor asyncServiceExecutor() {//TaskExecutor不带返回值 ThreadPoolTaskExecutor带返回值
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
// 设置核心线程数
// 指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
executor.setCorePoolSize(10);
// 设置最大线程数
// 指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
executor.setMaxPoolSize(10);
// 设置队列容量
// new LinkedBlockingQueue<Runnable>();
executor.setQueueCapacity(32);
// 设置线程活跃时间(秒)
// 当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
executor.setKeepAliveSeconds(300);
// 设置默认线程名称
executor.setThreadNamePrefix("async-thread-");
// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
//new ThreadPoolExecutor.AbortPolicy());//银行满了,还有人进来,不处理这个人,抛出异常
//new ThreadPoolExecutor.CallerRunsPolicy());//哪来的去哪里!
//new ThreadPoolExecutor.DiscardPolicy());//银行满了,还有人进来,不处理这个人,不抛出异常
//new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常!
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
// executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
3.线程池要执行的任务
package com.jeesite.modules.asysutils.juc.pool.sms;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
public class ThreadService {
// @Autowired
// private RedisTemplate<String,String> redisTemplate;
@Async("asyncServiceExecutor")
public void sendSMS(String phone , int code) {
// boolean isSend = smsService.sendSms(phone,code);
// if (isSend){ redisTemplate.opsForValue().set("LOGIN_"+phone,String.valueOf(code), Duration.ofMinutes(time));
// }
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + "发送短信成功:" + code);
}
}
4.服务层调用
package com.jeesite.modules.asysutils.juc.pool.sms;
import jline.internal.Log;
import org.apache.commons.lang3.RandomUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class SmsService {
@Autowired
private ThreadService threadService;
public String sendSms(String phone) {
/**
* 1. 调用短信平台 发送短信 如果发送成功,将验证码存入redis,redis要有过期时间
* 2. 发送成功,返回成功
*/
//短信验证码 要生成
int code = RandomUtils.nextInt(100000, 999999);
Log.info("短信验证码: ",code);
//放入线程池执行,不影响当前的业务,立马返回
threadService.sendSMS(phone,code);
return "success";
}
}
5.启动服务,控制层调用
package com.jeesite.modules.asysutils.juc.pool.sms;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping(value = "t/send")
public class SmsController{
@Autowired
private SmsService smsService;
@RequestMapping(value = "msg")
@ResponseBody
public String sendSms(String phone) {
return smsService.sendSms(phone);
}
}
二.推送
比如有一个业务场景:
有一个审核业务,当收到数据之后,需要将这些数据发送给第三方的监管系统进行审核,数据量有百万之多,一条数据按照一秒计算,那摩需要经过百万秒,200多个小时才能处理完
解决:
考虑引入多线程进行并发操作,降低数据推送时间,提供数据推送的实时性
要注意的问题:
防止重复推送
可以考虑将数据切分成不同的数据段,每一个线程负责一个
失败处理
推送失败后,进行失败推送的数据记录,用额外的程序处理失败数据(补偿措施)
代码演示:
1.同样这里使用自定义线程池
package com.jeesite.modules.asysutils.juc.pool;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean("asyncServiceExecutor")
public ThreadPoolTaskExecutor asyncServiceExecutor() {//TaskExecutor不带返回值 ThreadPoolTaskExecutor带返回值
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
// 设置核心线程数
// 指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去
executor.setCorePoolSize(10);
// 设置最大线程数
// 指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量
executor.setMaxPoolSize(10);
// 设置队列容量
// new LinkedBlockingQueue<Runnable>();
executor.setQueueCapacity(32);
// 设置线程活跃时间(秒)
// 当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁
executor.setKeepAliveSeconds(300);
// 设置默认线程名称
executor.setThreadNamePrefix("async-thread-");
// 设置拒绝策略rejection-policy:当pool已经达到max size的时候,如何处理新任务 CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
//new ThreadPoolExecutor.AbortPolicy());//银行满了,还有人进来,不处理这个人,抛出异常
//new ThreadPoolExecutor.CallerRunsPolicy());//哪来的去哪里!
//new ThreadPoolExecutor.DiscardPolicy());//银行满了,还有人进来,不处理这个人,不抛出异常
//new ThreadPoolExecutor.DiscardOldestPolicy());//队列满了,尝试去和最早的竞争,也不会抛出异常!
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
// executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
}
2.测试类
- 传统方式,不调用线程池
- 调用线程池,无返回值(这里可以发现,虽然此方法可以达到异步执行的目的,但是我们并不知道线程执行的结果,有没有执行成功,因为这种方式有时候在企业中并不是最佳使用方式,下面介绍带有返回值的多线程)
- 调用线程池,有返回值(使用多线程一次性执行多个不同任务并且获取任务执行结果)
package com.jeesite.modules.asysutils.juc.pool.push;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.concurrent.TimeUnit;
@SpringBootTest
@RunWith(SpringRunner.class)
public class TestPushService {
@Autowired
private PushService pushService;
@Test
public void testOldPush(){
//传统方式
pushService.oldPush();
}
@Test
public void testNewPush(){
//线程无返回值
pushService.pushNew();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void testNewPushCall(){
//线程有返回值
pushService.pushNewCall();
try {
TimeUnit.HOURS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
3.推送消息
package com.jeesite.modules.asysutils.juc.pool.push;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@Service
public class PushService {
@Autowired
private ThreadPushService threadService;
@Autowired
private ThreadPoolTaskExecutor asyncServiceExecutor;
public void oldPush(){
int dataNum = 10000;
int[] array = new int[dataNum];
for (int i = 0;i<dataNum;i++){
array[i] = i;
}
long start = System.currentTimeMillis();
//推送的数据数量
for (int i = 0 ; i < array.length;i++){
//推送到第三方审核平台
pushSend(array[i]);
}
long end = System.currentTimeMillis();
System.out.println("需要时间:"+(end - start) + "ms");
}
public void pushNew(){
int dataNum = 10000;
int[] array = new int[dataNum];
for (int i = 0;i<dataNum;i++){
array[i] = i;
}
long start = System.currentTimeMillis();
//推送的数据数量
//假设线程池数量为10,也就是说 每个线程处理 1000条数据 共需要10次循环
for (int i = 0 ; i < 10;i++){
int s = i * 1000;
int e = i * 1000 + 1000 - 1;
//推送到第三方审核平台
//这个是假设 有10000条数据,那么每次推送处理1000条数据
threadService.push(array,s,e);
}
long end = System.currentTimeMillis();
System.out.println("需要时间:"+(end - start) + "ms");
}
public void pushNewCall(){
int dataNum = 10000;
int[] array = new int[dataNum];
for (int i = 0;i<dataNum;i++){
array[i] = i;
}
long start = System.currentTimeMillis();
//推送的数据数量
//假设线程池数量为10,也就是说 每个线程处理 1000条数据 共需要10次循环
List<Future> futureList = new ArrayList<>();
for (int i = 0 ; i < 10;i++){
int s = i * 1000;
int e = i * 1000 + 1000 - 1;
//推送到第三方审核平台
//这个是假设 有10000条数据,那么每次推送处理1000条数据
//无法使用配置的线程池,没有返回值,使用这种方式重写
Future<Integer> submit = asyncServiceExecutor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return threadService.push1(array, s, e);
}
});
//不能在这 直接得到返回值,因为会阻塞
// System.out.println("本轮线程执行数量:" +submit.get());
futureList.add(submit);
}
for (Future future : futureList) {
try {
System.out.println("本轮线程执行数量:" +future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
System.out.println("需要时间:"+(end - start) + "ms");
}
private void pushSend(int data) {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
4.使用线程池来执行该任务
package com.jeesite.modules.asysutils.juc.pool.push;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;
@Service
public class ThreadPushService {
@Async("asyncServiceExecutor")
public void push(int[] array, int start, int end){
long s = System.currentTimeMillis();
for (int i = start;i<=end;i++){
pushSend(array[i]);
//推送失败 可以记录日志
}
long e = System.currentTimeMillis();
System.out.println((e-s)+"ms");
}
public int push1(int[] array, int start, int end){
int count = 0;
long s = System.currentTimeMillis();
for (int i = start;i<=end;i++){
count++;
pushSend(array[i]);
//推送失败 可以记录日志
}
long e = System.currentTimeMillis();
System.out.println((e-s)+"ms");
return count;
}
public void pushSend(int dataNum){
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
5、启动测试,可以看到三种方式的效果对比!!!
-
传统方式需要时长:
-
线程无返回值:
-
线程有返回值: