MySQL与ES数据同步之异步调用

news2024/11/24 5:09:11

文章目录

  • 简述
  • SpringBoot项目
    • 引入依赖
    • 配置文件
    • 项目结构
    • 实体类
    • 配置类
      • RabbitMQ交换机队列声明,绑定配置类
      • 回调接口配置类
    • Mapper接口
      • UserMapper接口
      • UserEsMapper
    • Controller类
    • Service接口
    • Service实现类
    • 监听类/消费者

简述

上一篇是同步调用,我们在中间加上MQ就可以实现异步调用,这种方式性能高,不易出现数据丢失问题,多源写入之间相互隔离,便于扩展更多的数据源写入。
同时也会带来一些问题,首先还是代码侵入强,其次系统复杂度会增加,因为引入了消息中间件
可能出现延时问题:MQ是异步消费模型,可能会造成延时。
这种方案也不是很推荐,简单了解学习一下就好。

下面通过SpringBoot项目演示一下,首先本地要有MQ,我这里使用RabbitMQ。若本地没有,可移步:Windows版Docker安装RabbitMQ
Linux的Docker也类似

对RabbitMQ还不是很了解的,可以打开我的主页查看RabbitMQ系列教程

这里只做最简单的MQ可靠性配置

SpringBoot项目

引入依赖

全部依赖如下

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.5.3.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置文件

注意修改Mysql,ES,rabbitmq地址及信息

# 端口号8080
server:
  port: 8080

# 数据库名:mysql,用户名root,密码123456
spring:
  datasource:
    username: root
    password: 123456
    url: jdbc:mysql://mysql地址:3306/mysql?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
    driver-class-name: com.mysql.cj.jdbc.Driver
  elasticsearch:
    rest:
      uris: ES地址:9200
  rabbitmq:
    host: rabbitmq地址
    port: 5672
    username: admin
    password: admin
    #确认消息已发送到交换机
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true

# mybatis-plus配置
mybatis-plus:
  # xml文件位置
  mapper-locations: classpath:mapper/*.xml

项目结构

在这里插入图片描述

实体类

/**
 * mysql(user)与ES(user-demo)实体类
 */
@Data
@TableName(value = "user_t")
@Document(indexName = "user-demo")
public class User {
    @Id
    private String id;
    private String userName;
    private String address;
}

配置类

RabbitMQ交换机队列声明,绑定配置类

/**
 * RabbitMQ交换机队列声明,绑定配置类
 */
@Configuration
public class Config {

    //交换机名称
    public static final String X_EXCHANGE = "X";

    //队列名称
    public static final String QUEUE_INSERT = "A";
    public static final String QUEUE_DELETE = "B";
    public static final String QUEUE_UPDATE = "C";


    //声明交换机xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    //声明队列A
    @Bean("queueA")
    public Queue queueInsert() {
        return QueueBuilder.durable(QUEUE_INSERT).build();
    }

    //声明队列B
    @Bean("queueB")
    public Queue queueDelete() {
        return QueueBuilder.durable(QUEUE_DELETE).build();
    }

    //声明队列C
    @Bean("queueC")
    public Queue queueUpdate() {
        return QueueBuilder.durable(QUEUE_UPDATE).build();
    }

    //绑定交换机与队列
    //A与X通过XA线路绑定
    @Bean
    public Binding queueInsertBindingX(@Qualifier("queueA") Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //B与X通过XB线路绑定
    @Bean
    public Binding queueDeleteBindingX(@Qualifier("queueB") Queue queueB,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueB).to(xExchange).with("XB");
    }

    //C与X通过XC线路绑定
    @Bean
    public Binding queueUpdateBindingX(@Qualifier("queueC") Queue queueC,
                                       @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }
}

回调接口配置类

/**
 * 回调接口
 */
