RocketMQ使用

news2025/1/22 19:40:04

说明:本文介绍RocketMQ的消费模式&消息类型,RocketMQ的安装参考及简单使用,参考:http://t.csdn.cn/BKFPj

消费模式

RocketMQ与RabbitMQ最大的区别在于,RocketMQ是根据消息的Topic锁定消费者的,Topic属性设置为相同的消费者,可以看做是一个消费者集群。消息模式分为以下三种:

(1)一对一

最简单的一种方式,消息的Topic只被一个消费者消费,如下:

(生产者)

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Test
    public void simpleTest(){
        rocketMQTemplate.syncSend("simple","hello rocketmq!");
    }

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("s = " + s);
    }
}

执行结果

在这里插入图片描述

(2)一对多

当存在多个Topic相同的消费者时,这些消费者共同消费消息,如下:

(开启两个消费者,Topic相同)

在这里插入图片描述

(生产者)

    @Test
    public void oneToMany(){
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.syncSend("simple","one to many" + i);
        }
    }

执行结果,可以看到负载均衡策略是随机;

在这里插入图片描述

在这里插入图片描述

(3)多对多

参考一对多方式,发送多个Topic的消息,让多种Topic的消费者接收消息;

消息类型

根据消息的类型和对消息的处理,可以分为以下几种:

(1)同步消息

同步消息,消息发送到MQ,MQ保存成功后才会返回结果,在API中是以"sync"(synchronous,同步)开头的一些方法,可以看到这些方法都有返回值,可以通过返回结果判断是否发送成功;
在这里插入图片描述

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("接收到同步消息 = " + s);
    }
}

(生产者,可以通过返回结果判断发送是否成功)

	@Autowired
	private RocketMQTemplate rocketMQTemplate;
	
	@Test
	public void simpleTest1(){
	    SendResult sendResult = rocketMQTemplate.syncSend("simple", "这是一个异步消息");
	    System.out.println("sendResult.getSendStatus() = " + sendResult.getSendStatus());
	}

在这里插入图片描述

(2)异步消息

异步消息,消息发送给MQ后代码就会立即向下执行,在API中是以“asyn”(asynchronous,异步),可以手动设置发送消息成功与否执行的方法;

(生产者)

    @Test
    public void simpleTest2() throws InterruptedException {
        rocketMQTemplate.asyncSend("simple", "这是一个异步消息", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("成功信息" + sendResult.toString());
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("异常信息" + throwable.getMessage());
            }
        });
        TimeUnit.SECONDS.sleep(2);
    }

(发送消息成功,执行成功的方法)

在这里插入图片描述

需要注意,这里是指发送消息成功与否,与消费者是否成功消费无关;

(3)单向消息

单向消息,是指只管发送消息,不关系MQ是否成功接收,没有返回值;

    @Test
    public void simpleTest3() {
        rocketMQTemplate.sendOneWay("simple", "这是一个单向消息");
    }

(4)延迟消息

延迟消息,指给消息设置一个延迟级别,达到指定时间后,消费者才能收到这个消息,延迟级别如下:

# 延迟级别,从1开始
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

(生产者)

    @Test
    public void simpleTest4() {
        // 设置超时为1秒,延迟等级为3,即10秒
        rocketMQTemplate.syncSend("simple", MessageBuilder.withPayload("这是一个延迟消息").build(),1000,3);
    }

(消费者,10秒后才收到消息)

在这里插入图片描述

延迟消息相较于RabbitMQ,使用起来更方便,但是只能设置时间等级,不能设置准确时间,非常难受;

(5)批量消息

RocketMQ可以发送一个集合,如下:

(消费者)

    @Test
    public void simpleTest5(){

        ArrayList<Message> list = new ArrayList<>();
        list.add(MessageBuilder.withPayload("aaa").build());
        list.add(MessageBuilder.withPayload("bbb").build());
        list.add(MessageBuilder.withPayload("ccc").build());

        rocketMQTemplate.syncSend("simple", list, 3000);
    }

(执行结果)

在这里插入图片描述

(6)消息过滤

