go zero 使用MapReduce并发
一、MapReduce 介绍
MapReduce 是一种用于并行计算的编程模型,特别适合在大规模数据处理场景中简化逻辑代码。
官方文档:
https://go-zero.dev/docs/components/mr
1. MapReduce 的核心概念
在 MapReduce 中,主要有以下三个核心步骤:
a. Generate (生成数据):
- 数据的初始输入阶段。可以是一个简单的循环,也可以是从数据库、文件或其他来源加载数据。
b. Mapper (映射): - 将输入数据映射为中间结果。通常用来过滤、转换、查询或处理数据。
c. Reducer (归约): - 对映射后的数据进行汇总处理,生成最终的结果。
在 go zero 中,mr.MapReduce
的具体代码如下:
func MapReduce[T, U, V any](generate GenerateFunc[T], mapper MapperFunc[T, U], reducer ReducerFunc[U, V],
opts ...Option) (V, error) {
panicChan := &onceChan{channel: make(chan any)}
source := buildSource(generate, panicChan)
return mapReduceWithPanicChan(source, panicChan, mapper, reducer, opts...)
}
2. 为什么需要 MapReduce
在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。
比如要查询商品详情:
- 商品服务-查询商品属性
- 库存服务-查询库存属性
- 价格服务-查询价格属性
- 营销服务-查询营销属性
如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。
简单的场景下使用 WaitGroup
也能够满足需求,但是如果我们需要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?继续使用 WaitGroup
就有点力不从心了.。
二、项目构建
接下来我们使用一个文章列表功能简单的演示下
1. article数据表
这是存储文章信息的表,包含标题、内容、作者、评论数等字段。
CREATE TABLE `article` (
`id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`title` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '标题' COLLATE 'utf8mb4_bin',
`content` TEXT NOT NULL COMMENT '内容' COLLATE 'utf8_unicode_ci',
`cover` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '封面' COLLATE 'utf8mb4_bin',
`description` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '描述' COLLATE 'utf8mb4_bin',
PRIMARY KEY (`id`)
);
2.article.api
在实际开发中,应该传入作者ID、游标、页码、排序方法等信息,这里为了方便演示就不传入参数了,API 定义如下:
syntax = "v1"
type (
ArticleInfo {
ArticleId int64 `json:"article_id"`
Title string `json:"title"`
Content string `json:"content"`
Description string `json:"description"`
Cover string `json:"cover"`
}
ArticleListResponse {
Articles []ArticleInfo `json:"articles"`
}
)
@server (
prefix: /v1/article
)
service article-api {
@handler Articlelisthandler
post /list returns (ArticleListResponse)
}
三、使用 MapReduce
拉取库
go get github.com/zeromicro/go-zero/core/mr
1.实现文章列表
我们使用 MapReduce 来并行处理文章数据,
func (l *ArticlelistLogic) Articlelist() (resp *types.ArticleListResponse, err error) {
// Step 1: Generate 数据
//这里为了方便我使用了简单for循环产生文档ID
generateFunc := func(source chan<- int) {
for id := 1; id < 50; id++ { // 模拟文章 ID 数据
source <- id
}
}
articleModel := l.svcCtx.ArticleModel
// Step 2: Mapper 映射处理
mapperFunc := func(id int, writer mr.Writer[*types.ArticleInfo], cancel func(error)) {
//使用产生id,查询文章详情
one, err := articleModel.FindOne(l.ctx, uint64(id)) // 查找单篇文章
if err != nil {
return // 跳过错误
}
//FindOne返回的是 *model.Article类型,Mapper映射的类型为*types.ArticleInfo
//所以需要转换一下
articleInfo := &types.ArticleInfo{
ArticleId: int64(one.Id),
Title: one.Title,
Content: one.Content,
Description: one.Description,
Cover: one.Cover,
}
writer.Write(articleInfo) // 写入中间结果
}
// Step 3: Reducer 汇总处理
reduceFunc := func(pipe <-chan *types.ArticleInfo, writer mr.Writer[[]types.ArticleInfo], cancel func(error)) {
var articleList []types.ArticleInfo
for article := range pipe {
articleList = append(articleList, *article)
}
writer.Write(articleList) // 写入最终结果
}
// 调用 MapReduce
//mr.WithWorkers(5) 允许调用者自定义并发工作线程数。
//如果不传入mr.WithWorkers ,默认Workers为16个
reduce, err := mr.MapReduce(generateFunc, mapperFunc, reduceFunc, mr.WithWorkers(5))
if err != nil {
return nil, err // 处理错误
}
// 返回结果
return &types.ArticleListResponse{
Articles: reduce,
}, nil
}
2. 详细讲解
Step 1: Generate 数据
generateFunc
的作用是提供初始数据。在本例中,我们通过一个循环生成了文章的 ID:
generateFunc := func(source chan<- int) {
for id := 1; id < 50; id++ {
source <- id
}
}
Step 2: Mapper 映射处理
mapperFunc
用于处理每一个文章 ID,并将其转换为 ArticleInfo
。
- 使用
articleModel.FindOne
从数据库中获取文章数据。 - 如果获取失败,跳过该 ID。
- 将结果通过
writer.Write
写入到下一步。
mapperFunc := func(id int, writer mr.Writer[*types.ArticleInfo], cancel func(error)) {
one, err := articleModel.FindOne(l.ctx, uint64(id))
if err != nil {
return
}
articleInfo := &types.ArticleInfo{
ArticleId: int64(one.Id),
Title: one.Title,
Content: one.Content,
Description: one.Description,
Cover: one.Cover,
}
writer.Write(articleInfo)
}
Step 3: Reducer 汇总处理
reduceFunc
将 mapperFunc
的结果汇总为最终的 []types.ArticleInfo
。
- 遍历管道中的每个
*types.ArticleInfo
。 - 将解引用后的
ArticleInfo
添加到结果列表。
reduceFunc := func(pipe <-chan *types.ArticleInfo, writer mr.Writer[[]types.ArticleInfo], cancel func(error)) {
var articleList []types.ArticleInfo
for article := range pipe {
articleList = append(articleList, *article)
}
writer.Write(articleList)
}
3. 测试运行
向 /v1/article/list
发送 POST 请求:
curl -X POST http://localhost:8888/v1/article/list
运行结果如下:
{
"articles": [
{
"article_id": 1,
"title": "标题1",
"content": "这是内容1",
"description": "描述1",
"cover": "封面1.jpg"
},
...
]
}
4.效率对比
普通循环
为了更直观的对比效率,我们使用普通循环再次实现下文章列表:
func (l *ArticlelistLogic) Articlelist() (resp *types.ArticleListResponse, err error) {
// todo: add your logic here and delete this line
time1 := time.Now()
var articleList []types.ArticleInfo
articleModel := l.svcCtx.ArticleModel
for id := 1; id < 50; id++ {
article, _ := articleModel.FindOne(l.ctx, uint64(id))
articleInfo := types.ArticleInfo{
ArticleId: int64(article.Id),
Title: article.Title,
Content: article.Content,
Description: article.Description,
Cover: article.Cover,
}
articleList = append(articleList, articleInfo)
}
time2 := time.Now()
logx.Info("执行时间为:", time2.Sub(time1))
return &types.ArticleListResponse{
Articles: articleList,
}, nil
}
效率对比
这个执行时间可能每次都不一样,但是进过多次对比, 使用mapreduce 效率是高于普通方法的
使用串行调用时间:
使用MapReduce消耗时间: