RocketMQ学习记录

news2024/11/23 4:01:54

服务器操作系统版本:Ubuntu 24.04

Java版本:21

Spring Boot版本:3.3.5

如果打算用GUI,虚拟机安装Ubuntu 24.04,见虚拟机安装Ubuntu 24.04及其常用软件(2024.7)_ubuntu24.04-CSDN博客icon-default.png?t=O83Ahttps://blog.csdn.net/weixin_42173947/article/details/140335522如果打算用纯命令行,见

虚拟机安装Ubuntu 24.04服务器版(命令行版)-CSDN博客icon-default.png?t=O83Ahttps://blog.csdn.net/weixin_42173947/article/details/143747375

1 Ubuntu上部署RocketMQ

这里准备两台服务器,做集群使用,一台IP是192.168.100.200,一台是192.168.100.201

1.1 安装jdk8

首先需要部署JDK8+,这里我使用了JDK8

sudo apt-get install -y openjdk-8-jdk;

1.2 下载RocketMQ

这里使用5.2.0版本

mkdir -p /home/user/softwares;
cd /home/user/softwares;
wget https://dist.apache.org/repos/dist/release/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip;

1.3 解压,改文件夹名

unzip rocketmq-all-5.2.0-bin-release.zip;
mv rocketmq-all-5.2.0-bin-release rocketmq;

1.4 修改参数,减少内存消耗量

cd /home/user/softwares/rocketmq;
vim bin/runserver.sh;

-Xms4g 改为 -Xms256m,-Xmx4g 改为 -Xmx256m,-Xmn2g 改为 -Xmn128m

vim bin/runbroker.sh;

-Xmn4g 改为 -Xmn256m,-Xms8g 改为 -Xms256m, -Xmx8g 改为 -Xmx256m

1.5 启动NameServer

cd /home/user/softwares/rocketmq;
nohup sh bin/mqnamesrv &

验证namesrv是否启动成功

tail -f ~/logs/rocketmqlogs/namesrv.log;
jps -l;

1.6 启动Broker+Proxy

1.6.1 单点版

只有一个节点,最简单,最不稳定,一般用于测试

nohup sh bin/mqbroker -n <服务器IP>:9876 --enable-proxy &

1.6.2 全Master版

nohup sh bin/mqbroker -n <NameServer服务器IP1>:9876;<NameServer服务器IP2>:9876 -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-a.properties --enable-proxy &
nohup sh bin/mqbroker -n <NameServer服务器IP1>:9876;<NameServer服务器IP2>:9876 -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-b.properties --enable-proxy &

我这里IP是192.168.100.200,192.168.100.201,注意两台服务器要执行不一样的命令

nohup sh bin/mqbroker -n '192.168.100.200:9876;192.168.100.201:9876' -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-a.properties --enable-proxy &
nohup sh bin/mqbroker -n '192.168.100.100:9876;192.168.100.201:9876' -c /usr/local/softwares/rocketmq/conf/2m-noslave/broker-b.properties --enable-proxy &

验证broker是否启动成功

tail -f ~/logs/rocketmqlogs/proxy.log;
jps -l;

1.7 停止broker,namesrv

先停broker,后停namesrv

cd /home/user/softwares/rocketmq;
sh bin/mqshutdown broker;
sh bin/mqshutdown namesrv;

1.8 测试生产者,消费者

重新开启namesrv和broker,然后执行下面操作

cd /usr/local/softwares/rocketmq;
export NAMESRV_ADDR=localhost:9876;
# 生产者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer;
# 消费者
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer;

1.9 防火墙放开RocketMQ端口

sudo ufw allow 8081/tcp;
sudo ufw allow 9876/tcp;
sudo ufw allow 10911/tcp;
sudo ufw allow 10909/tcp;
sudo ufw reload;

2 关于Topic的操作

2.1 创建Topic

cd /usr/local/softwares/rocketmq;

执行如下命令,创建一个叫TestTopic的Topic

