系列五、Java操作RocketMQ简单消息之同步消息

news2025/1/31 8:26:35

一、概述

        同步消息的特征是消息发出后会有一个返回值,即RocketMQ服务器收到消息后的一个确认,这种方式非常安全,但是性能上却没有那么高,而且在集群模式下,也是要等到所有的从机都复制了消息以后才会返回,适用于重要的消息传递,例如:短信通知

二、案例代码

2.1、pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.star</groupId>
  <artifactId>rocketmq-example</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>rocketmq-example</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>

    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.5.0</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.13.2</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>2.0.25</version>
    </dependency>
    <dependency>
      <groupId>cn.hutool</groupId>
      <artifactId>hutool-all</artifactId>
      <version>5.8.16</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-collections4</artifactId>
      <version>4.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.12.0</version>
    </dependency>

    <!-- 普通maven项目中使用Sl4j注解 -->
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.22</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>1.7.32</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.10</version>
    </dependency>

  </dependencies>

  <!-- 锁定Java编译的版本 -->
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.8.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

2.2、RocketMQConstant

package org.star.constants;

/**
 * @Author: 一叶浮萍归大海
 * @Date: 2023/7/27 16:42
 * @Description:
 */
public class RocketMQConstant {

    /**
     * 配置RocketMQ NameSrv的地址
     */
    public static final String NAME_SERVER_ADDR = "192.168.173.219:9876";

}

2.3、SimpleConsumer

package org.star.simple.consumer;

import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.star.constants.RocketMQConstant;

import java.util.List;

/**
 * @Author: 一叶浮萍归大海
 * @Date: 2023/8/25 10:20
 * @Description: 简单消息消费者
 */
@Slf4j
public class SimpleConsumer {

    public static void main(String[] args) throws Exception {
        /**
         * 消费消息分两种
         * (1)拉模式:消费者主动去Broker上拉消息
         * (2)推模式:消费者等待Broker把消息推送过来
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SimpleMessageGroup");
        consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
        consumer.subscribe("SimpleTopic", "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                if (CollectionUtils.isNotEmpty(list)) {
                    String body = StrUtil.utf8Str(list.get(0).getBody());
                    log.info("收到消息 body:{}",body);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("consumer started!");
    }

}

2.4、SyncProducer

package org.star.simple.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.star.constants.RocketMQConstant;

import java.nio.charset.StandardCharsets;

/**
 * @Author: 一叶浮萍归大海
 * @Date: 2023/8/25 10:12
 * @Description: 同步发送:等待消息返回后再继续执行下面的操作
 */
@Slf4j
public class SyncProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducerGroup");
        producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
        producer.start();
        for (int i = 0; i < 3; i++) {
            Message message = new Message("SimpleTopic", ("我是第[" + i + "]个简单消息").getBytes(StandardCharsets.UTF_8));
            SendResult sendResult = producer.send(message);
            log.info("第[" + i + "]个简单消息发送成功 sendStatus:{},msgId:{},topic:{}", sendResult.getSendStatus(),sendResult.getMsgId(),sendResult.getMessageQueue().getTopic());
        }
        producer.shutdown();
    }

}

2.5、控制台打印结果

# 生产者端
12:14:32.339 [main] INFO org.star.simple.producer.SyncProducer - 第[0]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9850000,topic:SimpleTopic
12:14:32.343 [main] INFO org.star.simple.producer.SyncProducer - 第[1]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9940001,topic:SimpleTopic
12:14:32.348 [main] INFO org.star.simple.producer.SyncProducer - 第[2]个简单消息发送成功 sendStatus:SEND_OK,msgId:C0A81FB25D9418B4AAC207C6D9970002,topic:SimpleTopic


# 消费者端
12:14:32.337 [ConsumeMessageThread_10] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[0]个简单消息
12:14:32.345 [ConsumeMessageThread_11] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[1]个简单消息
12:14:32.349 [ConsumeMessageThread_12] INFO org.star.simple.consumer.SimpleConsumer - 收到消息 body:我是第[2]个简单消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/965039.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【vue2第九章】组件化开发和根组件以及style上的scoped作用

