RocketMQ5.0.0部署与实例

news2024/10/6 8:27:27

一、Idea调试

1.相关配置文件

在E:\rocketmq创建conf、logs、store三个文件夹。从RocketMQ distribution部署目录中将broker.conf、logback_namesrv.xml、logback_broker.xml文件复制到conf目录。如下图所示。

其中logback_namesrv.xml、logback_broker.xml分别是NameServer、Broker的日志配置文件,修改打印日志文件路径即可。broker.conf文件是Broker启动时的加载配置文件,如下代码所示。

注意:NameServer启动端口默认是9876,Broker启动端口默认10911。

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.156.245

# 存储路径
storePathRootDir=E:\\rocketmq\\store
#commitLog 存储路径
storePathCommitLog=E:\\rocketmq\\store\\commitlog
# 消费队列存储路径
storePathConsumeQueue=E:\\rocketmq\\store\\consumequeue
# 消息索引|存储路径
storePathindex=E:\\rocketmq\\store\\index
#checkpoint 文件存储路径
storeCheckpoint=E:\\rocketmq\\store\\checkpoint
#abort 文件存储路径
abortFile=E:\\rocketmq\\store\\abort

2.启动NameServer

org.apache.rocketmq.namesrv.NamesrvStartup启动类配置环境变量ROCKETMQ_HOME,值是配置主目录“E:\rocketmq”,如下图所示。

出现“The Name Server boot success. serializeType=JSON”时,则NameServer启动成功。

3.启动Broker

org.apache.rocketmq.broker.BrokerStartup启动类配置环境变量ROCKETMQ_HOME,值是配置主目录“E:\rocketmq”;配置启动参数:-c E:\rocketmq\conf\broker.conf。如下图所示。

出现“The broker[broker-a, 192.168.156.245:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876”时,则Broker启动成功。

二、Linux部署

1.执行Maven命令

mvn -Prelease-all -DskipTests clean install -U

查看打包的可部署文件路径:.\distribution\target\rocketmq-5.0.0\rocketmq-5.0.0,如下图所示。

2.复制rocketmq-5.0.0

3.启动NameServer

nohup sh bin/mqnamesrv &

可能出现问题,如下图所示。原因是Windows系统下打包,换行符出现问题。解决:notepad++编辑器在Windows环境下将文本转换为Unix格式,步骤为:用Notepad++打开脚本 >> 编辑 >> 档案格式转换 >> 选择转换为UNIX格式。

4.启动Broker

nohup sh bin/mqbroker -n 192.168.1.55:9876 -c /home/rocketmq-5.0.0/conf/broker.conf &

启动命令中,-n是指定NameServer地址,-c 是broker的配置文件。

5.关闭NameServer、Broker命令

关闭NameServer:sh bin/mqshutdown namesrv

关闭Broker:sh bin/mqshutdown broker

三、事务消息实例

1.事务消息监听类

实现org.apache.rocketmq.client.producer.TransactionListener,该类有两个方法:

  • executeLocalTransaction():保存本地事务中间表,用于Broker回查事务状态

  • checkLocalTransaction():Broker定时回查事务状态,根据事务状态提交或回滚事务消息

package com.common.instance.demo.config.rocketmq;

import com.common.instance.demo.entity.TMessageTransaction;
import com.common.instance.demo.service.TMessageTransactionService;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;
import java.util.List;

/**
 * @description 订单事务消息监听器实现类
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:44
 **/
@Component
public class OrderTransactionListenerImpl implements TransactionListener {

    @Resource
    private TMessageTransactionService tMessageTransactionService;

    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
        // 组装事务
        TMessageTransaction tMessageTransaction = packageTMessageTransaction(message);

        // 保存事务中间表
        tMessageTransactionService.insert(tMessageTransaction);

        // 推荐返回UNKNOW状态,待事务状态回查
        return LocalTransactionState.UNKNOW;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 获取用户属性tabId
        String tabId = messageExt.getUserProperty("tabId");

        // 查询事务消息
        List<TMessageTransaction> tMessageTransactions = tMessageTransactionService.queryByTabId(tabId);

        if (!tMessageTransactions.isEmpty() && tMessageTransactions.size() <= 6) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        LogUtil.error("orderTransaction rollBack, tabId: " + tabId);
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }

    // 组装事务
    private TMessageTransaction packageTMessageTransaction(Message message) {
        TMessageTransaction tMessageTransaction = new TMessageTransaction();

        // 获取用户属性tabId
        String tabId = message.getUserProperty("tabId");
        // 事务ID
        String transactionId = message.getTransactionId();
        tMessageTransaction.setTabId(tabId);
        tMessageTransaction.setTransactionId(transactionId);
        tMessageTransaction.setCreateBy("auto");
        tMessageTransaction.setCreateTime(new Date());

        return tMessageTransaction;
    }

}