nohup sh bin/mqadmin updatetopic -n <服务器IP>:9876 -t TestTopic -b <服务器IP>:10911 &
nohup sh bin/mqadmin updatetopic -n <服务器IP>:9876 -t TestTopic -b <服务器IP>:10911 &

注意修改IP

nohup sh bin/mqadmin updatetopic -n '192.168.100.200:9876;192.168.100.201:9876' -t TestTopic -b 192.168.100.200:10911 &
nohup sh bin/mqadmin updatetopic -n '192.168.100.200:9876;192.168.100.201:9876' -t TestTopic -b 192.168.100.201:10911 &

2.2 删除Topic

cd /usr/local/softwares/rocketmq;

执行如下命令

nohup sh bin/mqadmin deleteTopic -n <服务器IP>:9876 -t TestTopic -b <服务器IP>:10911 &

注意修改IP

nohup sh bin/mqadmin deleteTopic -n 192.168.100.200:9876 -t TestTopic -b 192.168.100.200:10911 &
nohup sh bin/mqadmin deleteTopic -n 192.168.100.201:9876 -t TestTopic -b 192.168.100.201:10911 &

2.3 查看Topic

cd /usr/local/softwares/rocketmq;

执行如下命令

sh bin/mqadmin topicList -n <服务器IP>:9876

注意修改IP

sh bin/mqadmin topicList -c -n 192.168.100.200:9876;
sh bin/mqadmin topicList -c -n 192.168.100.201:9876;

3 Java连接RocketMQ环境搭建

3.1 文件树结构

3.2 父节点pom.xml 

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.sliverbullet</groupId>
    <artifactId>jdk21-maven-test</artifactId>
    <packaging>pom</packaging>

    <version>1.0</version>

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>21</java.version>
        <spring-boot.version>3.3.5</spring-boot.version>
    </properties>

    <modules>
        <module>springboot3-test</module>
        <module>rocketmq-test</module>
    </modules>

    <repositories>
        <repository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>public</id>
            <name>aliyun nexus</name>
            <url>https://maven.aliyun.com/repository/public</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </pluginRepository>
    </pluginRepositories>
</project>

3.3 子节点xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.sliverbullet</groupId>
        <artifactId>jdk21-maven-test</artifactId>
        <version>1.0</version>
    </parent>

    <artifactId>rocketmq-test</artifactId>

    <properties>
        <maven.compiler.source>21</maven.compiler.source>
        <maven.compiler.target>21</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>21</java.version>
        <spring-boot.version>3.3.5</spring-boot.version>
        <fastjson2-version>2.0.53</fastjson2-version>
        <lombok-version>1.18.34</lombok-version>
        <rocketmq-spring-boot-starter-version>2.3.0</rocketmq-spring-boot-starter-version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>${spring-boot.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>${spring-boot.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>${fastjson2-version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq-spring-boot-starter-version}</version>
        </dependency>
    </dependencies>

</project>

依赖环境搭建完成

4 RocketMQ基本配置

4.1 文件树结构

4.2 application.yml的配置

server:
  port: 8002
spring:
  application:
    name: rocketmq-test
  profiles:
    active: dev
machine-no: 1

4.3 application-dev.yml的配置

test,prod,自行配置

spring:

logging:
  file:
    path: D:/log/SpringBoot3-Test
    name: ${logging.file.path}/test.log
rocketmq:
  name-server: 192.168.100.200:9876;192.168.100.201:9876
  producer:
    group: boot-product
    send-message-timeout: 10000

4.4 RocketMQConsumer类

package com.sliverbullet.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

@Component
@Slf4j
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "my-consumer-test-topic", consumeTimeout = 1000L)
public class RocketMQConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
        LocalDateTime localDateTime = LocalDateTime.now();
        String formattedDate = dateTimeFormatter.format(localDateTime);
        log.info("RocketMQ消费者接收时间:{}" ,formattedDate);
        log.info("RocketMQ消费者接收内容:{}" ,message);
    }
}

4.5 IRocketMQService接口

package com.sliverbullet.service;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;

import java.util.List;
import java.util.Map;