@Slf4j
@Component
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        //内部接口注入类中
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);

    }

    /**
     * 交换机确定回调方法
     * 1.发消息 交换机接收到消息 回调
     * 1.1 correlationData 保存回调消息的ID及相关信息
     * 1.2 交换机收到消息 ack=true
     * 1.3 cause null
     * 2.发消息 交换机接受失败 回调
     * 2.1 correlationData 保存回调消息的ID及相关信息
     * 2.2 交换机收到消息 ack=false
     * 2.3 cause 失败原因
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            log.info("交换机回报消息:收到id为:{}的消息", id);
        } else {
            log.info("交换机回报消息:未经收到id为:{}的消息,原因为:{}", id, cause);
        }

    }

    /**
     * 队列失败回报
     * @param message 消息
     * @param i 返回码
     * @param s 返回信息
     * @param s1 交换机
     * @param s2 路由
     */
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        log.error("队列回报消息:消息被交换机:{}退回,路由key:{},退回原因:{}", s1,s2,s);
    }
}

Mapper接口

UserMapper接口

/**
 * mysql user实体Mapper接口
 */
public interface UserMapper extends BaseMapper<User> {
}

UserEsMapper

/**
 * ES user-demo实体Mapper接口
 */
@Repository
public interface UserEsMapper extends ElasticsearchRepository<User,String> {

}

Controller类

此处Controller充当生产者,接到请求,先执行mysql操作,然后将消息按情况通过交换机转发到不同的队列,相应的消费者收到消息后对ES进行处理

/**
 * 异步调用方式实现mysql与ES数据同步Controller/消息生产者
 */
@Slf4j
@RestController
@RequestMapping(value = "/asyn")
public class DataController {

    @Resource
    private IDataService dataService;
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 同步更新mysql和ES的user信息
     * @param user user实体
     */
    @GetMapping("/update")
    public void updateData(User user){
        dataService.updateMysqlData(user);
        String key = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(key);
        rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);
        log.info("Producer消息:已发送消息:{}到队列A中等待ES更新处理,消息ID:{}",user.getId(),key);
    }

    /**
     * 查询user表信息
     * @return user信息集合
     */
    @GetMapping("/findData")
    public List<User> findAllData(){
        return dataService.findAllData();
    }

    /**
     * 同步根据id删除mysql和ES中user对应的数据信息
     * @param id 需要删除的信息id
     */
    @GetMapping("/delete")
    public void deleteDataById(String id){
        dataService.deleteDataById(id);
        String key = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(key);
        rabbitTemplate.convertAndSend("X","XB",id,correlationData);
        log.info("Producer消息:已发送消息:{}到队列B中等待ES删除处理,消息ID:{}",id,key);
    }

    /**
     * 同步新增mysql和ES的user数据
     * @param user user实体
     */
    @GetMapping("addData")
    public void addData(User user){
        dataService.addData(user);
        String key = UUID.randomUUID().toString();
        CorrelationData correlationData = new CorrelationData(key);
        rabbitTemplate.convertAndSend("X","XA",user.getId(),correlationData);
        log.info("Producer消息:已发送消息:{} 到队列A中等待ES新增处理,消息ID:{}",user.getId(),key);
    }

    /**
     * 同步删除mysql和ES中所有user信息
     */
    @GetMapping("deleteAll")
    public void deleteAllData(){
        dataService.deleteAllData();
        dataService.esDeleteAllData();
    }

    /**
     * 查询ES中所有user信息
     */
    @GetMapping("findEs")
    public Iterable<User> findEs(){
        return dataService.findEs();
    }
}

Service接口

/**
 * 异步调用方式实现mysql与ES数据同步Service
 */
public interface IDataService extends IService<User> {

    /**
     * 根据id更新mysql数据
     * @param user 需要更新数据的user对象
     */
    void updateMysqlData(User user);

    /**
     * 查询所有数据
     * @return user对象集合
     */
    List<User> findAllData();

    /**
     * mysql根据id删除信息
     * @param id 需要删除信息的id
     */
    void deleteDataById(String id);

    /**
     * mysql新增数据
     * @param user 需要新增数据的对象
     */
    void addData(User user);

