SpringBoot+RocketMQ集群(dledger)部署完整学习笔记

news2024/11/23 22:08:13

文章目录

  • 前言
  • 一、单台集群部署
  • 二、多台集群部署
    • 1.修改配置
    • 2.dashboard修改
  • 三、整合springboot
    • 1.引入pom和修改yml
    • 2.编写消费者
    • 3.编写生产者
    • 4.测试效果
  • 总结


前言


RocketMQ集群方式有好几种
官网地址 https://rocketmq.apache.org/zh/docs/4.x/deployment/01deploy

  • 2m-2s-async:2主2从异步刷盘(吞吐量较大,但是消息可能丢失
  • 2m-2s-sync:2主2从同步刷盘(吞吐量会下降,但是消息更安全)
  • 2m-noslave :2主无从(单点故障),然后还可以直接配置broker.conf,进行单点环境配置
  • dledger:用来实现主从切换的。集群中的节点会基于Raft协议随机选举出一个leader,
    其他的就都是follower。通常正式环境都会采用这种方式来搭建集群。

dledger搭建参考文档 https://rocketmq.apache.org/zh/docs/4.x/bestPractice/02dledger

MQ安装部署请看这篇:https://blog.csdn.net/HBliucheng/article/details/135357998

搭建过程中踩过的坑也也会记录下来

一、单台集群部署

## 启动
nohup sh bin/dledger/fast-try.sh start
## 关闭
nohup sh bin/dledger/fast-try.sh stop

先启动 fast-try.sh start
启动时发现权限不足
nohup: 无法运行命令"bin/mqbroker": 权限不够
查看启动脚本

cat bin/dledger/fast-try.sh

在这里插入图片描述
那我们就修改下nohup 后面加上sh
修改后如下

function startNameserver() {
    export JAVA_OPT_EXT=" -Xms512m -Xmx512m  "
    nohup sh bin/mqnamesrv &
}

function startBroker() {
    export JAVA_OPT_EXT=" -Xms1g -Xmx1g  "
    conf_name=$1
    nohup sh bin/mqbroker -c $conf_name &
}

再次启动发现可以了
在这里插入图片描述
执行命令 查看集群情况 BID =0的是主节点

sh bin/mqadmin clusterList -n 127.0.0.1:9876

在这里插入图片描述
再看看dashboarb
启动之前请先开放6个端口 如果还有端口访问不了的请自行开放出来

firewall-cmd --zone=public --add-port=30909/tcp --permanent
firewall-cmd --zone=public --add-port=30911/tcp --permanent
firewall-cmd --zone=public --add-port=30919/tcp --permanent
firewall-cmd --zone=public --add-port=30921/tcp --permanent
firewall-cmd --zone=public --add-port=30929/tcp --permanent
firewall-cmd --zone=public --add-port=30931/tcp --permanent

### 如果不想一次次开放下面命令也可以
firewall-cmd --zone=public --add-port=30900-30930/tcp --permanent
## 重启防火墙
systemctl reload firewalld
## 查看开放的端口
firewall-cmd --list-ports
## 其它命令
### 关闭端口
firewall-cmd --zone=public --remove-port=30909/tcp --permanent

在这里插入图片描述

启动生产者和消费者再看 master消费一个
在这里插入图片描述
停止master

 lsof  -i:30911
 ## 找到pid杀死 我的是118276
 kill  118276

在这里插入图片描述
在这里插入图片描述
我们再启动 被杀死的broker

nohup sh  bin/mqbroker -c conf/dledger/broker-n0.conf &

在这里插入图片描述
在这里插入图片描述
发现30911作为slave回来了

二、多台集群部署

先准备三台机器
192.168.141.101
192.168.141.102
192.168.141.103

1.修改配置

192.168.141.101修改如下
profile不修改也可以

vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=192.168.141.101:9876
source /etc/profile

修改 broker.conf 后面我们启动哪个就修改哪个 我是把 broker-n0.conf复制一份到broker.conf,也可以直接修改broker-n0.conf,启动时启动自己配置的conf文件就可以



cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf 
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n0
sendMessageThreadPoolNums=16


192.168.141.102修改如下
profile不修改也可以

vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=92.168.141.102:9876
source /etc/profile

cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf 
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n1
sendMessageThreadPoolNums=16

192.168.141.103修改如下
profile不修改也可以

vim /etc/profile
## 加入192.168.141.102 192.168.141.103 同理102 103也改成这样
export NAMESRV_ADDR=192.168.141.103:9876
cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0/conf/dledger
cp broker-n0.conf broker.conf
vim broker.conf 
## 修改的配置如下改动地方namesrvAddr dLegerPeers dLegerSelfId
brokerClusterName = RaftCluster
brokerName=RaftNode00
listenPort=30911
namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
storePathRootDir=/tmp/rmqstore/node00
storePathCommitLog=/tmp/rmqstore/node00/commitlog
enableDLegerCommitLog=true
dLegerGroup=RaftNode00
dLegerPeers=n0-192.168.141.101:40911;n1-192.168.141.102:40911;n2-192.168.141.103:40911
## must be unique
dLegerSelfId=n2
sendMessageThreadPoolNums=16

开放端口
每台机器都要开放

firewall-cmd --zone=public --add-port=30911/tcp --permanent
firewall-cmd --zone=public --add-port=40911/tcp --permanent
systemctl reload firewalld

如果还有端口没开放,请自行开放

启动
每台机器都要启动

cd /bsoft/mdt/rocketmq/rocketmq-4.8.0/rocketmq-4.8.0
nohup sh bin/mqnamesrv &
nohup sh  bin/mqbroker -c conf/dledger/broker.conf &

查看日志
在这里插入图片描述
发现未创建文件夹,创建文件夹

 mkdir -p  /tmp/rmqstore/node00/commitlog
## 关掉再启动
sh bin/mqshutdown broker
## 启动broker
nohup sh  bin/mqbroker -c conf/dledger/broker.conf &

查看集群情况

sh bin/mqadmin clusterList -n 127.0.0.1:9876

在这里插入图片描述

踩坑 这个值不要随便写,这里从0开始递增 ,不然选举会有问题
在这里插入图片描述

2.dashboard修改

修改配置

## 根据自己的服务器地址修改,注意中间是分号不是逗号
rocketmq.config.namesrvAddr=192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876

启动访问
在这里插入图片描述
关闭master再查看集群情况,然后再重启,和前面的单机集群一样的,大家可自行测试

三、整合springboot

1.引入pom和修改yml

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>

rocketmq:
# 集群中间以分号隔开
  name-server: 192.168.141.101:9876;192.168.141.102:9876;192.168.141.103:9876
  producer:
    group: my_group_test

2.编写消费者

package com.study.config.rocketmq;

import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;

/**
 * @author: 
 * @time: 2024/1/5 10:00
 */
@Component
@RocketMQMessageListener(consumerGroup = "my_group_test",
        topic = "topic_test",
        selectorType = SelectorType.TAG,
        selectorExpression = "tagA")
@Slf4j
public class MQMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody(), CharsetUtil.UTF_8);
        log.info("msgId={} msg={}",msgId,msg);
    }
}