/**
 * <p>
 * RocektMQ生产者常用发送消息方法
 * 最佳实践:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
 * </p>
 *
 * @author MrWen
 * @since 2022-01-06 17:10
 **/
public interface IRocketMQService {

    /**
     * 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
     * <p>
     * (send消息方法只要不抛异常,就代表发送成功。但不保证100%可靠投递(所有消息都一样,后面不在叙述)。
     * 要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
     * 解析看:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
     * )
     *
     * @param destination 主题名:标签 topicName:tags
     * @param msg         发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessage(String destination, Object msg);

    /**
     * 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
     *
     * @param topicName 主题名 topicName
     * @param tags      标签 tags
     * @param msg       发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessage(String topicName, String tags, Object msg);

    /**
     * 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
     *
     * @param topicName 主题名 topicName
     * @param tags      标签 tags
     * @param key       唯一标识码要设置到keys字段,方便将来定位消息丢失问题
     * @param msg       发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessage(String topicName, String tags, String key, Object msg);

    /**
     * 发送同步消息-SQL92模式
     * 需要配置RocketMQ服务器  vim conf/broker.conf  ##支持sql语句过滤  enablePropertyFilter=true
     * 在console控制台查看集群状态  enablePropertyFilter=true 才正常
     *
     * @param topicName 主题名 topicName
     * @param map       自定义属性
     * @param msg       发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg);


    /**
     * 发送同步消息-SQL92模式
     * 需要配置RocketMQ服务器  vim conf/broker.conf  ##支持sql语句过滤  enablePropertyFilter=true
     * 在console控制台查看集群状态  enablePropertyFilter=true 才正常
     *
     * @param topicName 主题名 topicName
     * @param map       自定义属性
     * @param key       唯一标识码要设置到keys字段,方便将来定位消息丢失问题
     * @param msg       发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg);


    /**
     * 发生异步消息(异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。)
     *
     * @param destination  主题名:标签 topicName:tags
     * @param msg          发送对象
     * @param sendCallback 异步回调函数
     */
    void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback);

    /**
     * 发送单向消息(这种方式主要用在不特别关心发送结果的场景,例如日志发送。)
     *
     * @param destination 主题名:标签 topicName:tags
     * @param msg         发送对象
     */
    void sendOneway(String destination, Object msg);

    /**
     * 发送批量消息(发送超过1MB,做了自动分割,超时时间设置30s(默认3s)),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB
     *
     * @param destination 主题名:标签 topicName:tags
     * @param list        批量消息
     */
    void sendBatchMessage(String destination, List<?> list);


    /**
     * 发送批量消息(发送超过1MB,做了自动分割。),注:默认最大是4MB,为了避免ListSplitter.calcMessageSize计算不精确及大批量数据发送超时才设置1MB
     *
     * @param topicName 主题名 topicName
     * @param tags      标签 tags
     * @param timeout   超时时间,空则默认设为30s
     * @param list      批量消息
     */
    void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list);

    /**
     * 发送延时消息(超时时间,设置30s(默认3s))
     * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18
     *
     * @param destination    主题名:标签 topicName:tags
     * @param msg            发送对象
     * @param delayTimeLevel 延时等级(从1开始)
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel);

    /**
     * 发送延时消息
     * 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     * 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18
     *
     * @param destination    主题名:标签 topicName:tags
     * @param msg            发送对象
     * @param timeout        超时时间(单位毫秒)
     * @param delayTimeLevel 延时等级(从1开始)
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel);


    /**
     * 发送顺序消息(分区有序,多个queue参与,即相对每个queue,消息都是有序的。)
     *
     * @param destination 主题名:标签 topicName:tags
     * @param msg         发送对象
     * @param hashKey     根据其哈希值取模后确定发送到哪一个queue队列
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendInOrder(String destination, Object msg, String hashKey);


    /**
     * 发送事务消息
     * 事务消息使用上的限制
     * 1:事务消息不支持延时消息和批量消息。
     * 2:为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionalMessageCheckListener 类来修改这个行为。
     * 3:事务消息将在 Broker 配置文件中的参数 transactionTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionTimeout 参数。
     * 4:事务性消息可能不止一次被检查或消费。
     * 5:提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
     * 6:事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
     *
     * @param destination 主题名:标签 topicName:tags
     * @param msg         发送对象
     * @param arg         arg
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessageInTransaction(String destination, Object msg, Object arg);
}

4.6 RocketMQServiceImpl接口实现类

package com.sliverbullet.service.impl;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import com.sliverbullet.service.IRocketMQService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.List;
import java.util.Map;

@Slf4j
@Service
public class RocketMQServiceImpl implements IRocketMQService {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public SendResult sendMessage(String topicName, Object msg) {
        MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
        Message<?> message = messageBuilder.build();
        SendResult sendResult = rocketMQTemplate.syncSend(topicName, message);
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("【RocketMQ测试】发送同步消息成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);
        } else {
            log.warn("【RocketMQ测试】发送同步消息不一定成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);
        }
        return sendResult;
    }

    @Override
    public SendResult sendMessage(String topicName, String tags, Object msg) {
        MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
        Message<?> message = messageBuilder.build();
        SendResult sendResult = rocketMQTemplate.syncSend(topicName + ":" + tags, message);
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("【RocketMQ测试】发送同步带Tag消息成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);
        } else {
            log.warn("【RocketMQ测试】发送同步带Tag消息不一定成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);
        }
        return sendResult;
    }

    @Override
    public SendResult sendMessage(String topicName, String tags, String key, Object msg) {
        MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
        if (StringUtils.isNotBlank(key)) {
            messageBuilder.setHeader(MessageConst.PROPERTY_KEYS, key);
        }
        Message<?> message = messageBuilder.build();
        SendResult sendResult = rocketMQTemplate.syncSend(topicName + ":" + tags, message);
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("【RocketMQ测试】发送同步带Tag和Key消息成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);
        } else {
            log.warn("【RocketMQ测试】发送同步带Tag和Key消息不一定成功, topicName: {}, tag:{}, msg: {}, sendResult: {}", topicName, tags, msg, sendResult);
        }
        return sendResult;
    }

    @Override
    public SendResult sendMessageBySql(String topicName, Map<String, Object> map, Object msg) {
        return null;
    }

    @Override
    public SendResult sendMessageBySql(String topicName, Map<String, Object> map, String key, Object msg) {
        return null;
    }

    @Override
    public void sendAsyncMessage(String destination, Object msg, SendCallback sendCallback) {

    }

    @Override
    public void sendOneway(String destination, Object msg) {

    }

    @Override
    public void sendBatchMessage(String destination, List<?> list) {

    }

    @Override
    public void sendBatchMessage(String topicName, String tags, Long timeout, List<?> list) {

    }

    @Override
    public SendResult sendDelayLevel(String destination, Object msg, int delayTimeLevel) {
        Message<?> message = MessageBuilder.withPayload(msg).build();
        SendResult sendResult = rocketMQTemplate.syncSend(destination, message, 10000L, delayTimeLevel);
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("【RocketMQ测试】发送延时消息成功, destination: {}, msg: {}, sendResult: {}", destination, message, sendResult);
        } else {
            log.warn("【RocketMQ测试】发送延时消息不一定成功, destination: {}, msg: {}, sendResult: {}", destination, message, sendResult);
        }
        return sendResult;
    }

    @Override
    public SendResult sendDelayLevel(String destination, Object msg, int timeout, int delayTimeLevel) {
        return null;
    }

    @Override
    public SendResult sendInOrder(String destination, Object msg, String hashKey) {
        return null;
    }

    @Override
    public SendResult sendMessageInTransaction(String destination, Object msg, Object arg) {
        return null;
    }
}

5 正式测试

5.1 基本同步消息测试

传入一个JSON,同步进入消息队列,消息队列同步消费

5.1.1 Controller层

    @Value("${machine-no}")
    private String machineNo;

    @Resource
    private IRocketMQService rocketMQService;

    @RequestMapping("/sync")
    public JSONObject send(@RequestBody JSONObject param) {
        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS");
        LocalDateTime localDateTime = LocalDateTime.now();
        String formattedDate = dateTimeFormatter.format(localDateTime);
        System.out.println(formattedDate);
        log.info("【SpringBoot3测试】-【RocketMQ测试】:{}", formattedDate);
        param.put("time", formattedDate);
        param.put("name", "Sliver");
        param.put("machine_no", machineNo);
        SendResult sendResult = rocketMQService.sendMessage("TestTopic", param);
        JSONObject returnJSONObject = JSONObject.parseObject(JSONObject.toJSONString(sendResult));
        return returnJSONObject;
    }

5.1.2 Service接口

    /**
     * 发送同步消息(这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。)
     * <p>
     * (send消息方法只要不抛异常,就代表发送成功。但不保证100%可靠投递(所有消息都一样,后面不在叙述)。
     * 要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。
     * 解析看:https://github.com/apache/rocketmq/blob/master/docs/cn/best_practice.md
     * )
     *
     * @param destination 主题名:标签 topicName:tags
     * @param msg         发送对象
     * @return 发送结果,只有为SEND_OK且同步Master服务器或同步刷盘才保证100%投递成功
     */
    SendResult sendMessage(String destination, Object msg);

