AI大模型编写多线程并发框架(六十四):监听器优化·下

news2025/1/11 10:00:30

系列文章目录


文章目录

  • 系列文章目录
  • 前言
    • 一、项目背景
    • 二、第十一轮对话-修正运行时数据
    • 三、修正任务计数器
    • 四、第十二轮对话-生成单元测试
    • 五、验证通过
    • 七、参考文章


前言

在这个充满技术创新的时代,AI大模型正成为开发者们的新宠。它们可以帮助我们完成从简单的问答到复杂的编程任务,所以AI编程将会是未来的主流方向,利用AI大模型的能力,本系列文章将介绍从零到一用AI大模型编写一个多线程并发框架。

一、项目背景

经过上两篇文章和AI的对话,我们基本捣鼓出来了多线程并发框架的雏形,并且接入了监听器,但还是存在一些问题,限于篇幅,本文继续优化监听器。

本多线程框架使用示例如下:源码地址
1、引入依赖。

<dependency>
    <groupId>io.github.vipjoey</groupId>
    <artifactId>mmc-juc</artifactId>
    <version>1.0</version>
</dependency>

2、使用示例。


// 创建一个MmcTaskExecutor实例,用于执行单次长耗时任务
// 下面是创建一个计算从1加到100的任务,总共100个任务,采用fork分治算法,阈值为10,总共任务为100 / 10 * 2 = 20个大任务,执行速率约为10/s
MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = MmcTaskExecutor.<Integer, Integer>builder()
        .taskSource(IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList())) // 设置任务源
        .taskProcessor(x -> x.stream().reduce(0, Integer::sum)) // 设置任务处理方法
        .taskMerger(Integer::sum) // 设置结果处理方法(可选)
        .threshold(10) // 设置任务处理阈值(可选)
        .taskName("mmcTaskExample") // 设置任务名称
        .rateLimiter(10, 20)  // 设置速率限制,容量为10,每秒产生令牌为20,休眠时间为10ms
        .forkJoinPoolConcurrency(4) // 设置ForkJoinPool的并发度为4
        .build();

// 同步执行并打印结果        
System.out.println("result: " + mmcTaskExecutor.execute());

// 任务执行过程监控
[mmcTaskExample] Tasks submitted. Total tasks: 100
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 14, remaining tasks: 86
[mmcTaskExample] Task started. Completed tasks: 20, remaining tasks: 80
[mmcTaskExample] Task started. Completed tasks: 26, remaining tasks: 74
[mmcTaskExample] Task started. Completed tasks: 33, remaining tasks: 67
[mmcTaskExample] Task started. Completed tasks: 40, remaining tasks: 60
[mmcTaskExample] Task started. Completed tasks: 46, remaining tasks: 54
[mmcTaskExample] Task started. Completed tasks: 52, remaining tasks: 48
[mmcTaskExample] Task started. Completed tasks: 58, remaining tasks: 42
[mmcTaskExample] Task started. Completed tasks: 64, remaining tasks: 36
[mmcTaskExample] Task started. Completed tasks: 70, remaining tasks: 30
[mmcTaskExample] Task started. Completed tasks: 76, remaining tasks: 24
[mmcTaskExample] Task started. Completed tasks: 82, remaining tasks: 18
[mmcTaskExample] Task started. Completed tasks: 88, remaining tasks: 12
[mmcTaskExample] Task started. Completed tasks: 94, remaining tasks: 6
[mmcTaskExample] Task started. Completed tasks: 100, remaining tasks: 0
[mmcTaskExample] Tasks completed. Elapsed time: 1107 ms 
result: 5050

二、第十一轮对话-修正运行时数据

1、上文提到,它在MmcTask来初始化运行时数据,这明显是不对的。TaskRuntime 应该从MmcTaskExecutor中首次提交任务时初始化并透传。
在这里插入图片描述
2、以下是AI的回复。

  • 【抱歉,我理解了您的问题。我们可以将TaskRuntime的初始化从MmcTask构造函数中移除,并将其传递给MmcTask。这样,在任务fork时,TaskRuntime将保持不变。首先,我们需要修改MmcTask类以接收TaskRuntime作为参数。】
import java.util.List;
import java.util.concurrent.RecursiveTask;

public class MmcTask<T, R> extends RecursiveTask<R> {
    // ...省略其他成员变量和方法
    private TaskRuntime taskRuntime;

    public MmcTask(Builder<T, R> builder) {
        // ...省略其他初始化代码
        this.taskRuntime = builder.taskRuntime;
    }

