文章目录
- 1. 准备工作
- 1.1 索引库
- 1.2 建表
- 1.3 实体类
- 1.3.1 item.java
- 1.3.2 itemDocument.java
- 1.4 编写配置文件
- 1.5 编写 Mapper 类和 Service 类
- 2. 没有使用多线程的情况
- 2.1 编码
- 2.2 测试结果
- 3. 使用多线程(配合线程池)的情况
- 3.1 自定义类,实现 Runnable 接口
- 3.2 编码(结合线程池)
- 3.3 测试
- 4. 对比及分析
1. 准备工作
测试环境:
- JDK 17.0.7
- SpringBoot 3.0.2
- MySQL 8.0.34
- ElasticSearch 7.17.18
本次测试主要利用的是 Mybatis Plus、PageHelper、fastjson2
MybatisPlus 的 Maven 依赖
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.7</version>
</dependency>
PageHelper 的 Maven 依赖
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
</exclusion>
</exclusions>
</dependency>
fastjson2的 Maven 依赖
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.50</version>
</dependency>
1.1 索引库
创建一个名为 shopping_mall 的索引库
{
"mappings": {
"properties": {
"id": {
"type": "keyword"
},
"name": {
"type": "text",
"analyzer": "ik_smart"
},
"price": {
"type": "integer"
},
"image": {
"type": "keyword",
"index": false
},
"category": {
"type": "keyword"
},
"brand": {
"type": "keyword"
},
"sold": {
"type": "integer"
},
"commentCount": {
"type": "integer",
"index": false
},
"isAD": {
"type": "boolean"
},
"updateTime": {
"type": "date"
}
}
}
}
1.2 建表
表名为 item,表结构如下(由于表中有 88476 条数据,无法在这里展示,需要具体的数据可以私聊我获取 SQL 文件)
/*
Navicat Premium Data Transfer
Source Server : localhost
Source Server Type : MySQL
Source Server Version : 80034 (8.0.34)
Source Host : localhost:3306
Source Schema : blog
Target Server Type : MySQL
Target Server Version : 80034 (8.0.34)
File Encoding : 65001
Date: 25/08/2024 01:59:24
*/
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for item
-- ----------------------------
DROP TABLE IF EXISTS `item`;
CREATE TABLE `item` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '商品id',
`name` varchar(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NOT NULL COMMENT 'SKU名称',
`price` int NOT NULL DEFAULT 0 COMMENT '价格(分)',
`stock` int UNSIGNED NOT NULL COMMENT '库存数量',
`image` varchar(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '商品图片',
`category` varchar(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '类目名称',
`brand` varchar(100) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '品牌名称',
`spec` varchar(200) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci NULL DEFAULT NULL COMMENT '规格',
`sold` int NULL DEFAULT 0 COMMENT '销量',
`comment_count` int NULL DEFAULT 0 COMMENT '评论数',
`isAD` tinyint(1) NULL DEFAULT 0 COMMENT '是否是推广广告,true/false',
`status` int NULL DEFAULT 2 COMMENT '商品状态 1-正常,2-下架,3-删除',
`create_time` datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
`creater` bigint NULL DEFAULT NULL COMMENT '创建人',
`updater` bigint NULL DEFAULT NULL COMMENT '修改人',
PRIMARY KEY (`id`) USING BTREE,
INDEX `status`(`status` ASC) USING BTREE,
INDEX `updated`(`update_time` ASC) USING BTREE,
INDEX `category`(`category` ASC) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 100002672305 CHARACTER SET = utf8mb3 COLLATE = utf8mb3_general_ci COMMENT = '商品表' ROW_FORMAT = COMPACT;
SET FOREIGN_KEY_CHECKS = 1;
1.3 实体类
1.3.1 item.java
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serial;
import java.io.Serializable;
import java.time.LocalDateTime;
@TableName("item")
public class Item implements Serializable {
@Serial
private static final long serialVersionUID = 1L;
/**
* 商品id
*/
@TableId(value = "id", type = IdType.AUTO)
private Long id;
/**
* SKU名称
*/
private String name;
/**
* 价格(分)
*/
private Integer price;
/**
* 库存数量
*/
private Integer stock;
/**
* 商品图片
*/
private String image;
/**
* 类目名称
*/
private String category;
/**
* 品牌名称
*/
private String brand;
/**
* 规格
*/
private String spec;
/**
* 销量
*/
private Integer sold;
/**
* 评论数
*/
private Integer commentCount;
/**
* 是否是推广广告,true/false
*/
@TableField("isAD")
private Boolean isAD;
/**
* 商品状态 1-正常,2-下架,3-删除
*/
private Integer status;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
/**
* 创建人
*/
private Long creater;
/**
* 修改人
*/
private Long updater;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getPrice() {
return price;
}
public void setPrice(Integer price) {
this.price = price;
}
public Integer getStock() {
return stock;
}
public void setStock(Integer stock) {
this.stock = stock;
}
public String getImage() {
return image;
}
public void setImage(String image) {
this.image = image;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public String getBrand() {
return brand;
}
public void setBrand(String brand) {
this.brand = brand;
}
public String getSpec() {
return spec;
}
public void setSpec(String spec) {
this.spec = spec;
}
public Integer getSold() {
return sold;
}
public void setSold(Integer sold) {
this.sold = sold;
}
public Integer getCommentCount() {
return commentCount;
}
public void setCommentCount(Integer commentCount) {
this.commentCount = commentCount;
}
public Boolean getIsAD() {
return isAD;
}
public void setIsAD(Boolean AD) {
isAD = AD;
}
public Integer getStatus() {
return status;
}
public void setStatus(Integer status) {
this.status = status;
}
public LocalDateTime getCreateTime() {
return createTime;
}
public void setCreateTime(LocalDateTime createTime) {
this.createTime = createTime;
}
public LocalDateTime getUpdateTime() {
return updateTime;
}
public void setUpdateTime(LocalDateTime updateTime) {
this.updateTime = updateTime;
}
public Long getCreater() {
return creater;
}
public void setCreater(Long creater) {
this.creater = creater;
}
public Long getUpdater() {
return updater;
}
public void setUpdater(Long updater) {
this.updater = updater;
}
@Override
public String toString() {
return "Item{" +
"id=" + id +
", name='" + name + '\'' +
", price=" + price +
", stock=" + stock +
", image='" + image + '\'' +
", category='" + category + '\'' +
", brand='" + brand + '\'' +
", spec='" + spec + '\'' +
", sold=" + sold +
", commentCount=" + commentCount +
", isAD=" + isAD +
", status=" + status +
", createTime=" + createTime +
", updateTime=" + updateTime +
", creater=" + creater +
", updater=" + updater +
'}';
}
}
1.3.2 itemDocument.java
import java.time.LocalDateTime;
/**
* 索引库实体类
*/
public class ItemDocument {
private Long id;
private String name;
private Integer price;
private Integer stock;
private String image;
private String category;
private String brand;
private Integer sold;
private Integer commentCount;
private Boolean isAD;
private LocalDateTime updateTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getPrice() {
return price;
}
public void setPrice(Integer price) {
this.price = price;
}
public Integer getStock() {
return stock;
}
public void setStock(Integer stock) {
this.stock = stock;
}
public String getImage() {
return image;
}
public void setImage(String image) {
this.image = image;
}
public String getCategory() {
return category;
}
public void setCategory(String category) {
this.category = category;
}
public String getBrand() {
return brand;
}
public void setBrand(String brand) {
this.brand = brand;
}
public Integer getSold() {
return sold;
}
public void setSold(Integer sold) {
this.sold = sold;
}
public Integer getCommentCount() {
return commentCount;
}
public void setCommentCount(Integer commentCount) {
this.commentCount = commentCount;
}
public Boolean getIsAD() {
return isAD;
}
public void setIsAD(Boolean AD) {
isAD = AD;
}
public LocalDateTime getUpdateTime() {
return updateTime;
}
public void setUpdateTime(LocalDateTime updateTime) {
this.updateTime = updateTime;
}
@Override
public String toString() {
return "ItemDocument{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", price=" + price +
", stock=" + stock +
", image='" + image + '\'' +
", category='" + category + '\'' +
", brand='" + brand + '\'' +
", sold=" + sold +
", commentCount=" + commentCount +
", isAD=" + isAD +
", updateTime=" + updateTime +
'}';
}
}
1.4 编写配置文件
编写配置文件前,先导入 MySQL 连接驱动的 Maven 依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
application.yaml
spring:
datasource:
url: jdbc:mysql://localhost:3306/blog?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
driver-class-name: com.mysql.cj.jdbc.Driver
username: root
password: 123456
编写完配置文件后,在项目的启动类上添加 @MapperScan 注解,指定 Mapper 所在的包
@MapperScan("cn.edu.scau.mapper")
1.5 编写 Mapper 类和 Service 类
ItemMapper.java
import cn.edu.scau.pojo.Item;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
public interface ItemMapper extends BaseMapper<Item> {
}
ItemService.java
import cn.edu.scau.pojo.Item;
import com.baomidou.mybatisplus.extension.service.IService;
public interface ItemService extends IService<Item> {
}
ItemServiceImpl.java
import cn.edu.scau.mapper.ItemMapper;
import cn.edu.scau.pojo.Item;
import cn.edu.scau.service.ItemService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
@Service
public class ItemServiceImpl extends ServiceImpl<ItemMapper, Item> implements ItemService {
}
完成上述工作后,编写一个测试类,检查 ItemServiceImpl 类能否正常工作
import cn.edu.scau.service.ItemService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class ItemServiceTests {
@Autowired
private ItemService itemService;
@Test
public void test() {
System.out.println(itemService.getById(317578L));
}
}
2. 没有使用多线程的情况
我们先来测试一下没有使用多线程的情况
2.1 编码
import cn.edu.scau.pojo.Item;
import cn.edu.scau.pojo.ItemDocument;
import cn.edu.scau.service.ItemService;
import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.github.pagehelper.PageHelper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.xcontent.XContentType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
@SpringBootTest
public class BulkInsertDocumentTests {
private RestHighLevelClient restHighLevelClient;
@Autowired
private ItemService itemService;
@Test
public void testBulkInsertDocument() throws Exception {
int pageNumber = 1;
int pageSize = 500;
while (true) {
// 1.准备文档数据
QueryWrapper<Item> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda().eq(Item::getStatus, 1);
PageHelper.startPage(pageNumber, pageSize);
List<Item> itemList = itemService.list(queryWrapper);
if (itemList == null || itemList.isEmpty()) {
return;
}
// 2.准备 BulkRequest 对象
BulkRequest bulkRequest = new BulkRequest();
// 3.准备请求参数
ItemDocument itemDocument;
for (Item item : itemList) {
itemDocument = new ItemDocument();
BeanUtils.copyProperties(item, itemDocument);
bulkRequest.add(new IndexRequest("shopping_mall")
.id(item.getId().toString())
.source(JSON.toJSONString(itemDocument), XContentType.JSON));
}
// 4.发送请求
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
// 5.翻页
pageNumber++;
}
}
@BeforeEach
public void setUp() {
restHighLevelClient = new RestHighLevelClient(RestClient.builder(
new HttpHost("127.0.0.1", 9200, "http")
));
}
@AfterEach
public void tearDown() throws Exception {
restHighLevelClient.close();
}
}
2.2 测试结果
GET /shopping_mall/_count
共有 88475 条数据
- 第一次导入耗时 36 秒 954 毫秒
- 第二次导入耗时 38 秒 454 毫秒
- 第三次导入耗时 38 秒 910 毫秒
- 第四次导入耗时 40 秒 671毫秒
- 第五次导入耗时 38 秒 958毫秒
- 第六次导入耗时 38 秒 470毫秒
3. 使用多线程(配合线程池)的情况
3.1 自定义类,实现 Runnable 接口
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class InsertDocumentThread implements Runnable {
private final RestHighLevelClient restHighLevelClient;
private final BulkRequest bulkRequest;
private final CountDownLatch countDownLatch;
public InsertDocumentThread(RestHighLevelClient restHighLevelClient, BulkRequest bulkRequest, CountDownLatch countDownLatch) {
this.restHighLevelClient = restHighLevelClient;
this.bulkRequest = bulkRequest;
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
countDownLatch.countDown();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
3.2 编码(结合线程池)
@Test
public void testBulkInsertDocumentWithMultipleThread() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<>(1000);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2 * availableProcessors,
4 * availableProcessors,
100,
TimeUnit.SECONDS,
linkedBlockingQueue,
Thread::new,
new ThreadPoolExecutor.DiscardPolicy()
);
int pageNumber = 1;
int pageSize = 500;
long count = itemService.count() % pageSize == 0 ? itemService.count() / pageSize : itemService.count() / pageSize + 1;
CountDownLatch countDownLatch = new CountDownLatch((int) count);
long start = System.currentTimeMillis();
while (true) {
// 1.准备文档数据
QueryWrapper<Item> queryWrapper = new QueryWrapper<>();
queryWrapper.lambda().eq(Item::getStatus, 1);
PageHelper.startPage(pageNumber, pageSize);
List<Item> itemList = itemService.list(queryWrapper);
if (itemList == null || itemList.isEmpty()) {
break;
}
// 2.准备 BulkRequest 对象
BulkRequest bulkRequest = new BulkRequest();
// 3.准备请求参数
ItemDocument itemDocument;
for (Item item : itemList) {
itemDocument = new ItemDocument();
BeanUtils.copyProperties(item, itemDocument);
bulkRequest.add(new IndexRequest("shopping_mall")
.id(item.getId().toString())
.source(JSON.toJSONString(itemDocument), XContentType.JSON));
}
// 4.发送请求
InsertDocumentThread insertDocumentThread = new InsertDocumentThread(restHighLevelClient, bulkRequest, countDownLatch);
threadPoolExecutor.submit(insertDocumentThread);
// 5.翻页
pageNumber++;
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start) / 1000 + "秒 " + (end - start) % 1000 + " 毫秒");
}
3.3 测试
DELETE /shopping_mall
我们先删除 shopping_mall 索引库,再次进行批量导入操作
GET /shopping_mall/_count
共导入 88475 条数据
- 第一次导入耗时 30秒 657 毫秒
- 第二次导入耗时 35 秒 200 毫秒
- 第三次导入耗时 32 秒 265 毫秒
- 第四次导入耗时 34 秒 11 毫秒
- 第五次导入耗时 30 秒 778 毫秒
- 第六次导入耗时 32 秒 861 毫秒
4. 对比及分析
通过对比可以发现,使用多线程从 MySQL 批量导入数据到 ElasticSearch,虽然速度提升了一点,但是不多,可能是因为以下原因:
- 服务器的 CPU 核心数:我在做测试时,数据库用的是本地的,但 ElasticSearch 用的是云服务器,云服务器的 CPU 配置是 2 核,这也可能是导致使用多线程批量导入数据速度提升不明显的原因
- I/O 密集型操作:Elasticsearch 的索引操作通常是 I/O 密集型的,这意味着瓶颈可能在于网络延迟和 Elasticsearch 服务器的响应时间,而不是 CPU 的处理能力,在这种情况下,增加线程数可能不会显著提高性能,因为 I/O 操作无法并行执行得更快
- 网络带宽限制:网络带宽可能是瓶颈(我使用的云服务器的带宽是 6M),特别是在批量插入大量数据时