5.1.3 Service接口实现类

    @Override
    public SendResult sendMessage(String topicName, Object msg) {
        MessageBuilder<?> messageBuilder = MessageBuilder.withPayload(msg);
        Message<?> message = messageBuilder.build();
        SendResult sendResult = rocketMQTemplate.syncSend(topicName, message);
        if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
            log.info("【RocketMQ测试】发送同步消息成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);
        } else {
            log.warn("【RocketMQ测试】发送同步消息不一定成功, topicName: {}, msg: {}, sendResult: {}", topicName, msg, sendResult);
        }
        return sendResult;
    }

5.1.4 访问测试

测试结果

{
    "traceOn": true,
    "regionId": "DefaultRegion",
    "messageQueue": {
        "queueId": 1,
        "topic": "TestTopic",
        "brokerName": "broker-a"
    },
    "msgId": "0ACC4A893DF836BAF30C6BBBD62D0000",
    "queueOffset": 0,
    "sendStatus": "SEND_OK",
    "offsetMsgId": "C0A864C800002A9F00000000000764BC",
    "transactionId": "0ACC4A893DF836BAF30C6BBBD62D0000"
}

后台日志

2024-11-21T22:04:32.151+08:00  INFO 15864 --- [rocketmq-test] [nio-8002-exec-4] c.s.controller.RocketMQSyncController    : 【SpringBoot3测试】-【RocketMQ测试】:2024-11-21 22:04:32.151695800
2024-11-21T22:04:32.202+08:00  INFO 15864 --- [rocketmq-test] [nio-8002-exec-4] c.s.service.impl.RocketMQServiceImpl     : 【RocketMQ测试】发送同步消息成功, topicName: TestTopic, msg: {"machine_no":"1","name":"Sliver","time":"2024-11-21 22:04:32.151695800"}, sendResult: SendResult [sendStatus=SEND_OK, msgId=0ACC4A893DF836BAF30C6BBBD62D0000, offsetMsgId=C0A864C800002A9F00000000000764BC, messageQueue=MessageQueue [topic=TestTopic, brokerName=broker-a, queueId=1], queueOffset=0]
2024-11-21T22:04:32.205+08:00  INFO 15864 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收时间:2024-11-21 22:04:32.205206100
2024-11-21T22:04:32.206+08:00  INFO 15864 --- [rocketmq-test] [er-test-topic_1] c.s.consumer.RocketMQConsumer            : RocketMQ消费者接收内容:{"machine_no":"1","name":"Sliver","time":"2024-11-21 22:04:32.151695800"}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2245715.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络安全之接入控制

