RocketMQ5.0--部署与实例

news2024/9/20 22:37:49

RocketMQ5.0–部署与实例

一、Idea调试

1.相关配置文件

在E:\rocketmq创建conf、logs、store三个文件夹。从RocketMQ distribution部署目录中将broker.conf、logback_namesrv.xml、logback_broker.xml文件复制到conf目录。如下图所示。
在这里插入图片描述
其中logback_namesrv.xml、logback_broker.xml分别是NameServer、Broker的日志配置文件,修改打印日志文件路径即可。broker.conf文件是Broker启动时的加载配置文件,如下代码所示。

注意:NameServer启动端口默认是9876,Broker启动端口默认10911。

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
 
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
 
namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.156.245
 
# 存储路径
storePathRootDir=E:\\rocketmq\\store
#commitLog 存储路径
storePathCommitLog=E:\\rocketmq\\store\\commitlog
# 消费队列存储路径
storePathConsumeQueue=E:\\rocketmq\\store\\consumequeue
# 消息索引|存储路径
storePathindex=E:\\rocketmq\\store\\index
#checkpoint 文件存储路径
storeCheckpoint=E:\\rocketmq\\store\\checkpoint
#abort 文件存储路径
abortFile=E:\\rocketmq\\store\\abort

2.启动NameServer

org.apache.rocketmq.namesrv.NamesrvStartup启动类配置环境变量ROCKETMQ_HOME,值是配置主目录“E:\rocketmq”,如下图所示。
在这里插入图片描述
出现“The Name Server boot success. serializeType=JSON”时,则NameServer启动成功。
在这里插入图片描述
3.启动Broker

org.apache.rocketmq.broker.BrokerStartup启动类配置环境变量ROCKETMQ_HOME,值是配置主目录“E:\rocketmq”;配置启动参数:-c E:\rocketmq\conf\broker.conf。如下图所示。
在这里插入图片描述
出现“The broker[broker-a, 192.168.156.245:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876”时,则Broker启动成功。
在这里插入图片描述
二、Linux部署

1.执行Maven命令

mvn -Prelease-all -DskipTests clean install -U

查看打包的可部署文件路径:.\distribution\target\rocketmq-5.0.0\rocketmq-5.0.0,如下图所示。
在这里插入图片描述
2.复制rocketmq-5.0.0
在这里插入图片描述
3.启动NameServer

nohup sh bin/mqnamesrv &

可能出现问题,如下图所示。原因是Windows系统下打包,换行符出现问题。解决:notepad++编辑器在Windows环境下将文本转换为Unix格式,步骤为:用Notepad++打开脚本 >> 编辑 >> 档案格式转换 >> 选择转换为UNIX格式。
在这里插入图片描述
4.启动Broker

nohup sh bin/mqbroker -n 192.168.1.55:9876 -c /home/rocketmq-5.0.0/conf/broker.conf &

启动命令中,-n是指定NameServer地址,-c 是broker的配置文件。
在这里插入图片描述
5.关闭NameServer、Broker命令

关闭NameServer:sh bin/mqshutdown namesrv

关闭Broker:sh bin/mqshutdown broker

三、事务消息实例

1.事务消息监听类

实现org.apache.rocketmq.client.producer.TransactionListener,该类有两个方法:

  • executeLocalTransaction():保存本地事务中间表,用于Broker回查事务状态
  • checkLocalTransaction():Broker定时回查事务状态,根据事务状态提交或回滚事务消息
package com.common.instance.demo.config.rocketmq;
 
import com.common.instance.demo.entity.TMessageTransaction;
import com.common.instance.demo.service.TMessageTransactionService;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
 
/**
 * @description 订单事务消息监听器实现类
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:44
 **/
@Component
public class OrderTransactionListenerImpl implements TransactionListener {
 
    @Resource
    private TMessageTransactionService tMessageTransactionService;
 
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
        // 组装事务
        TMessageTransaction tMessageTransaction = packageTMessageTransaction(message);
 
        // 保存事务中间表
        tMessageTransactionService.insert(tMessageTransaction);
 
        // 推荐返回UNKNOW状态,待事务状态回查
        return LocalTransactionState.UNKNOW;
    }
 
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 获取用户属性tabId
        String tabId = messageExt.getUserProperty("tabId");
 
        // 查询事务消息
        List<TMessageTransaction> tMessageTransactions = tMessageTransactionService.queryByTabId(tabId);
 
        if (!tMessageTransactions.isEmpty() && tMessageTransactions.size() <= 6) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
 
        LogUtil.error("orderTransaction rollBack, tabId: " + tabId);
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
 
    // 组装事务
    private TMessageTransaction packageTMessageTransaction(Message message) {
        TMessageTransaction tMessageTransaction = new TMessageTransaction();
 
        // 获取用户属性tabId
        String tabId = message.getUserProperty("tabId");
        // 事务ID
        String transactionId = message.getTransactionId();
        tMessageTransaction.setTabId(tabId);
        tMessageTransaction.setTransactionId(transactionId);
        tMessageTransaction.setCreateBy("auto");
        tMessageTransaction.setCreateTime(new Date());
 
        return tMessageTransaction;
    }
 
}