@RocketMQMessageListener 注解参数如下:

  • topic: 消费者订阅的主题,即消费者将从这个主题中接收消息。
  • consumerGroup: 消费者组,多个消费者可以组成一个消费者组,共同从一个主题中接收消息。
  • consumeMode: 消费模式,指定消费者是以并发的方式接收消息还是以有序的方式接收消息。并发模式下,多个消费者可以同时接收消息;有序模式下,每个消费者按照消息的顺序依次接收消息。
  • messageModel: 消息模式,指定消息是以集群模式还是广播模式发送。集群模式下,消息将被发送到同一个主题的其中一个消费者;广播模式下,消息将被发送到主题的所有消费者。
  • selectorType: 过滤消息的方式,可以使用标签(Tag)或SQL92表达式(SQL92)来过滤消息。
  • selectorExpression: 过滤消息的表达式,可以使用标签(Tag)或SQL92表达式(SQL92)来指定过滤条件。
  • maxReconsumeTimes: 消息消费失败后,可被重复投递的最大次数。超过最大重试次数后,消息将被放入死信队列。
  • delayLevelWhenNextConsume: 并发模式的消息重试策略,指定消息消费失败后的重试延迟级别。设置为-1时,表示无需重试,直接将消息放入死信队列。

3.编写生产者

package com.study.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
 * @author: 
 * @time: 2024/1/5 10:17
 */
@RestController
@RequestMapping("/mq")
@Slf4j
public class RocketMQProducerController {
    @Resource
    RocketMQTemplate rocketMQTemplate;

    @PostMapping("/sendMessage")
    @ResponseBody
    public void sendMessage(String msg){
      rocketMQTemplate.asyncSend("topic_test", "hello mq", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("msgId={}",sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });

    }
}

同步会有一点小问题,第一次启动不会消费,直接写成异步

4.测试效果

发现没有主题
在这里插入图片描述
追踪源码发现主题和过滤消息的表达式按照冒号分割
topic取第一位,过滤表达式取第二位
在这里插入图片描述

修改再试下

  rocketMQTemplate.asyncSend("topic_test:tagA", "hello mq", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("msgId={}",sendResult.getMsgId());
            }

            @Override
            public void onException(Throwable e) {
                e.printStackTrace();
            }
        });

发现可以了
在这里插入图片描述
前面写了个java客户端的消费者,改下消费组发现也可以消费

java客户端代码

package com.bsoft;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