    @Override
    protected R compute() {
        if (end - start <= threshold) {
            // 在处理任务之前调用onTaskStarted方法
            taskListener.onTaskStarted(taskRuntime);

            // ...省略其他代码

            // 在任务完成后更新已完成任务的计数
            taskRuntime.getCompletedTasks().incrementAndGet();
            taskRuntime.getRemainingTasks().decrementAndGet();
        } else {
            // ...省略其他代码

            MmcTask<T, R> leftTask = new Builder<T, R>()
                    .taskSource(taskSource)
                    .taskProcessor(taskProcessor)
                    .taskMerger(taskMerger)
                    .threshold(threshold)
                    .start(start)
                    .end(mid)
                    .rateLimiter(rateLimiter)
                    .taskListener(taskListener)
                    .taskRuntime(taskRuntime)
                    .build();

            MmcTask<T, R> rightTask = new Builder<T, R>()
                    .taskSource(taskSource)
                    .taskProcessor(taskProcessor)
                    .taskMerger(taskMerger)
                    .threshold(threshold)
                    .start(mid)
                    .end(end)
                    .rateLimiter(rateLimiter)
                    .taskListener(taskListener)
                    .taskRuntime(taskRuntime)
                    .build();

            // ...省略其他代码
        }
    }

    public static class Builder<T, R> {
        // ...省略其他成员变量
        private TaskRuntime taskRuntime;

        // ...省略其他方法

        public Builder<T, R> taskRuntime(TaskRuntime taskRuntime) {
            this.taskRuntime = taskRuntime;
            return this;
        }

        public MmcTask<T, R> build() {
            return new MmcTask<>(this);
        }
    }
}
  • 【接下来,我们需要修改MmcTaskExecutor类以在创建MmcTask时传递TaskRuntime。】
import java.util.List;
import java.util.concurrent.ForkJoinPool;

public class MmcTaskExecutor<T, R> {
    // ...省略其他成员变量和方法

    // 提交新任务
    public R submitTask(MmcTask<T, R> mmcTask) {
        // 调用onTasksSubmitted方法
        taskListener.onTasksSubmitted(mmcTask.getTaskRuntime(), System.currentTimeMillis());

        R result = forkJoinPool.invoke(mmcTask);

        // 调用onTasksCompleted方法
        taskListener.onTasksCompleted(mmcTask.getTaskRuntime(), mmcTask.getElapsedTime(), System.currentTimeMillis());

        return result;
    }

    // 同步执行并返回结果
    public R execute() {
        TaskRuntime taskRuntime = new TaskRuntime(taskName, taskSource.size());
        MmcTask<T, R> mmcTask = new MmcTask.Builder<T, R>()
                // ...省略其他方法调用
                .taskRuntime(taskRuntime)
                .build();

        return submitTask(mmcTask);
    }
}

三、修正任务计数器

AI这次修改基本上符合我的诉求,现在就剩下计数器问题了。上文我提到过,AI理解的累计完成任务个数和我理解的任务个数不一样,可能跟我的提示词表达方式有关(有根更好表达方式的同学可以评论区回复留言),这里我就不纠结了,直接给他修正一下。
1、监听器的默认实现方式。

package com.mmc.juc;

public class DefaultMmcTaskListener implements MmcTaskListener {


    @Override
    public void onTasksSubmitted(TaskRuntime taskRuntime) {
        System.out.println(
                "[" + taskRuntime.getTaskName() + "] Tasks submitted. Total tasks: " + taskRuntime.getTotalTasks());

    }

    @Override
    public void onTaskStarted(TaskRuntime taskRuntime) {
        System.out.println(
                "[" + taskRuntime.getTaskName() + "] Task started. Completed tasks: " + taskRuntime.getCompletedTasks()
                        .get() + ", remaining tasks: " + taskRuntime.getRemainingTasks().get());

    }

    @Override
    public void onTasksCompleted(TaskRuntime taskRuntime, long elapsedTime) {
        System.out.println(
                "[" + taskRuntime.getTaskName() + "] Tasks completed. Elapsed time: " + elapsedTime + " ms ");

    }
}

2、修正compute方法,修正计数器计数方式。

package com.mmc.juc;

import java.util.List;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Data;
import lombok.Getter;
import lombok.experimental.Accessors;

@Getter
public class MmcTask<T, R> extends RecursiveTask<R> {

    // 省略其他代码