2. 事务消息生产者

package com.common.instance.demo.config.rocketmq;
 
import com.alibaba.fastjson.JSON;
import com.common.instance.demo.entity.WcPendantTab;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
 
/**
 * @description 订单事务消息生产者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:54
 **/
@Component
public class OrderTransactionProducer {
 
    @Resource
    private OrderProducerProperties orderProducerProperties;
 
    @Resource
    private OrderTransactionListenerImpl orderTransactionListener;
 
    private TransactionMQProducer orderTransactionMQProducer;
 
    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order transactionProducer");
            orderTransactionMQProducer = new TransactionMQProducer(orderProducerProperties.getProducerGroup());
            orderTransactionMQProducer.setNamesrvAddr(orderProducerProperties.getNameSrcAddr());
            orderTransactionMQProducer.setSendMsgTimeout(orderProducerProperties.getSendMsgTimeout());
            // 注册事务监听器
            orderTransactionMQProducer.setTransactionListener(orderTransactionListener);
            orderTransactionMQProducer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderTransactionProducer.start()", "start rocketmq failed!", e);
        }
    }
 
    public void sendTransactionMessage(WcPendantTab data) {
        sendTransactionMessage(data, orderProducerProperties.getTopic(), orderProducerProperties.getTag(), null);
    }
 
    public void sendTransactionMessage(WcPendantTab data, String topic, String tags, String keys) {
        try {
            // 消息内容
            byte[] msgBody = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
            // 消息对象
            Message message = new Message(topic, tags, keys, msgBody);
            message.putUserProperty("tabId", data.getTabId());
 
            // 发送事务消息
            orderTransactionMQProducer.sendMessageInTransaction(message, null);
        } catch (Exception e) {
            LogUtil.error("OrderTransactionProducer.sendMessage()","send order rocketmq error", e);
        }
    }
 
    @PreDestroy
    public void stop() {
        if (orderTransactionMQProducer != null) {
            orderTransactionMQProducer.shutdown();
        }
    }
 
}

3. 事务消息消费者

package com.common.instance.demo.config.rocketmq;
 
import com.log.util.LogUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
 
/**
 * @description 订单消费者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 14:29
 **/
@Component
public class OrderConsumer implements MessageListenerConcurrently {
 
    @Resource
    private OrderConsumerProperties orderConsumerProperties;
 
