xxljob分片广播+多线程实现高效定时同步elasticsearch索引库

news2025/1/10 22:00:19

需求:为了利用elasticsearch实现高效搜索,需要将mysql中的数据查出来,再定时同步到es里,同时在同步过程中通过分片广播+多线程提高同步数据的效率。

1. 添加映射

  • 使用kibana添加映射
PUT  /app_info_article
{
    "mappings":{
        "properties":{
            "id":{
                "type":"long"
            },
            "publishTime":{
                "type":"date"
            },
            "layout":{
                "type":"integer"
            },
            "images":{
                "type":"keyword",
                "index": false
            },
            "staticUrl":{
                "type":"keyword",
                "index": false
            },
            "authorId": {
                "type": "long"
            },
            "authorName": {
                "type": "keyword"
            },
            "title":{
                "type":"text",
                "analyzer":"ik_max_word",
                "copy_to": "all"
            },
            "content":{
                "type":"text",
                "analyzer":"ik_max_word",
                "copy_to": "all"
            },
            "all":{
              "type": "text",
              "analyzer": "ik_max_word"
            }
        }
    }
}
  • 使用http请求
    PUT请求添加映射:http://192.168.200.130:9200/app_info_article
    GET请求查询映射:http://192.168.200.130:9200/app_info_article
    DELETE请求,删除索引及映射:http://192.168.200.130:9200/app_info_article
    GET请求,查询所有文档:http://192.168.200.130:9200/app_info_article/_search
    在这里插入图片描述

2. springboot测试

引入依赖

<!--elasticsearch-->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>7.12.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>7.12.1</version>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>7.12.1</version>
			<exclusions>
                <exclusion>
                    <groupId>com.fasterxml.jackson.dataformat</groupId>
                    <artifactId>jackson-dataformat-smile</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.fasterxml.jackson.dataformat</groupId>
                    <artifactId>jackson-dataformat-yaml</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

elasticsearch配置

#自定义elasticsearch连接配置
elasticsearch:
  host: 192.168.200.131
  port: 9200

elasticsearch配置类

@Getter
@Setter
@Configuration
@ConfigurationProperties(prefix = "elasticsearch")
public class ElasticSearchConfig {
    private String host;
    private int port;

    @Bean
    public RestHighLevelClient client(){
        return new RestHighLevelClient(RestClient.builder(
                new HttpHost(
                        host,
                        port,
                        "http"
                )
        ));
    }
}

实体类

@Data
public class SearchArticleVo {

    // 文章id
    private Long id;
    // 文章标题
    private String title;
    // 文章发布时间
    private Date publishTime;
    // 文章布局
    private Integer layout;
    // 封面
    private String images;
    // 作者id
    private Long authorId;
    // 作者名词
    private String authorName;
    //静态url
    private String staticUrl;
    //文章内容
    private String content;

}

测试。测试成功后别忘了删除app_info_article的所有文档

@SpringBootTest
@RunWith(SpringRunner.class)
public class ApArticleTest {

    @Autowired
    private ApArticleMapper apArticleMapper;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    /**
     * 注意:数据量的导入,如果数据量过大,需要分页导入
     *  1)查询数据库数据
     *  2)将数据写入到ES中即可
     *     创建BulkRequest
     *            ================================
     *            ||A:创建XxxRequest
     *            ||B:向XxxRequest封装DSL语句数据
     *            ||                             X C:使用RestHighLevelClient执行远程请求
     *            ================================
     *            将XxxRequest添加到BulkRequest
     *       使用RestHighLevelClient将BulkRequest添加到索引库
     * @throws Exception
     */
    @Test
    public void init() throws Exception {
        //1)查询数据库数据
        List<SearchArticleVo> searchArticleVos = apArticleMapper.loadArticleList();

        //2)创建BulkRequest - 刷新策略
        BulkRequest bulkRequest = new BulkRequest()
                //刷新策略-立即刷新
                .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        for (SearchArticleVo searchArticleVo : searchArticleVos) {
            //A:创建XxxRequest
            IndexRequest indexRequest = new IndexRequest("app_info_article")
                    //B:向XxxRequest封装DSL语句数据
                    .id(searchArticleVo.getId().toString())
                    .source(JSON.toJSONString(searchArticleVo), XContentType.JSON);

            //3)将XxxRequest添加到BulkRequest
            bulkRequest.add(indexRequest);
        }

        //4)使用RestHighLevelClient将BulkRequest添加到索引库
        restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    }

}