2. 事务消息生产者

package com.common.instance.demo.config.rocketmq;

import com.alibaba.fastjson.JSON;
import com.common.instance.demo.entity.WcPendantTab;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

/**
 * @description 订单事务消息生产者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:54
 **/
@Component
public class OrderTransactionProducer {

    @Resource
    private OrderProducerProperties orderProducerProperties;

    @Resource
    private OrderTransactionListenerImpl orderTransactionListener;

    private TransactionMQProducer orderTransactionMQProducer;

    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order transactionProducer");
            orderTransactionMQProducer = new TransactionMQProducer(orderProducerProperties.getProducerGroup());
            orderTransactionMQProducer.setNamesrvAddr(orderProducerProperties.getNameSrcAddr());
            orderTransactionMQProducer.setSendMsgTimeout(orderProducerProperties.getSendMsgTimeout());
            // 注册事务监听器
            orderTransactionMQProducer.setTransactionListener(orderTransactionListener);
            orderTransactionMQProducer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderTransactionProducer.start()", "start rocketmq failed!", e);
        }
    }

    public void sendTransactionMessage(WcPendantTab data) {
        sendTransactionMessage(data, orderProducerProperties.getTopic(), orderProducerProperties.getTag(), null);
    }

    public void sendTransactionMessage(WcPendantTab data, String topic, String tags, String keys) {
        try {
            // 消息内容
            byte[] msgBody = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
            // 消息对象
            Message message = new Message(topic, tags, keys, msgBody);
            message.putUserProperty("tabId", data.getTabId());

            // 发送事务消息
            orderTransactionMQProducer.sendMessageInTransaction(message, null);
        } catch (Exception e) {
            LogUtil.error("OrderTransactionProducer.sendMessage()","send order rocketmq error", e);
        }
    }

    @PreDestroy
    public void stop() {
        if (orderTransactionMQProducer != null) {
            orderTransactionMQProducer.shutdown();
        }
    }

}

3. 事务消息消费者

package com.common.instance.demo.config.rocketmq;

import com.log.util.LogUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;

/**
 * @description 订单消费者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 14:29
 **/
@Component
public class OrderConsumer implements MessageListenerConcurrently {

    @Resource
    private OrderConsumerProperties orderConsumerProperties;