    /**
     * ES根据ID删除数据
     * @param id 需要删除信息的id
     */
    void esDeleteDataById(String id);

    /**
     * ES新增/根据ID修改数据
     * @param user 需要新增/根据ID修改数据的对象
     */
    void esAddData (User user);

    /**
     * mysql删除user表所有数据
     */
    void deleteAllData();

    /**
     * es删除index=user-demo中所有数据
     */
    void esDeleteAllData();

    /**
     * 查询ES中所有数据信息
     */
    Iterable<User> findEs();
}

Service实现类

/**
 * 异步调用方式实现mysql与ES数据同步Service实现类
 */
@Service
public class DataServiceImpl extends ServiceImpl<UserMapper, User> implements IDataService {

    @Resource
    private UserMapper userMapper;
    @Resource
    private UserEsMapper userEsMapper;

    /**
     * 根据id更新mysql数据
     * @param user 需要更新数据的user对象
     */
    @Override
    public void updateMysqlData(User user) {
        userMapper.updateById(user);
    }

    /**
     * 查询所有数据
     * @return user对象集合
     */
    @Override
    public List<User> findAllData() {
        return userMapper.selectList(null);
    }

    /**
     * mysql根据id删除信息
     * @param id 需要删除信息的id
     */
    @Override
    public void deleteDataById(String id) {
        userMapper.deleteById(id);
    }

    /**
     * mysql新增数据
     * @param user 需要新增数据的对象
     */
    @Override
    public void addData(User user) {
        userMapper.insert(user);
    }

    /**
     * ES根据ID删除数据
     * @param id 需要删除信息的id
     */
    @Override
    public void esDeleteDataById(String id) {
        userEsMapper.deleteById(id);
    }

    /**
     * ES新增/根据ID修改数据
     * @param user 需要新增/根据ID修改数据的对象
     */
    @Override
    public void esAddData(User user) {
        userEsMapper.save(user);
    }

    /**
     * mysql删除user表所有数据
     */
    @Override
    public void deleteAllData() {
        userMapper.delete(null);
    }

    /**
     * es删除index=user-demo中所有数据
     */
    @Override
    public void esDeleteAllData() {
        userEsMapper.deleteAll();
    }

    /**
     * 查询ES中user所有信息
     * @return 查询user信息集合
     */
    @Override
    public Iterable<User> findEs() {
        return userEsMapper.findAll();
    }

}

监听类/消费者

/**
 * 异步调用方式实现mysql与ES数据同步消息消费者
 */
@Slf4j
@Component
public class Consumer {
    @Resource
    private IDataService dataService;
    @Resource
    private UserMapper userMapper;

    //接收消息
    @RabbitListener(queues="A")
    public void addData(Message message){
        log.info("Consumer消息:当前时间:{},收到A队列的消息:{},进行ES新增操作",new Date().toString(),new String(message.getBody()));
        QueryWrapper<User> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("id",new String(message.getBody()));
        User user = userMapper.selectOne(queryWrapper);
        dataService.esAddData(user);
        log.info("ES新增/更新数据为:{}",user);
    }

    @RabbitListener(queues = "B")
    public void delete(Message message){
        log.info("Consumer消息:当前时间:{},收到B队列的消息:{},进行ES删除操作",new Date().toString(),new String(message.getBody()));
        dataService.esDeleteDataById(new String(message.getBody()));
    }

    @RabbitListener(queues="C")
    public void update(Message message){

    }
}

操作完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1005883.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【海思SS626 | 开发环境】VMware17安装Ubuntu 18.04.6

目录 一、下载 Ubuntu 18.04.6 LTS二、VMware17创建虚拟机三、安装Ubuntu18.04LTS四、安装其他软件五、总结 一、下载 Ubuntu 18.04.6 LTS 问题&#xff1a;为什么要下载 Ubuntu18.04.6 LTS 而不是使用最新的&#xff0c;或者其他Linux发行版&#xff1f; 答&#xff1a;在ss6…

