需求:为了利用elasticsearch实现高效搜索,需要将mysql中的数据查出来,再定时同步到es里,同时在同步过程中通过分片广播+多线程提高同步数据的效率。
1. 添加映射
- 使用kibana添加映射
PUT /app_info_article
{
"mappings":{
"properties":{
"id":{
"type":"long"
},
"publishTime":{
"type":"date"
},
"layout":{
"type":"integer"
},
"images":{
"type":"keyword",
"index": false
},
"staticUrl":{
"type":"keyword",
"index": false
},
"authorId": {
"type": "long"
},
"authorName": {
"type": "keyword"
},
"title":{
"type":"text",
"analyzer":"ik_max_word",
"copy_to": "all"
},
"content":{
"type":"text",
"analyzer":"ik_max_word",
"copy_to": "all"
},
"all":{
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
- 使用http请求
PUT请求添加映射:http://192.168.200.130:9200/app_info_article
GET请求查询映射:http://192.168.200.130:9200/app_info_article
DELETE请求,删除索引及映射:http://192.168.200.130:9200/app_info_article
GET请求,查询所有文档:http://192.168.200.130:9200/app_info_article/_search
2. springboot测试
引入依赖
<!--elasticsearch-->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.12.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>7.12.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.12.1</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-smile</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
</exclusions>
</dependency>
elasticsearch配置
#自定义elasticsearch连接配置
elasticsearch:
host: 192.168.200.131
port: 9200
elasticsearch配置类
@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {
private String host;
private int port;
@Bean
public RestHighLevelClient client(){
return new RestHighLevelClient(RestClient.builder(
new HttpHost(
host,
port,
"http"
)
));
}
}
实体类
@Data
public class SearchArticleVo {
// 文章id
private Long id;
// 文章标题
private String title;
// 文章发布时间
private Date publishTime;
// 文章布局
private Integer layout;
// 封面
private String images;
// 作者id
private Long authorId;
// 作者名词
private String authorName;
//静态url
private String staticUrl;
//文章内容
private String content;
}
测试。测试成功后别忘了删除app_info_article的所有文档
@SpringBootTest
@RunWith(SpringRunner.class)
public class ApArticleTest {
@Autowired
private ApArticleMapper apArticleMapper;
@Autowired
private RestHighLevelClient restHighLevelClient;
/**
* 注意:数据量的导入,如果数据量过大,需要分页导入
* 1)查询数据库数据
* 2)将数据写入到ES中即可
* 创建BulkRequest
* ================================
* ||A:创建XxxRequest
* ||B:向XxxRequest封装DSL语句数据
* || X C:使用RestHighLevelClient执行远程请求
* ================================
* 将XxxRequest添加到BulkRequest
* 使用RestHighLevelClient将BulkRequest添加到索引库
* @throws Exception
*/
@Test
public void init() throws Exception {
//1)查询数据库数据
List<SearchArticleVo> searchArticleVos = apArticleMapper.loadArticleList();
//2)创建BulkRequest - 刷新策略
BulkRequest bulkRequest = new BulkRequest()
//刷新策略-立即刷新
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (SearchArticleVo searchArticleVo : searchArticleVos) {
//A:创建XxxRequest
IndexRequest indexRequest = new IndexRequest("app_info_article")
//B:向XxxRequest封装DSL语句数据
.id(searchArticleVo.getId().toString())
.source(JSON.toJSONString(searchArticleVo), XContentType.JSON);
//3)将XxxRequest添加到BulkRequest
bulkRequest.add(indexRequest);
}
//4)使用RestHighLevelClient将BulkRequest添加到索引库
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
}
}
3. 核心代码
3.1 xxljob配置
xxl:
job:
accessToken: default_token
admin:
addresses: http://127.0.0.1:8080/xxl-job-admin
executor:
address: ''
appname: hmtt
ip: ''
logpath: /data/applogs/xxl-job/jobhandler
logretentiondays: 30
port: 9998
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
3.2 elasticsearch配置
和前面一样
3.3 任务编写
@Component
public class SyncIndexTask {
@Autowired
private IArticleClient articleClient;
@Autowired
private RestHighLevelClient restHighLevelClient;
//线程池
public static ExecutorService pool = Executors.newFixedThreadPool(10);
/***
* 同步索引任务
* 1)当数量大于100条的时候,才做分片导入,否则只让第1个导入即可
* A:查询所有数据量 ->searchTotal total>100 [判断当前分片不是第1个分片]
* 第N个分片执行数据处理范围-要计算 确定当前分片处理的数据范围 limit #{index},#{size}
* [index-范围]
*
* B:执行分页查询-需要根据index判断是否超过界限,如果没有超过界限,则并开启多线程,分页查询,将当前分页数据批量导入到ES
*
* C:在xxl-job中配置作业-策略:分片策略
*
*/
@XxlJob("syncIndex")
public void syncIndex() {
//1、获取任务传入的参数 {"minSize":100,"size":10}
String jobParam = XxlJobHelper.getJobParam();
Map<String,Integer> jobData = JSON.parseObject(jobParam,Map.class);
int minSize = jobData.get("minSize"); //分片处理的最小总数据条数
int size = jobData.get("size"); //分页查询的每页条数 小分页
//2、查询需要处理的总数据量 total=IArticleClient.searchTotal()
Long total = articleClient.searchTotal();
//3、判断当前分片是否属于第1片,不属于,则需要判断总数量是否大于指定的数据量[minSize],大于,则执行任务处理,小于或等于,则直接结束任务
int cn = XxlJobHelper.getShardIndex(); //当前节点的下标
if(total<=minSize && cn!=0){
//结束
return;
}
//4、执行任务 [index-范围] 大的分片分页处理
//4.1:节点个数
int n = XxlJobHelper.getShardTotal();
//4.2:当前节点处理的数据量
int count = (int) (total % n==0? total/n : (total/n)+1);
//4.3:确定当前节点处理的数据范围
//从下标为index的数据开始处理 limit #{index},#{count}
int indexStart = cn*count;
int indexEnd = cn*count+count-1; //最大的范围的最后一个数据的下标
//5.小的分页查询和批量处理
int index =indexStart; //第1页的index
System.out.println("分片个数是【"+n+"】,当前分片下标【"+cn+"】,处理的数据下标范围【"+indexStart+"-"+indexEnd+"】");
do {
//=============================================小分页================================
//5.1:分页查询
//5.2:将数据导入ES
push(index,size,indexEnd);
//5.3:是否要查询下一页 index+size
index = index+size;
}while (index<=indexEnd);
}
/**
* 数据批量导入
* @param index
* @param size
* @param indexEnd
* @throws IOException
*/
public void push(int index,int size,int indexEnd) {
pool.execute(()->{
System.out.println("当前线程处理的分页数据是【index="+index+",size="+(index+size>indexEnd? indexEnd-index+1 : size)+"】");
//1)查询数据库数据
List<SearchArticleVo> searchArticleVos = articleClient.searchPage(index, index+size>indexEnd? indexEnd-index+1 : size); //size可能越界
//2)创建BulkRequest - 刷新策略
BulkRequest bulkRequest = new BulkRequest()
//刷新策略-立即刷新
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (SearchArticleVo searchArticleVo : searchArticleVos) {
//A:创建XxxRequest
IndexRequest indexRequest = new IndexRequest("app_info_article")
//B:向XxxRequest封装DSL语句数据
.id(searchArticleVo.getId().toString())
.source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);
//3)将XxxRequest添加到BulkRequest
bulkRequest.add(indexRequest);
}
//4)使用RestHighLevelClient将BulkRequest添加到索引库
if(searchArticleVos!=null && searchArticleVos.size()>0){
try {
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
3.4 xxl-admin新增任务
- 新增执行器 这里的appName要和XxlJobConfig 中的appname保持一致。也可以不用新增执行器,执行器就相当于一个分组的作用。
- 新增任务
4. 模拟集群,运行项目
- 运行3个SearchApplication项目,设置xxl.job.executor.port分别为9998,9997,9996
- 执行一次任务
- 查看结果,3个application都成功了
- kibana查看是否有数据,发现有18条,mysql的数据全部被导入了
5. 总结
-
xxljob分片广播:假如一共有1000条数据,有3个节点上运行着SearchApplication服务。那么每个节点需要同步的数据总条数为334,334,332条。
-
分页查询:节点0的任务总条数为334条,那么需要做分页(假设分页size为20)查询的次数为17次,每查1次后,将查到的数据通过restHighLevelClient发送到es中。
do {
//5.1:分页查询
//5.2:将数据导入ES
push(index,size,indexEnd); //分页查询+导入es的操作
//5.3:是否要查询下一页 index+size
index = index+size;
}while (index<=indexEnd);
- 多线程:上述代码,push方法包括了分页查询+导入es的操作,是低效的。并且push方法不结束的话,下一页的操作不会开始。这里可以用多线程,每次push的时候都开启一个新线程,这样每一页的操作都是独立的,可以同时查询,同时导入到es,不会互相影响。这里使用了线程池。
public static ExecutorService pool = Executors.newFixedThreadPool(10);
......
public void push(int index,int size,int indexEnd) {
pool.execute(()->{
//分页查询+导入到es
});
}
- elasticsearch的相关api:参考我另一篇博客 项目中使用Elasticsearch的API相关介绍
//2)创建BulkRequest - 刷新策略
BulkRequest bulkRequest = new BulkRequest()
//刷新策略-立即刷新
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (SearchArticleVo searchArticleVo : searchArticleVos) {
//A:创建XxxRequest
IndexRequest indexRequest = new IndexRequest("app_info_article")
//B:向XxxRequest封装DSL语句数据
.id(searchArticleVo.getId().toString())
.source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);
//3)将XxxRequest添加到BulkRequest
bulkRequest.add(indexRequest);
}
//4)使用RestHighLevelClient将BulkRequest添加到索引库
if(searchArticleVos!=null && searchArticleVos.size()>0){
try {
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
}