身份鉴别 ​ 定义:验证主题真实身份与其所声称的身份是否符合的过程&#xff0c;主体可以是用户、进程、主机。同时也可实现防重放&#xff0c;防假冒。 ​ 分类:单向鉴别、双向鉴别、三向鉴别。 ​ 主题身份标识信息:密钥、用户名和口令、证书和私钥 Internet接入控制过程 …

Spring 框架七大模块(Java EE 学习笔记03)

​ ​核心容器模块&#xff08;Core Container&#xff09; 核心容器模块在Spring的功能体系中起着支撑性作用&#xff0c;是其他模块的基石。核心容器层主要由Beans模块、Core模块、Contex模块和SpEL模块组成。 &#xff08;1&#xff09;Beans模块。它提供了BeanFactory类&…

IPv6基础知识

IPv6是由IEIF提出的互聯網協議第六版&#xff0c;用來替代IPv4的下一代協議&#xff0c;它的提出不僅解決了網絡地址資源匱乏問題&#xff0c;也解決了多種接入設備接入互聯網的障礙。IPv6的地址長度為128位&#xff0c;可支持340多萬億個地址。如下圖&#xff0c;3ffe:1900:fe…

旷世yolox自定义数据训练和验证和onnx导出推理

目录 1.前言 2.代码 3.环境 4.自定义数据形态 5.配置文件 6.训练 7.验证 8.评估混淆矩阵 9.导出onnx 10.onnx推理 -- 补充&#xff1a;docker环境 1.前言 旷世科技的yolox比较清爽&#xff0c;效果也不错&#xff0c;简单总结主要有三点创新比较高&#xff1a;deco…