    private DefaultMQPushConsumer orderMQConsumer;

    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order consumer");
            orderMQConsumer = new DefaultMQPushConsumer(orderConsumerProperties.getConsumerGroup());
            orderMQConsumer.setNamesrvAddr(orderConsumerProperties.getNameSrcAddr());
            orderMQConsumer.subscribe(orderConsumerProperties.getTopic(), orderConsumerProperties.getTag() == null ? "*":orderConsumerProperties.getTag());
            orderMQConsumer.setConsumeFromWhere(ConsumeFromWhere.valueOf(orderConsumerProperties.getConsumeFromWhere()));
            orderMQConsumer.registerMessageListener(this); // 注册监听器
            orderMQConsumer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderProducer.start()", "start rocketmq failed!", e);
        }
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        int index = 0;
        try {
            for (; index < msgs.size(); index++) {
                // 完整消息
                MessageExt msg = msgs.get(index);
                // 消息内容
                String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8);

                LogUtil.info("消费组消息内容:" + messageBody);
            }
        } catch (Exception e) {
            LogUtil.error("OrderConsumer.consumeMessage()", "consume order rocketmq error", e);
        } finally {
            if (index < msgs.size()) {
                // 消费应答
                context.setAckIndex(index + 1);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    @PreDestroy
    public void stop() {
        if (orderMQConsumer != null) {
            orderMQConsumer.shutdown();
        }
    }

}

四、参考资料

https://www.cnblogs.com/qdhxhz/p/11094624.html

https://blog.csdn.net/xiqingchun/article/details/44571887

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/146967.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

纯C语言实现动态爱心(详解,初学者也能看懂)

文章目录✍动态爱心实现&#x1f496;一段小故事&#xff1a;爱心函数的由来&#x1f388; 创建动态爱心的准备&#xff08;非小白可以跳过&#xff09;1.爱心字符2.对easyx库里面的基础函数的认识①initgraph函数②settextcolor、settextstyle、setbkmode、outtextxy四种函数③…

PostgresSQL数据库的使用

PostgresSQL数据库的使用 下载安装 数据类型 使用指导 数据库操作 连接控制台 psql -h <实例连接地址> -U <用户名> -p <端口号>参数描述实例连接地址RDS PostgreSQL实例的连接地址&#xff0c;本机可用localhost或者127.0.0.1用户名创建的RDS Postgre…

ES语法扩展

剩余参数 剩余参数本质 // 剩余参数的本质const add(x,y,...args)>{console.log(x,y,args);}add();add(1);add(1,2);add(1,2,3,4,5); 剩余参数的注意事项 箭头函数的参数部分即使只有一个剩余参数&#xff0c;也不能省略圆括号使用剩余参数替代arguments获取实际参数剩余…

4.Isaac Jetson Nano 入门

Isaac Jetson Nano 入门 本节介绍如何在 Jetson Nano 设备上运行 Isaac SDK 示例应用程序。 有关如何开始使用 Nano 的一般说明&#xff0c;请参阅 Jetson Nano 开发工具包入门。 文章目录Isaac Jetson Nano 入门获取 IP 地址在 Jetson Nano 上运行示例应用程序PingOpenCV 边缘…

Pytorch CIFAR10图像分类 EfficientNet v1篇

Pytorch CIFAR10图像分类 EfficientNet v1篇 文章目录Pytorch CIFAR10图像分类 EfficientNet v1篇4. 定义网络&#xff08;EfficientNet&#xff09;EfficientNet介绍EfficientNet性能比较EfficientNet的baselineEfficientNet模型混合缩放方法其他版本的EfficientNet(B1-B7)判断…

错题 3jxn (8253复杂)

A 指示型指令 C 比如 ,跟C语言的return 一样&#xff0c;可以由多条&#xff0c;但是返回的位置都是一个地方 JN NEXT RET NEXT: RET A 可以重复 EQU不可以重复 C 中断向量&#xff1a;中断服务程序的入口地址 向量中断&#xff1a;识别中断你的方法 接口 编程题&#xff…

Redis关键知识点总结

Reference: http://redis.cn用处缓存数据库分布式锁&#xff08;Redission的redlock&#xff0c;自定义的lock等&#xff09;过滤器&#xff08;布隆过滤器/增强的带计数的布隆过滤器/布谷鸟过滤器等&#xff09;大规模的计算辅助&#xff08;bitmap&#xff09;消息订阅/监听 …

PyQt5入门学习(一)【小白入门系列】

PyQt5入门学习 介绍&#xff1a;PyQt5是Python较好的图形库&#xff0c;与C的Qt不同的是PyQt5封装得较为简单&#xff0c;上手也更加的方便。下面话不多说&#xff0c;开始学习PyQt5吧&#xff01; 安装过程 安装方法有两种&#xff0c;一种是下载PyQt5最新源码进行编译安装…

初识Kafka

1.1 定义 Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue&#xff09;&#xff0c;主要应用于大数据实时处理领域。 发布/订阅: 消息的发布者不会将消息直接发送给特定的订阅者&#xff0c;而是将发布的消息分为不同的类别&#xff0c;订阅者只…

[数据结构基础]二叉树——堆的概念、结构、接口函数及经典的Topk问题和堆排序问题

目录 一. 堆的概念及结构 1.1 堆的概念 1.2 堆的结构及在内存中的存储 二. 堆的主要接口函数 2.1 堆初始化函数HeapInit 2.2 堆销毁函数HeapDestroy 2.3 向堆中插入数据函数HeapPush&#xff08;以小堆为例&#xff09; 2.4 删除堆根节点数据函数HeapPop&#xff08;小…

C++ 夺冠!成为 TIOBE 2022 年度编程语言

目录&#xff1a;C夺冠—成为TIOBE2022年度编程语言一、前言二、C 摘得桂冠三、Top 10 编程语言 TIOBE 指数走势&#xff08;2002-2023&#xff09;四、历史排名&#xff08;1987-2023&#xff09;五、编程语言“名人榜”&#xff08;2003-2022&#xff09;六、说明一、前言 2…

vitepress(三):自动生成目录

上一节我们将自己的网站发布到了git pages上&#xff0c;但是现在我们需要每次更新一篇文章就重写一次目录&#xff0c;操作上十分的繁琐和不方便&#xff0c;所以我们需要一个方法去自动生成我们的侧边栏结构&#xff0c;方便我们每次只需要更新我们的项目即可。这里我们要知道…

【每日一题】【LeetCode】【第六天】【Python实现】加一

加一的解决之路 题目描述 测试案例&#xff08;部分&#xff09; 第一次 1这个很好理解&#xff0c;唯一的难点就是个位1导致的进位的问题&#xff0c;可能会只会导致十位1&#xff0c;也有像8999这样产生多次进位的情况。 为了解决进位问题&#xff0c;自己想到了第三天学…

mysql三表查询15个例子带你搞懂它

mysql三表查询30个经典案例创建三个表a、b、c表a中的数据表b中的数据表c中的数据1.查询出学习成绩70分以上的学生姓名与成绩与学科&#xff1b;2.查询姓名以mi结尾的学生姓名及其任课老师姓名&#xff1b;3.选修课名为math的学生学号与姓名;4.选修课号为C4的学生学号&#xff1…

QEMU调试Linux内核环境搭建

一个最小可运行Linux操作系统需要内核镜像bzImage和rootfs&#xff0c;本文整理了其制作、安装过程&#xff0c;调试命令&#xff0c;以及如何添加共享磁盘。编译内核源码从 The Linux Kernel Archives 网站下载内核源码&#xff0c;本文下载的版本为4.14.191&#xff0c;4.14.…

危险程度(并查集)

有 nn 种化学物质&#xff0c;编号 1∼n1∼n。 其中&#xff0c;有 mm 对物质之间会发生反应。 现在&#xff0c;要将这些化学物质逐个倒入同一个试管之中&#xff0c;具体倒入顺序不限。 我们需要计算一下试管的危险值。 已知&#xff0c;空试管的危险值为 11&#xff0c;…

【UE4 第一人称射击游戏】21-添加动态扩散准心

素材资料地址&#xff1a;链接&#xff1a;https://pan.baidu.com/s/1epyD62jpOZg-o4NjWEjiyg密码&#xff1a;jlhr上一篇&#xff1a;【UE4 第一人称射击游戏】20-添加瞄准十字线本篇效果&#xff1a;步骤&#xff1a;将资源移至FPS项目文件夹内移入后发现多了一个名为“WBCro…

【web安全】——报错注入

作者名&#xff1a;Demo不是emo主页面链接&#xff1a; 主页传送门创作初心&#xff1a; 舞台再大&#xff0c;你不上台&#xff0c;永远是观众&#xff0c;没人会关心你努不努力&#xff0c;摔的痛不痛&#xff0c;他们只会看你最后站在什么位置&#xff0c;然后羡慕或鄙夷座右…

如何查看sqlite数据库的 .db文件中的表的内容数据

在使用 qt 的sqlite 数据的时候,对于创建的数据库的 .db 文件的内容的查看我们可以按照下面的步骤安装工具进行查看 下载所需的sqlite 查看工具 下载:链接&#xff1a;https://pan.baidu.com/s/1KSl9w61zaEyemhR1Ir04_A 提取码&#xff1a;6666 只需要解压即可,其中安装包内…

MINISForum HX90 主机风扇调教

今年秋天买了个1个HX90 5900H的mini主机。准系统版本&#xff0c;2899元。 但是买回来之后&#xff0c;发现它的风扇声音实在是大&#xff0c;稍微一加载点东西&#xff0c;就 开始呜呜的响&#xff0c;简直让人心烦 意乱。 去了官网查看。好多人的解决办法看了没看明白&…