Python 图形化界面基础篇:使用框架( Frame )组织界面

Python 图形化界面基础篇&#xff1a;使用框架&#xff08; Frame &#xff09;组织界面 引言什么是 Tkinter 框架&#xff08; Frame &#xff09;&#xff1f;步骤1&#xff1a;导入 Tkinter 模块步骤2&#xff1a;创建 Tkinter 窗口步骤3&#xff1a;创建框架&#xff08; F…

如何做到安全上网

随着信息化的发展&#xff0c;企业日常办公越来越依赖互联网&#xff0c;而访问互联网过程中&#xff0c;会遇到各种各样不容忽视的风险&#xff0c;例如员工主动故意的数据泄漏&#xff0c;后台应用程序偷偷向外部发信息&#xff0c;木马间谍软件的外联&#xff0c;以及各种挖…

聚观早报 | 荣耀V Purse定档;哪吒S迎来最新OTA升级

【聚观365】9月13日消息 荣耀V Purse定档 哪吒S迎来最新OTA升级 宝马将向其英国工厂投资7.5亿美元 英伟达称霸AI芯片领域致初创公司融资难 甲骨文第一财季收入约125亿美元增长9% 荣耀V Purse定档 不久前&#xff0c;荣耀官方推出了全新的荣耀Magic V2内折叠屏旗舰&#x…

【Linux从入门到精通】信号(信号保存 信号的处理)

本篇文章接着信号&#xff08;初识信号 & 信号的产生&#xff09;进行讲解。学完信号的产生后&#xff0c;我们也了解了信号的一些结论。同时还留下了很多疑问&#xff1a; 上篇文章所说的所有信号产生&#xff0c;最终都要有OS来进行执行&#xff0c;为什么呢&#xff1f;…

在Android studio 创建Flutter项目运行出现问题总结