Electron开发构建工具electron-vite(alex8088)添加VueDevTools(VitePlugin)

零、介绍 本文章的electron-vite指的是这个项目&#x1f449;electron-vite仓库&#xff0c;electron-vite网站 本文章的VueDevTools指的是VueDevTools的Vite插件版&#x1f449;https://devtools.vuejs.org/guide/vite-plugin 一、有一个用electron-vite创建的项目 略 二、…

软件测试—— Selenium 常用函数(一)

前一篇文章&#xff1a;软件测试 —— 自动化基础-CSDN博客 目录 前言 一、窗口 1.屏幕截图 2.切换窗口 3.窗口设置大小 4.关闭窗口 二、等待 1.等待意义 2.强制等待 3.隐式等待 4.显式等待 总结 前言 在前一篇文章中&#xff0c;我们介绍了自动化的一些基础知识&a…

UE5 腿部IK 解决方案 footplacement

UE5系列文章目录 文章目录 UE5系列文章目录前言一、FootPlacement 是什么&#xff1f;二、具体实现 前言 在Unreal Engine 5 (UE5) 中&#xff0c;腿部IK&#xff08;Inverse Kinematics&#xff0c;逆向运动学&#xff09;是一个重要的动画技术&#xff0c;用于实现角色脚部准…

私有化部署视频平台EasyCVR宇视设备视频平台如何构建视频联网平台及升级视频转码业务?

在当今数字化、网络化的时代背景下&#xff0c;视频监控技术已广泛应用于各行各业&#xff0c;成为保障安全、提升效率的重要工具。然而&#xff0c;面对复杂多变的监控需求和跨区域、网络化的管理挑战&#xff0c;传统的视频监控解决方案往往显得力不从心。 EasyCVR视频融合云…

山东春季高考-C语言-综合应用题

&#xff08;2018年&#xff09;3.按要求编写以下C语言程序&#xff1a; &#xff08;1&#xff09;从键盘上输入三个整数a、b、c&#xff0c;判断能否以这三个数为边构成三角形&#xff0c;若可以则计算机三角形的面积且保留两位小数&#xff1b;若不可以则输出“不能构成三角…

Linux移植IMX6ULL记录 一:编译源码并支持能顺利进入linux

目录 前言 一、不修改文件进行编译 二、修改设备树文件进行编译 前言 我用的开发板是100_ask_imx6ull_pro&#xff0c;其自带的linux内核版本linux-4.9.88&#xff0c;然后从linux官网下载过来的linux-4.9.88版本的arch/arm/configs/defconfig和dts设备树文件并没有对imx6ull…

