系列文章目录

文章目录
- 系列文章目录
- 前言
- 一、项目背景
- 二、第七轮对话-补充异步执行代码
- 三、第八轮对话-增加累计完成等字眼
- 四、第九轮对话-线程安全优化
- 五、第十轮对话-增加运行时数据
- 七、参考文章
前言
在这个充满技术创新的时代,AI大模型正成为开发者们的新宠。它们可以帮助我们完成从简单的问答到复杂的编程任务,所以AI编程将会是未来的主流方向,利用AI大模型的能力,本文将介绍从零到一用AI大模型编写一个多线程并发框架。
一、项目背景
经过上两篇文章和AI的对话,我们基本捣鼓出来了多线程并发框架的雏形,接下来是比较重要的监控模块,本文将会通过比较长的篇幅来叙述。
本多线程框架使用示例如下:源码地址
1、引入依赖。
<dependency>
<groupId>io.github.vipjoey</groupId>
<artifactId>mmc-juc</artifactId>
<version>1.0</version>
</dependency>
2、使用示例。
// 创建一个MmcTaskExecutor实例,用于执行单次长耗时任务
// 下面是创建一个计算从1加到100的任务,总共100个任务,采用fork分治算法,阈值为10,总共任务为100 / 10 * 2 = 20个大任务,执行速率约为10/s
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
.taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList())) // 设置任务源
.taskProcessor(x -> x.stream().reduce(0, Integer::sum)) // 设置任务处理方法
.taskMerger(Integer::sum) // 设置结果处理方法(可选)
.threshold(10) // 设置任务处理阈值(可选)
.taskName("mmcTaskExample") // 设置任务名称
.rateLimiter(10, 20) // 设置速率限制,容量为10,每秒产生令牌为20,休眠时间为10ms
.forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4
.build();
// 同步执行并打印结果
System.out.println("result: " + mmcTaskExecutor.execute());
// 任务执行过程监控
[mmcTaskExample] Tasks submitted. Total tasks: 100
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 20, remaining tasks: 80
[mmcTaskExample] Task started. Completed tasks: 26, remaining tasks: 74
[mmcTaskExample] Task started. Completed tasks: 33, remaining tasks: 67
[mmcTaskExample] Task started. Completed tasks: 40, remaining tasks: 60
[mmcTaskExample] Task started. Completed tasks: 46, remaining tasks: 54
[mmcTaskExample] Task started. Completed tasks: 52, remaining tasks: 48
[mmcTaskExample] Task started. Completed tasks: 58, remaining tasks: 42
[mmcTaskExample] Task started. Completed tasks: 64, remaining tasks: 36
[mmcTaskExample] Task started. Completed tasks: 70, remaining tasks: 30
[mmcTaskExample] Task started. Completed tasks: 76, remaining tasks: 24
[mmcTaskExample] Task started. Completed tasks: 82, remaining tasks: 18
[mmcTaskExample] Task started. Completed tasks: 88, remaining tasks: 12
[mmcTaskExample] Task started. Completed tasks: 94, remaining tasks: 6
[mmcTaskExample] Task started. Completed tasks: 100, remaining tasks: 0
[mmcTaskExample] Tasks completed. Elapsed time: 1107 ms
result: 5050
二、第七轮对话-补充异步执行代码
1、参考限流器的对话,我们知道AI是不会主动帮忙抽象公共接口的,所以这里我们干脆要求AI给我们定义好监听器接口,方便后续扩展。
2、下面是AI给的回复。
- 【为了实现监听器功能,我们首先需要创建一个监听器接口MmcTaskListener,然后在MmcTaskExecutor类中使用它。我们还将提供一个默认实现DefaultMmcTaskListener,并允许用户通过Builder类替换监听器。首先,我们创建一个MmcTaskListener接口。】
public interface MmcTaskListener {
void onTaskStarted(int totalTasks, int remainingTasks);
void onTaskCompleted(int totalTasks, int remainingTasks, long elapsedTime);
}
- 【接下来,我们创建一个默认的监听器实现DefaultMmcTaskListener。】
public class DefaultMmcTaskListener implements MmcTaskListener {
@Override
public void onTaskStarted(int totalTasks, int remainingTasks) {
System.out.println("Task started. Total tasks: " + totalTasks + ", remaining tasks: " + remainingTasks);
}
@Override
public void onTaskCompleted(int totalTasks, int remainingTasks, long elapsedTime) {
System.out.println("Task completed. Total tasks: " + totalTasks + ", remaining tasks: " + remainingTasks + ", elapsed time: " + elapsedTime + " ms");
}
}
- 【然后,我们修改MmcTaskExecutor类以使用MmcTaskListener接口。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class MmcTaskExecutor<T, R> {
// ...省略其他成员变量
private MmcTaskListener taskListener;
private MmcTaskExecutor(Builder<T, R> builder) {
// ...省略其他初始化代码
this.taskListener = builder.taskListener;
}
// ...省略其他方法
public static class Builder<T, R> {
// ...省略其他成员变量
private MmcTaskListener taskListener = new DefaultMmcTaskListener(); // 默认使用DefaultMmcTaskListener
// ...省略其他方法
public Builder<T, R> taskListener(MmcTaskListener taskListener) {
this.taskListener = taskListener;
return this;
}
// ...省略其他方法
}
}
- 【最后,我们需要修改MmcTask类以调用监听器的方法。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class MmcTask<T, R> extends RecursiveTask<R> {
// ...省略其他成员变量
private MmcTaskListener taskListener;
public MmcTask(List<T> taskSource, MmcTaskProcessor<T, R> taskProcessor, MmcTaskMerger<R> taskMerger, int threshold, int start, int end, RateLimiter rateLimiter, MmcTaskListener taskListener) {
// ...省略其他初始化代码
this.taskListener = taskListener;
}
@Override
protected R compute() {
// ...省略其他代码
// 在处理任务之前调用onTaskStarted方法
taskListener.onTaskStarted(taskSource.size(), end - start);
// ...省略其他代码
// 在任务完成后调用onTaskCompleted方法
long elapsedTime = System.currentTimeMillis() - startTime;
taskListener.onTaskCompleted(taskSource.size(), end - start - 1, elapsedTime);
// ...省略其他代码
}
}
本轮AI的回复有些欠缺,它并没有GET到我的诉求,可能跟我的提问方式有关。实际上我需要监控的是TaskSource集合被执行完成的进度,AI只是监控了compute方法,也就是监控每个阈值下的小任务的执行,这明显是错的。
三、第八轮对话-增加累计完成等字眼
1、由于上一轮对话,可能是提问方式不对,我重新增加【首次】、【累计】这些字眼,希望AI能理解我的意思。
2、下面是AI大模型给的回复。
- 【为了满足您的要求,我们需要修改MmcTaskListener接口以添加新的事件方法,然后修改DefaultMmcTaskListener以实现这些方法。接下来,我们将在MmcTaskExecutor类和MmcTask类中调用这些方法。首先,我们修改MmcTaskListener接口以添加新的事件方法。】
public interface MmcTaskListener {
void onTasksSubmitted(int totalTasks, long startTime);
void onTaskStarted(int completedTasks, int remainingTasks);
void onTasksCompleted(long elapsedTime, long endTime);
}
- 【接下来,我们修改DefaultMmcTaskListener以实现新的事件方法。】
public class DefaultMmcTaskListener implements MmcTaskListener {
@Override
public void onTasksSubmitted(int totalTasks, long startTime) {
System.out.println("Tasks submitted. Total tasks: " + totalTasks + ", start time: " + startTime);
}
@Override
public void onTaskStarted(int completedTasks, int remainingTasks) {
System.out.println("Task started. Completed tasks: " + completedTasks + ", remaining tasks: " + remainingTasks);
}
@Override
public void onTasksCompleted(long elapsedTime, long endTime) {
System.out.println("Tasks completed. Elapsed time: " + elapsedTime + " ms, end time: " + endTime);
}
}
- 【然后,我们需要修改MmcTaskExecutor类以调用onTasksSubmitted和onTasksCompleted方法。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
public class MmcTaskExecutor<T, R> {
// ...省略其他成员变量和方法
private AtomicInteger completedTasks = new AtomicInteger(); // 添加一个原子整数以跟踪已完成的任务数量
// 同步执行并返回结果
public R execute() {
MmcTask<T, R> mmcTask = new MmcTask<>(taskSource, taskProcessor, taskMerger, threshold, 0, taskSource.size(), rateLimiter, taskListener, completedTasks);
// 调用onTasksSubmitted方法
taskListener.onTasksSubmitted(taskSource.size(), System.currentTimeMillis());
R result = forkJoinPool.invoke(mmcTask);
// 调用onTasksCompleted方法
taskListener.onTasksCompleted(mmcTask.getElapsedTime(), System.currentTimeMillis());
return result;
}
// ...省略其他方法
}
- 【最后,我们需要修改MmcTask类以调用onTaskStarted方法。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
public class MmcTask<T, R> extends RecursiveTask<R> {
// ...省略其他成员变量和方法
private AtomicInteger completedTasks;
public MmcTask(List<T> taskSource, MmcTaskProcessor<T, R> taskProcessor, MmcTaskMerger<R> taskMerger, int threshold, int start, int end, RateLimiter rateLimiter, MmcTaskListener taskListener, AtomicInteger completedTasks) {
// ...省略其他初始化代码
this.completedTasks = completedTasks;
}
@Override
protected R compute() {
// 在处理任务之前调用onTaskStarted方法
taskListener.onTaskStarted(completedTasks.get(), end - start);
// ...省略其他代码
// 在任务完成后更新已完成任务的计数
completedTasks.incrementAndGet();
// ...省略其他代码
}
}
从本次AI大模型的回复来看,存在以下问题:
- 它似乎不理解任务拆分前后计数的区别,实际上后续我尝试了N种问法,它都不理解,它理解的任务累计完成个数就是
completedTasks.incrementAndGet();
而不是我想要的taskRuntime.getCompletedTasks().addAndGet(end - start);
- 它不会主动发现线程安全问题,这个跟我们提问的要求有关,毕竟是生成式AI,所以它不会考虑任务提交时需要重置AtomicInteger。
- 同理,它的剩余任务个数的计算方式也是错误的,它的答案是
end - start
,而不是我想要的taskSource.size() - 累计完成任务个数
。
不过,这些都是小问题,不用太纠结,我们继续提诉求去优化。
四、第九轮对话-线程安全优化
1、从AI大模型给出的代码来看,是没有考虑线程安全问题的,我们需要补充一下诉求:
2、下面是AI的回复。
- 【为了满足您的要求,我们将对MmcTaskExecutor类进行修改以确保线程安全,并在每次提交任务时重置计数器。我们将使用原子整数AtomicInteger来实现线程安全的计数器。首先,我们将修改MmcTaskExecutor类以添加一个原子整数completedTasks以跟踪已完成的任务数量。我们还将在每次执行任务时重置completedTasks。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
public class MmcTaskExecutor<T, R> {
// ...省略其他成员变量和方法
private AtomicInteger completedTasks = new AtomicInteger(); // 添加一个原子整数以跟踪已完成的任务数量
// 同步执行并返回结果
public R execute() {
completedTasks.set(0); // 重置已完成任务的计数器
MmcTask<T, R> mmcTask = new MmcTask<>(taskSource, taskProcessor, taskMerger, threshold, 0, taskSource.size(), rateLimiter, taskListener, completedTasks);
// 调用onTasksSubmitted方法
taskListener.onTasksSubmitted(taskSource.size(), System.currentTimeMillis());
R result = forkJoinPool.invoke(mmcTask);
// 调用onTasksCompleted方法
taskListener.onTasksCompleted(mmcTask.getElapsedTime(), System.currentTimeMillis());
return result;
}
// ...省略其他方法
}
- 【接下来,我们将修改MmcTask类以确保线程安全并调用监听器的方法。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
public class MmcTask<T, R> extends RecursiveTask<R> {
// ...省略其他成员变量和方法
private AtomicInteger completedTasks;
public MmcTask(List<T> taskSource, MmcTaskProcessor<T, R> taskProcessor, MmcTaskMerger<R> taskMerger, int threshold, int start, int end, RateLimiter rateLimiter, MmcTaskListener taskListener, AtomicInteger completedTasks) {
// ...省略其他初始化代码
this.completedTasks = completedTasks;
}
@Override
protected R compute() {
// 在处理任务之前调用onTaskStarted方法
taskListener.onTaskStarted(completedTasks.get(), end - start);
// ...省略其他代码
// 在任务完成后更新已完成任务的计数
int completed = completedTasks.incrementAndGet();
// 调用onTaskStarted方法,以便在每个小任务完成时更新已完成任务的计数
taskListener.onTaskStarted(completed, end - start - completed);
// ...省略其他代码
}
}
这次AI大模型的回复也是不尽人意,如果多个MmcTask任务同时执行的话,计数器是不准确的,我们继续耐心优化一下我们的提示词。
五、第十轮对话-增加运行时数据
1、由于我们设计的初衷,任务可以多次提交的。所以每个任务运行时应该产生运行时数据,我们监控的也是运行时数据,运行时数据需要一个唯一标识,也就是任务名称来与之关联。另外,由于MmcTask的构造函数过长,我在其它对话中让它优化过了,这里就不详细讲述。
2、以下是AI的回复。
- 【为了满足您的要求,我们可以创建一个新的类TaskRuntime,它将包含任务名称、任务源总数、已完成任务数和剩余任务数等信息。然后,我们将修改MmcTask和MmcTaskListener以使用TaskRuntime。首先,我们创建一个新的类TaskRuntime。】
public class TaskRuntime {
private String taskName;
private int totalTasks;
private AtomicInteger completedTasks;
private AtomicInteger remainingTasks;
public TaskRuntime(String taskName, int totalTasks) {
this.taskName = taskName;
this.totalTasks = totalTasks;
this.completedTasks = new AtomicInteger();
this.remainingTasks = new AtomicInteger(totalTasks);
}
// ...省略getter和setter方法
}
- 【接下来,我们修改MmcTaskListener接口以使用TaskRuntime。】
public interface MmcTaskListener {
void onTasksSubmitted(TaskRuntime taskRuntime, long startTime);
void onTaskStarted(TaskRuntime taskRuntime);
void onTasksCompleted(TaskRuntime taskRuntime, long elapsedTime, long endTime);
}
- 【然后,我们修改MmcTask以使用TaskRuntime。】
import java.util.List;
import java.util.concurrent.RecursiveTask;
public class MmcTask<T, R> extends RecursiveTask<R> {
// ...省略其他成员变量和方法
private TaskRuntime taskRuntime;
public MmcTask(Builder<T, R> builder) {
// ...省略其他初始化代码
this.taskRuntime = new TaskRuntime(builder.taskName, builder.taskSource.size());
}
@Override
protected R compute() {
// 在处理任务之前调用onTaskStarted方法
taskListener.onTaskStarted(taskRuntime);
// ...省略其他代码
// 在任务完成后更新已完成任务的计数
taskRuntime.getCompletedTasks().incrementAndGet();
taskRuntime.getRemainingTasks().decrementAndGet();
// ...省略其他代码
}
}
- 【最后,我们修改MmcTaskExecutor类以使用TaskRuntime。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;
public class MmcTaskExecutor<T, R> {
// ...省略其他成员变量和方法
// 提交新任务
public R submitTask(MmcTask<T, R> mmcTask) {
// 调用onTasksSubmitted方法
taskListener.onTasksSubmitted(mmcTask.getTaskRuntime(), System.currentTimeMillis());
R result = forkJoinPool.invoke(mmcTask);
// 调用onTasksCompleted方法
taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), mmcTask.getElapsedTime(), System.currentTimeMillis());
return result;
}
// 同步执行并返回结果
public R execute() {
MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>()
// ...省略其他方法调用
.build();
return submitTask(mmcTask);
}
}
这轮AI的回答还是有问题,例如下面:
public MmcTask(Builder<T, R> builder) {
// ...省略其他初始化代码
this.taskRuntime = new TaskRuntime(builder.taskName, builder.taskSource.size());
}
它在MmcTask来初始化运行时数据,这明显是不对的,继续让它修改。下一篇,继续优化监听器 - -
七、参考文章
- 《AI大模型编写多线程并发框架(六十一):从零开始搭建框架》
- 《AI大模型编写多线程并发框架(六十二):限流和并发度优化》
- 《AI大模型编写多线程并发框架(六十三):监听器优化·上》
- 《AI大模型编写多线程并发框架(六十四):监听器优化·下》
- 《AI大模型编写多线程并发框架(六十五):发布和应用》
加我加群(备注csdn)一起交流学习!更多干货下载、项目源码和大厂内推等着你
![]() | ![]() |