/**
 * @author: liucheng
 * @time: 2023/12/29 15:39
 */
public class MQConsumer {
    private final static String nameServer = "192.168.141.101:9876";

    private final static String consumerGroup = "my_group_test02";

    private final static String topic = "topic_test";
    public static void main(String[] args) throws MQClientException, IOException, InterruptedException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 设置NameServer的地址
        consumer.setNamesrvAddr(nameServer);
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(topic, "tagA");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                msgs.forEach((msg)->{
                    byte[] body = msg.getBody();
                    String s = new String(body, Charset.defaultCharset());
                    System.out.println("msg=================> " +s);

                });
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started......");
//        Thread.sleep(5000);
//        consumer.shutdown();
        System.in.read();
    }
}

在这里插入图片描述
到此集群搭建完成,大家搭建过程中有遇到问题可以交流

总结

整个搭建过程不难就是有点繁琐,需要配置多台服务器

  • 其中配置brocker.conf时dLegerSelfId值这块要注意 ,dLegerSelfId是节点 id, 必须属于 dLegerPeers 中的一个;同 Group 内各个节点要唯一。这个值从0开始递增
    在这里插入图片描述

  • 同一台服务器上启动时先启动 namesrv 再启动 broker

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1361255.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CISP-DSG和CDGA该如何选择?

同样是数据治理&#xff0c;CDGA证书和CISP-DSG证书&#xff0c;它们之间有什么区别和各自的优势呢❓ 1️⃣CISP-DSG CISP-DSG证书聚焦于信息an全领域&#xff0c;特别guan注数据an全治理。 国际知名zi询机构Gartner用“风暴之眼”比喻“数据an全治理”&#xff0c;&#x1f44…

kubernetes(k8s)集群常用指令

基础控制指令 # 查看对应资源: 状态 $ kubectl get <SOURCE_NAME> -n <NAMESPACE> -o wide 查看默认命名空间的pod [rootk8s-master ~]# kubectl get pod NAME READY STATUS RESTARTS AGE nginx 1/1 Running 0 3h53m查看所有pod [roo…

【C++】STL 算法 ③ ( 函数对象中存储状态 | 函数对象作为参数传递时值传递问题 | for_each 算法的 函数对象 参数是值传递 )

文章目录 一、函数对象中存储状态1、函数对象中存储状态简介2、示例分析 二、函数对象作为参数传递时值传递问题1、for_each 算法的 函数对象 参数是值传递2、代码示例 - for_each 函数的 函数对象 参数在外部不保留状态3、代码示例 - for_each 函数的 函数对象 返回值 一、函数…

【开源项目】WPF 扩展组件 -- Com.Gitusme.Net.Extensiones.Wpf

一、项目简介 Com.Gitusme.Net.Extensiones.Wpf 是一款 Wpf 扩展组件。基于.Net Core 3.1 开发&#xff0c;当前最新 1.0.1 版本。包含 核心扩展库&#xff08;Com.Gitusme.Net.Extensiones.Core&#xff09;、视频渲染&#xff08;Com.Gitusme.Media.Video&#xff09;、串口…

苹果Mac图像修图软件Photomator和Pixelmator Pro 有什么区别?

同为一个团队设计的Mac修图软件Photomator和Pixelmator Pro有哪些区别呢&#xff1f;有哪些不一样的功能&#xff1f; Photomator和Pixelmator Pro区别如下&#xff1a; 1、用途不同 Photomator 和 Pixelmator Pro 是两个功能强大的应用程序&#xff0c;具有两个不同的用途。…

从Eumetsat批量下载哨兵数据等各种数据

从Eumetsat批量下载哨兵数据等各种数据 那些最好的程序员不是为了得到更高的薪水或者得到公众的仰慕而编程&#xff0c;他们只是觉得这是一件有趣的事情&#xff01; 批量下载Sentinel数据脚本2023 从Eumetsat批量下载哨兵数据等各种数据&#x1f33f;前言&#x1f340;脚本构成…

原生微信小程序如何动态修改svg图片颜色及尺寸、宽高(封装svgIcon组件)解决ios不显示问题

最终效果 前言 动态设置Svg图片颜色就是修改Svg源码的path中的fill属性&#xff0c; 通过wx.getFileSystemManager().readFile读取.xlsx文件 ios不显示需要把encoding设置 binary 把文件转成base64 封装svg-icon组件 1、在项目的components下新建svg-icon文件夹&#xff0c;新…

no usable temporary directory found in %s“ % dirlist 问题解决

提示其实就是没有可用空间&#xff0c;那我们就找到占用空间大且无用的数据文件删除掉 du -sh * 删除掉/tmp目录下的文件。 重启 问题解决

cpufreq子系统

