SpringBoot整合Canal+RabbitMQ监听数据变更(对rabbit进行模块封装)

news2025/1/11 23:00:27

SpringBoot+Canal(监听MySQL的binlog)+RabbitMQ(处理保存变更记录)

在SpringBoot中采用一种与业务代码解耦合的方式,来实现数据的变更记录,记录的内容是新数据,如果是更新操作还得有旧数据内容。
使用Canal来监听MySQL的binlog变化可以实现这个需求,可是在监听到变化后需要马上保存变更记录,除非再做一些逻辑处理,于是又结合了RabbitMQ来处理保存变更记录的操作。

  • 启动MySQL环境,并开启binlog
  • 启动Canal环境,为其创建一个MySQL账号,然后以Slave的形式连接MySQL
  • Canal服务模式设为TCP,用Java编写客户端代码,监听MySQL的binlog修改
  • Canal服务模式设为RabbitMQ,启动RabbitMQ环境,配置Canal和RabbitMQ的连接,用消息队列去接收binlog修改事件

预先在model实体中准备

短信实体

@Data
@ApiModel(description = "短信实体")
public class MsmVo{
	
	@ApiModelProperty(value="phone")
	private String phone;

	@ApiModelProperty(value = "短信模板code")
	private  String templateCode;

	@ApiModelProperty(value="短信模板参数")
	private Map<String,Object> param;
}

排班实体

@Data
@ApiModel(description = "OrderMqVo")
public class OrderMqVo{
	
	@ApiModelProperty(value="可预约数")
	private Integer reserverdNumber

	@ApiModelProperty(value = "剩余预约数")
	private Integer availableNumber;

	@ApiModelProperty(value = "排班id")
	private String scheduleId;

	@ApiModelProperty(value = "短信实体")
	private MsmVo msmVo;
}

一、安装RabbitMQ

docker pull rabbitmq:nanagemnet
docker run -d -p 5672:5672 -p 12672:15672 --name rabbitmq rabbitmq:nanagement

访问:http://IP:15672
在这里插入图片描述

二、rabbit-util模块封装

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
</dependency>

创建一个RabbitService用来发送消息

@Service
public class RabbitService{
	
	@Autowired
	private RabbitTemplate rabbitTemplate;

	//发送消息
	public boolean sendMessage(String exchange,String routingKey,Object message){
		rabbitTemplate.convertAndSend(exchange,routingKey,message);
		return true;
	}
}

创建mq消息转化器

@Configuration
public class MQConfig{
	
	@Bean
	public MessageConverter messageConverter(){
		return new Jackson2JsonMessageConverter();
	}
}

添加常量配置类

public class MqConst{
	
	//预约下单
	public static final String EXCHANGE_DIRECT_ORDER = "exchange.direct.order";
	public static final String ROUTING_ORDER = "order";
	//队列
	public static final String QUEUE_ORDER = "queue.order";

	//短信
	public static final String EXCHANGE_DIRECT_MSM = "exchange.direct.msm";
	public static final String ROUTING_MSM_ITEM = "msm.item";
	pulib static final String Queue_MSM_item = "queue.msm.item";
}

三、短信模块service-sms

将二中的模块依赖引入

<dependency>
	<groupId>com.michael</groupId>
	<artifactId>rabbit_util</artifactId>
	<version>xxx</version>
</dependency>

配置文件application.properties

spring.rabbitmq.host=192.168.44.168
spring.rabbitmq.port=5672
spring.rabbit.uername=guest
spring.rabbitmq.password=guest

Service发送消息

public interface MsmService{
	//发送手机验证码
	boolean send(String phone,String code);
	
	//MQ使用发送短信的接口
	boolean send(MsmVo msmVo);
}
@Service
public class MsmServiceImpl implements MsmService{

