RocketMQ5-03RocketMQ-Dashboard和Java客户端访问示例

news2024/11/13 12:43:06

接上篇02快速部署RocketMQ5.x(手动和容器部署)
已经完成 RocketMQ5.0 环境的部署,就需要对这个环境进行测试,查看集群、写入消息、读取消息等

本篇教你如何使用和查看部署的服务:

  • Docker部署 Dashboard
    • 获取镜像并下载
    • 部署服务
  • 客户端连接
    • pom文件
    • 生产者代码
    • 消费者代码
    • 接口测试
    • 问题: broker资源不足无法提供服务

Docker部署 Dashboard

以上通过可执行文件部署或者容器部署的形式,都需要有一个可以查看的集群的地方,对于官方自己配备的有 rocketmq-dashboard, 可以使用docker快速部署,便于测试

获取镜像并下载

docker search rocketmq-dashboard & docker pull apacherocketmq/rocketmq-dashboard

部署服务

docker run -d --name rmqdashboard -e "JAVA_OPTS=-Xmx256M -Xms256M -Xmn128M -Drocketmq.namesrv.addr=192.168.2.92:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8088:8080 apacherocketmq/rocketmq-dashboard

这边将端口映射到了8088,所以访问 localhost:8088,就可以查看到集群,如果有数据正在写入与读取,就能够大概看到数据量
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

客户端连接

手动创建 topic: sh bin/mqadmin updatetopic -n 192.168.2.92:9876 -t dataTopic2 -c DefaultCluster

pom文件

 <properties>
        <java.version>17</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <spring-boot.version>3.0.2</spring-boot.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.5</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>17</source>
                    <target>17</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>${spring-boot.version}</version>
                <configuration>
                    <mainClass>com.learning.springbootrmq5.SpringbootRmq5Application</mainClass>
                    <skip>true</skip>
                </configuration>
                <executions>
                    <execution>
                        <id>repackage</id>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

生产者代码

 @GetMapping("/sendSync")
    public String sendSync() throws ClientException, IOException {
        String endpoint = "192.168.2.92:8081";
        String topic = "dataTopic2";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
        ClientConfiguration configuration = builder.enableSsl(true).build();
        Producer producer = provider.newProducerBuilder()
                                    .setTopics(topic)
                                    .setClientConfiguration(configuration)
                                    .build();
        Message message = provider.newMessageBuilder()
                                  .setTopic(topic)
                                  .setKeys("messageKey")
                                  .setTag("messageTag")
                                  .setBody("messageBodySync".getBytes())
                                  .build();
        try {
            SendReceipt sendReceipt = producer.send(message);
            log.info("Send sync message successfully, messageId={}", sendReceipt.getMessageId());
        } catch (ClientException e) {
            log.error("Failed to send message", e);
        }
        producer.close();
        return "success";
    }

    @GetMapping("/sendAsync")
    public String sendAsync() throws ClientException, InterruptedException, IOException {
        String endpoint = "192.168.2.92:8081";
        String topic = "dataTopic2";
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().enableSsl(true).setEndpoints(endpoint);
        ClientConfiguration configuration = builder.build();
        Producer producer = provider.newProducerBuilder()
                                    .setTopics(topic)
                                    .setClientConfiguration(configuration)
                                    .build();
        Message message = provider.newMessageBuilder()
                                  .setTopic(topic)
                                  .setKeys("messageKey")
                                  .setTag("messageTag")
                                  .setBody("messageBodyASync".getBytes())
                                  .build();
        producer.sendAsync(message);
        log.info("Send async message successfully, messageId");
        return "success";
    }

消费者代码

@Slf4j
@Component
public class MessageConsumerRunner implements CommandLineRunner {
    @Override
    public void run(final String... args) throws Exception {
        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        String endpoints = "192.168.2.92:8081";
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                                                                     .setEndpoints(endpoints)
                                                                     .build();
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
        String consumerGroup = "YourConsumerGroup";
        String topic = "dataTopic2";
        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
                                            .setClientConfiguration(clientConfiguration)
                                            .setConsumerGroup(consumerGroup)
                                            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                                            .setMessageListener(messageView -> {
                                                log.info("Consume message successfully, messageId={}", messageView.getMessageId());
                                                return ConsumeResult.SUCCESS;
                                            })
                                            .build();
        Thread.sleep(Long.MAX_VALUE);
    }
}

接口测试

请求接口 /msg/sendAsync
在这里插入图片描述

能够正常收发

问题: broker资源不足无法提供服务

可能出现的客户端报错为:

