548、RocketMQ详细入门教程系列 -【消息队列之 RocketMQ (二)】 2023.02.28

news2025/1/15 22:03:49

目录

    • 一、Java 访问 RocketMQ 实例
      • 1.1 引入依赖
      • 1.2 消息生产者
      • 1.3 消息消费者
      • 1.4 启动 Name Server
      • 1.5 启动 Broker
      • 1.6 运行 Consumer
      • 1.7 运行 Producer
    • 二、参考链接

一、Java 访问 RocketMQ 实例

RocketMQ 目前支持 Java、C++、Go 三种语言访问,按惯例以 Java 语言为例看下如何用 RocketMQ 来收发消息的。

1.1 引入依赖

<dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.2.0</version>
  </dependency>

添加 RocketMQ 客户端访问支持,具体版本和安装的 RocketMQ 版本一致即可。

1.2 消息生产者

package org.study.mq.rocketMQ.java;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {

    public static void main(String[] args) throws Exception {
        //创建一个消息生产者,并设置一个消息生产者组
        DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");

        //指定 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");

        //初始化 Producer,整个应用生命周期内只需要初始化一次
        producer.start();

        for (int i = 0; i < 100; i++) {
            //创建一条消息对象,指定其主题、标签和消息内容
            Message msg = new Message(
                    "topic_example_java" /* 消息主题名 */,
                    "TagA" /* 消息标签 */,
                    ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息内容 */
            );

            //发送消息并返回结果
            SendResult sendResult = producer.send(msg);

            System.out.printf("%s%n", sendResult);
        }

        // 一旦生产者实例不再被使用则将其关闭,包括清理资源,关闭网络连接等
        producer.shutdown();
    }
}

示例中用 DefaultMQProducer 类来创建一个消息生产者,通常一个应用创建一个 DefaultMQProducer 对象,所以一般由应用来维护生产者对象,可以其设置为全局对象或者单例。该类构造函数入参 producerGroup 是消息生产者组的名字,无论生产者还是消费者都必须给出 GroupName ,并保证该名字的唯一性,ProducerGroup 发送普通的消息时作用不大,后面介绍分布式事务消息时会用到。
接下来指定 NameServer 地址和调用 start 方法初始化,在整个应用生命周期内只需要调用一次 start 方法。
初始化完成后,调用 send 方法发送消息,示例中只是简单的构造了100条同样的消息发送,其实一个 Producer 对象可以发送多个主题多个标签的消息,消息对象的标签可以为空。send 方法是同步调用,只要不抛异常就标识成功。
最后应用退出时调用 shutdown 方法清理资源、关闭网络连接,从服务器上注销自己,通常建议应用在 JBOSS、Tomcat 等容器的退出钩子里调用 shutdown 方法。

1.3 消息消费者

package org.study.mq.rocketMQ.java;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;

public class Consumer {

