RocketMQ 消息发送、消息类别

news2025/1/14 18:41:27

一、消息发送

1.1 单生产者单消费者消息发送(OneToOne)

1、新建maven项目recketmqtest

2、导入RocketMQ客户端坐标

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

3、生产者

package com.liming.base;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * @author 黎明
 * @version 1.0
 * 生产者
 * @date 2023/5/21 9:08
 */
public class Producer {
    public static void main(String[] args) throws Exception {
        /**
         1. 谁来发?
         2. 发给谁?
         3. 怎么发?
         4. 发什么?
         5. 发的结果是什么?
         6. 打扫战场
         **/

        // 1、创建一个发送消息的对象Producer
        DefaultMQProducer producer = new DefaultMQProducer("group1");
        // 2、设定发送的命名服务器地址
        producer.setNamesrvAddr("localhost:9876");
        // 3.1 启动发送的服务
        producer.start();
        // 4、创建要发送的消息对象
        Message message = new Message("topic1", "tag1","hello recketmq".getBytes());
        // 3.2 发送消息
        SendResult sendResult = producer.send(message);
        System.out.println("返回结果:" + sendResult);
        // 5、关闭连接
        producer.shutdown();
    }
}

4、消费者

package com.liming.base;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @author 黎明
 * @version 1.0
 * 消费者
 * @date 2023/5/21 9:17
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("localhost:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意
        consumer.subscribe("topic1", "*");
        //3.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //遍历消息
                for (MessageExt msg : list) {
                    System.out.println("收到的消息:" + msg);
                    byte[] body = msg.getBody();
                    System.out.println(new String(body));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();
        System.out.println("接收消息服务已经开启!");
        //5 不要关闭消费者!
    }
}

1.2 单生产者多消费者消息发送(OneToMany)

生产者

//1.创建一个发送消息的对象Producer
 DefaultMQProducer producer = new DefaultMQProducer("group1");
 //2.设定发送的命名服务器地址
 producer.setNamesrvAddr("localhost:9876");
 //3.1启动发送的服务
 producer.start();
 for (int i = 0; i < 10; i++) {
     //4.创建要发送的消息对象,指定topic,指定内容body
     Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes();
     //3.2发送消息
     SendResult result = producer.send(msg);
     System.out.println("返回结果:" + result);
 }
 //5.关闭连接
 producer.shutdown();

消费者(负载均衡模式:默认模式)

 //1.创建一个接收消息的对象Consumer
 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
 //2.设定接收的命名服务器地址
 consumer.setNamesrvAddr("localhost:9876");
 //3.设置接收消息对应的topic,对应的sub标签为任意
 consumer.subscribe("topic1","*");
 //设置当前消费者的消费模式(默认模式:负载均衡)
 consumer.setMessageModel(MessageModel.CLUSTERING);
 //3.开启监听,用于接收消息
 consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         //遍历消息
         for (MessageExt msg : list) {
             System.out.println("收到消息:"+msg);
             System.out.println("消息是:"+new String(msg.getBody()));
         }
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });
 //4.启动接收消息的服务
 consumer.start();
 System.out.println("接受消息服务已经开启!");

 //5 不要关闭消费者!

消费者(广播模式)