org.apache.rocketmq.client.java.exception.InternalErrorException: [request-id=e3f9dxxxx1aa872, response-code=50001] org.apache.rocketmq.proxy.common.ProxyException: service not available now. It may be caused by one of the following reasons: the broker's disk is full [CL:  0.96 CQ:  0.96 INDEX: -1.00], messages are put to the slave, message store has been shut down, etc.

java.util.concurrent.RejectedExecutionException: Task org.apache.rocketmq.shaded.io.grpc.internal.DelayedStream$4@72ba34c2 rejected from java.util.concurrent.ThreadPoolExecutor@7deb0119[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 13]
	at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2065) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:833) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1365) ~[na:na]

以上大体就是描述资源不足无法进行接入、服务不可达等,通常就是因为环境的资源不足,可能是内存、可能是硬盘

.../broker/logs/rocketmqlogs/store.log 中可以看出端倪,是磁盘存储不够了

2024-01-08 13:34:24 ERROR StoreScheduledThread1 - physic disk maybe full soon 0.95, so mark disk full, storePathPhysic=/home/rocketmq/store/commitlog

可以通过清除以下数据暂时缓解 .../broker/store/commitlog,可以发现没怎么用也有好多G。不过确实需要使用的话尽早考虑扩容啊

扩大存储增加可用磁盘空间,就能够正常使用连接了

如果这篇文章对你有用的话,帮忙留个关注吧~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1381787.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ZooKeeper初探:分布式世界的守护者

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 ZooKeeper初探&#xff1a;分布式世界的守护者 前言Zookeeper的概述分布式系统中的角色和作用&#xff1a; Zookeeper的数据模型Znode的概念和层次结构&#xff1a;Znode的类型和应用场景&#xff1a;…

anoconda 安装报错

表现形式&#xff1a;Output folder: D:\anoconda\Lib Extract: _nsis.py Extract: _system_path.py Output folder: D:\anoconda........................ 解决办法&#xff1a; 网址&#xff1a;Index of /anaconda/archive/ | 清华大学开源软件镜像站 | Tsinghua Open Sour…

【LabVIEW FPGA入门】使用数字IO卡实现计数器输入功能

方法1&#xff1a; 1.首先需要用一个数字IO的输入FPGA端口&#xff0c;并将其拖入程序框图中&#xff0c;同时创建一个循环。 2.如果想要在循环中实现累加功能&#xff0c;就可以使用移位寄存器。 数字输入的当前值和历史值进行比较&#xff0c;用于一个判断大于&#xff0c;来…

【论文解读】SiamMAE:用于从视频中学习视觉对应关系的 MAE 简单扩展

来源&#xff1a;投稿 作者&#xff1a;橡皮 编辑&#xff1a;学姐 论文链接&#xff1a;https://siam-mae-video.github.io/resources/paper.pdf 项目主页&#xff1a;https://siam-mae-video.github.io/ 1.背景 时间是视觉学习背景下的一个特殊维度&#xff0c;它提供了一…

年终关账四大财务处理技巧|柯桥会计做账,财税知识

2023年即将落下帷幕&#xff0c;无数公司最忙碌就是“年终关账“这件事了。 “年终关账”不仅是企业内部结算一年经营结果的事&#xff0c;还与企业所得税汇算清缴息息相关&#xff0c;甚至还可能关乎企业税负高低与企业是否依法纳税&#xff0c;千万不可小觑。 同时&#xff0…

MySQL 管理端口

错误 客户出现 MySQL连接数 超过 最大连接数的现象 ERROR 1040 (HY000): Too many connections 出现该现象&#xff0c;一般的解决方法&#xff1a; 1.修改配置文件中的最大连接数&#xff0c;之后重启数据库 2.如果配置文件中没有设置 连接超时时间的参数。8小时后&#…

平衡小车——编码器

学习目标 了解编码器的构成理解编码器采样原理掌握编码器获取转速信息学习内容 编码器组成 左侧的减速齿轮中间的电机部分右侧的电路板减速齿轮 将电机转速通过齿轮按照一定比例进行降速。 电路板 电路板中,包含了一个圆形磁体,还有两个霍尔传感器。 电机转动时,圆形的磁…

Spring之AOP源码(一)

文章目录 一、动态代理1. 概念2. Cglib动态代理的使用3. JDK动态代理的使用 二、SpringAOP1. 简介2. Spring AOP使用 一、动态代理 1. 概念 动态代理&#xff08;Dynamic Proxy&#xff09;是一种在运行时动态生成代理对象的技术。它是一种设计模式&#xff0c;用于在不修改原…

安卓手机变iOS!