3. 核心代码

3.1 xxljob配置

xxl:
  job:
    accessToken: default_token
    admin:
      addresses: http://127.0.0.1:8080/xxl-job-admin
    executor:
      address: ''
      appname: hmtt
      ip: ''
      logpath: /data/applogs/xxl-job/jobhandler
      logretentiondays: 30
      port: 9998
@Configuration
public class XxlJobConfig {
    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;


    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }
}

3.2 elasticsearch配置

和前面一样

3.3 任务编写

@Component
public class SyncIndexTask {

    @Autowired
    private IArticleClient articleClient;

    @Autowired
    private RestHighLevelClient restHighLevelClient;

    //线程池
    public static ExecutorService pool = Executors.newFixedThreadPool(10);

    /***
     * 同步索引任务
     *  1)当数量大于100条的时候,才做分片导入,否则只让第1个导入即可
     *      A:查询所有数据量 ->searchTotal total>100 [判断当前分片不是第1个分片]
     *      第N个分片执行数据处理范围-要计算   确定当前分片处理的数据范围  limit #{index},#{size}
     *                                                                [index-范围]
     *
     *      B:执行分页查询-需要根据index判断是否超过界限,如果没有超过界限,则并开启多线程,分页查询,将当前分页数据批量导入到ES
     *
     *      C:在xxl-job中配置作业-策略:分片策略
     *
     */
    @XxlJob("syncIndex")
    public void syncIndex()  {
        //1、获取任务传入的参数   {"minSize":100,"size":10}
        String jobParam = XxlJobHelper.getJobParam();
        Map<String,Integer> jobData = JSON.parseObject(jobParam,Map.class);
        int minSize = jobData.get("minSize"); //分片处理的最小总数据条数
        int size =  jobData.get("size"); //分页查询的每页条数   小分页

        //2、查询需要处理的总数据量  total=IArticleClient.searchTotal()
        Long total = articleClient.searchTotal();

        //3、判断当前分片是否属于第1片,不属于,则需要判断总数量是否大于指定的数据量[minSize],大于,则执行任务处理,小于或等于,则直接结束任务
        int cn = XxlJobHelper.getShardIndex(); //当前节点的下标
        if(total<=minSize && cn!=0){
            //结束
            return;
        }
        //4、执行任务   [index-范围]   大的分片分页处理
        //4.1:节点个数
        int n = XxlJobHelper.getShardTotal();
        //4.2:当前节点处理的数据量
        int count = (int) (total % n==0? total/n :  (total/n)+1);
        //4.3:确定当前节点处理的数据范围
        //从下标为index的数据开始处理  limit #{index},#{count}
        int indexStart = cn*count;
        int indexEnd = cn*count+count-1; //最大的范围的最后一个数据的下标
        //5.小的分页查询和批量处理
        int index =indexStart; //第1页的index

        System.out.println("分片个数是【"+n+"】,当前分片下标【"+cn+"】,处理的数据下标范围【"+indexStart+"-"+indexEnd+"】");
        do {
            //=============================================小分页================================
            //5.1:分页查询
            //5.2:将数据导入ES
            push(index,size,indexEnd);

            //5.3:是否要查询下一页 index+size
            index = index+size;
        }while (index<=indexEnd);
    }


