消息队列-RabbitMQ(二)

news2024/11/30 10:52:52

接上文《消息队列-RabbitMQ(一)》

在这里插入图片描述

@Configuration
public class RabbitMqConfig {
    // 消息的消费方json数据的反序列化
    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(new Jackson2JsonMessageConverter());
        return factory;
    }

    // 定义使用json的方式转换数据
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate amqpTemplate = new RabbitTemplate();
        amqpTemplate.setConnectionFactory(connectionFactory);
        amqpTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return amqpTemplate;
    }

}
@Service
public class TestConsumer {

    @RabbitListener(queuesToDeclare = {@Queue("simpleQueue")})
    public void simpleModel(User user) {
        System.out.println("接收消息message=" + user);
    }

    // value=@Queue 创建临时队列
    // exchange创建交换机
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))
    })
    public void receiveMessage1(User user) {
        System.out.println(String.format("消费者 【one】: %s", user));
    }

    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,
                    exchange = @Exchange(value = "fanout-ex", type = ExchangeTypes.FANOUT))
    })
    public void receiveMessage2(User user) {
        System.out.println(String.format("消费者 【two】: %s", user));
    }

}
@RestController
@RequestMapping("/rabbitMqReq")
public class RabbitMqController {
    @Autowired
    private TestProducer testProducer;

    @PostMapping("/simple")
    public void simple(@RequestBody User user) {
        testProducer.simpleMessageSend();
    }

    @PostMapping("/fanoutMessageSend")
    public void fanoutMessageSend(@RequestBody User user) {
        testProducer.fanoutMessageSend();
    }
}
@Data
public class User {
    private String userId; //用户编号
    private String userName; //用户姓名
}

@Service
public class TestProducer {
    @Resource
    private RabbitTemplate rabbitTemplate;

    public void simpleMessageSend() {
        System.out.println("simpleMessageSend");
        User user = new User();
        user.setUserId("1001");
        user.setUserName("张三");
        rabbitTemplate.convertAndSend("simpleQueue", user);
    }