    @Override
    protected R compute() {

        // 在处理任务之前尝试消耗令牌
        while (!rateLimiter.tryConsume()) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        if (end - start <= threshold) {

            R result = taskProcessor.process(taskSource.subList(start, end));

            // 在任务完成后更新已完成任务的计数
            taskRuntime.getCompletedTasks().addAndGet(end - start);
            taskRuntime.getRemainingTasks().set(taskRuntime.getTotalTasks() - taskRuntime.getCompletedTasks().get());

            // 调用onTaskStarted方法,以便在每个小任务完成时更新已完成任务的计数
            taskListener.onTaskStarted(taskRuntime);

            return result;
        }

        int middle = (start + end) / 2;
        MmcTask<T, R> leftTask = new MmcTask.Builder<T, R>()
                .taskSource(taskSource)
                .taskProcessor(taskProcessor)
                .taskMerger(taskMerger)
                .threshold(threshold)
                .start(start)
                .end(middle)
                .rateLimiter(rateLimiter)
                .taskListener(taskListener)
                .taskName(taskName)
                .taskRuntime(taskRuntime)
                .build();

        MmcTask<T, R> rightTask = new MmcTask.Builder<T, R>()
                .taskSource(taskSource)
                .taskProcessor(taskProcessor)
                .taskMerger(taskMerger)
                .threshold(threshold)
                .start(middle)
                .end(end)
                .rateLimiter(rateLimiter)
                .taskListener(taskListener)
                .taskName(taskName)
                .taskRuntime(taskRuntime)
                .build();

        leftTask.fork();
        R rightResult = rightTask.compute();
        R leftResult = leftTask.join();

        if (taskMerger != null) {
            return taskMerger.merge(leftResult, rightResult);
        } else {
            return null;
        }
    }
}

四、第十二轮对话-生成单元测试

1、我们要求AI生成单元测试,验证整个框架计算逻辑是否准确。
在这里插入图片描述
2、以下是AI的回复。

  • 【为了验证整个框架的计算逻辑、监听器、限流和执行方式的准确性,我们可以创建一个单元测试。在这个测试中,我们将使用1加到100的任务作为示例。以下是一个单元测试的示例:】
package com.mmc.juc;

import static org.junit.Assert.assertEquals;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;

public class MmcTaskExecutorTest {

    @Test
    public void testMmcTaskExecutor() {

        // 创建一个任务源,包含从1到100的整数
        List<Integer> taskSource = IntStream.rangeClosed(1, 100).boxed().collect(Collectors.toList());

        // 创建一个任务处理器,将每个整数相加
        MmcTaskProcessor<Integer, Integer> taskProcessor = (integer) -> integer.stream().mapToInt(Integer::intValue).sum();

        // 创建一个任务合并器,将所有整数的和合并
        MmcTaskMerger<Integer> taskMerger = Integer::sum;

        // 创建一个任务监听器,打印任务执行过程中的相关信息
        MmcTaskListener taskListener = new DefaultMmcTaskListener();

        // 创建一个MmcTaskExecutor实例
        MmcTaskExecutor<Integer, Integer> mmcTaskExecutor = new MmcTaskExecutor.Builder<Integer, Integer>()
                .taskSource(taskSource)
                .taskProcessor(taskProcessor)
                .taskMerger(taskMerger)
                .taskListener(taskListener)
                .rateLimiter(10, 10) // 每秒处理10个任务
                .taskName("SumTask")
                .build();

        // 执行任务并获取结果
        Integer result = mmcTaskExecutor.execute();

        // 验证结果是否正确(1到100的和等于5050)
        assertEquals(5050, result.intValue());
    }
}

五、验证通过

1、我们运行一下单元测试,看看效果。

[SumTask] Tasks submitted. Total tasks: 100
[SumTask] Task started. Completed tasks: 7, remaining tasks: 87
[SumTask] Task started. Completed tasks: 13, remaining tasks: 87
[SumTask] Task started. Completed tasks: 19, remaining tasks: 81
[SumTask] Task started. Completed tasks: 26, remaining tasks: 74
[SumTask] Task started. Completed tasks: 32, remaining tasks: 68
[SumTask] Task started. Completed tasks: 38, remaining tasks: 62
[SumTask] Task started. Completed tasks: 44, remaining tasks: 56
[SumTask] Task started. Completed tasks: 50, remaining tasks: 50
[SumTask] Task started. Completed tasks: 56, remaining tasks: 44
[SumTask] Task started. Completed tasks: 62, remaining tasks: 38
[SumTask] Task started. Completed tasks: 68, remaining tasks: 32
[SumTask] Task started. Completed tasks: 74, remaining tasks: 26
[SumTask] Task started. Completed tasks: 80, remaining tasks: 20
[SumTask] Task started. Completed tasks: 87, remaining tasks: 13
[SumTask] Task started. Completed tasks: 93, remaining tasks: 7
[SumTask] Task started. Completed tasks: 100, remaining tasks: 0
[SumTask] Tasks completed. Elapsed time: 2161 ms 

