SpringBoot整合Canal实现MySQL与ES数据同步

news2025/1/11 18:38:06

文章目录

  • SpringBoot项目
    • 引入Canal依赖
    • 配置文件
    • 项目结构
    • 设置监听类
    • 其余类、接口内容
    • 启动类
    • 实体类
    • Controller类
    • Mapper接口
    • Serice接口
  • 运行
  • 测试

开始之前请确认docker中已运行mysql与canal容器,并完成了监听binlog配置
未完成可移步: Docker部署Canal监听MySQL的binlog

SpringBoot项目

本次在SpringBoot整合Easy-ES实现对ES的基础操作项目基础上进行操作
此部分操作请移步:SpringBoot整合Easy-ES实现对ES操作

引入Canal依赖

        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>

配置文件

新增以下内容
注意修改server,换成自己的canal地址,端口号

canal:
  server: canal地址:11111
  destination: example

项目结构

在这里插入图片描述

设置监听类

CanalTable注解是监听的表名,实现EntryHandler接口
重写监听到mysql增删改操作时,这里的进行自定义操作,方法也都是通过Easy-ES实现

@CanalTable("document")
@Component
public class DocumentHandler implements EntryHandler<Document> {
    
    @Resource
    private IDocumentService documentService;
    
    /**
     * mysql中数据有新增时自动执行
     * @param document 新增的数据
     */
    @Override
    public void insert(Document document) {
        try {
            documentService.addData(document);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * mysql中数据有修改时自动执行
     * @param before 修改前的数据
     * @param after 修改后的数据
     */
    @Override
    public void update(Document before, Document after) {
        documentService.updateData(after);
    }

    /**
     * mysql中数据有删除时自动执行
     * @param document 要删除的数据
     */
    @Override
    public void delete(Document document) {
        documentService.deleteData(document);
    }
}

其余类、接口内容

启动类

添加扫描ESMapper的注解,指定路径

@EsMapperScan("com.mine.easyEs.mapper")

在这里插入图片描述

实体类

@Data
public class Document {
    @Id
    /**
     * es中的唯一id
     */
    private String id;
    /**
     * 文档标题
     */
    private String title;
    /**
     * 文档内容
     */
    private String content;
    /**
     * 创建时间
     */
    private Date createTime;
}

Controller类

包括对索引操作和对数据进行操作的接口

@RestController
@RequestMapping("/ee")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class DocumentController {

    private final IDocumentService documentService;

    /**
     * 创建索引
     * @return 结果信息
     * @throws Exception
     */
    @GetMapping("/createIndex")
    public String createIndex() throws Exception {
        return documentService.createIndex();
    }

    /**
     * 删除索引
     * @return 结果信息
     */
    @GetMapping("/deleteIndex")
    public String deleteIndex(){
        return documentService.deleteIndex();
    }

    /**
     * 查询ES所有数据
     * @return 查询Document结果对象集合
     */
    @GetMapping("/findAll")
    public List<Document> findAll(){
        return documentService.findAllData();
    }

    /**
     * ES新增数据
     * @param document 新增数据对象
     * @return 结果信息
     * @throws Exception
     */
    @GetMapping("/add")
    public String addData(Document document) throws Exception {
        return documentService.addData(document);
    }

    /**
     * 修改ES数据
     * @param document 修改数据对象
     */
    @GetMapping("/update")
    public String updateData(Document document){
        return documentService.updateData(document);
    }

    /**
     * 根据id删除ES数据
     * @param id 需要删除的数据的id
     * @return
     */
    @GetMapping("/delete")
    public String deleteData(String id){
        return documentService.deleteDataById(id);
    }

    /**
     * 分词匹配查询content字段
     * @param value 查询内容
     * @return
     */
    @GetMapping("/match")
    public List<Document> findMatch(String value){
        return documentService.findMatch(value);
    }

}

Mapper接口

继承BaseMapper,整体操作都与MybatisPlus类似

public interface DocumentMapper extends BaseEsMapper<Document> {
}

Serice接口

public interface IDocumentService {

    /**
     * 查询ES所有数据
     * @return 查询Document结果对象集合
     */
    List<Document> findAllData();

    /**
     * 创建索引
     * @return 结果信息
     * @throws Exception
     */
    String createIndex() throws Exception;

    /**
     * 删除索引
     * @return 结果信息
     */
    String deleteIndex();

    /**
     * ES新增数据
     * @param document 新增数据实体类
     * @return 结果信息
     * @throws Exception
     */
    String addData(Document document) throws Exception;

    /**
     * 根据id删除ES数据
     * @param id 需要删除的数据的id
     * @return
     */
    String deleteDataById(String id);

    String deleteData(Document document);

    /**
     * 修改ES数据
     * @param document 修改数据对象
     */
    String updateData(Document document);

    /**
     * 分词匹配查询content字段
     * @param value 查询内容
     * @return
     */
    List<Document> findMatch(String value);
}

Service实现类

@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class DocumentServiceImpl implements IDocumentService {

    private final DocumentMapper documentMapper;

    /**
     * 查询ES所有数据
     * @return 查询Document结果对象集合
     */
    @Override
    public List<Document> findAllData() {
        LambdaEsQueryWrapper<Document> wrapper = new LambdaEsQueryWrapper<>();
        wrapper.matchAllQuery();
        return documentMapper.selectList(wrapper);
    }

    /**
     * 创建索引
     * @return 结果信息
     * @throws Exception
     */
    @Override
    public String createIndex() throws Exception {
        StringBuilder msg = new StringBuilder();
        String indexName = Document.class.getSimpleName().toLowerCase();
        boolean existsIndex = documentMapper.existsIndex(indexName);
        if (existsIndex){
            throw new Exception("Document实体对应索引已存在,删除索引接口:deleteIndex");
        }
        boolean success = documentMapper.createIndex();
        if (success){
            msg.append("Document索引创建成功");
        }else {
            msg.append("索引创建失败");
        }
        return msg.toString();
    }

    /**
     * 删除索引
     * @return 结果信息
     */
    @Override
    public String deleteIndex() {
        StringBuilder msg = new StringBuilder();
        String indexName = Document.class.getSimpleName().toLowerCase();
        if (documentMapper.deleteIndex(indexName)){
            msg.append("删除成功");
        }else {
            msg.append("删除失败");
        }
        return msg.toString();
    }

    /**
     * ES新增数据
     * @param document 新增数据实体类
     * @return 结果信息
     * @throws Exception
     */
    @Override
    public String addData(Document document) throws Exception {
        if (StringUtils.isEmpty(document.getTitle()) || StringUtils.isEmpty(document.getContent())) {
            throw new Exception("请补全title及content数据");
        }
        document.setCreateTime(new Date());
        documentMapper.insert(document);
        return "Added successfully!";
    }

    /**
     * 根据id删除ES数据
     * @param id 需要删除的数据的id
     * @return
     */
    @Override
    public String deleteDataById(String id) {
        documentMapper.deleteById(id);
        return "Success";
    }
    
 	@Override
    public String deleteData(Document document) {
        documentMapper.deleteById(document.getId());
        return "Success";
    }
    /**
     * 修改ES数据
     * @param document 修改数据对象
     */
    @Override
    public String updateData(Document document) {
        documentMapper.updateById(document);
        return "Success";
    }


    /**
     * 分词匹配查询content字段
     * @param value 查询内容
     * @return
     */
    @Override
    public List<Document> findMatch(String value) {
        LambdaEsQueryWrapper<Document> wrapper = new LambdaEsQueryWrapper<>();
        wrapper.match(Document::getContent,value);
        wrapper.orderByDesc(Document::getCreateTime);
        List<Document> documents = documentMapper.selectList(wrapper);
        return documents;
    }
}

运行

可以看到,正在监听,只不过目前我们没有对数据库进行操作。
在这里插入图片描述

测试

我们在数据库新增一条数据
在这里插入图片描述
此时插入的这条数据被监听到了
在这里插入图片描述

通过测试方法查看ES中是否插入了这条数据

@Test
    public void testSelect() {
        // 测试查询
        String title = "3";
        Document document = EsWrappers.lambdaChainQuery(documentMapper)
                .eq(Document::getTitle, title)
                .one();
        System.out.println(document);

        Assertions.assertEquals(title,document.getTitle());
    }

在这里插入图片描述
查到了在mysql新插入的这条数据
数据同步成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1005117.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

领域驱动设计:事件风暴构建领域模型

文章目录 事件风暴需要准备些什么&#xff1f;如何用事件风暴构建领域模型&#xff1f; 事件风暴是一项团队活动&#xff0c;领域专家与项目团队通过头脑风暴的形式&#xff0c;罗列出领域中所有的领域事件&#xff0c;整合之后形成最终的领域事件集合&#xff0c;然后对每一个…

共享股东模式:一种新型的商业模式,让你轻松创业

你是否想过拥有自己的事业&#xff0c;但又觉得创业风险太大&#xff0c;资金太少&#xff0c;人脉太弱&#xff1f;你是否想过利用自己的消费能力&#xff0c;获得更多的收益&#xff0c;而不仅仅是消费者&#xff1f;你是否想过成为一个有影响力的人&#xff0c;为社会创造价…

【STM32】影子寄存器

不可操作但是真正起作用的寄存器是影子寄存器 定时器框图中&#xff0c;有些寄存器下有个阴影 这些阴影的表示这些寄存器存在影子寄存器。 图中也有对这些影子的说明&#xff0c;在U事件时传送预装载寄存器至实际寄存器。 有阴影的寄存器(AutoReloadRegister)&#xff0c;表…

【OpenCL基础 · 二 】OpenCL架构

文章目录 前言一、OpenCL平台模型二、OpenCL执行模型1.上下文2.命令队列3.内核的执行——NDRange 三、OpenCL存储器模型1.存储器区域2.存储器对象3.主机与设备的数据交互 总结 前言 通过【OpenCL基础 一】因源&#xff0c;我们了解了OpenCL的起源和应用场景。在异构并行平台上…

HTTP文件服务

在工作中&#xff0c;往往会需要将文件同时共享给很多台电脑。 本篇介绍HHDESK的HTTP文件服务功能&#xff0c;通过浏览器&#xff0c;将本地资源共享给任意主机。 1 共享文件 首页——资源管理——服务端——“”&#xff0c;在弹出框中选择HTTP文件服务。 填写各项内容。…

计算一个四边形差值结构的稳定性

( A, B )---3*30*2---( 1, 0 )( 0, 1 ) 让网络的输入只有3个节点&#xff0c;AB训练集各由5张二值化的图片组成&#xff0c;让A中有4个1&#xff0c;B中全是0&#xff0c;统计迭代次数并排序。 其中有6组数据 差值结构 迭代次数 构造平均列 L E - - - 34838.43 1 - …

Swing程序设计(3)JDialog窗体

文章目录 前言一、JDialog窗体的介绍二、JDialog窗体的使用 1.JDialog的常用构造方法2.实例展示及分析总结 前言 JDialog窗体是窗体中的另一种类型的窗体&#xff0c;指对话框窗体。与JFrame窗体类似&#xff0c;绝大部分对于JFrame窗体使用的方法&#xff0c;对于JDialog窗体也…

多元共进|整合开发者社区资源,共建繁荣生态

谷歌致力于构建多元社区 促进行业内更多交流和联系 一起来了解 2023 Google 开发者大会上 谷歌如何以点及面 将资源辐射至开发者、数字人才和初创企业 持续赋能开发者社区生态 谷歌全球开发者社区计划的目的是与开发者同在&#xff0c;实现双向对话、互动和参与&#xff0…

Houdini19 命令行启动环境配置

在自动化流程中&#xff0c;通常都是从外部命令行启动 Houdini&#xff0c;而不是在软件里进行烘培和输出。完全体是通过类似 Jenkins 等自动化工具来启动 Houdini 自动生成流程。 我使用的 Houdini 版本为 19.5.640&#xff0c;对应的 Python 版本为 3.9 。 1、配置开发环境 …

Gitlab仓库部署

Gitlab仓库部署 一、Gitlab的概述1、gitlab介绍2、gitlab主要功能3、gitlab和github的区别 二、部署环境1、安装依赖环境2、安装Postfix邮箱3、Gitlab优势4、Gitlab工作流程 三、Gitlab部署过程1、Yum安装Gitlab2、配置gitlab站点URL3、启动并访问Gitlab 四、Gitlab具体操作1、…

挖到宝了!这个中文版SiteGPT竟然有那么多好处

如今数字时代蓬勃发展&#xff0c;信息非常丰富&#xff0c;但个性化互动和量身定制的反馈却相对匮乏。个性化AI工具的出现可以说是打破了窘境。随着人工智能&#xff08;AI&#xff09;技术的快速发展&#xff0c;定制AI问答机器人成为了越来越多企业和组织的热门选择。这些智…

Zabbix监控部署项目

为什么选择Zabbix Zabbix 是一个基于 WEB 界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。zabbix 能监视各种网络参数,保证服务器系统的安全运营;并提供灵活的通知机制以让系统管理员快速定位/解决存在的各种问题。 面试常问 你用过哪些监控软件 zabbix …

linux的应用线程同步与驱动同步机制

同步机制 在 Linux 应用程序和内核中的驱动程序中&#xff0c;有一些常见的同步机制用于实现线程或进程之间的同步和数据访问保护。下面是它们的一些主要机制&#xff1a; Linux 应用程序中的同步机制&#xff1a; 互斥锁&#xff08;Mutex&#xff09;&#xff1a;用于保护共…

233062C++QTday5

实现一个图形类&#xff08;Shape&#xff09;&#xff0c;包含受保护成员属性&#xff1a;周长、面积&#xff0c; 公共成员函数&#xff1a;特殊成员函数书写 定义一个圆形类&#xff08;Circle&#xff09;&#xff0c;继承自图形类&#xff0c;包含私有属性&#xff1a;半…

【ArcGIS Pro二次开发】(67):处理面要素空洞

这个一个简单的小功能。 有些面要素可能会存在空洞&#xff0c;这个工具的目的就是获取面要素的空洞&#xff0c;或者去除空洞获取要素的边界。 这个功能其实在之前做拓扑功能的时候就已经有了&#xff0c;这次只是单独把它提取出来。因为有时候会单独用到这个功能。 一、要实…

圆形旋转特效原理及pygame实现

具体效果&#xff1a; 视频教程链接&#xff1a; https://www.bilibili.com/video/BV1ou411F7a2/ 介绍 本文介绍了如何实现一个围绕鼠标旋转的文字效果如何实现&#xff0c;有什么用途&#xff0c;以及pygame的代码实现。 实现代码&#xff1a; import pygame import math…

初识Java 8-1 接口和抽象类

目录 抽象类和抽象方法 接口定义 默认方法 多重继承 接口中的静态方法 作为接口的Instrument 本笔记参考自&#xff1a; 《On Java 中文版》 接口和抽象类提供了一种更加结构化的方式分离接口和实现。 抽象类和抽象方法 抽象类&#xff0c;其介于普通类和接口之间。在构…

华为云云耀云服务器L实例评测|华为云云耀云服务器L实例使用教学+宝塔建站 — 运行Python脚本(保姆级)

目录 文章目录 目录前言一、创建云耀云服务器L实例1、打开购买页面2、找到系统镜像3、进入系统控制台4、重置服务器密码 二、安装宝塔面板1.打开在线安装工具2.复制公网IP3.完成在线安装4.安装完成&#xff08;记住账密信息&#xff09;五.开放安全组 三、使用服务器总结 前言 …

Linux系统编程(一):文件 I/O

参考引用 UNIX 环境高级编程 (第3版)黑马程序员-Linux 系统编程 1. UNIX 基础知识 1.1 UNIX 体系结构&#xff08;下图所示&#xff09; 从严格意义上说&#xff0c;可将操作系统定义为一种软件&#xff0c;它控制计算机硬件资源&#xff0c;提供程序运行环境&#xff0c;通常…

4.linux的RPM和YUM

一、RPM 1.rpm包的管理 1.1介绍 Linux互联网下载包&#xff0c;类似于windows的setup.exe 1.2rpm简单查询已安装的rpm rpm -qa | grep xxx 当前linux有没有安装火狐 rpm -qa | grep fox 1.3rpm包的格式 一个 rpm 包名&#xff1a;firefox-45.0.1-1.el6.centos.x86_64.…