组件化开发和根组件 什么是组件化开发&#xff1f; 一个页面可以拆分为多个组件&#xff0c;每个组件有自己的样式&#xff0c;结构&#xff0c;行为&#xff0c;组件化开发的好处就是&#xff0c;便于维护&#xff0c;利于重复利用&#xff0c;提升开发的效率。 便于维护&…

输出归一化位置式PID(完整梯形图代码)

SMART PLC单自由度和双自由度位置式PID的完整源代码,请参看下面文章链接: 位置式PID(S7-200SMART 单自由度、双自由度梯形图源代码)_RXXW_Dor的博客-CSDN博客有关位置型PID和增量型PID的更多详细介绍请参看PID专栏的相关文章,链接如下:SMART PLC增量型PID算法和梯形图代码…

已解决‘jupyter‘ 不是内部或外部命令,也不是可运行的程序或批处理文件报错

本文摘要&#xff1a;本文已解决‘jupyter‘ 不是内部或外部命令&#xff0c;也不是可运行的程序或批处理文件的相关报错问题&#xff0c;并系统性地总结提出了几种可用解决方案。同时结合人工智能GPT排除可能得隐患及错误。 &#x1f60e; 作者介绍&#xff1a;我是程序员洲洲…

利用frps搭建本地自签名https服务的透传

nginx的搭建就不介绍了&#xff0c;教程很多&#xff0c;基本上油手就会。 在本例中&#xff0c;frp服务器的域名是 www.yourfrp.com&#xff0c;同时也是反向代理nginx服务器; 本地网站要用的域名&#xff1a; test.abcd.com 请事先将 test.abcd.com 解析到 frp所在服务器…

java.sql.SQLException: com.mysql.cj.jdbc.Driver

这篇文章分享一下Springboot整合Elasticsearch时遇到的一个问题&#xff0c;项目正常启动&#xff0c;但是查询数据库的时候发生了一个异常java.sql.SQLException: com.mysql.cj.jdbc.Driver java.sql.SQLException: com.mysql.cj.jdbc.Driverat com.alibaba.druid.util.JdbcU…

docker-compose 部署 Seata整合nacos,Postgresql 为DB存储

docker-compose 部署 Seata整合nacos,Postgresql 为DB存储 环境 详情环境可参考 https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E 我这里 <spring.cloud.alibaba-version>2021.1</spring.cloud.alibaba-version>所…

【前端入门案例1】HTML + CSS

案例一 <!DOCTYPE html> <html lang"en-US"><head><meta charset"utf-8"><meta name"viewport" content"widthdevice-width"><title>My test page</title> </head><body><…

浅析Linux SCSI子系统:错误恢复

文章目录 概述SCSI错误恢复处理添加错误恢复命令错误恢复线程scsi_eh_ready_devs IO超时处理相关参考 概述 IO路径是一个漫长的过程&#xff0c;从SCSI命令请求下发到请求完成返回&#xff0c;中间的任何一个环节出现问题都会导致IO请求的失败。从SCSI子系统到低层驱动&#x…

002图的基本概念与表示方法

文章目录 一. 图的组成二. 本体图2.1 什么是本体图2.2 怎么设计本体图 三. 图的种类3.1 按连接是否有向分3.2 按本体图分3.3 按连接是否带权重分 四. 节点连接数&#xff08;节点的度&#xff09;4.1 无向图节点的度4.2 有向图节点的度 五. 图的表示方法5.1 邻接矩阵5.2 连接列…

图:有向无环图(DAG)

1.有向无环图的定义 有向无环图:若一个有向图中不存在环&#xff0c;则称为有向无环图。 简称DAG图(Directed Acyclic Graph) 顶点中不可能出现重复的操作数。 2.有向无环图的应用 1.描述算数表达式 用有向无环图描述算术表达式。 解题步骤&#xff1a; 把各个操作数不重…