    public static void main(String[] args) throws Exception {
        //创建一个消息消费者,并设置一个消息消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
        //指定 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        //设置 Consumer 第一次启动时从队列头部开始消费还是队列尾部开始消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //订阅指定 Topic 下的所有消息
        consumer.subscribe("topic_example_java", "*");

        //注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                //默认 list 里只有一条消息,可以通过设置参数来批量接收消息
                if (list != null) {
                    for (MessageExt ext : list) {
                        try {
                            System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 消费者对象在使用之前必须要调用 start 初始化
        consumer.start();
        System.out.println("消息消费者已启动");
    }
}

示例中用 DefaultMQPushConsumer 类来创建一个消息消费者,通生产者一样一个应用一般创建一个 DefaultMQPushConsumer 对象,该对象一般由应用来维护,可以其设置为全局对象或者单例。该类构造函数入参 consumerGroup 是消息消费者组的名字,需要保证该名字的唯一性。
接下来指定 NameServer 地址和设置消费者应用程序第一次启动时从队列头部开始消费还是队列尾部开始消费。
接着调用 subscribe 方法给消费者对象订阅指定主题下的消息,该方法第一个参数是主题名,第二个擦书是标签名,示例表示订阅了主题名 topic_example_java 下所有标签的消息。
最主要的是注册消息监听器才能消费消息,示例中用的是 Consumer Push 的方式,即设置监听器回调的方式消费消息,默认监听回调方法中 List 里只有一条消息,可以通过设置参数来批量接收消息。
最后调用 start 方法初始化,在整个应用生命周期内只需要调用一次 start 方法。

1.4 启动 Name Server

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

RocketMQ 核心的四大组件中 Name Server 和 Broker 都是由 RocketMQ 安装包提供的,所以要启动这两个应用才能提供消息服务。首先启动 Name Server,先确保你的机器中已经安装了与 RocketMQ 相匹配的 JDK ,并设置了环境变量 JAVA_HOME ,然后在 RocketMQ 的安装目录下执行 bin 目录下的 mqnamesrv ,默认会将该命令的执行情况输出到当前目录的 nohup.out 文件,最后跟踪日志文件查看 Name Server 的实际运行情况。

1.5 启动 Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

同样也要确保你的机器中已经安装了与 RocketMQ 相匹配的 JDK ,并设置了环境变量 JAVA_HOME ,然后在 RocketMQ 的安装目录下执行 bin 目录下的 mqbroker ,默认会将该命令的执行情况输出到当前目录的 nohup.out 文件,最后跟踪日志文件查看 Broker 的实际运行情况。

1.6 运行 Consumer

先运行 Consumer 类,这样当生产者发送消息的时候能在消费者后端看到消息记录。配置没问题的话会看到在控制台打印出消息消费者已启动

1.7 运行 Producer

最后运行 Producer 类,在 Consumer 的控制台能看到接收的消息
在这里插入图片描述

二、参考链接

[01] 消息队列之 RocketMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/379183.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SQL数据库权限管理-10个数据库角色

为便于管理数据库中的权限&#xff0c;SQL 数据库提供了服务器角色、数据库角色、用户等来划分不同用户拥有的权限差异。今天给大家介绍数据库角色对应的权限。 数据库级角色 存在两种类型的数据库级角色&#xff1a; 数据库中预定义的“固定数据库角色”可以创建的“用户定…

CSO面对面丨中核华辉刘博:应对大型央国企数字化转型道路上必须攻克的安全难题

“极致”&#xff0c;一直是大型央国企网络安全工作建设追求的目标。随着我国数字化转型全面走深向实&#xff0c;网络安全风险、数据管理、层出不穷的网络攻击&#xff0c;为各领域大型央国企数字化转型带来了更多的挑战。如何充分发挥优势、携手各方构筑网络安全屏障、提升安…

Codeforces Round #854 by cybercats (Div. 1 + Div. 2)

A. Recent Actions给出n个格子&#xff0c;从上到下是1~n&#xff0c;其他的n1~。。。不在格子内。给出m个操作&#xff0c;若该操作的数字不在格子内&#xff0c;那就将它拿到格子的第一个位置&#xff0c;同时格子第n个位置的数被挤下去&#xff1b;若操作的数字在格子内&…

Java简单的生成/解析二维码(zxing qrcode)

Hi I’m Shendi Java简单的生成/解析二维码&#xff08;zxing qrcode&#xff09; 在之前使用 qrcode.js 方式生成二维码&#xff0c;但在不同设备上难免会有一些兼容问题&#xff0c;于是改为后端&#xff08;Java&#xff09;生成二维码图片 这里使用 Google 的 zxing包 Jar…

基于STM32的DHT11温湿度控制系统仿真设计

基于STM32的DHT11温湿度控制系统仿真设计(仿真程序报告讲解&#xff09;演示视频1.主要功能2.仿真3. 程序4. 设计报告1主控制器选择5.设计内容 百度云网盘下载链接仿真图proteus 8.9程序编译器&#xff1a;keil 5 编程语言&#xff1a;C语言 设计编号&#xff1a;C0076 演示…

Android Qcom Display学习(十三)

该系列文章总目录链接与各部分简介&#xff1a; Android Qcom Display学习(零) 在上一篇中dump GraphicBuffer中&#xff0c;知道了护眼模式中调用setColorTransform应用于每一层Layer&#xff0c;于是想往上了解一些&#xff0c;color是针对屏幕的&#xff0c;不是对单个Layer…

[YOLO] yolo博客笔记汇总(自用

pip下载速度太慢&#xff0c;国内镜像&#xff1a; 国内镜像解决pip下载太慢https://blog.csdn.net/weixin_51995286/article/details/113972534​​​​​​​ YOLO v2和V3 关于设置生成anchorbox&#xff0c;Boundingbox边框回归的过程详细解读 YOLO v2和V3 关于设置生成an…

Airbnb系列三《Managing Diversity in Airbnb Search》 搜索多样性

abstract 搜索系统中一个长期的问题是结果多样性。从产品角度讲&#xff0c;给用户多种多样的选择&#xff0c;有助于提升用户体验及业务指标。 多样性需求和模型的目标是相矛盾的&#xff0c;因为传统ctr模型是 point wise&#xff0c;只看单个相关性不管相邻之间item差异。 …

Jvisualvm监控Tomcat以及相关参数优化

Tomcat阻塞模式 阻塞模式&#xff08;BIO&#xff09; 客户端和服务器创建一个连接&#xff0c;它就会创建一个线程来处理这个连接&#xff0c;以为这客户端创建了几个连接&#xff0c;服务端就需要创建几个线程来处理你&#xff0c;导致线程会产生很多&#xff0c;有很多线程…

数学小课堂:无穷小(平均速度和瞬间速度的关系)

文章目录 引言I 速度1.1 平均速度1.2 瞬间速度(某一时刻特定的速度)1.3 解释飞箭是静止的悖论II 导数2.1 概念2.2 导数的现实影响2.3 微积分的意义III 无穷小3.1 贝克莱挑战牛顿(无穷小悖论)3.2 无穷小的定义引言 柯西和魏尔斯特拉斯给出的无穷小的定义: 它不是零;它的绝对…

vue2+element封装rules, 支持json多层级

一、封装介绍 封装前景&#xff1a;表单内容多、表单类型重复且校验项较多 下面就参考element的例子写个实例 element地址&#xff1a;https://element.eleme.cn/2.15/#/zh-CN/component/form 实现效果如下: 今天给大家写三种表单校验实现方式 普通表单实现、正常定义rules…

【svg】引入svg(非图标)

这里写目录标题直接插入页面—— 有各层svg内容并可赋值属性css 背景图 ——不可更改各层svg属性创建标签&#xff08;动态添加&#xff09;——可改属性但是还不如直接插入不常用&#xff08;没弄明白&#xff09;目的&#xff1a;如果直接以图片的方式引用svg 不能改变内层sv…

【C++基础入门】初识C++、数据类型

一&#xff1a;C简介 1.1 介绍 C&#xff08;c plus plus&#xff09;是一种计算机高级程序设计语言&#xff0c;由C语言扩展升级而产生 [17] &#xff0c;最早于1979年由本贾尼斯特劳斯特卢普在AT&T贝尔工作室研发。C既可以进行C语言的过程化程序设计&#xff0c;又可以…

深度学习 <实战Kaggle比赛:预测房价> 代码分析 跟李沐学AI

4.10. 实战Kaggle比赛&#xff1a;预测房价 — 动手学深度学习 2.0.0 documentation 若有错误请指出 一.数据处理部分 1.下载部分 没啥好说的 import hashlib import os import tarfile import zipfile import requests#save DATA_HUB dict() DATA_URL http://d2l-data.…

2-8 SpringCloud快速开发入门: Eureka 注册中心高可用集群搭建

Eureka 注册中心高可用集群搭建 Eureka 注册中心高可用集群就是各个注册中心相互注册 Eureka Server的高可用实际上就是将自己作为服务向其他服务注册中心注册自己&#xff0c;这样就会形成一组互相注册的服务注册中心&#xff0c;进而实现服务清单的互相同步&#xff0c;往注…

【博学谷学习记录】超强总结,用心分享丨人工智能 机器学习 逻辑回归模型遗漏知识点总结

目录激活函数逻辑回归的优缺点总结LR可以进行多分类吗&#xff1f;激活函数 h(w)表示输入的线性方程 逻辑回归的优缺点总结 优点&#xff1a; 形式简单&#xff0c;模型的可解释性非常好。从特征的权重可以看到不同的特征对最后结果的影响&#xff0c;某个特征的权重值比较高…

Docker搭建redis-cluster集群

以下是搭建redis-cluster集群&#xff0c;该集群是redis3.0引进了的&#xff0c;该集群比redis-sentinel哨兵架构有以下优点 可以配置多主多从&#xff0c;在redis设置内存可以更大&#xff0c;而哨兵只能配置一主多从&#xff0c;且单个主节点内存不宜设置过大&#xff0c;否…

操作系统内核与安全分析课程笔记【0】环境搭建

本学期选择了游伟和黄建军老师的操作系统内核分析与安全&#xff0c;目前已经试听了第一节课。这门的授课老师建了一个网页用于收录本次课程的幻灯片材料&#xff0c;录屏材料&#xff0c;以及软件安装包等一系列课程用得到的材料。对于学生而言&#xff0c;这是一门既硬核能够…

【机器学习】机器学习实验三:集成算法1(详细代码展示)

文章目录一、实验介绍1.1 简单介绍1.2 Breast Cancer 数据实验1.3 Boston 数据实验二、项目地址三、算法结果展示一、实验介绍 1.1 简单介绍 AdaBoost 和 Random Forest 算法的原理 1.2 Breast Cancer 数据实验 对 Breast Cancer 数据进行探索性数据分析&#xff1b; 数据预…

VMware虚拟机安装ubuntu系统在虚拟机中全屏以及主机和虚拟机之间文件的复制

一、从Wndows复制文字到VMware&#xff08;Linux&#xff09; List item 今天需要用到了Linux操作系统。在VMware装上linux之后经常面临这样一个问题&#xff0c;那就是很多指令很长&#xff0c;逐字去敲显然费时费力。 按照惯例我也查了几种方法&#xff0c;然而要么就是需要…