系列文章目录
文章目录
- 系列文章目录
- 前言
- 一、项目背景
- 二、第十三轮对话-优化传参
- 三、第十四轮对话-释放资源
- 四、完善所有单元测试
- 五、验证通过
- 六、发布
- 七、参考文章
前言
在这个充满技术创新的时代,AI大模型正成为开发者们的新宠。它们可以帮助我们完成从简单的问答到复杂的编程任务,所以AI编程将会是未来的主流方向,利用AI大模型的能力,本系列文章将介绍从零到一用AI大模型编写一个多线程并发框架。
一、项目背景
经过前面系列文章,我们基本完成这个多线程并发框架的搭建,目前只剩下一些小细节需要稍微处理一下,就基本可以发布上线。
本多线程框架使用示例如下:源码地址
1、引入依赖。
<dependency>
<groupId>io.github.vipjoey</groupId>
<artifactId>mmc-juc</artifactId>
<version>1.1</version>
</dependency>
2、使用示例。
// 创建一个MmcTaskExecutor实例,用于执行单次长耗时任务
// 下面是创建一个计算从1加到100的任务,总共100个任务,采用fork分治算法,阈值为10,总共任务为100 / 10 * 2 = 20个大任务,执行速率约为10/s
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
.taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList())) // 设置任务源
.taskProcessor(x -> x.stream().reduce(0, Integer::sum)) // 设置任务处理方法
.taskMerger(Integer::sum) // 设置结果处理方法(可选)
.threshold(10) // 设置任务处理阈值(可选)
.taskName("mmcTaskExample") // 设置任务名称
.rateLimiter(10, 20) // 设置速率限制,容量为10,每秒产生令牌为20,休眠时间为10ms
.forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4
.build();
// 同步执行并打印结果
System.out.println("result: " + mmcTaskExecutor.execute());
// 任务执行过程监控
[mmcTaskExample] Tasks submitted. Total tasks: 100
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 20, remaining tasks: 80
[mmcTaskExample] Task started. Completed tasks: 26, remaining tasks: 74
[mmcTaskExample] Task started. Completed tasks: 33, remaining tasks: 67
[mmcTaskExample] Task started. Completed tasks: 40, remaining tasks: 60
[mmcTaskExample] Task started. Completed tasks: 46, remaining tasks: 54
[mmcTaskExample] Task started. Completed tasks: 52, remaining tasks: 48
[mmcTaskExample] Task started. Completed tasks: 58, remaining tasks: 42
[mmcTaskExample] Task started. Completed tasks: 64, remaining tasks: 36
[mmcTaskExample] Task started. Completed tasks: 70, remaining tasks: 30
[mmcTaskExample] Task started. Completed tasks: 76, remaining tasks: 24
[mmcTaskExample] Task started. Completed tasks: 82, remaining tasks: 18
[mmcTaskExample] Task started. Completed tasks: 88, remaining tasks: 12
[mmcTaskExample] Task started. Completed tasks: 94, remaining tasks: 6
[mmcTaskExample] Task started. Completed tasks: 100, remaining tasks: 0
[mmcTaskExample] Tasks completed. Elapsed time: 1107 ms
result: 5050
二、第十三轮对话-优化传参
1、上文提到,可以多次往MmcTaskExecutor提交任务,但明显每次构建MmcTask显得比较笨重,我们需要优化一下。
2、以下是AI的回复。
- 【为了满足您的要求,我们将修改MmcTaskExecutor类的submitTask方法,以在提交任务时检查MmcTask的构造参数。如果MmcTask自身具有相应的参数,则使用它们,否则将MmcTaskExecutor的参数赋值给MmcTask。如果两者都为空,则抛出异常。在计算出最终的taskSource后,我们将实例化TaskRuntime。】
// 检查并设置MmcTask的参数
private void checkAndSet(MmcTask<T, R> mmcTask) {
// 检查MmcTask的构造参数,如果为空,则使用MmcTaskExecutor的参数
List<T> taskSource = mmcTask.getTaskSource() != null ? mmcTask.getTaskSource() : this.taskSource;
MmcTaskProcessor<T, R> taskProcessor = mmcTask.getTaskProcessor() != null ? mmcTask.getTaskProcessor() : this.taskProcessor;
MmcTaskMerger<R> taskMerger = mmcTask.getTaskMerger() != null ? mmcTask.getTaskMerger() : this.taskMerger;
RateLimiter rateLimiter = mmcTask.getRateLimiter() != null ? mmcTask.getRateLimiter() : this.rateLimiter;
MmcTaskListener taskListener = mmcTask.getTaskListener() != null ? mmcTask.getTaskListener() : this.taskListener;
String taskName = mmcTask.getTaskName() != null ? mmcTask.getTaskName() : this.taskName;
int threshold = mmcTask.getThreshold() > 0 ? mmcTask.getThreshold() : this.threshold;
// 如果参数为空,则抛出异常
Objects.requireNonNull(taskSource, "TaskSource cannot be null.");
Objects.requireNonNull(taskProcessor, "TaskProcessor cannot be null.");
Objects.requireNonNull(taskMerger, "TaskMerger cannot be null.");
Objects.requireNonNull(rateLimiter, "RateLimiter cannot be null.");
Objects.requireNonNull(taskListener, "TaskListener cannot be null.");
Objects.requireNonNull(taskName, "TaskName cannot be null.");
// 将最终参数赋值给MmcTask
TaskRuntime taskRuntime = new TaskRuntime(mmcTask.getTaskName(), mmcTask.getTaskSource().size());
mmcTask.setTaskSource(taskSource);
mmcTask.setTaskProcessor(taskProcessor);
mmcTask.setTaskMerger(taskMerger);
mmcTask.setRateLimiter(rateLimiter);
mmcTask.setTaskListener(taskListener);
mmcTask.setTaskName(taskName);
mmcTask.setStart(0);
mmcTask.setEnd(taskSource.size());
mmcTask.setTaskRuntime(taskRuntime);
mmcTask.setThreshold(threshold);
}
- 【接下来,我们需要修改MmcTaskExecutor类去初始化MmcTask。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class MmcTaskExecutor<T, R> {
// ...省略其他成员变量和方法
// 同步执行并返回结果
@Override
public R execute() {
MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.threshold(threshold)
.rateLimiter(rateLimiter)
.taskListener(taskListener)
.taskName(taskName)
.build();
return execute(mmcTask);
}
// 同步执行并返回结果
@Override
public R execute(MmcTask<T, R> mmcTask) {
// 检查MmcTask的构造参数,如果为空,则使用MmcTaskExecutor的参数
checkAndSet(mmcTask);
long startTime = System.currentTimeMillis();
mmcTask.setTaskRuntime(mmcTask.getTaskRuntime());
// 调用onTasksSubmitted方法
taskListener.onTasksSubmitted(mmcTask.getTaskRuntime());
R result = forkJoinPool.invoke(mmcTask);
// 调用onTasksCompleted方法
long elapsedTime = System.currentTimeMillis() - startTime;
taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), elapsedTime);
return result;
}
// 异步执行
@Override
public void commit() {
commit((result -> {
}));
}
// 异步执行并获取结果
@Override
public void commit(MmcTaskCallback<R> callback) {
MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.threshold(threshold)
.rateLimiter(rateLimiter)
.taskListener(taskListener)
.taskName(taskName)
.build();
commit(mmcTask, callback);
}
// 异步执行并获取结果
@Override
public void commit(MmcTask<T, R> mmcTask, MmcTaskCallback<R> callback) {
// 检查MmcTask的构造参数,如果为空,则使用MmcTaskExecutor的参数
checkAndSet(mmcTask);
long startTime = System.currentTimeMillis();
// 调用onTasksSubmitted方法
taskListener.onTasksSubmitted(mmcTask.getTaskRuntime());
forkJoinPool.submit(() -> {
R result = mmcTask.invoke();
// 调用onTasksCompleted方法
long elapsedTime = System.currentTimeMillis() - startTime;
taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), elapsedTime);
if (callback != null) {
callback.onComplete(result);
}
});
}
}
三、第十四轮对话-释放资源
1、增加关闭方法,释放所有资源。
2、以下是AI的回复。
- 【为了满足您的要求,我们将为MmcTaskExecutor类添加一个名为shutdown的方法。该方法将关闭ForkJoinPool,以便不再接收新任务,并释放所有资源。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class MmcTaskExecutor<T, R> {
private ForkJoinPool forkJoinPool;
// ...省略其他成员变量和方法
// 关闭任务容器,不再接收任务,释放所有资源
public void shutdown() {
forkJoinPool.shutdown();
}
}
- 【现在,您可以通过调用MmcTaskExecutor的shutdown方法来关闭任务容器。这将导致不再接收新任务,并释放所有资源。注意,在调用shutdown方法后,您将无法再提交新任务。如果您需要再次提交任务,请重新创建一个新的MmcTaskExecutor实例。】
四、完善所有单元测试
1、完善所有异步和同步提交任务的单元测试。
package com.mmc.juc;
import static org.junit.Assert.assertEquals;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;
public class MmcTaskExecutorTest {
@Test
public void testMmcTaskExecutor() {
// 创建一个任务源,包含从1到100的整数
List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
// 创建一个任务处理器,将每个整数相加
MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue)
.sum();
// 创建一个任务合并器,将所有整数的和合并
MmcTaskMerger<Integer> taskMerger = Integer::sum;
// 创建一个任务监听器,打印任务执行过程中的相关信息
MmcTaskListener taskListener = new DefaultMmcTaskListener();
// 创建一个MmcTaskExecutor实例
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.taskListener(taskListener)
.rateLimiter(10, 10) // 每秒处理10个任务
.taskName("testMmcTaskExecutor")
.build();
// 执行任务并获取结果
Integer result = mmcTaskExecutor.execute();
// 验证结果是否正确(1到100的和等于5050)
assertEquals(5050, result.intValue());
}
@Test
public void testMmcTaskExecutorWithTask() {
// 创建一个任务源,包含从1到100的整数
List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
// 创建一个MmcTaskExecutor实例
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
.taskProcessor(x -> x.stream().reduce(0, Integer::sum))
.taskMerger(Integer::sum)
.rateLimiter(new TokenBucket(10, 20))
.taskListener(new DefaultMmcTaskListener())
.build();
Integer r = mmcTaskExecutor.execute(MmcTask.<Integer, Integer>builder()
.taskSource(taskSource)
.taskName("testMmcTaskExecutorWithTask")
.build()
);
System.out.println("result: " + r);
}
@Test
public void testCommit() throws InterruptedException {
// 创建一个任务源,包含从1到100的整数
List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
// 创建一个任务处理器,将每个整数相加
MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue)
.sum();
// 创建一个任务合并器,将所有整数的和合并
MmcTaskMerger<Integer> taskMerger = Integer::sum;
// 创建一个任务监听器,打印任务执行过程中的相关信息
MmcTaskListener taskListener = new DefaultMmcTaskListener();
// 创建一个MmcTaskExecutor实例
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.taskListener(taskListener)
.rateLimiter(10, 10) // 每秒处理10个任务
.taskName("testCommit")
.build();
mmcTaskExecutor.commit();
TimeUnit.SECONDS.sleep(3); // 等待异步任务完成
}
@Test
public void testCommitWithCallback() throws InterruptedException {
// 创建一个任务源,包含从1到100的整数
List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
// 创建一个任务处理器,将每个整数相加
MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue)
.sum();
// 创建一个任务合并器,将所有整数的和合并
MmcTaskMerger<Integer> taskMerger = Integer::sum;
// 创建一个任务监听器,打印任务执行过程中的相关信息
MmcTaskListener taskListener = new DefaultMmcTaskListener();
// 创建一个MmcTaskExecutor实例
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
.taskSource(taskSource)
.taskProcessor(taskProcessor)
.taskMerger(taskMerger)
.taskListener(taskListener)
.rateLimiter(10, 10) // 每秒处理10个任务
.taskName("testCommitWithCallback")
.build();
mmcTaskExecutor.commit(result -> {
assertEquals(5050, result.intValue());
});
}
@Test
public void testCommitWithCustomMmcTaskAndCallback() throws InterruptedException {
// 创建一个任务源,包含从1到100的整数
List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());
// 创建一个MmcTaskExecutor实例
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
.taskProcessor(x -> x.stream().reduce(0, Integer::sum))
.taskMerger(Integer::sum)
.taskListener(new DefaultMmcTaskListener())
.build();
mmcTaskExecutor.commit(MmcTask.<Integer, Integer>builder()
.taskName("testCommitWithCustomMmcTaskAndCallback")
.rateLimiter(new TokenBucket(20, 20))
.taskSource(taskSource)
.build()
, result -> {
assertEquals(5050, result.intValue());
System.out.println("result: " + result);
});
}
}
五、验证通过
1、我们运行一下单元测试,看看效果。
很完美!整个框架运行正常!
六、发布
参考搭建大型分布式服务(四十三)SpringBoot 无代码侵入实现多Kafka数据源发布到Maven中央仓库:让世界看到你的作品! 这篇文章,将这个多线程并发框架发布到中央仓库。
<dependency>
<groupId>io.github.vipjoey</groupId>
<artifactId>mmc-juc</artifactId>
<version>1.1</version>
</dependency>
可能,有小伙伴有疑问,这个多线程框架不会只能用来计算1到100这些简单的任务吧?当然不是,下一篇,我们将整合到kafka框架,让它的执行速度飞起来!
《搭建大型分布式服务(四十四)SpringBoot 无代码侵入实现多Kafka数据源:单分区提升至十万级消费速度!》
七、参考文章
- 《AI大模型编写多线程并发框架(六十一):从零开始搭建框架》
- 《AI大模型编写多线程并发框架(六十二):限流和并发度优化》
- 《AI大模型编写多线程并发框架(六十三):监听器优化·上》
- 《AI大模型编写多线程并发框架(六十四):监听器优化·下》
- 《AI大模型编写多线程并发框架(六十五):发布和应用》
加我加群(备注csdn)一起交流学习!更多干货下载、项目源码和大厂内推等着你