【研究开源机器臂】(1):参加机器人展览,突然来了兴趣要做个机器臂来研究下,先进行产品调研。研究其他人的方案,进行技术调研。

1&#xff0c;关于 6 轴机器人开源项目 使用步进电机做的开源机器人项目&#xff1a; https://github.com/SkyentificGit/SmallRobotArm 按照这个关键字进行搜索&#xff0c;已经有很多人研究成功了。 3D打印6轴机械臂-自制 2&#xff0c;然后是小点的 3 轴机器人 github项…

2023年03月 C/C++(六级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C编程&#xff08;1~8级&#xff09;全部真题・点这里 第1题&#xff1a;波兰表达式 波兰表达式是一种把运算符前置的算术表达式&#xff0c;例如普通的表达式2 3的波兰表示法为 2 3。波兰表达式的优点是运算符之间不必有优先级关系&#xff0c;也不必用括号改变运算次序&am…

async的用法

有以下几种形式 //从c11到c17有 template< class Function, class... Args > std::future<typename std::result_of<typename std::decay<Function>::type(typename std::decay<Args>::type...)>::type>async( Function&& f, Args&…

el-select 选择一条数据后,把其余数据带过来

1. 案例&#xff1a; ps: 票号是下拉框选择&#xff0c;风险分类、场站名称以及开始时间是选择【票号】后带过来的。 2. 思路: 使用官网上给的方法&#xff0c;选择之后&#xff0c;触发change方法从而给其余字段赋值 3. 代码 <el-form-itemlabel"票号&#xff1a;&…

C++网狐服务器引入开源日志库spdlog

很多人对日志库不以为然&#xff0c;包括网狐这种十几年的公司都不重视&#xff0c;其实日志库记录的东西能在线上出问题时高效解决&#xff0c;特别是别人写的东西&#xff0c;人又走了&#xff0c;出了问题&#xff0c;还可以用日志分析快速解决。要是没有日志记录&#xff0…

嵌入式开发-串口通信介绍

串口通信&#xff08;Serial Communications&#xff09;是一种串行数据传输方式&#xff0c;它将数据一位一位地顺序传输&#xff0c;而不是并行传输。这种方式相比并行传输更为节省空间和成本&#xff0c;因此在现代工业和嵌入式系统中得到广泛应用。 串口通信通常使用RS-23…

代码随想录算法训练营第五十三天 | 1143.最长公共子序列,1035.不相交的线,53. 最大子序和 动态规划

代码随想录算法训练营第五十三天 | 1143.最长公共子序列&#xff0c;1035.不相交的线&#xff0c;53. 最大子序和 动态规划 1143.最长公共子序列1035.不相交的线53. 最大子序和 动态规划 1143.最长公共子序列 题目链接 视频讲解 给定两个字符串 text1 和 text2&#xff0c;返回…

Qt使用Json

包含目录&#xff1a; #include <QJsonObject> #include <QJsonDocument> #include <QByteArray> #include <QFile> #include <QJsonArray>基本结构&#xff1a; 写json QJsonObject studentobj;QJsonArray arrarydata;QJsonObject subdata;…

【Java核心知识】ThreadLocal相关知识

ThreadLocal 什么是ThreadLocal ThreadLoacal类可以为每个线程保存一份独有的变量&#xff0c;该变量对于每个线程都是独占的。实现原理为每个Thread类中包含一个ThreadHashMap&#xff0c;key为变量的name&#xff0c;value为变量的值。 在日常使用中&#xff0c;我们可以通…

javaScipt

javaScipt 一、JavaScript简介二、javaScript基础1、输入输出语法2、变量3、常量4、数据类型4.1、数字型 number4.2、字符串类型 string4.3、布尔类型 boolean4.4、未定义类型 undefined4.5、null 空类型4.6、typeof 检测变量数据类型 5、数据类型转换5.1、隐式转换5.2、显示转…