消息过滤,是RocketMQ较与RabbitMQ独有的功能,指对发送的消息进行过滤,指接收限定条件的消息,对消息进行限制接收。有两种方式,如下:

a. 标签过滤

在发送消息时,指定topic的同时,加上一个标签,表示只发给有这个标签的消费者;

(生产者)

    @Test
    public void simpleTest6(){
        rocketMQTemplate.syncSend("simple:tag", "Tag Message");
    }

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple", selectorExpression = "tag1")
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("接收到标签过滤消息 = " + s);
    }
}

(执行结果)

在这里插入图片描述

b. SQL过滤

另一种是SQL过滤的方式,在消费者这边,写SQL语句对消息进行过滤消息;

(生产者,设置name = SQL)

    @Test
    public void simpleTest6(){
        // 标签方式
        rocketMQTemplate.syncSend("simple:tag", "Tag Message");

        // SQL语句方式
        rocketMQTemplate.syncSend("simple",
                MessageBuilder.withPayload("SQL Message")
                        .setHeader("name","SQL")
                        .build());
    }

(消费者,只接受name = SQL的消息)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",
 selectorType = SelectorType.SQL92, selectorExpression = "name = 'SQL'")
public class ConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("接收到SQL语句过滤消息 = " + s);
    }
}

(执行结果)

在这里插入图片描述

(7)对象消息

RocketMQ当然也可以发送对象作为消息,该对象应该要实现Serializable接口,如下:

import java.io.Serializable;

public class User implements Serializable {

    private String username;

    private String password;

    public User() {
    }

    public User(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    @Override
    public String toString() {
        return "User{" +
                "username='" + username + '\'' +
                ", password='" + password + '\'' +
                '}';
    }
}

(生产者)

    @Test
    public void simpleTest7(){
        User user = new User();
        user.setUsername("zhangsan");
        user.setPassword("123456");
        rocketMQTemplate.syncSend("simple", user);
    }

(消费者)

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple")
public class ConsumerListener implements RocketMQListener<User> {
    @Override
    public void onMessage(User user) {
        System.out.println("user = " + user);
    }
}

(执行结果)

在这里插入图片描述

(8)顺序消息

顺序消息,是指消息从发送到被消费,需要始终保持前后顺序。如下,发送15次消息,可以看到消费者那边的消费顺序并不是一直的;

    @Test
    public void simpleTest1() {
        for (int i = 0; i < 15; i++) {
            rocketMQTemplate.syncSend("simple", "这是一个同步消息===>" + i);
        }
    }

在这里插入图片描述

顺序消息,需要保证以下两方面:

  • 所有的消息存入到MQ中的同一个队列中,因为RocketMQ默认有四个队列,消息会被负载均衡存储在这些队列里;

  • 该队列只能被一个线程消费,因为一个队列的消息在消费时会有多个线程同时进行消费;

前者可以通过,XxxOrderly()方法实现消息在队列中的顺序存储,如下:

(生产者:给对象设置一个ID,让它们按照ID顺序存储在MQ中)