Launcher iOS 16 - 安卓手机秒变iOS Launcher iOS 16 是一款iOS启动器&#xff0c;可以将安卓手机桌面变成iOS样子&#xff0c;还有iOS的开机动画和景深效果&#xff01; 下载链接&#xff1a;【Launcher iOS 16】 ​

【CCNet】《CCNet:Criss-Cross Attention for Semantic Segmentation》

ICCV-2019 文章目录 1 Background and Motivation2 Related Work3 Advantages / Contributions4 Method5 Experiments5.1 Datasets and Metrics5.2 Experiments on Cityscapess5.3 Experiments on ADE20K5.4 Experiments on COCO 6 Conclusion&#xff08;own&#xff09; 1 Ba…

也谈人工智能——AI科普入门

文章目录 1. 科普入门人工智能的定义人工智能的类型 - 弱 AI 与强 AI人工智能、深度学习与机器学习人工智能的应用和使用场景语音识别计算机视觉客户服务建议引擎数据分析网络安全 行业应用人工智能发展史![img](https://img-blog.csdnimg.cn/img_convert/66aeaaeac6870f432fc4…

Vue的api接口封装以及使用说明、模块说明

在Api目录下面建立user.js&#xff0c;如果以后有不同的接口请求地址都可以单独创建不同的&#xff0c;目的是方便维护&#xff01; import request from /utils/request 这个代码是引入之前封装好的 request.js 文件&#xff0c;具体可以参考上门一篇文档 Vue的request.js模…

深入剖析开源大模型+Langchain框架,智能问答系统性能下降原因

大模型&#xff08;LLM&#xff09;相关理论研究与工程实践随着 GPT3 的发布&#xff0c;在学术界、工业界大爆发&#xff0c;备受各行各业关注&#xff0c;并涌现出一些赋能行业、促进生产力、生产关系变革的实践。GPT3 [1] 以及斯坦福计算机学院近 100 教授联名论文 [2] 将大…

pymssql 报错误解决办法:20002, severity 9

错误 解决办法 python3.6&#xff0c;安装pymssql低版本&#xff08;pymssql-2.1.5-cp36-cp36m-win32.whl&#xff09;

腾讯云有没有免费云服务器?如何申请?

腾讯云免费服务器申请入口 https://curl.qcloud.com/FJhqoVDP 免费服务器可选轻量应用服务器和云服务器CVM&#xff0c;轻量配置可选2核2G3M、2核8G7M和4核8G12M&#xff0c;CVM云服务器可选2核2G3M和2核4G3M配置&#xff0c;腾讯云百科txybk.com分享2024年最新腾讯云免费服务器…

【前后端的那些事】快速上手富文本+富文本图片上传

文章目录 fullText富文本1. 后端接口1.1 定义常量1.2 定义返回实体类1.3 上传图片接口1.4 下载图片接口 2. 前端代码编写2.1 安装2.2 快速使用 3. 配置富文本图片上传地址3.1 配置图片上传配置 4. 全部代码展示 前言&#xff1a;最近写项目&#xff0c;发现了一些很有意思的功能…

【JaveWeb教程】(23) MySQL数据库开发之事务与索引 详细代码示例讲解(最全面)

目录 2. 事务2.1 介绍2.2 操作2.3 四大特性 3. 索引3.1 介绍3.2 结构3.3 语法 2. 事务 场景&#xff1a;学工部整个部门解散了&#xff0c;该部门及部门下的员工都需要删除了。 操作&#xff1a; -- 删除学工部 delete from dept where id 1; -- 删除成功-- 删除学工部的员工…

定制一套ERP系统大概要多少钱?ERP软件定制报价

定制一套ERP系统大概要多少钱&#xff1f;ERP软件定制报价 每个企业的需求和情况都是独特的&#xff0c;在不清楚题主所在企业的规模、业务流程、所需功能等情况时&#xff0c;确实没办法给出项目预算。 我们公司也定制过管理系统&#xff0c;经验就是&#xff0c;建议在开始…

MySQL数据库设计原则

0.简单的处理逻辑 一.MySQL完整性约束 主键约束 primary key 自增键约束 auto_increment 唯一键约束 unique 非空约束 not null 默认值约束 default 外键约束 foreign key 下面是一个sql语句创建一个表,可以看出来了使用了哪几个约束吗? create table user( id int…

如何将重复方法封装为Aop切面并结合注解使用

首先要导入依赖 <dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId> </dependency> 编写注解 package com.yg.domain.note;import java.lang.annotation.ElementType; import java.lang.annotation.Rete…