在Android studio 中配置Flutter出现的问题 A problem occurred configuring root project ‘android’出现这个问题。解决办法 首先找到flutter配置的位置 在D:\xxx\flutter\packages\flutter_tools\gradle位置中的flutter.gradle buildscript { repositories { googl…

相机坐标系 -> 像素坐标系

代码链接&#xff1a;https://github.com/PanJinquan/python-learning-notes/blob/master/modules/utils_3d/camera_tools.py def __cam2pixel(cam_coord, f, c):"""相机坐标系 -> 像素坐标系: (f / dx) * (X / Z) f * (X / Z) / dxcx,ppx260.166; cy,ppy…

分库分表---理论

目录 一、垂直切分 1、垂直分库 2、垂直分表 3、垂直切分优缺点 二、水平切分 1、水平分库 2、水平分表 3、水平切分优缺点 三、数据分片规则 1、Hash取模分表 2、数值Range分表 3、一致性Hash算法 四、分库分表带来的问题 1、分布式事务问题 2、跨节点关联查询…

【FAQ】本地录像视频文件如何推送到视频监控平台EasyCVR进行AI视频智能分析?

安防监控平台EasyCVR支持多协议、多类型设备接入&#xff0c;可以实现多现场的前端摄像头等设备统一集中接入与视频汇聚管理&#xff0c;并能进行视频高清监控、录像、云存储与磁盘阵列存储、检索与回放、级联共享等视频功能。视频汇聚平台既具备传统安防监控、视频监控的视频能…

Vue2电商前台项目——完成Search搜索模块业务

Vue2电商前台项目——完成Search搜索模块业务 Vue基础知识点击此处——Vue.js 文章目录 Vue2电商前台项目——完成Search搜索模块业务一、项目开发的步骤二、各种请求数据并展示数据1、写Search模块的接口2、写Vuex中的search仓库3、组件拿到search仓库的数据&#xff08;1&…

详解HPE MSA 2040存储初始化配置划分卷

哈喽大家好&#xff0c;欢迎来到虚拟化时代君&#xff08;XNHCYL&#xff09;。 “ 大家好&#xff0c;我是虚拟化时代君&#xff0c;一位潜心于互联网的技术宅男。这里每天为你分享各种你感兴趣的技术、教程、软件、资源、福利……&#xff08;每天更新不间断&#xff0c;福…

mock技术在测试中的应用

技术简介 mock技术又叫测试桩、挡板 在软件测试中&#xff0c;对于一些不容易构造、获取的对象&#xff0c;用一个虚拟的对象来代替它&#xff0c;以达到相同的效果&#xff0c;这个虚拟的对象就是mock。 mock技术并不是只有测试领域用&#xff0c;最早是在开发领域应用&…

互联网电视流氓乱收费被市场惩罚,传统品牌合力挤压互联网电视

市调机构洛图科技&#xff08;RUNTO&#xff09;公布的6月份数据显示&#xff0c;传统电视品牌强势反弹&#xff0c;海信、TCL、创维的销量分别为60万台、58万台、57万台&#xff0c;名次分别为第一名、第三名、第四名&#xff0c;而曾连续数年位居国内电视行业第一名的某互联网…

精品基于NET实现的汽配网上商城系统

《[含文档PPT源码等]精品基于NET实现的汽配网上商城系统》该项目含有源码、文档、PPT、配套开发软件、软件安装教程、项目发布教程等 软件开发环境及开发工具&#xff1a; 开发软件&#xff1a;VS 2017 &#xff08;版本2017以上即可&#xff0c;不能低于2017&#xff09; 数…

第十二章总结

一.集合类概述 java.util包中提供了一些集合类&#xff0c;这些集合类又被称为容器。 集合类与数组的不同之处&#xff1a; 数组的长度是固定的&#xff0c;集合的长度是可变的&#xff1a;数组用来存放基本类型的数据&#xff0c;集合用来存放对象的引用。 常…

windows10系统下Python3.11中安装Numpy库教程

Python3.11中安装Numpy库目录 项目场景&#xff1a;问题描述解决方案&#xff1a;①下载Numpy文件②把NumPy文件放到Python安装的Scripts文件夹里。③安装numpy④安装验证 项目场景&#xff1a; numpy是开源的数值计算扩展&#xff0c;用于数据分析、机器学习、科学计算的重要…

(第十一天)初识SpringMVC SSM框架的学习与应用(Spring + Spring MVC + MyBatis)-Java EE企业级应用开发学习记录

SSM框架的学习与应用(Spring Spring MVC MyBatis)-Java EE企业级应用开发学习记录&#xff08;第十一天&#xff09;初识SpringMVC 今天我们要来学习一下SSM框架的最后一个框架SpringMVC 一、初认SpringMVC 基本概念&#xff1a; ​ Spring MVC&#xff08;Model-View-Co…

Qt应用开发(基础篇)——菜单 QMenu

一、前言 QMenu类继承于QWidget&#xff0c;它提供了一个菜单样式的小部件&#xff0c;用于菜单栏、上下文菜单和一些弹出式菜单。 QMenu菜单的选项是可选的&#xff0c;它可以是一个下拉的菜单&#xff0c;也可以是独立的上下文菜单。下拉菜单通常作用于当用户单击相应的项目或…

Unity——模拟AI视觉

人类的视觉系统有以下几个特点&#xff1a; 距离有限。近处看得清&#xff0c;远处看不清容易被遮挡。不能穿过任何不透明的障碍物视野范围大约为90度。实现正前方信息丰富&#xff0c;具有色彩和细节&#xff1b;实现外侧的部分只有轮廓和运动信息注意力有限。当关注某个具体的…

深入浅出学Verilog--数据类型

1、数值类型 在Verilog可以用4种数值来描述其构建的电路的电平逻辑&#xff0c;除了event类型和real类型外&#xff0c;几乎所有的数据类型都可以用这4种数值来表示。 0&#xff1a;代表逻辑0&#xff0c;或者条件“假”1&#xff1a;代表逻辑1&#xff0c;或者条件“真”x或X…