//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("localhost:9876");
//3.设置接收消息对应的topic,对应的sub标签为任意
consumer.subscribe("topic1","*");
//设置当前消费者的消费模式(默认模式:负载均衡)
//consumer.setMessageModel(MessageModel.CLUSTERING);
//设置当前消费者的消费模式(广播模式)
consumer.setMessageModel(MessageModel.BROADCASTING);
//3.开启监听,用于接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        //遍历消息
        for (MessageExt msg : list) {
            System.out.println("收到消息:"+msg);
            System.out.println("消息是:"+new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
//4.启动接收消息的服务
consumer.start();
System.out.println("接受消息服务已经开启!");

//5 不要关闭消费者!

1.3 多生产者多消费者消息发送(ManyToMany)

多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

二、消息类别

2.1 同步消息

特征:即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
在这里插入图片描述

代码实现(生产者中):

SendResult result = producer.send(msg);

2.2 异步消息

特征:即时性较弱,但需要有回执的消息,例如订单中的某些信息
在这里插入图片描述

代码实现(生产者中):

//1.创建一个发送消息的对象Producer
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.设定发送的命名服务器地址
producer.setNamesrvAddr("localhost:9876");
//3.1启动发送的服务
producer.start();
for (int i = 0; i < 10; i++) {
    //4.创建要发送的消息对象,指定topic,指定内容body
    Message msg = new Message("topic1", ("hello rocketmq"+i).getBytes("UTF-8"));
    //3.2 同步消息
    //SendResult result = producer.send(msg);
    //System.out.println("返回结果:" + result);

    //异步消息
    producer.send(msg, new SendCallback() {
        //表示成功返回结果
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println(sendResult);
        }
        //表示发送消息失败
        @Override
        public void onException(Throwable throwable) {
            System.out.println(throwable);
        }
    });
    
    System.out.println("消息"+i+"发完了,做业务逻辑去了!");
}
//休眠10秒
TimeUnit.SECONDS.sleep(10);
//5.关闭连接
producer.shutdown();

2.3 单向消息

特征:不需要有回执的消息,例如日志类消息
在这里插入图片描述

代码实现(生产者中):

producer.sendOneway(msg);

2.4 延时消息

消息发送时并不直接发送到消息服务器,而是根据设定的等待时间到达,起到延时到达的缓冲作用

Message msg = new Message("topic3",("延时消息:hello rocketmq "+i).getBytes("UTF-8"));
//设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
msg.setDelayTimeLevel(3);
SendResult result = producer.send(msg);
System.out.println("返回结果:"+result);

目前支持的消息时间:

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

2.5 批量消息

批量发送消息能显著提高传递小消息的性能.

发送批量消息:

List<Message> msgList = new ArrayList<Message>();
Message msg1 = new Message("topic1", ("hello rocketmq1").getBytes("UTF-8"));
Message msg2 = new Message("topic1", ("hello rocketmq2").getBytes("UTF-8"));
Message msg3 = new Message("topic1", ("hello rocketmq3").getBytes("UTF-8"));

msgList.add(msg1);
msgList.add(msg2);
msgList.add(msg3);


SendResult result = producer.send(msgList);

注意限制:

  • 这些批量消息应该有相同的topic

  • 相同的waitStoreMsgOK

  • 不能是延时消息

  • 消息内容总长度不超过4M

三、消息过滤

3.1 分类过滤

按照tag过滤信息

生产者:

Message msg = new Message("topic6","tag2",("消息过滤按照tag:hello rocketmq 2").getBytes("UTF-8"));

消费者:

//接收消息的时候,除了制定topic,还可以指定接收的tag,*代表任意tag
consumer.subscribe("topic6","tag1 || tag2");

3.2 语法过滤(属性过滤/语法过滤/SQL过滤)

基本语法:

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:‘abc’,必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUE 或 FALSE

生产者:

//为消息添加属性
msg.putUserProperty("vip","1");
msg.putUserProperty("age","20");

消费者:

//使用消息选择器来过滤对应的属性,语法格式为类SQL语法
consumer.subscribe("topic7", MessageSelector.bySql("age >= 18"));
consumer.subscribe("topic6", MessageSelector.bySql("name = 'litiedan'"));

注意:SQL过滤需要依赖服务器的功能支持,在broker.conf配置文件中添加对应的功能项,并开启对应功能

enablePropertyFilter=true

重启broker

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

或者直接cmd中输入(D:\software\rocketmq-all-4.8.0-bin-release\bin)

mqadmin.cmd updateBrokerConfig -blocalhost:10911 -kenablePropertyFilter -vtrue

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/551392.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

chatgpt赋能Python-pythonbug

Python Bug: 了解并避免Python编程中的错误 在编程中遇到错误是件非常常见的事情&#xff0c;Python编程也不例外。在Python中&#xff0c;被称为“bug”的错误主要分为两种&#xff0c;编译错误和运行时错误。本文将向您介绍如何识别、调试和避免Python编程中的错误&#xff…

Windi CSS 原子css 下一代工类 CSS 框架

最近由于项目原因接触到了windi Css 发现这个东西真是绝绝子啊,不用在代码里写一行style,完全以类的形式去写样式,它里面包含了几乎所有的css样式&#xff0c;可以让我们不需要再去繁琐的写css样式&#xff0c;原来几行的css现在只需要短短的几个字符。他的许多新特性给我们带来…

《算法竞赛进阶指南》(持续更新ing)

算法竞赛进阶指南 位运算 AcWing 89. a^b #include<iostream> using namespace std;int main(void) {long long a,b,p;cin>>a>>b>>p;long long ans1%p;while(b){if(b&1)//判断b当前二进制位是否为1{ansans*a%p;}aa*a%p;//每跨越一个二进制位&…

用WaveNet预测(Adapted Google WaveNet-Time Series Forecasting)

目录 剧情简介: 数据来源 加载数据 分割数据和可视化 时间序列的多元波网模型:实现(多步预测) 创建模型 创建数据集 数据准备 1- Training dataset preparation 2- Validation dataset preparation Train the Model with TPU: 使用经过训练的适应Google WaveNet预测…

【多线程】| 基本知识汇总

目录 &#x1f981; 掌握基本概念1. 什么是线程&#xff1f;2. 什么是主线程以及子线程&#xff1f;3. 什么是串行&#xff1f;什么是并行&#xff1f; 什么是并发? &#x1f981; 线程的创建1. 通过继承Thread类实现多线程2. 通过Runnable接口实现多线程 &#x1f981; 线程执…

AI故事:智慧学校的人脸识别奇幻之旅

人脸识别 在一个名为智慧学校的小镇上&#xff0c;生物老师Rita和她的丈夫朝哥&#xff0c;一个富有创造力的艺术家&#xff0c;过着幸福美满的生活。他们的家庭与学校紧密相连&#xff0c;成为了一座小小的教育乐园。 智慧学校里有一群充满朝气的学生&#xff0c;其中小枣是…

自定义属性,v-bind computed的使用

0.0 自定义组件的使用 【掌握】 先自定义自己的组件 引入组件 import 组件名 from 路径/文件名 注册组件 <script> export default {components:{ // 组件注册组件名:组件名&#xff0c;组件名1},data(){ // 数据return {}},methods:{ // 方法} ​ } ​ </script&…

buu [AFCTF2018]MyOwnCBC 1

题目描述&#xff1a; 三份文件 #!/usr/bin/python2.7 # -*- coding: utf-8 -*-from Crypto.Cipher import AES from Crypto.Random import random from Crypto.Util.number import long_to_bytesdef MyOwnCBC(key, plain):if len(key)!32:return "error!"cipher_t…

lwIP更新记03:IPv6

从 lwIP-2.0.0 开始&#xff0c;lwIP 终于有可用的 IPv6 协议栈了&#xff01;IPv6 支持 双栈&#xff08;IPv4 和 IPv6 同时使用&#xff09; 或 IPv4/IPv6 二选一 模式。 lwIP-1.4.1 版本也有 IPv6&#xff0c;但那是实验性质的&#xff08;见…\lwip-1.4.1\src\core\ipv6目…

linux专题:嵌入式linux系统启动流程基础分析

目录 第一&#xff1a;linux内核源码基本简介 第二&#xff1a;uboot启动分析 第三&#xff1a;内核源码分析 第一&#xff1a;linux内核源码基本简介 下载 Linux 内核网址&#xff1a; https://www.kernel.org/ 最新 Linux 内核是 5.15 版本。现在常用 Linux 内核源码为4…

八大排序-直接插入排序、希尔排序、直接选择排序、冒泡排序、堆排序、快速排序、归并排序、基数排序

目录 前言 直接插入排序&#xff08;Insertion Sort&#xff09; 一、概念及其介绍 二、过程图示 三、代码 四、复杂度 希尔排序&#xff08;Shell Sort&#xff09; 一、概念 二、实现思路 三、图示过程 四、代码 4.1代码 4.2运行结果 4.3解释 五、复杂度 堆排…

路径规划算法:基于蝙蝠算法的路径规划算法- 附代码

路径规划算法&#xff1a;基于蝙蝠的路径规划算法- 附代码 文章目录 路径规划算法&#xff1a;基于蝙蝠的路径规划算法- 附代码1.算法原理1.1 环境设定1.2 约束条件1.3 适应度函数 2.算法结果3.MATLAB代码4.参考文献 摘要&#xff1a;本文主要介绍利用智能优化算法蝙蝠算法来进…

Swift 如何闪电般异步读取大文件?

功能需求 Apple 系统中&#xff08;iOS、MacOS、WatchOS等等&#xff09;读取文件是一个平常的不能再平常的需求&#xff0c;不过当文件很大时&#xff0c;同步读取文件会导致 UI 的挂起&#xff0c;这是不能让用户接受的。 所以&#xff0c;要想读取文件内容的同时保持界面操…

KMP算法及其改进图文详解

文章目录 KMP算法详解什么是KMP算法KMP算法的应用场景KMP算法和暴力求解的比较字符串的前缀、后缀和最长相等前后缀KMP算法实现字符串匹配的具体过程&#xff08;图解&#xff09;从串与主串的下标变化j回退的位置(从串的下标变化)主串的下标变化 Next数组如何运用代码逻辑计算…

[CTF/网络安全] 攻防世界 xff_referer 解题详析

[CTF/网络安全] 攻防世界 xff_referer 解题详析 XFF及refererXFF格式referer格式姿势总结 题目描述&#xff1a;X老师告诉小宁其实xff和referer是可以伪造的。 XFF及referer X-Forwarded-For&#xff08;简称 XFF&#xff09;是一个 HTTP 请求头部字段&#xff0c;它用于表示 …

深入理解计算机系统第七章知识点总结

文章目录 详解ELF文件-> main.o前十六个字节的含义推测elf的大小查看节头部表推断每个section在elf中的具体位置查看.text的内容查看.data的内容关于.bss查看.rodata的内容关于其他的节表示的信息 详解符号表符号编译器如何解析多重定义的全局符号静态库与静态链接构造和使用…

seata的部署和集成

seata的部署和集成 一、部署Seata的tc-server 1.下载 首先我们要下载seata-server包&#xff0c;地址在http://seata.io/zh-cn/blog/download.html 2.解压 在非中文目录解压缩这个zip包&#xff0c;其目录结构如下&#xff1a; 3.修改配置 修改conf目录下的registry.conf文…

开源大模型资料总结

基本只关注开源大模型资料&#xff0c;非开源就不关注了&#xff0c;意义也不大。 基座大模型&#xff1a; LLaMA&#xff1a;7/13/33/65B&#xff0c;1.4T token LLaMA及其子孙模型概述 - 知乎 GLM&#xff1a;6/130B&#xff0c; ChatGLM基座&#xff1a;GLM&#xff08…

【网络】- TCP/IP四层(五层)协议 - 网际层(网络层) - 网际协议IP

目录 一、概述 二、初步了解网际协议 IP  &#x1f449;2.1 与数据链路层的区别  &#x1f449;2.2 网际协议 IP 概览  &#x1f449;2.3 分层的意义 三、IP协议基础知识  &#x1f449;3.1 IP地址属于网络层地址  &#x1f449;3.2 路由控制  &#x1f449;3.3 IP分包与…

solr快速上手:核心概念及solr-admin界面介绍(二)

0. 引言 上一节&#xff0c;我们简单介绍了solr并演示了单节点solr的安装流程&#xff0c;本章&#xff0c;我们继续讲解solr的核心概念 solr快速上手&#xff1a;solr简介及安装&#xff08;一&#xff09; 1. 核心概念 核心&#xff08;索引/表&#xff09; 在es中有索引…