	@Override
	public boolean send(String phone,String code){
		//判断手机号是否为空
		if(StringUtils.isEmpty(phone)){
			return false;
		}

		//整合阿里云相关参数,短信服务
		DefaultProfile profile = DefaultProfile.getProfile(ConstantPropertiesUtils.REGION_Id,
			ConstantPropertiesUtils.ACCESS_KEY_ID,
			ConstantPropertiesUtils.SECRET
		);
		IAcsClient client = new DefaultAcsClient(profile);
		CommonRequest request = new CommonRequest();

		request.setMethod(MethodType.POST);
		request.setDomain("dysmsapi.aliyuncs.com");
		request.setVersion("2018-08-08");
		request.setAction("SendSms");

		//手机号
		request.putQueryParameter("PhoneNumbers",phone);
		//签名名称
		request.putQueryParameter("SignName","我的网站");
		//模板
		request.putQueryParameter("TemplateCode","SMS_180051135");
		//验证码使用json格式{"code":"123456"}
		Map<String,Object> param = new HashMap();
		param.put("code",code);
		request.putQueryParameter("TemplateParam",JSONObject.toJSONString(param));
		
		//调用方法进行短信发送
		try{
			CommonResponse response = client.getCommonResponse(request);
			System.out.println(response.getData());
			return response.getHttpResponse().isSuccess();
		}catch(ServerException e){
			e.printStackTrace();
		}catch(ClientException e){
			e.printStackTrace();
		}
		return false;
	}
	
	@Override
	public boolean send(MsmVo msmVo){
		
		if(!StringUtils.isEmpty(msmVO.getPhone())){
			String code = (String)msmVo.getParam().get("code");
			boolean isSend = this.send(msmVo.getPhone(),code);
			return isSend;
		}
		return false;
	}
}

创建mq监控器

@Component
public class MsmReceiver{
	
	@Autowired
	private MsmService msmService;

	//监听
	@RabbitListener(bindings = @QueueBinding(
		value = @Queue(value = MqConst.QUEUE_MSM_ITEM,durable = "true"),
		exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_MSM),
		key = {MqConst.ROUTING_MSM_ITEM}
	))
	public void send(MsmVo msmVo,Message message,Channel channel){
		msmService.ssend(msmVo);
	}
}

四、业务类

生成订单之后,发送短信并更新数量

①、业务模块中引入依赖

rabbit-util

②、添加配置

spring.rabbitmq.host=192.168.44.165
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

③、service接口以及实现类

@Override
public void update(Schedule schedule){
	schedule.setUpdata(new Date());
	scheduleRepository.save(schedule);
}

④、receiver包中创建MQ监听器

@Component
public class HospitalReceiver{
	
	@Autowired
	private ScheduleService scheduleService;

	@Autowired
	private RabbitService rabbitService;