    @Test
    public void simpleTest8(){
        ArrayList<User> users = new ArrayList<>();

        User user1 = new User("1","zhangsan","zs");
        User user2 = new User("2","lisi","ls");
        User user3 = new User("3","wangwu","ww");

        users.add(user1);
        users.add(user2);
        users.add(user3);


        for (User user : users) {
            rocketMQTemplate.syncSendOrderly("simple",user,user.getId());
        }
    }

后者,可以通过在消费者这边添加这个配置,保证消息被顺序消费,如下:

(消费者,设置消费模式 consumeMode = ConsumeMode.ORDERLY

@Component
@RocketMQMessageListener(consumerGroup = "groupA", topic = "simple",consumeMode = ConsumeMode.ORDERLY)
public class ConsumerListener implements RocketMQListener<User> {

    @Override
    public void onMessage(User user) {
        System.out.println(user);
    }
}

执行结果,可以看到消息时顺序进行的

在这里插入图片描述

总结

RocketMQ的内容还有很多,可参考 http://t.csdn.cn/QXQNZ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/841163.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

当不在公司时,如何在外远程登录公司内网OA系统?

在外远程登录公司内网OA系统 文章目录 在外远程登录公司内网OA系统前言1. 打开“远程桌面”选项2. 安装cpolar客户端3. 登录cpolar客户端4. 创建隧道5. 生成公网地址6. 远程连接其他电脑 前言 随着信息化办公的快速推进&#xff0c;很多企业已经用上了OA系统&#xff0c;并且我…

ubuntu上安装mosquitto服务

1、mosquitto是什么 Mosquitto 项目最初由 IBM 和 Eurotech 于 2013 年开发&#xff0c;后来于 2016 年捐赠给 Eclipse 基金会。Eclipse Mosquitto 基于 Eclipse 公共许可证(EPL/EDL license)发布&#xff0c;用户可以免费使用。作为全球使用最广的 MQTT 协议实现之一 &#x…

Diffusion扩散模型学习4——Stable Diffusion原理解析-inpaint修复图片为例

Diffusion扩散模型学习4——Stable Diffusion原理解析-inpaint修复图片为例 学习前言源码下载地址原理解析一、先验知识二、什么是inpaint三、Stable Diffusion中的inpaint1、开源的inpaint模型2、基于base模型inpaint 四、inpaint流程1、输入图片到隐空间的编码2、文本编码3、…

东芝低导通电阻N沟道MOSFET 为智能穿戴设备赋能

东芝低导通电阻N沟道MOSFET TPN6R303NC,LQ(S 为智能穿戴设备赋能 MOSFET也就是金属-氧化物半导体场效应晶体管&#xff0c;外形与普通晶体管差不多&#xff0c;但具有不同的控制特性&#xff0c;主要是通过充电和放电来切换或放大信号。 此次推出的用于智能穿戴的30V N沟道MO…

CMake的使用--以ORCA避碰C++库为例

1、安装cmake 链接&#xff1a;Download | CMake 版本需下载Binary distributions这个模块下的 Windows x64 Installer: cmake-3.27.1-windows-x86_64.msi 注意事项 1.1勾选为所有用户添加到PATH路径 Add CMake to the system PATH for all users 1.2安装路径建议直接在c…

Dueling Network

Dueling Network —— Dueling Network Architectures for Deep Reinforcement Learning 论文下载地址 论文介绍 图9. Dueling Network 模型结果示意图 Dueling Network与传统DQN的区别在于神经网络结构的不同&#xff0c;Dueling Netowrk在传统DQN的基础上只进行了微小的改动…

python 合并多个excel文件

使用 openpyxl 思路&#xff1a; 读取n个excel的文件&#xff0c;存储在一个二维数组中&#xff0c;注意需要转置。将二维数组的数据写入excel。 安装软件&#xff1a; pip install openpyxl源代码&#xff1a; import os import openpyxl # 将n个excel文件数据合并到一个…

jupyter lab环境配置

1.jupyterlab 使用虚拟环境 conda install ipykernelpython -m ipykernel install --user --name tf --display-name "tf" #例&#xff1a;环境名称tf2. jupyter lab kernel管理 show kernel list jupyter kernelspec listremove kernel jupyter kernelspec re…

微软研究院展示Project Rumi项目;参数高效微调(PEFT)

&#x1f989; AI新闻 &#x1f680; 微软研究院展示Project Rumi项目&#xff0c;通过多模态方法增强人工智能理解能力 摘要&#xff1a;微软研究院展示了Project Rumi项目&#xff0c;该项目通过结合文本、音频和视频数据&#xff0c;并采用多模态副语言提示的方法&#xf…

VL 模型 Open-Set Domain Adaptation with Visual-Language Foundation Models 论文阅读笔记

Open-Set Domain Adaptation with Visual-Language Foundation Models 论文阅读笔记 一、Abstract 写在前面 又是一周周末&#xff0c;在家的时间感觉过得很快呀。今天没得时间写博客&#xff0c;留下个标题&#xff0c;明天搞完。 论文地址&#xff1a;Open-Set Domain Adapta…

探索人工智能 | 计算机视觉 让计算机打开新灵之窗

前言 计算机视觉是一门研究如何使机器“看”的科学&#xff0c;更进一步的说&#xff0c;就是指用摄影机和电脑代替人眼对目标进行识别、跟踪和测量等机器视觉&#xff0c;并进一步做图形处理&#xff0c;使电脑处理成为更适合人眼观察或传送给仪器检测的图像。 文章目录 前言…

安全基础 --- https详解 + 数组(js)

CIA三属性&#xff1a;完整性&#xff08;Confidentiality&#xff09;、保密性&#xff08;Integrity&#xff09;、可用性&#xff08;Availability&#xff09;&#xff0c;也称信息安全三要素。 https 核心技术&#xff1a;用非对称加密传输对称加密的密钥&#xff0c;然后…

第一篇:一文看懂 Vue.js 3.0 的优化

我们的课程是要解读 Vue.js 框架的源码&#xff0c;所以在进入课程之前我们先来了解一下 Vue.js 框架演进的过程&#xff0c;也就是 Vue.js 3.0 主要做了哪些优化。 Vue.js 从 1.x 到 2.0 版本&#xff0c;最大的升级就是引入了虚拟 DOM 的概念&#xff0c;它为后续做服务端渲…

java+springboot+mysql员工工资管理系统

项目介绍&#xff1a; 使用javaspringbootmysql开发的员工工资管理系统&#xff0c;系统包含超级管理员&#xff0c;系统管理员、员工角色&#xff0c;功能如下&#xff1a; 超级管理员&#xff1a;管理员管理&#xff1b;部门管理&#xff1b;员工管理&#xff1b;奖惩管理&…

电脑技巧:七个非常神奇有趣的网站,值得收藏

目录 1、Airpano 2、AI创作家 3、The Useless Web 4、全球高清实况摄像头 5、MyFreeMP3 6、世界名画拼图 7、纪妖&#xff08;中国古今妖怪集&#xff09; 互联网是一个神奇的世界&#xff0c;存在着许多令人惊叹的网站&#xff0c;这里就给大家分享七个非常神奇有趣的网…

快速排序【Java算法】

文章目录 1. 概念2. 思路3. 代码实现 1. 概念 快速排序是一种比较高效的排序算法&#xff0c;采用 “分而治之” 的思想&#xff0c;通过多次比较和交换来实现排序&#xff0c;在一趟排序中把将要排序的数据分成两个独立的部分&#xff0c;对这两部分进行排序使得其中一部分所有…

接口测试—知识速查(Postman)

文章目录 接口测试1. 概念2. 原理3. 测试流程4. HTTP协议4.1 URL的介绍4.2 HTTP请求4.2.1 请求行4.2.2 请求头4.2.3 请求体4.2.4 完整的HTTP请求示例 4.3 HTTP响应4.3.1 状态行4.3.2 响应头4.3.3 响应体4.3.4 完整的HTTP请求示例 5. RESTful接口规范6. 测试用例的设计思路6.1 单…

基于Vue+wangeditor实现富文本编辑

目录 前言分析实现具体解决的问题有具体代码实现如下效果图总结前言 一个网站需要富文本编辑器功能的原因有很多,以下是一些常见的原因: 方便用户编辑内容:富文本编辑器提供了类似于Office Word的编辑功能,使得那些不太懂HTML的用户也能够方便地编辑网站内容。提高用户体验…

关于前端动态调试解密签名校验的分享

首先我们先来看一下&#xff0c;下面这张图是笔者近期测试遇到的问题&#xff0c;那就是程序每次生成请求都会生成signature的验签&#xff0c;该验签生成方式暂不可知&#xff0c;唯一知道的就是用一次就失效&#xff0c;这对测试的成本造成了很不好的影响&#xff0c;那么我们…

setmap使用

目录 set使用 set的模板参数 构造函数 成员函数 insert iterator ​编辑 find count pair pair 的模板参数 make_pair multiset使用 multiset 的模板参数 set 与 multiset 的区别 count map使用 map 的模板参数 构造函数 insert iterator find ​编辑 cou…