很完美!整个框架运行正常!下一篇,我们将发布这个小巧玲珑的框架,让全世界都看到他,作为第一个版本的里程碑!

七、参考文章

  • 《AI大模型编写多线程并发框架(六十一):从零开始搭建框架》
  • 《AI大模型编写多线程并发框架(六十二):限流和并发度优化》
  • 《AI大模型编写多线程并发框架(六十三):监听器优化·上》
  • 《AI大模型编写多线程并发框架(六十四):监听器优化·下》
  • 《AI大模型编写多线程并发框架(六十五):发布和应用》

加我加群(备注csdn)一起交流学习!更多干货下载、项目源码和大厂内推等着你

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2097803.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面向智能体编程(Agent-Oriented Programming, AOP)

大家已经熟知面向对象编程、面向接口编程&#xff0c;AI大模型研发背景下&#xff0c;又产生了一个新概念&#xff1a;&#xff08;Agent-Oriented Programming, AOP&#xff09; 它是一种特殊的编程范式&#xff0c;它专注于开发能够模拟人类智能行为的智能体。智能体是能够在…

水经微图Web版功能简介

我们在《水经微图Web版341项功能清单》一文中&#xff0c;为你罗列了水经微图&#xff08;下称“微图”&#xff09;的详细功能清单。 现在基于该清单&#xff0c;再为你分享微图最主要的功能&#xff0c;从而让你对该平台有一个基本的了解。 微图Web版功能简介 微图Web版&a…

Vue3 官方推荐状态管理库Pinia

介绍 Pinia 是 Vue 官方团队推荐代替Vuex的一款轻量级状态管理库&#xff0c;允许跨组件/页面共享状态。 Pinia 旨在提供一种更简洁、更直观的方式来处理应用程序的状态。 Pinia 充分利用了 Vue 3 的 Composition API。 官网&#xff1a; Pinia符合直觉的 Vue.js 状态管理库 P…

我如何选择自己的AI细分方向和第一个入门项目

思维导图 下图展示了我的思考和分析过程 目录 思维导图大纲1. **确定兴趣和技能方向**2. **寻找合适的开源项目**3. **评估项目的活跃度**4. **开始贡献**5. **展示你的贡献**6. **推荐开源项目**总结 选择一个细分领域1. **了解各个领域的基本概念**2. **考虑你的兴趣和背景**…

备战秋招60天算法挑战,Day29

题目链接&#xff1a; https://leetcode.cn/problems/decode-ways/ 视频题解&#xff1a; https://www.bilibili.com/video/BV181YKeGE3E/ LeetCode 91. 解码方法 题目描述 一条包含字母 A-Z 的消息通过以下映射进行了 编码 &#xff1a; A -> "1" B -> &qu…

大力出奇迹背景下的Scaling Law能否带领我们走向AGI

Scaling Law&#xff08;尺度定律&#xff09; 在人工智能领域&#xff0c;尤其是在大模型的发展中扮演着至关重要的角色。它描述了模型性能如何随着模型规模&#xff08;如参数数量&#xff09;、数据量和计算资源的增加而提升。这一定律对于理解大模型的能力扩展和优化训练策…

CSS3文本属性详解

4.2 文本属性 想缩进段落&#xff0c;幂指数&#xff0c;标题字符增加间距&#xff0c;要用到文本属性。 最有用的CSS文本属性&#xff1a; text-indent:文本缩进letter-spacing:字符间距word-spacing:单词间距text-decoration:文本装饰&#xff0c;下划线text-align:文字对…

2024年小鹏MONA M03 P7 G3 G3i P5 G9 P7i G6 X9维修手册和电路图

汽修帮手资料库提供各大厂家车型维修手册、电路图、新车特征、车身钣金维修数据、全车拆装、扭力、发动机大修、发动机正时、保养、电路图、针脚定义、模块传感器、保险丝盒图解对照表位置等&#xff0c;并长期保持高频率资料更新&#xff01; 覆盖车型&#xff1a; 2024年小…