	//监听
	@RabbitListener(
		bindings = @QueueBinding(
			value = @Queue(value = MqConst.QUEUE_ORDER,durable = "true"),
			exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_ORDER),
			key = {MqConst.ROUTING_ORDER}
		)
	)
	public void receiver(OrderMqVo orderMqVo,Message message,Channel channle) throws IOException{
		//下单成功,更新数据
		Schedule schedule = scheduleService.getScheduleId(orderMqVo.getScheduleId());
		schedule.setReservedNumber(orderMqVo.getReservedNumber());
		schedule.setAvailableNumber(orderMqVo.getAvailableNumber);
		scheduleService.update(schedule);

		//发送短信
		MsmVo msmVo = orderMqVo.getMsmVo();
		if(null != msmVo){
			rebbitService.sendMessage(MqConst.QUEUE_MSM_ITEM,MqConst.ROUTING_MSM_ITEM,msmVo);
		}
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1184253.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

埃隆·马斯克旗下xAI推出PromptIDE工具,加速提示工程和可解释性研究

&#x1f989; AI新闻 &#x1f680; 埃隆马斯克旗下xAI推出PromptIDE工具&#xff0c;加速提示工程和可解释性研究 摘要&#xff1a;埃隆马斯克旗下人工智能初创公司xAI推出了PromptIDE工具&#xff0c;该工具是一个用于提示工程和可解释性研究的集成开发环境。通过该工具&a…

MATLAB|怎么将散点图替换成图片

目录 效果图 工具箱函数 输入参数: 输出参数 使用教程 案例1 案例3 案例4 案例5 案例6 案例7 案例8 扫一扫关注公众号&#xff1a; 今天教大家怎么把散点图替换成自己喜欢的图片&#xff0c;喜欢此推文的小伙伴们记得点赞关注分享&#xff01; 效果图 看了slandare…

【BUG解决】服务器没报警但是应用接口崩了....

最近遇到一个突发问题&#xff1a;服务器没报警但是应用接口崩了… 为其他业务系统提供一个接口&#xff0c;平时好好的&#xff0c;突然就嚷嚷反馈说访问不了了&#xff0c;吓得我赶紧跳起来&#xff01; 正常情况下在系统崩溃前&#xff0c;我会收到很多系统报警&#xff0…

【Unity】光照烘培-基础参数-基础设置

光照烘培 一级目录二级目录 问题目录烘焙光照在手机不起作用 一级目录 二级目录 Unity 2020.3.25 打开灯光面板 Wingdow -》 Rendering -> Lighting Lighting Settings 灯光设置文件 Realtime Lighting Realtime Global lllumin Realtime Environme Mixed Lighting Ba…

欧科云链:成本与规模之辨——合规科技如何赋能香港Web3生态?

作为国际金融中心&#xff0c;香港近两年来在虚拟资产及Web3领域频频发力。秉持着“稳步创新”的基本逻辑&#xff0c;香港在虚拟资产与Web3领域已建立一定优势&#xff0c;但近期各类风险事件的发生则让业界的关注焦点再次转向“安全”与“合规”。 在香港FinTech Week前夕&a…

【Hugging Face】如何下载模型文件

参考文章&#xff1a; 1、mac安装Homebrew - 知乎 2、 ssh连接 git lfs install git clone githf.co:bert-base-uncased -- 安装Homebrew /bin/zsh -c "$(curl -fsSL https://gitee.com/cunkai/HomebrewCN/raw/master/Homebrew.sh)" -- 配置文件生效 source /Use…

创邻科技再获人行旗下《金融电子化》2023年度大奖

11月3日&#xff0c;由中国人民银行核心金融期刊《金融电子化》与第五届数字金融应用博览会共同主办的第14届金融科技大会在江苏苏州举行。 大会现场揭晓了2023年度“金融科技创新奖”&#xff0c;创邻科技的“Galaxybase全行级图应用平台”项目&#xff0c;从全国各金融机构送…

支持企业微信集成和登录!镭速传输新版本带来多项升级

近日&#xff0c;镭速发布了最新版本V.6.7.8.0&#xff0c;增加了一些新的功能和优化&#xff0c;为用户带来更好的体验。以下是本次更新的主要内容&#xff1a; 01 企业微信集成 企业微信登录功能为企业提供了更加便捷的用户管理和权限控制。在镭速的新版本中&#xff0c;通…

查出来这个表中evaluation_num字段中以2023开头的最大的尾数是几,instr用法

查出来这个表中evaluation_num字段中以2023开头的最大的尾数是几&#xff0c; sql如下&#xff1a; select max(to_number(substr(evaluation_num,instr(evaluation_num,-,1,2)1))) evaluation_num from tbl_lawsuit_index_assess_rec where to_char(create_time,yyyy)2023我…

某相亲网站白合网js逆向解析,★

文章目录 前言网址:参数加密确定加密位置和加密的参数解析加密方法完结撒花前言 可以关注我哟,一起学习,主页有更多练习例子 如果哪个练习我没有写清楚,可以留言我会补充 如果有加密的网站可以留言发给我,一起学习共享学习路程 如侵权,联系我删除 此文仅用于学习交流,请…

队列(定义,基本操作,顺序存储,链式存储)

目录 1.队列的定义1.重要术语2.基本操作 2.队列的顺序存储1.基本操作1.初始化2.判空3.入队&#xff08;循环队列&#xff09;4.出队5.读队头 2.判断队列已满/已空 3.队列的链式存储1.基本操作&#xff08;带头结点&#xff09;1.初始化2.判空3.入队4.出队5.队满条件 1.队列的定…

腾讯广告RACE曝光归因模型

今天我们以腾讯广告RACE曝光归因模型为例&#xff08;以下简称RACE模型&#xff09;&#xff0c;来聊聊行业在衡量广告效果上的努力与成效。 第一类&#xff1a;衡量转化以及转化过程的归因 如同前面所讲&#xff0c;如果只是衡量ROI&#xff0c;对广告投放的效果衡量就只有一…

idea配置插件JRebel and XRebel

激活 点击Help—>JRebel—>Activation 激活地址: http://server.52zhaoyue.cn/b56c9b61-2e80-4e31-82b4-15271a93e8c7 b56c9b61-2e80-4e31-82b4-15271a93e8c7生成地址https://www.guidgen.com/ 激活邮箱: 自由 激活及确认 方案1&#xff1a;激活后直接显示 激活信息…

第二证券:北交所30%的涨跌幅限制?

随着我国股市的不断发展&#xff0c;股市生意的涨跌幅束缚也成为了一个备受注重的论题。在北交所&#xff0c;股票的涨跌幅束缚为30%&#xff0c;这一束缚是否合理呢&#xff1f;本文将从多个角度进行剖析。 首先&#xff0c;涨跌幅束缚对于股市的安稳起着重要的效果。股票价格…

CSS实现透明度效果的两种方法—— opacity 和 rgba()

在实际开发过程中&#xff0c;为了给用户呈现一些效果&#xff0c;我们需要控制元素的透明度。CSS 提供了 opacity 属性和 rgba() 函数给我们控制透明度&#xff0c;接下来通过一个例子来感受一下两种方法的区别。 <style>.transparentBox {display: inline-block;width…

telnet不是内部或外部命令

telnet不是内部或外部命令-telnet测试端口是否开放 在windows系统上开启Telnet服务 win8以上系统&#xff1a;"开始"→"控制器面板"→"程序和功能"→ 左侧"启动或关闭windows功能"→ 在"Windows功能"界面勾选Telnet服务器和…

java RMI 技术介绍和实践

在项目上发现了使用rmi技术&#xff0c;充电一波 RMI 概述 RMI&#xff08;Remote Method Invocation&#xff09;是一种 Java 编程语言中的远程过程调用&#xff08;RPC&#xff09;协议&#xff0c;用于在不同的Java虚拟机&#xff08;JVM&#xff09;之间进行通信和交互。它…

基于非精确线搜索算法三准则实现步长因子的求解

0、前言 朋友请我帮他做一个他们老师留的课堂作业&#xff0c;就自学了一下&#xff0c;我给他做了A准则和G准则的&#xff0c;W准则的留给他自己改了&#xff0c;也没有多难就是换一个判断条件就行了。 一、问题描述 二、要求 三、代码 3.1A准则加回退法 %帮别人做的小作业…

『MySQL快速上手』-⑤-数据类型

文章目录 1.数据类型有哪些2.数值类型2.1 tinyint 类型2.2 bit 类型2.3 小数类型2.3.1 float2.3.2 decimal 3.字符串类型3.1 char3.2 varchar3.2 char 和 varchar 比较 4.日期和时间类型5.enum和set 1.数据类型有哪些 MySQL支持多种数据类型&#xff0c;这些数据类型可用于定义…

第二证券:“闻”A股:注册制走深走实是活跃市场制度保障

不久前举办的中心金融工作会议明确要求“推动股票发行注册制走深走实”&#xff0c;无疑为本钱商场活泼进一步夯实制度保证。 上市公司是本钱商场的基石&#xff0c;其质量是决议本钱商场长期健康展开的重要因素之一。作为牵一发而动全身的重要革新&#xff0c;注册制的实施逐…