从Stream的 toList() 和 collect(Collectors.toList()) 方法看Java的不可变流

环境 JDK 21Windows 11 专业版IntelliJ IDEA 2024.1.6 背景 在使用Java的Stream的时候&#xff0c;常常会把流收集为List。 假设有List list1 如下&#xff1a; var list1 List.of("aaa", "bbbbbb", "cccc", "d", "eeeee&qu…

大语言模型---LoRA简介;LoRA的优势;LoRA训练步骤;总结

文章目录 1. 介绍2. LoRA的优势3. LoRA训练步骤&#xff1a;4.总结 1. 介绍 LoRA&#xff08;Low-Rank Adaptation&#xff09;是一种用于高效微调大模型的技术&#xff0c;它通过在已有模型的基础上引入低秩矩阵来减少训练模型时所需的参数量和计算量。具体来说&#xff0c;L…

Debug-031-近期功能实现小结

由于时间原因&#xff0c;没办法对每个小的功能点进行比较细致的总结&#xff0c;这里统一去记录一下最近的实现了的功能&#xff0c;算是存档备份&#xff0c;为今后开发带来便利和参考。 一、ACEeditor ACEeditor使用手册&#xff08;一&#xff09;_ace editor-CSDN博客 AC…

深度学习中的mAP

在深度学习中&#xff0c;mAP是指平均精度均值(mean Average Precision)&#xff0c;它是深度学习中评价模型好坏的一种指标(metric)&#xff0c;特别是在目标检测中。 精确率和召回率的概念&#xff1a; (1).精确率(Precision)&#xff1a;预测阳性结果中实际正确的比例(TP / …

基于SpringBoot+Vue的影院管理系统(含演示视频+运行截图+说明文档)

web启动链接地址&#xff1a; http://localhost:8082&#xff08;管理端&#xff09; http://localhost:8081&#xff08;用户端&#xff09; http://localhost:8082&#xff08;员工端&#xff09; 一、项目介绍 基于框架的系统&#xff0c;系统分为用户、员工和管理员三个…

科研实验室的数字化转型:Spring Boot系统

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理实验室管理系统的相关信息成为必然。开发合…

网络无人值守批量装机-cobbler

网络无人值守批量装机-cobbler 一、cobbler简介 ​ 上一节中的pxe+kickstart已经可以解决网络批量装机的问题了,但是环境配置过于复杂,而且仅针对某一个版本的操作系统进批量安装则无法满足目前复杂环境的部署需求。 ​ 本小节所讲的cobbler则是基于pxe+kickstart技术的二…

基于Java Springboot二手商品网站

一、作品包含 源码数据库全套环境和工具资源部署教程 二、项目技术 前端技术&#xff1a;Html、Css、Js、Vue、Element-ui 数据库&#xff1a;MySQL 后端技术&#xff1a;Java、Spring Boot、MyBatis 三、运行环境 开发工具&#xff1a;IDEA/eclipse 数据库&#xff1a;…

使用chrome 访问虚拟机Apache2 的默认页面,出现了ERR_ADDRESS_UNREACHABLE这个鸟问题

本地环境 主机MacOs Sequoia 15.1虚拟机Parallels Desktop 20 for Mac Pro Edition 版本 20.0.1 (55659)虚拟机-操作系统Ubuntu 22.04 服务器版本 最小安装 开发环境 编辑器编译器调试工具数据库http服务web开发防火墙Vim9Gcc13Gdb14Mysql8Apache2Php8.3Iptables 第一坑 数…

java: spire.pdf.free 9.12.3 create pdf

可以用windows 系统中文字体&#xff0c;也可以从文件夹的字体文件 /*** encoding: utf-8* 版权所有 2024 ©涂聚文有限公司* 许可信息查看&#xff1a;言語成了邀功盡責的功臣&#xff0c;還需要行爲每日來值班嗎* 描述&#xff1a;* # Author : geovindu,Geovin Du 涂…