langchain结合searXNG实现基于搜索RAG

目录 一、背景 二、环境说明和安装 1.1 环境说明 2.2 环境安装 2.2.1 searXNG安装 三、代码实现 代码 结果输出 直接请求模型输出 四、参考 一、背景 大语言模型的出现带来了新的技术革新&#xff0c;但是大模型由于训练语料的原因&#xff0c;它的知识和当前实时热点…

白酒酿造设备大揭秘:科技与传统的结合

在白酒的酿造世界里&#xff0c;设备与工艺同样重要。它们共同构建了白酒的不同风味和品质。今天&#xff0c;就让我们一起走进豪迈白酒&#xff08;HOMANLISM&#xff09;的酿造车间&#xff0c;探索那些科技与传统相结合的酿造设备&#xff0c;感受它们如何为白酒的酿造增添魅…

Seata环境搭建

1、Seata下载&#xff1a; 1.下载地址 2.下载的版本 2、Seata参数配置参考&#xff1a; 各种seata参数官网参考 3、Seata安装部署&#xff1a; 3.1.Seata新手部署指南: 3.2.在mysql8.0数据库里面建库建表 a.建数据库&#xff1a; create database seata; use seata;b.建…

开源项目管理工具 Plane 安装和使用教程

说到项目管理工具&#xff0c;很多人脑海中第一个蹦出来的可能就是 Jira 了。没错&#xff0c;Jira 确实很强大&#xff0c;但是...它也有点太强大了&#xff0c;既复杂又昂贵&#xff0c;而且目前也不再提供私有化部署版本了。 再说说飞书&#xff0c;作为国产之光&#xff0…

电路基础 ---- 负反馈放大电路的方框图分析法

1 方框图分析法 方框图如下&#xff1a; 图中 A u o A_{uo} Auo​是一个电压输入的放大器的放大倍数&#xff0c;称为开环放大倍数。 F F F为反馈系数&#xff0c;是一个矢量&#xff0c;是指输出信号 x o x_{o} xo​的多少倍回送到放大器的输入端。 M M M为衰减系数&#x…

[pytorch] --- pytorch基础之损失函数与反向传播

1 损失函数 1.1 Loss Function的作用 每次训练神经网络的时候都会有一个目标&#xff0c;也会有一个输出。目标和输出之间的误差&#xff0c;就是用Loss Function来衡量的。所以Loss误差是越小越好的。此外&#xff0c;我们可以根据误差Loss&#xff0c;指导输出output接近目…

浏览器百科:网页存储篇-Cookie详解(一)

1.引言 在现代网页开发中&#xff0c;数据存储和管理是提升用户体验的重要环节之一。作为网页存储技术的元老&#xff0c;Cookie 自从诞生以来就扮演着不可或缺的角色。Cookie 允许网站在用户浏览器中存储小块数据&#xff0c;从而实现状态保持、用户跟踪以及个性化设置等功能…

数仓基础(六):离线与实时数仓区别和建设思路

文章目录 离线与实时数仓区别和建设思路 一、离线数仓与实时数仓区别 二、实时数仓建设思路 离线与实时数仓区别和建设思路 ​​​​​​​一、离线数仓与实时数仓区别 离线数据与实时数仓区别如下&#xff1a; 对比方面 离线数仓 实时数仓 架构选择 传统大数据架构 …

KRaft模式下的Kafka启动指南:摆脱Zookeeper依赖

一、背景介绍 多年来&#xff0c;人们一直在同时使用Apache ZooKeeper和Apache Kafka。但是自Apache Kafka 3.3发布以来&#xff0c;它就可以在没有ZooKeeper的情况下运行。同时它包含了新的命令kafka-metadata-quorum和kafka-metadata-shell?该如何安装新版kafka&#xff0c…

快手小店多店铺管理神器:甜羊浏览器

随着短视频平台的兴起&#xff0c;快手小店已经成为众多商家的重要销售渠道。然而&#xff0c;对于同时管理多个快手小店的商家来说&#xff0c;如何高效地运营这些店铺成为了一大挑战。特别是在需要同时登录和管理多个店铺账号时&#xff0c;问题尤为突出。那么&#xff0c;如…

【Python报错已解决】“ImportError: cannot import name ‘triu‘ from ‘scipy.linalg‘“?

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 文章目录 引言&#xff1a;一、问题描述1.1 报错示例&#xff1a;以下代码尝试从 scipy.linalg 中导入 triu 函数。1.2 报错分析…