    // fanout模型
    public void fanoutMessageSend() {
        for (int i = 0; i < 5; i++) {
            User user = new User();
            user.setUserId("1001");
            user.setUserName("张三");
            rabbitTemplate.convertAndSend("fanout-ex", "", user);
        }
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
@SpringBootApplication(exclude = { DataSourceAutoConfiguration.class })
public class App
{
    public static void main(String[] args)
    {
        SpringApplication.run(App.class, args);
        System.out.println("(♥◠‿◠)ノ゙  启动成功   ლ(´ڡ`ლ)゙ ");
    }
}
# 开发环境配置
server:
  # 服务器的HTTP端口,默认为8080
  port: 8899
  servlet:
    # 应用的访问路径
    context-path: /
  tomcat:
    # tomcat的URI编码
    uri-encoding: UTF-8
    # tomcat最大线程数,默认为200
    max-threads: 800
    # Tomcat启动初始化的线程数,默认值25
    min-spare-threads: 30
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>RabbitMQDemo</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>RabbitMQDemo</name>
  <!-- FIXME change it to the project's website -->
  <url>http://www.example.com</url>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.1.RELEASE</version>
    <relativePath />
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <fastjson.version>1.2.70</fastjson.version>
  </properties>

  <dependencies>
    <!-- SpringBoot 核心包 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- AMQP客户端 -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    <!-- 阿里JSON解析器 -->
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>${fastjson.version}</version>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.10</version>
    </dependency>

  </dependencies>

  <repositories>
    <!--阿里云镜像仓库-->
    <repository>
      <id>public</id>
      <name>aliyun nexus</name>
      <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
      <releases>
        <enabled>true</enabled>
      </releases>
    </repository>
    <!--oracle驱动没有发布到中央仓库,只能从此仓库下载-->
    <repository>
      <id>jeecg</id>
      <name>jeecg Repository</name>
      <url>http://maven.jeewx.com/nexus/content/repositories/jeecg</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>

</project>

什么是事务?
数据库事务。原子性、一致性、隔离性、持久性。

本地事务?
整个服务操作只能涉及一个单一的数据库资源。

分布式事务?
分布式事务指事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是为了保证不同数据库的数据一致性。

分布式事务应用架构?
微服务架构的分布式应用环境下,越来越多的应用要求对多个数据库资源,多个服务的访问都能纳入到同一个事务当中,分布式事务应运而生。

分布式事务有几种解决方案: 两阶段提交/XA MQ
第一阶段(prepare):即所有的参与者RM准备执行事务并锁住需要的资源。参与者ready时,向TM报告已准备就绪。
第二阶段 (commit/rollback):当事务管理者™确认所有参与者(RM)都ready后,向所有参与者发送commit命令。

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

RabbitMQ与springboot整合
1、添加依赖 spring-boot-starter-amqp
2、配置RabbitConfig,包含RabbitListenerContainerFactory、RabbitTemplate

简单模型
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void simpleMessageSend() {
    rabbitTemplate.convertAndSend("simpleQueue", new User(1, "张"));
}

}

@RabbitListener(queuesToDeclare = {@Queue(“simpleQueue”)})
public void simpleModel(User user) {
log.info(“message: {}”, user);
}

发布订阅模型
// fanout模型
@Test
public void fanoutMessageSend() {
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend(“fanout-ex”, “”, new User(i, “张三”));
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

// value=@Queue 创建临时队列
// exchange创建交换机
@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
exchange = @Exchange(value = “fanout-ex”, type = ExchangeTypes.FANOUT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

直连模式(direct)
@Test
public void directMessageSend() {
//rabbitTemplate.convertAndSend(“direct-ex”, “success”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“direct-ex”, “error”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”, “success”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“error”},
exchange = @Exchange(value = “direct-ex”, type = ExchangeTypes.DIRECT))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

topic模型
@Test
public void topicMessageSend() {
//rabbitTemplate.convertAndSend(“topic-ex”, “company”, new User(2, “张三”));
rabbitTemplate.convertAndSend(“topic-ex”, “company.java”, new User(2, “张三”));
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage1(User user) {
System.out.println(String.format(“消费者 【one】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.java.#”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage2(User user) {
System.out.println(String.format(“消费者 【two】: %s”, user));
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue,
key = {“company.html.*”},
exchange = @Exchange(value = “topic-ex”, type = ExchangeTypes.TOPIC))
})
public void receiveMessage3(User user) {
System.out.println(String.format(“消费者 【three】: %s”, user));
}

消息的手动确认
spring:
rabbitmq:
listener:
simple:
# 提交方式为手动
acknowledge-mode: MANUAL

factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

面试:a. 如何保证消息不丢失?

​ b. 如何保证消息的不重复消费?

​ c. 如何使用mq来是实现分布式事务?

​ d. 在工作中mq用在哪里?支付回调。

RabbitMQ如何做到消息不丢失?

1.持久化

发送消息时设置delivery_mode属性为2,使消息被持久化保存到磁盘,即使RabbitMQ服务器宕机也能保证消息不丢失。同时,创建队列时设置durable属性为True,以确保队列也被持久化保存。

2.确认机制

消费者通过basic.ack命令向RabbitMQ服务器确认已经消费了消息。如果消费者处理消息时发生错误或宕机,RabbitMQ会重新将消息发送给其他消费者。RabbitMQ在接收到消费者确认消息前会将消息保存在内存中,在确认后才会删除消息。

3.发布者确认

RabbitMQ支持发布者确认机制,即发布者在将消息发送到队列后,等待RabbitMQ服务器的确认消息。成功保存到队列的消息会返回确认消息给发布者,如果无法保存则返回Nack(Negative Acknowledgement)消息。通过发布者确认机制,可以确保消息成功发送到RabbitMQ服务器。

4.备份队列

RabbitMQ支持备份队列(Alternate Exchange)机制,即在消息发送到队列之前,先将消息发送到备份队列。如果主队列无法接收消息,RabbitMQ会将消息发送到备份队列中。备份队列通常是一个交换机,在创建队列时可以通过x-dead-letter-exchange属性指定备份队列。

三、注意事项 在使用RabbitMQ消息确认机制时,需注意以下几点:

1、确认模式的选择: 根据具体业务需求选择合适的确认模式。如果对消息传递的可靠性要求较高,建议使用手动确认模式或批量确认模式。

2、设置消息持久化: 在发布消息时,通过设置消息的持久化属性,可以确保即使RabbitMQ服务器意外关闭或重启,消息仍然能够被保存下来。

3、处理未确认消息: 如果消费者在处理消息时发生异常,导致无法发送确认信号给RabbitMQ,那么这些消息将保持在队列中,并可能被重新投递。需要设定适当的重试策略和错误处理机制来处理这些未确认的消息。

4、监听确认超时: 在使用批量确认模式时,如果长时间没有收到确认信号,需要设置合理的超时时间,并采取相应措施来处理超时的情况。

RabbitMQ消息确认机制详解:确保交付成功

消息确认机制是通过发布者(Producer)和消费者(Consumer)之间的交互来实现的。
当发布者发送消息到RabbitMQ后,会等待确认结果。如果消息成功被消费者接收并处理,消费者会发送一个确认信号给RabbitMQ,告知消息已经处理完成。而RabbitMQ则会根据接收到的确认信号,判断消息是否成功交付。
消息确认机制一般有两种模式:简单模式和批量模式。在简单模式中,每发送一条消息后就等待确认;而在批量模式中,可以一次发送多条消息,然后等待批量确认。无论是简单模式还是批量模式,都可以提高消息传递的可靠性。

https://blog.csdn.net/weixin_52278591/article/details/128841155 RabbitMq 消息确认机制详解

https://blog.csdn.net/u011942456/article/details/128198956 总结RabbitMq消息丢失和消息重复消费问题

https://www.zhihu.com/question/54756562?utm_id=0 rabbitmq是什么,主要用于哪些方面?

https://zhuanlan.zhihu.com/p/583520436?utm_id=0 用了8年MQ!聊聊消息队列的技术选型,哪个最香!

https://blog.csdn.net/qq_43410878/article/details/123656765 RabbitMQ使用详解

https://zhuanlan.zhihu.com/p/590948541 绝对详细的 RabbitMQ入门,看完本系列就够了

MQ的优势和劣势

https://cloud.tencent.com/developer/article/2113514 面试百问:使用MQ的优势、劣势以及问题

rabbitmq basicAck

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1054859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

redis解压+windows安装+无法启动:1067

Redis下载安装图文教程&#xff08;Windows版_超详细&#xff09; 标题若遇到安装后无法启动&#xff1a;1067 排查方法如下&#xff1a; 1.查询是否有服务占用端口 查看6379的端口也没有被占用&#xff08;netstat -ano | findstr :6379&#xff09; 若有&#xff0c;kill掉…

盛最多水的容器 接雨水【基础算法精讲 02】

盛雨水最多的容器 链接 : 11 盛最多水的容器 思路 : 双指针 &#xff1a; 1.对于两条确定的边界&#xff0c;l和r,取中间的线m与r组成容器&#xff0c;如果m的高度>l的高度&#xff0c;那么整个容器的长度会减小&#xff0c;如果低于l的高度&#xff0c;那么不仅高度可…

54、数组--模拟

LCR 146. 螺旋遍历二维数组 给定一个二维数组 array&#xff0c;请返回「螺旋遍历」该数组的结果。 螺旋遍历&#xff1a;从左上角开始&#xff0c;按照 向右、向下、向左、向上 的顺序 依次 提取元素&#xff0c;然后再进入内部一层重复相同的步骤&#xff0c;直到提取完所有…

SpringBoot整合数据库连接

JDBC 1、数据库驱动 JDBC&#xff08;Java DataBase Connectivity&#xff09;&#xff0c;即Java数据库连接。简而言之&#xff0c;就是通过Java语言来操作数据库。 JDBC是sun公司提供一套用于数据库操作的接口. java程序员只需要面向这套接口编程即可。不同的数据库厂商&…

C++八股

1、简述一下C中的多态 在面向对象中&#xff0c;多态是指通过基类的指针或引用&#xff0c;在运行时动态调用实际绑定对象函数的行为&#xff0c;与之相对应的编译时绑定函数称为静态绑定。 静态多态 静态多态是编译器在编译期间完成的&#xff0c;编译器会根据实参类型来选择…

国庆10.1

用select实现服务器并发 ser #include <myhead.h> #define ERR_MSG(msg) do{\fprintf(stderr, "__%d__", __LINE__);\perror(msg);\ }while(0)#define PORT 8888 //端口号&#xff0c;范围1024~49151 #define IP "192.168.1.205" //本机…

ARMv7-A 那些事 - 5.CP15协处理器

By: Ailson Jack Date: 2023.10.01 个人博客&#xff1a;http://www.only2fire.com/ 本文在我博客的地址是&#xff1a;http://www.only2fire.com/archives/157.html&#xff0c;排版更好&#xff0c;便于学习&#xff0c;也可以去我博客逛逛&#xff0c;兴许有你想要的内容呢。…

14:STM32-----看门狗

目录 一:看门狗 1:WDG 2:独立看门狗 (IWDG) A:IWDG框图 B:IWDG_KR键寄存器 C:IWDG超时时间 3:窗口看门狗 (WWDG) A:WWDG框图 B:WWDG工作特性 C:WWDG超时时间 4:独立看门狗和窗口看门狗的区别 5:数据手册 二:案例 A:独立看门狗 1:连接图 2:步骤 3:函数介绍 3:代…

网络爬虫——urllib(2)

前言&#x1f36d; ❤️❤️❤️网络爬虫专栏更新中&#xff0c;各位大佬觉得写得不错&#xff0c;支持一下&#xff0c;感谢了&#xff01;❤️❤️❤️ Python网络爬虫_热爱编程的林兮的博客-CSDN博客 前篇讲解了urllib的基本使用、一个类型六个方法与下载相关内容&#xff0…

《深入浅出OCR》第二章:OCR技术发展与分类

✨专栏介绍: 经过几个月的精心筹备,本作者推出全新系列《深入浅出OCR》专栏,对标最全OCR教程,具体章节如导图所示,将分别从OCR技术发展、方向、概念、算法、论文、数据集等各种角度展开详细介绍。 👨‍💻面向对象: 本篇前言知识主要介绍深度学习知识,全面总结知知识…

Python3数据科学包系列(一):数据分析实战

Python3中类的高级语法及实战 Python3(基础|高级)语法实战(|多线程|多进程|线程池|进程池技术)|多线程安全问题解决方案 Python3数据科学包系列(一):数据分析实战 Python3数据科学包系列(二):数据分析实战 认识下数据科学中数据处理基础包: (1)NumPy 俗话说: 要学会跑需先…

C++核心编程--多态篇

4.7、多态 4.7.1、多态的基本概念 多态是C面向对象三大特征之一 多态分为两类 静态多态&#xff1a;函数重载和运算符重载属于静态多态&#xff0c;复用函数名动态多态&#xff1a;派生类和虚函数实现运行时多态 静态多态和动态多态区别&#xff1a; 静态多态的函数地址早…

Eclipse 主网即将上线迎空投预期,Zepoch 节点或成受益者?

目前&#xff0c;Zepoch 节点空投页面中&#xff0c;模块化 Layer2 Rollup 项目 Eclipse 出现在其空投列表中。 配合近期 Eclipse 宣布了其将由 SVM 提供支持的 Layer2 主网架构&#xff0c;并将在今年年底上线主网的消息后&#xff0c;不免引发两点猜测&#xff1a;一个是 Ecl…

springcloud:四、nacos介绍+启动+服务分级存储模型/集群+NacosRule负载均衡

nacos介绍 nacos是阿里巴巴提供的SpringCloud的一个组件&#xff0c;算是eureka的替代品。 nacos启动 安装过程这里不再赘述&#xff0c;相关安装或启动的问题可以见我的另一篇博客&#xff1a; http://t.csdn.cn/tcQ76 单价模式启动命令&#xff1a;进入bin目录&#xff0…

某房产网站登录RSA加密分析

文章目录 1. 写在前面2. 抓包分析3. 扣加密代码4. 还原加密 1. 写在前面 今天是国庆节&#xff0c;首先祝福看到这篇文章的每一个人节日快乐&#xff01;假期会老的这些天一直在忙事情跟日常带娃&#xff0c;抽不出一点时间来写东西。夜深了、娃也睡了。最近湖南开始降温了&…

SimpleCG动画示例--汉诺塔动画演示

前言 SimpleCG的使用方法在前面已经介绍了许多&#xff0c;有兴趣的同学如果有去动手&#xff0c;制作一些简单动画应该没多大问题的。所以这次我们来演示一下简单动画。我们刚学习C语言的递归函数时&#xff0c;有一个经典例子相信很多同学都写过&#xff0c;那就是汉诺塔。那…

【算法优选】双指针专题——壹

文章目录 &#x1f60e;前言&#x1f334;[移动零](https://leetcode.cn/problems/move-zeroes/)&#x1f6a9;题⽬描述&#xff1a;&#x1f6a9;算法思路&#x1f6a9;算法流程&#x1f6a9;代码实现 &#x1f340;[复写零](https://leetcode.cn/problems/duplicate-zeros/)&…

深入浅出线程池

一、线程 1、什么是线程 线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中&#xff0c;是进程中的实际 运作单位。一条线程指的是进程中一个单一顺序的控制流&#xff0c;一个进程中可以并发多个线程&#xff0c;每条线 程并行执行不同的任务。 2、如…

2023-10-01 LeetCode每日一题(买卖股票的最佳时机)

2023-10-01每日一题 一、题目编号 121. 买卖股票的最佳时机二、题目链接 点击跳转到题目位置 三、题目描述 给定一个数组 prices &#xff0c;它的第 i 个元素 prices[i] 表示一支给定股票第 i 天的价格。 你只能选择 某一天 买入这只股票&#xff0c;并选择在 未来的某一…

Python Appium 安卓自动化测试 基本使用 - Phone Spider

Python Appium 安卓自动化测试 基本使用 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 例如&#xff1a;第一章 Python 机器学习入门之pandas的使用 文章目录 Python Appium 安卓自动化测试 基本使用前言一、环境安装1.1 Python Pip…