    private DefaultMQPushConsumer orderMQConsumer;
 
    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order consumer");
            orderMQConsumer = new DefaultMQPushConsumer(orderConsumerProperties.getConsumerGroup());
            orderMQConsumer.setNamesrvAddr(orderConsumerProperties.getNameSrcAddr());
            orderMQConsumer.subscribe(orderConsumerProperties.getTopic(), orderConsumerProperties.getTag() == null ? "*":orderConsumerProperties.getTag());
            orderMQConsumer.setConsumeFromWhere(ConsumeFromWhere.valueOf(orderConsumerProperties.getConsumeFromWhere()));
            orderMQConsumer.registerMessageListener(this); // 注册监听器
            orderMQConsumer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderProducer.start()", "start rocketmq failed!", e);
        }
    }
 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        int index = 0;
        try {
            for (; index < msgs.size(); index++) {
                // 完整消息
                MessageExt msg = msgs.get(index);
                // 消息内容
                String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8);
 
                LogUtil.info("消费组消息内容:" + messageBody);
            }
        } catch (Exception e) {
            LogUtil.error("OrderConsumer.consumeMessage()", "consume order rocketmq error", e);
        } finally {
            if (index < msgs.size()) {
                // 消费应答
                context.setAckIndex(index + 1);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
 
    @PreDestroy
    public void stop() {
        if (orderMQConsumer != null) {
            orderMQConsumer.shutdown();
        }
    }
 
}

四、参考资料
https://www.cnblogs.com/qdhxhz/p/11094624.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/729624.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2.2.cuda驱动API-初始化和检查的理解,CUDA错误检查习惯

目录 前言1. cuInit-驱动初始化2. 返回值检查总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程&#xff0c;之前有看过一遍&#xff0c;但是没有做笔记&#xff0c;很多东西也忘了。这次重新撸一遍&#xff0c;顺便记记笔记 本次课程学习精简 CUDA 教程-Driver API 案例…

氢燃料电池汽车储氢技术及其发展现状

摘要&#xff1a; 氢能的发展可有效地解决经济发展和生态环境间日益增长的矛盾。氢燃料汽车将处于氢能产业体系中核心地位&#xff0c;加快对氢燃料电池车的技术研发&#xff0c;大范围提高氢能源利用率&#xff0c;对于全世界形成以低碳排放为特征的工业体系具有重要意义。在…

【数据库】忘记mysql本地密码

目录 说明 操作步骤操作失败解决1.在以上操作步骤的第四步&#xff0c;输入mysql&#xff0c;报错第一种报错解决办法如下 第二种报错解决办法如下 2.从上面操作第二步后重新操作步骤如下报错解决办法如下 参考链接 说明 太久没使用本地mysql数据库&#xff0c;忘记了密码。 …

禅意工作-诗意生活

“禅意工作&#xff0c;诗意生活”能做到这两点&#xff0c;非常非常非常难。 AI的解释&#xff1a; “禅意工作&#xff0c;诗意生活”是一种追求内心平和与幸福的生活方式&#xff0c;它将工作与生活相结合&#xff0c;达到一种和谐的状态。以下是一些关于如何实现“禅意工…

GitHub快速上手--GitHub高效操作教程

一、前言 如果你正在看我的这篇文章&#xff0c;说明你已经对GitHub有了一些基础的了解&#xff0c;下面我们将详细叙述每一步的操作&#xff0c;以保证你能够快速上手GitHub&#xff0c;完成对代码的管理。 二、创建仓库 登录GitHub账号&#xff0c;点击页面右上角的加号&am…

flutter聊天界面-自定义表情键盘实现

flutter聊天界面-自定义表情键盘实现 flutter 是 Google推出并开源的移动应用开发框架&#xff0c;主打跨平台、高保真、高性能。开发者可以通过 Dart语言开发 App&#xff0c;一套代码同时运行在 iOS 和 Android平台。 flutter开发基础腾讯IM的聊天应用&#xff0c;使用的是t…

PADS Layout中显示与布线标签页参数设置

1.“显示”标签页如图1 所示&#xff1a; 图1 显示标签页 显示标签页是用于去设置网络名以及管脚编号的字体大小的设置&#xff0c;建议是可以采取默认设置的&#xff0c;如果自己设计有另外要求&#xff0c;也是可以去进行设置。 2.“布线”标签也有三个子标签&#xff0c;首先…

基于matlab使用两个图像估计校准相机的姿势(附源码)

一、前言 运动结构 &#xff08;SfM&#xff09; 是从一组 3-D 图像估计场景的 2-D 结构的过程。此示例演示如何从两个图像估计校准相机的姿势&#xff0c;将场景的三维结构重建为未知比例因子&#xff0c;然后通过检测已知大小的对象来恢复实际比例因子。 此示例演示如何从使…

2.标识符、关键字、保留字

1、标识符 标识符&#xff1a;就是指开发人员为变量、属性、函数、参数取的名字 注意&#xff1a;标识符不能是关键字或保留字 JavaScript标识符 在JavaScript中&#xff0c;标识符&#xff08;Identifier&#xff09;是用于标识变量、函数、对象、属性或其他编程元素的名称。…

如何实现CesiumJS的视效升级?

CesiumJS作为一款强大的地理可视化引擎&#xff0c;为我们提供了丰富的地球数据可视化和交互展示的能力。然而&#xff0c;随着用户需求的不断增加和技术的不断进步&#xff0c;如何进一步提升CesiumJS的视觉效果成为了一个重要的问题。 首先&#xff0c;为了实现CesiumJS视觉…

Docker(二)之容器技术所涉及Linux内核关键技术

容器技术所涉及Linux内核关键技术 一、容器技术前世今生 1.1 1979年 — chroot 容器技术的概念可以追溯到1979年的UNIX chroot。它是一套“UNIX操作系统”系统&#xff0c;旨在将其root目录及其它子目录变更至文件系统内的新位置&#xff0c;且只接受特定进程的访问。这项功…

国内几款常用热门音频功放芯片-低功耗、高保真

音频功放芯片&#xff0c;又称为音频功率放大器芯片&#xff0c;是指一种将音频信号转换成线性的输出功率的集成电路芯片&#xff0c;在音频功放领域中一类是传统意义上的模拟功放&#xff1b;另一类是数字功放&#xff0c;它们都可以实现模拟信号到数字信号的转换。 随着智能…

[Java基础] StringBuffer 和 StringBuilder 类应用及源码分析

系列文章目录 [Java基础] StringBuffer 和 StringBuilder 类应用及源码分析 [Java基础] 数组应用及源码分析 [Java基础] String&#xff0c;分析内存地址&#xff0c;源码 文章目录 系列文章目录前言1、特性1.1、操作StringBuffer不会生成新的对象1.2、对比操作String会生成新…

el-tab-pane 和el-tooltip及el-tree 组合使用

<el-tabs v-model"groupId" tab-click"handleClick"><el-tab-pane label"全部" name"0"></el-tab-pane><el-tab-pane v-for"items in editableTabs" :key"items.group_id" :name"item…

Rust环境配置

专栏简介&#xff1a;本专栏作为Rust语言的入门级的文章&#xff0c;目的是为了分享关于Rust语言的编程技巧和知识。对于Rust语言&#xff0c;虽然历史没有C、和python历史悠远&#xff0c;但是它的优点可以说是非常的多&#xff0c;既继承了C运行速度&#xff0c;还拥有了Java…

动态规划之 70爬楼梯(第2道)

题目&#xff1a; 假设你正在爬楼梯。需要 n 阶你才能到达楼顶。 每次你可以爬 1 或 2 个台阶。你有多少种不同的方法可以爬到楼顶呢&#xff1f; 示例&#xff1a; 解法&#xff1a; class Solution { public:int climbStairs(int n) {vector<int> dp(n1);//n1个数…

IDEA+SpringBoot+mybatis+SSM+layui+Mysql学生学籍管理系统

IDEASpringBootmybatisSSMlayuiMysql学生学籍管理系统 一、系统介绍1.环境配置 二、系统展示1. 管理员登录2.专业管理3.班级管理4.学生管理5.老师管理6.公告管理7.课程管理8.开课管理9.用户管理 三、部分代码UserDao.javaUserController.javaUser.java 四、其他获取源码 一、系…

WEB中表单案例

一、题目&#xff1a;书写如下图的web前端 二、解题代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"…

Webstorm+Nodejs+webpack+vue-cli+Git搭建vue环境

此笔记归纳整理webstorm搭建vue项目&#xff0c;仅作记录使用。 一、安装Webstorm1、双击运行安装包2、设置安装路径3、按需选择4、安装5、运行6、激活 二、安装node.js1、双击运行安装包2、设置安装路径3、验证安装4、修改全局模块下载路径5、更换npm源6、全局安装基于淘宝源…

C++模拟实现string类

目录 前言&#xff1a;什么是string类&#xff1f;string类的模拟实现 一、四个默认成员函数1.1 构造函数1.2 拷贝构造函数1.3 赋值重载函数1.4 析构函数 二、迭代器三、c_str()函数四、size和capacity函数五、reserve函数六、resize函数七、push_back函数七、append函数八、fi…