    /**
     * 数据批量导入
     * @param index
     * @param size
     * @param indexEnd
     * @throws IOException
     */
    public void push(int index,int size,int indexEnd)  {

        pool.execute(()->{
            System.out.println("当前线程处理的分页数据是【index="+index+",size="+(index+size>indexEnd? indexEnd-index+1 : size)+"】");
            //1)查询数据库数据
            List<SearchArticleVo> searchArticleVos = articleClient.searchPage(index, index+size>indexEnd? indexEnd-index+1 : size);  //size可能越界
            //2)创建BulkRequest - 刷新策略
            BulkRequest bulkRequest = new BulkRequest()
                    //刷新策略-立即刷新
                    .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
            for (SearchArticleVo searchArticleVo : searchArticleVos) {
                //A:创建XxxRequest
                IndexRequest indexRequest = new IndexRequest("app_info_article")
                        //B:向XxxRequest封装DSL语句数据
                        .id(searchArticleVo.getId().toString())
                        .source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);

                //3)将XxxRequest添加到BulkRequest
                bulkRequest.add(indexRequest);
            }

            //4)使用RestHighLevelClient将BulkRequest添加到索引库
            if(searchArticleVos!=null && searchArticleVos.size()>0){
                try {
                    restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

3.4 xxl-admin新增任务

  • 新增执行器 这里的appName要和XxlJobConfig 中的appname保持一致。也可以不用新增执行器,执行器就相当于一个分组的作用。
    在这里插入图片描述
  • 新增任务
    在这里插入图片描述

4. 模拟集群,运行项目

  • 运行3个SearchApplication项目,设置xxl.job.executor.port分别为9998,9997,9996
    在这里插入图片描述
  • 执行一次任务
    在这里插入图片描述
  • 查看结果,3个application都成功了
    在这里插入图片描述
  • kibana查看是否有数据,发现有18条,mysql的数据全部被导入了

在这里插入图片描述

5. 总结

  • xxljob分片广播:假如一共有1000条数据,有3个节点上运行着SearchApplication服务。那么每个节点需要同步的数据总条数为334,334,332条。
    在这里插入图片描述

  • 分页查询:节点0的任务总条数为334条,那么需要做分页(假设分页size为20)查询的次数为17次,每查1次后,将查到的数据通过restHighLevelClient发送到es中。

do {
    //5.1:分页查询
    //5.2:将数据导入ES
    push(index,size,indexEnd);  //分页查询+导入es的操作

    //5.3:是否要查询下一页 index+size
    index = index+size;
}while (index<=indexEnd);
  • 多线程:上述代码,push方法包括了分页查询+导入es的操作,是低效的。并且push方法不结束的话,下一页的操作不会开始。这里可以用多线程,每次push的时候都开启一个新线程,这样每一页的操作都是独立的,可以同时查询,同时导入到es,不会互相影响。这里使用了线程池。
public static ExecutorService pool = Executors.newFixedThreadPool(10);
......
public void push(int index,int size,int indexEnd)  {
		pool.execute(()->{
            //分页查询+导入到es
        });
     }
  • elasticsearch的相关api:参考我另一篇博客 项目中使用Elasticsearch的API相关介绍
//2)创建BulkRequest - 刷新策略
 BulkRequest bulkRequest = new BulkRequest()
         //刷新策略-立即刷新
         .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
 for (SearchArticleVo searchArticleVo : searchArticleVos) {
     //A:创建XxxRequest
     IndexRequest indexRequest = new IndexRequest("app_info_article")
             //B:向XxxRequest封装DSL语句数据
             .id(searchArticleVo.getId().toString())
             .source(com.alibaba.fastjson.JSON.toJSONString(searchArticleVo), XContentType.JSON);

     //3)将XxxRequest添加到BulkRequest
     bulkRequest.add(indexRequest);
 }

 //4)使用RestHighLevelClient将BulkRequest添加到索引库
 if(searchArticleVos!=null && searchArticleVos.size()>0){
     try {
         restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
     } catch (IOException e) {
         e.printStackTrace();
     }
 }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1679749.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

维护表空间中的数据文件

目录 向表空间中添加数据文件 从表空间中删除数据文件 删除users表空间中的users02.dbf数据文件 对数据文件的自动扩展设置 Oracle从入门到总裁:​​​​​​https://blog.csdn.net/weixin_67859959/article/details/135209645 维护表空间中的数据文件主要包括向表空间中添…

服务网格 SolarMesh v1.13 重磅发布

SolarMesh是行云创新推出的流量治理平台&#xff0c;它基于Istio&#xff0c;为部署在K8s集群上的应用提供全面的流量治理能力。 在之前的版本中&#xff0c;SolarMesh提供的能力有&#xff1a;流量视图&#xff0c;流量控制策略批量配置&#xff0c;API级别的流量数据采集和展…

char x[]---char*---string---sizeof

字符串数组 #include <iostream>int main(){char c_str[]"abcd";char c_str1[]{a,b,c,d};std::cout<<sizeof(c_str)<<std::endl;std::cout<<sizeof(c_str1)<<std::endl;return 0; } char*存储的字符串个数 char*字符串所占字节大小 c…

2024年最新最全面的软件测试面试题(四)

1、在项目中如何保证软件质量? 项目质量不仅仅是某个人或某个团队来保障的&#xff0c;而是整个团队一起努力的结果&#xff0c;因此&#xff0c;在公司级别需要 有一个规范的项目流程。 产品&#xff0c;保证迭代过程中的产品逻辑&#xff0c;对于可能的兼容&#xff0c;升…

基于Pytorch深度学习神经网络MNIST手写数字识别系统源码(带界面和手写画板)

第一步&#xff1a;准备数据 mnist开源数据集 第二步&#xff1a;搭建模型 我们这里搭建了一个LeNet5网络 参考代码如下&#xff1a; import torch from torch import nnclass Reshape(nn.Module):def forward(self, x):return x.view(-1, 1, 28, 28)class LeNet5(nn.Modul…

【Qt】Qt开源项目

1、Flameshot 截图工具 1.1 简介 Flameshot是一款功能强大但易于使用的屏幕截图软件,中文名称火焰截图。 Flameshot 简单易用并有一个CLI版本,所以可以从命令行来进行截图。 Flameshot 是一个Linux发行版中完全免费且开源的截图工具 1.2 源码 github:https://github.com…

C++学习一(主要对cin的理解)

#include<iostream> int main() {int sum 0, value 0;//读取数据直到遇到文件尾&#xff0c;计算所有读入的值的和while (std::cin >> value){ //等价于sumsumvaluesum value;}std::cout << "Sum is :" << sum << std::endl;sum …

Vue3实战笔记(22)—路由Vue-Router 实战指南(路由传参)

文章目录 前言一、路由router-link二、路由传参1.query方式2.params方式3.props传参 总结 前言 vue-router 是 Vue.js 官方路由管理器。它和 Vue.js 核心深度集成&#xff0c;让用 Vue.js 构建单页应用变得易如反掌。 前面提到过简单的使用路由&#xff0c;直到上文使用404界面…

计网面试干货---带你梳理常考的面试题

顾得泉&#xff1a;个人主页 个人专栏&#xff1a;《Linux操作系统》 《C从入门到精通》 《LeedCode刷题》 键盘敲烂&#xff0c;年薪百万&#xff01; 一、HTTP和HTTPS的区别 1.安全性&#xff1a;HTTPS通过SSL/TLS协议对数据进行加密处理&#xff0c;有效防止数据在传输过…

【考研数学】准备开强化,更「张宇」还是「武忠祥」?

数一125学长前来回答&#xff0c;选择哪位老师的课程&#xff0c;这通常取决于你的个人偏好和学习风格&#xff01; 张宇老师和武忠祥老师都是非常有经验的数学老师&#xff0c;他们的教学方法各有特点。 张宇老师的教学风格通常被认为是通俗易懂&#xff0c;善于将复杂的概念…

数据结构------二叉树经典习题1

博主主页: 码农派大星. 关注博主带你了解更多数据结构知识 1判断相同的树 OJ链接 这道题相对简单,运用我们常规的递归写法就能轻松写出 所以我们解题思路应该这样想: 1.如果p为空&#xff0c;q为空&#xff0c;那么就是两颗空树肯定相等 2.如果一个树为空另一棵树不为空那么…

计算机Java项目|Springboot高校心理教育辅导设计与实现

作者主页&#xff1a;编程指南针 作者简介&#xff1a;Java领域优质创作者、CSDN博客专家 、CSDN内容合伙人、掘金特邀作者、阿里云博客专家、51CTO特邀作者、多年架构师设计经验、腾讯课堂常驻讲师 主要内容&#xff1a;Java项目、Python项目、前端项目、人工智能与大数据、简…

论文阅读:基于改进 YOLOv5算法的密集动态目标检测方法

目录 概要 Motivation 整体架构流程 技术细节 小结 论文地址&#xff1a;基于改进YOLOv5算法的密集动态目标检测方法 - 中国知网 (cnki.net) 概要 目的&#xff1a;提出一种基于 YOLOv5改进的检测算法&#xff0c;解决密集动态目标检测精度低及易漏检的问题。 方法&…

利用远程控制软件FinalShell远程连接虚拟机上的Linux系统(Windows)

一. VMware Workstation 安装CentOS Linux操作系统 传送门&#xff1a;VMware Workstation 安装CentOS Linux操作系统 1.右键打开终端 2.输入ifconfig 找到ens33对应 inet的id&#xff0c;这个就是虚拟机的ip地址图中所示为&#xff1a;192.168.5.128 3.打开finalshell 如…

「AIGC」Python实现tokens算法

本文主要介绍通过python实现tokens统计,避免重复调用openai等官方api,开源节流。 一、设计思路 初始化tokenizer使用tokenizer将文本转换为tokens计算token的数量二、业务场景 2.1 首次加载依赖 2.2 执行业务逻辑 三、核心代码 from transformers import AutoTokenizer imp…

【RSGIS数据资源】2001-2021 年亚洲季风区主要国家作物种植制度数据集

文章目录 1. 数据集概况2. 数据格式3. 文件名命名规则4. 数据生产服务单位5. 元数据6. 数据引用与参考文献引用 1. 数据集概况 2001-2021 年亚洲季风区主要国家作物种植制度数据集&#xff08;ACIA500&#xff09;是结合MODIS 影像和现有的土地利用等多源数据&#xff0c;基于…

数据结构——直接插入排序

基本思想 再插入第i个元素时&#xff0c;前面i-1个已经排好序。 排序过程 初始状态&#xff08;假设第一个元素为有序&#xff0c;其余均为无序元素&#xff09; 问题一&#xff1a;如何构建初始的有序序列&#xff1f; 办法 将第一个记录看成是初始有序表&#xff0c;然后…

万字长文破解 AI 图片生成算法-Stable diffusion (第一篇)

想象一下&#xff1a;你闭上眼睛&#xff0c;脑海中构思一个场景&#xff0c;用简短的语言描述出来&#xff0c;然后“啪”的一声&#xff0c;一张栩栩如生的图片就出现在你眼前。这不再是科幻小说里才有的情节&#xff0c;而是Stable Diffusion——一种前沿的AI图片生成算法—…

有多少小于当前数字的数字

链接&#xff1a;https://leetcode.cn/problems/how-many-numbers-are-smaller-than-the-current-number/description/ 思路&#xff1a; 最简单的思路来说&#xff0c;就是双重for循环进行遍历&#xff0c;来判断个数&#xff0c; 优化思路&#xff0c;其中一个思路就是递推 …

首次曝光!我喂了半年主食冻干,喵状态真滴顶~

科学养猫理念的推广&#xff0c;使得主食冻干喂养越来越受到养猫者的欢迎。主食冻干不仅符合猫咪的自然饮食习惯&#xff0c;还能提供丰富的营养&#xff0c;有助于保持猫咪的口腔和消化系统健康。我家喂了半年主食冻干&#xff0c;猫咪的状态是真的不一样了&#xff01; 然而…