利用MQ实现mysql与elasticsearch数据同步

news2024/11/28 1:39:16

流程

1.声明exchange、queue、RoutingKey
2. 在hotel-admin中进行增删改(SQL),完成消息发送
3. 在hotel-demo中完成消息监听,并更新elasticsearch数据
4. 测试同步

在这里插入图片描述

1.引入依赖

<!--amqp-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

我这里的mq是挂在了docker上,虚拟机地址是192.168.116.128。到时候这个根据自己的项目改就行

 spring: 
  rabbitmq:
    host: 192.168.116.128 # 主机名
    port: 5672 # 端口
    virtual-host: / # 虚拟主机
    username: itcast # 用户名
    password: 123321 # 密码

2.声明交换机、队列和绑定关系

package cn.itcast.hotel.constants;

public class MqConstants {

    /**
     * 交换机
     */
    public final static String HOTEL_EXCHANGE = "hotel.topic";
    /**
     * 监听新增和修改的队列
     */
    public final static String HOTEL_INSERT_QUEUE = "hotel.insert.queue";
    /**
     * 监听删除的队列
     */
    public final static String HOTEL_DELETE_QUEUE = "hotel.delete.queue";
    /**
     * 新增或修改的RoutingKey
     */
    public final static String HOTEL_INSERT_KEY = "hotel.insert";
    /**
     * 删除的RoutingKey
     */
    public final static String HOTEL_DELETE_KEY = "hotel.delete";
}

在hotel-demo中,定义配置类,声明队列、交换机:

package cn.itcast.hotel.config;


import cn.itcast.hotel.constants.MqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MqConfig {
    // 定义交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(MqConstants.HOTEL_EXCHANGE,true,false);
    }
    // 定义队列
    @Bean
    public Queue insertQueue()
    {
        return new Queue(MqConstants.HOTEL_INSERT_QUEUE,true);
    }
    @Bean
    public Queue deleteQueue()
    {
        return new Queue(MqConstants.HOTEL_DELETE_QUEUE,true);
    }

    // 定义绑定关系
    @Bean
    public Binding insertQueueBinding()
    {
        return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(MqConstants.HOTEL_INSERT_KEY);
    }
    @Bean
    public Binding deleteQueueBinding()
    {
        return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(MqConstants.HOTEL_DELETE_KEY);
    }
}

3.发送MQ消息

在hotel-admin中的增、删、改业务中分别发送MQ消息,具体怎么添加根据:

    // 新增
    @PostMapping
    public void saveHotel(){
        //数据库新增操作
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());
    }

    // 更新
    @PutMapping()
    public void updateById(){
            //数据库修改操作
            rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_INSERT_KEY,hotel.getId());

    }
    // 删除
    @DeleteMapping("/{id}")
    public void deleteById() {
		// 数据库删除操作
        rabbitTemplate.convertAndSend(MqConstants.HOTEL_EXCHANGE,MqConstants.HOTEL_DELETE_KEY,id);

    }

4.监听MQ消息

接收MQ消息

hotel-demo接收到MQ消息要做的事情包括:

  • 新增消息:根据传递的hotel的id查询hotel信息,然后新增一条数据到索引库
  • 删除消息:根据传递的hotel的id删除索引库中的一条数据

1)首先在hotel-demo的cn.itcast.hotel.service包下的IHotelService中新增新增、删除业务

void deleteById(Long id);

void insertById(Long id);

2)给hotel-demo中的cn.itcast.hotel.service.impl包下的HotelService中实现业务:

@Override
public void deleteById(Long id) {
    try {
        // 1.准备Request
        DeleteRequest request = new DeleteRequest("hotel", id.toString());
        // 2.发送请求
        client.delete(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

@Override
public void insertById(Long id) {
    try {
        // 0.根据id查询酒店数据
        Hotel hotel = getById(id);
        // 转换为文档类型
        HotelDoc hotelDoc = new HotelDoc(hotel);

        // 1.准备Request对象
        IndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());
        // 2.准备Json文档
        request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
        // 3.发送请求
        client.index(request, RequestOptions.DEFAULT);
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

编写监听类

package cn.itcast.hotel.mq;

import cn.itcast.hotel.constants.MqConstants;
import cn.itcast.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class HotelListener {
    // 专门用于消息监听的类

    @Autowired
    private IHotelService hotelService;

    @RabbitListener(queues = MqConstants.HOTEL_INSERT_QUEUE)
    public void listenHotelInsertOrUpdate(Long id)
    {
        hotelService.insertById(id);
    }

    @RabbitListener(queues = MqConstants.HOTEL_DELETE_QUEUE)
    public void listenHotelDelete(Long id)
    {
        hotelService.deleteById(id);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/976179.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RHCA之路---EX280(10)

RHCA之路—EX280(10) 1. 题目 On master.lab.example.com install the OpenShift Mertics component with the following requirements: Use the storage /exports/metrics for cassandra storage. You can use the files on http://materials.example.com/exam280/storage fo…

js recude求和

let unReadCount resultList.reduce((pre, cur) > {return pre cur.unReadCount}, 0)

电脑Windows关闭系统自动更新

1.在winr运行框中输入services.msc&#xff0c;打开windows服务窗口。 2.在服务窗口中&#xff0c;我们找到Windows update选项&#xff0c;如下图所示&#xff1a; 3.双击windows update服务&#xff0c;我们把启动类型改为禁用&#xff0c;如下图所示&#xff0c;点击应用和…

Windows共享文件夹

Windows共享文件夹 将服务器相应磁盘设置为共享式本机可远程访问服务器共享到本地注意 将服务器相应磁盘设置为共享式 这里E已经是共享式 将D也设置为共享式 \\Desktop-erps210\d本机可远程访问服务器 将本机和服务器的ip设置一个网段&#xff0c;然后网线直连 需要关闭服务器…

AUTOSAR Davinci Idle task 与 Init Task的配置

最近在用Davinci配置Idle task和Init Task的时候遇到了一些问题&#xff0c;配置OS的时候&#xff0c;软件会自动为每个Application生成一个Idle task和Init Task&#xff1a; 先来看Idle Task&#xff0c;自动生成的Idle task的优先级是0xFFFFFFFF&#xff0c;官方给出的解释…

Library ‘libs‘ required for module ‘xxx‘ is missing from the artifact

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 错误概述 项目报错&#xff1a;Library ‘libs’ required for module ‘xxx’ is missing from the artifact&#xff1b;图示如下&#xff1a; 解决方案 点击减号删除当…

Stack-queue

文章目录 stack适配器关于为什么不直接在成员函数里调用push_back... stack 适配器 stack和queue使用了container适配器&#xff0c;因为它觉得我不需要自己实现一个容器来实现而直接用现成的 stack和queue并没有迭代器&#xff0c;因为本来就是后进先出&#xff0c;先进先出&…

无涯教程-JavaScript - DATE函数

描述 DATE函数返回特定日期的序列号。 语法 DATE (year, month, day)争论 Argument描述Required/Optionalyear year参数的值可以包含1-4位数字。 Excel会根据计算机使用的日期系统解释年份参数。 默认情况下,Microsoft Excel for Windows使用1900日期系统。 请参阅下面的注…

9.4 数据库 TCP

#include "widget.h" #include "ui_widget.h"Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);//判断数据库对象是否包含了自己使用的数据库if(!db.contains("Stu.db")){//不存在数据库&#xff0…

对可再生能源和微电网集成研究的新控制技术和保护算法进行基线和测试及静态、时域和频率分析研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

vue2 封装通用表格数据筛选的重置摁钮

放弃冗余代码吧&#xff0c;封装个混入 封装逻辑 // 重置表格筛选参数 export const queryReset {methods: {queryReset(form, method "getData", fn) {if (!this[form]) {form "queryForm";}this.$data[form] this.$options.data()[form];this[form…

2023开学礼《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书对外经济贸易大学图书馆

2023开学礼《乡村振兴战略下传统村落文化旅游设计》许少辉八一新书对外经济贸易大学图书馆

2023最新UI工作室官网个人主页源码/背景音乐/随机壁纸/一言

2023最新UI工作室官网个人主页源码/支持背景音乐/随机壁纸/一言 功能介绍&#xff1a; 载入动画 站点简介 Hitokoto 一言 日期及时间 实时天气 时光进度条 音乐播放器 移动端适配 打开文件&#xff1b;index.html和setting.json修改替换你的相关信息&a…

Windows下安装配置Nginx

nginx安装 官网下载地址 https://nginx.org/en/download.html 推荐使用稳定版本 截止时间2023年9月5日稳定版本为 1.24.0 百度网盘 链接&#xff1a;https://pan.baidu.com/s/1cXm-jN2fMzKdVMRhbG72Fg 提取码&#xff1a;9hcq 下载完成以后,得到nginx压缩包; 双击启动nginx.…

2023年模拟IC就业形式怎么样? 还能不能入了?(附最新薪资行情)

虽然说模拟IC的门槛高&#xff0c;难度大&#xff0c;但相比数字IC的话竞争要小一点。而且薪资也比数字IC要高一些。 想要学好模拟芯片设计&#xff0c;首先要懂电路&#xff0c;懂器件&#xff0c;你要理解你的器件是怎么工作的&#xff0c;它在芯片上是什么样子&#xff0c;…

如何使用代理配置快速定位接口测试脚本问题?

在调试接口用例过程中&#xff0c;如果响应结果和预期结果不一致&#xff0c;则需要检查请求信息。通过代理获取自动化测试中的请求响应信息&#xff0c;对比与正常请求响应的区别&#xff0c;就能够更直观的排查请求错误&#xff0c;相当于编写代码时的 debug 功能。 实战练习…

华为数通安全产品介绍

HiSecEngine USG12000系列防火墙&#xff08;以下简称USG12000系列&#xff09;是华为公司推出的首款T级AI防火墙&#xff0c;在网络边界实时防护已知与未知威胁&#xff0c;通常部署在云计算数据中心&#xff0c;大型企业及园区网出口&#xff0c;为数据中心、企业及园区网络提…

[MySQL]查看数据库大小

查看库大小 例如&#xff1a;查看当前MySQL中数据总量超过2GB的库&#xff1a; select table_schema as 数据库,table_rows as 记录数,data_size as 数据容量(GB),index_size as 索引容量(MB) from (selecttable_schema,sum(table_rows) as table_rows,sum(truncate(data_leng…

1794086 -F110 清算日期与付款时的付款日期不同

确实没有想到设置的地方在这里。 症状 用户在事务 F110 中执行付款&#xff0c;且清算日期字段 (BSEG-AUGDT) 未设置为付款日期。 重现问题 执行事务 F110 并执行自动付款。完成后&#xff0c;检查凭证中的“清算日期”字段。该日期与付款日期不同。 例如&#xff0c;您具…

jQuery 层次选择器

jQuery 层次选择器 &#xff08;0&#xff09;测试前的准备工作 A. 定义测试对象 B. 定义测试对象的 CSS 样式 C. 再定义一些测试用的 button。 &#xff08;1&#xff09;所有后代选择器(A B) 所有后代&#xff0c;包括其直接后代及间接后代。 &#xff08;2&#xff09;直…