cpufreq是linux上负责实现动态调频的关键&#xff0c;这篇笔记总结了linux内核cpufreq子系统的关键实现&#xff08;Linux 3.18.140&#xff09;。 概述 借用一张网络上的图片来看cpufreq子系统的整体结构&#xff1a; 用户态接口&#xff1a;cpufreq通过sysfs向用户态暴露接…

2022年多元统计分析期中试卷

多元正态均值检验 一、去年卖出的一岁牛犊的平均身高为 51 英寸&#xff0c;平均背脂厚度是 0.3 英寸&#xff0c;平均肩高是 56 英寸。已知今年卖出的 76 头一岁牛犊的 3 项平均指标为(50, 0.2, 54)‘&#xff0c;样本协差阵及其逆矩阵为 S [ 3.00 − 0.053 2.97 − 0.053 0…

【Bootstrap5学习 day12】

Bootstrap5 导航 Bootstrap5提供了一种简单快捷的方法来创建基本导航&#xff0c;它提供了非常灵活和优雅的选项卡和Pills等组件。Bootstrap5的所有导航组件&#xff0c;包括选项卡和Pillss&#xff0c;都通过基本的.nav类共享相同的基本标记和样式。 创建基本导航 要创建简单…

eureka注册列表 某服务出现多个服务实例

最近文件导出功能偶发成功&#xff0c;大部分情况都失败&#xff0c;开始以为接口被拦截&#xff0c;gateway服务没有接口调用日志&#xff0c;发现测试环境可以&#xff0c;正式环境功能无法正常使用。 偶然看到注册中心如下 发现file服务有3个实例&#xff0c;调用接口将错误…

Java十种经典排序算法详解与应用

数组的排序 前言 排序概念 排序是将一组数据&#xff0c;依据指定的顺序进行排列的过程。 排序是算法中的一部分&#xff0c;也叫排序算法。算法处理数据&#xff0c;而数据的处理最好是要找到他们的规律&#xff0c;这个规律中有很大一部分就是要进行排序&#xff0c;所以需…

Excel中快速隐藏中间四位手机号或者身份证号等

注意&#xff1a;以下方式必须再新增一列&#xff0c;配合旧的一列用来对比操作&#xff0c;即根据旧的一列的数据源&#xff0c;通过新的一列的操作逻辑来生成新的隐藏数据 1、快捷方式是使用CtrlE 新建一列&#xff1a;手动输入第一个手机号隐藏后的号码&#xff0c;即在N2单…

VS+QT五子棋游戏开发

1、首先安装好VS软件和QT库&#xff0c;将其配置好&#xff0c;具体不在此展开说明。 2、文件结构如下图&#xff1a; 3、绘制棋盘代码&#xff0c;如下&#xff1a; void Qwzq::paintEvent(QPaintEvent* event) {QPainter painter(this);painter.setRenderHint(QPainter::An…

Unity之键盘鼠标的监控

小编最近在玩大表哥2&#xff0c;通过 W、A、S、D 来移动亚瑟&#xff0c;鼠标左键来不吃牛肉 我们都知道玩家通过按键鼠标来控制游戏人物做出相应的行为动作&#xff0c;那在Unity引擎里是怎么知道玩家是如何操作的呢&#xff1f;本篇来介绍Unity是怎样监控键盘和鼠标的。 首先…

智创有术开发公司,星潮宇宙开发。

2024年&#xff0c;星潮宇宙全网首发&#xff0c;一个全新的赛道将被开启&#xff0c;这将对游戏产业产生深远的影响。本文将深入探讨这场首发的对接团队以及他们的突破性举措&#xff0c;以展示其对游戏界的重要意义。 星潮宇宙全网首发概述 星潮宇宙的全网首发意味着将有一…

Cloud模型matlab

学习资料python 多维正态云python 预备知识&#xff1a; 如何获取具有特定均值和方差的正态分布随机数。首先&#xff0c;初始化随机数生成器&#xff0c;以使本示例中的结果具备可重复性。 rng(0,twister);基于均值为 500 且标准差为 5 的正态分布创建包含 1000 个随机值的向…

损失函数篇 | YOLOv8 引入 Shape-IoU 考虑边框形状与尺度的度量

作者导读&#xff1a;Shape-IoU&#xff1a;考虑边框形状与尺度的度量 论文地址&#xff1a;https://arxiv.org/abs/2312.17663 作者视频解读&#xff1a;https://www.bilibili.com 开源代码地址&#xff1a;https://github.com/malagoutou/Shape-IoU/blob/main/shapeiou.py…

Linux第16步_安装NFS服务

NFS&#xff08;Network File System&#xff09;是一种在网络上实现的分布式文件系统&#xff0c;它允许不同的操作系统和设备之间共享文件和资源。 在创建的linux目录下&#xff0c;再创建一个“nfs“文件夹&#xff0c;用来供nfs服务器使用&#xff0c;便于”我们的开发板“…