SpringCloudAlibaba:6.2RocketMQ的普通消息的使用

news2024/11/15 15:59:28

简介

普通消息也叫并发消息,是发送效率最高,使用最多的一种

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>java_sc_alibaba</artifactId>
        <groupId>jkw.life</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>test-rocketmq8009</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- rocketmq-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>5.1.0</version>
        </dependency>
        <!-- SpringMVC-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- test-->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

创建topic

mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic

测试类

package send_message;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.List;
@Slf4j
public class Test_01 {
    /**
     * 消息生成者【普通消息】
     * 创建topic:
     * mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t testtopic
     */
    @Test
    public void producer() throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        //1.初始化生产者【生产者组名】
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
        //2.rocketmq地址
        producer.setNamesrvAddr("192.168.66.101:9876");
        //3.启动生产者
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4.初始化消息对象【topic主题/标记过滤/消息体】
            Message message = new Message("testtopic", "Tags", (i + "_syncProducer").getBytes(StandardCharsets.UTF_8));
            //5.生产者发送消息
            SendResult send = producer.send(message);
            System.out.println(i + "消息发送成功:" + send);
        }
        //6.关闭生产者
        producer.shutdown();
    }
    /**
     * 消费者
     */
    @Test
    public void consumer() throws MQClientException, InterruptedException {
        //1.初始化消费者【消费者组名】
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SyncProducer");
        //2.rocketmq地址
        consumer.setNamesrvAddr("192.168.66.101:9876");
        //3.订阅主题
        consumer.subscribe("testtopic", "*");
        //4.监听消息
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("消费成功" + msg);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//消费成功
            }
        });
        //5.启动消费者
        consumer.start();
        //6.永远运行下去
        Thread.sleep(Long.MAX_VALUE);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1688716.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1701java药品进销存管理系统Myeclipse开发sqlserver数据库web结构java编程计算机网页项目

一、源码特点 java web药品进销存管理系统是一套完善的java web信息管理系统&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境 为 TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为s…

解决vue3 vite打包报Root file specified for compilation问题

解决方法&#xff1a; 修改package.json打包命令 把 "build": "vue-tsc --noEmit && vite build" 修改为 "build": "vite build" 就可以了 另外关于allowJs这个问题&#xff0c;在tsconfig.json文件中配置"allowJs&qu…

重学java 39.多线程 — 线程安全

逐渐成为一个情绪稳定且安静成长的人 ——24.5.24 线程安全 什么时候发生&#xff1f; 当多个线程访问同一个资源时&#xff0c;导致了数据有问题&#xff0c;出现并发问题&#xff0c;数据不能及时更新&#xff0c;导致数据发生错误&#xff0c;出现线程安全问题 多线程安全问…

【不太正常的题】LeetCode.232:用栈的函数接口实现队列

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;初阶数据结构刷题 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 &#x1f697; 1.问题描述&#xff1a; 题目中说了只能使用两个栈实现队列&#xff0c;并且只能使用…

【Crypto】password

文章目录 password解题感悟 password 试试flag{zs19900315} 提交成功 解题感悟 这题有点大病

软考--软件设计师--试题六--工厂方法模式(Factory Method)

工厂方法模式(Factory Method) 1、意图 定义一个用于创建对象的接口&#xff0c;让子类决定实例化哪儿一个类&#xff0c;factory method使一个类的实例化延迟到其子类。 2、结构 3、适用性 a、当一个类不知道它所必须创建的对象的类的时候。 b、当一个类希望由它的子类来指定…

UPPAAL使用方法

UPPAAL使用方法 由于刚开始学习时间自动机及其使用方法&#xff0c;对UPPAAL使用不太熟悉&#xff0c;网上能找到的教程很少&#xff0c;摸索了很久终于成功实现一个小例子&#xff0c;所以记录一下详细教程。 这里用到的例子参考【UPPAAL学习笔记】1&#xff1a;基本使用示例…

CyberScheduler调度引擎

CyberScheduler 架构设计 1. 多租户架构&#xff0c;支持 SaaS 化部署和私有化部署 2. 多源异构数据&#xff08;多种集群、数据库&#xff09;、多计算引擎、多类型任务的统一编排调度 3. 灵活资源管理能力&#xff0c;支持不同类型任务的资源管理和资源隔离&#xff0c;优…

docker 挂载运行镜像

文章目录 前言docker 挂载运行镜像1. 作用2. 命令3. 测试 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易啊^ _ ^。   而且听说点赞的人每天的运气都不会太差&#xff0c;实在白嫖的话&#xff0c;那欢…

1077: 平衡二叉树的判定

解法&#xff1a; 平衡二叉树是一种特殊的二叉树&#xff0c;它满足以下两个条件&#xff1a; 左子树和右子树的高度差不超过1&#xff08;即&#xff0c;左右子树高度差的绝对值不超过1&#xff09;。左子树和右子树都是平衡二叉树。 后序遍历过程中每次判断左右子树高度差…

收入极高的副业兼职,单价179元的养生爆款卖出3000份

做养生赛道切忌不要只靠接广告变现&#xff0c;换个思路以产品为核心&#xff0c;走低粉高变现才是变现效率最高的。 周周近财&#xff1a;让网络小白少花冤枉钱&#xff0c;赚取第一桶金 今天&#xff0c;我们要分析的这位养生博主&#xff0c;仅凭一款售价为179元的温通膏&a…

STM32 MAP文件结合固件文件分析

文章目录 加载域的结束地址并不是固件的结束地址&#xff1f;ROM中执行域的描述RAM中执行域的描述问题分析 中断向量表在固件中的存储位置代码段在固件中的位置只读数据Regin$$Table RW Data段其中的内部机理 总结 MAP 文件分析可以参考之前的文章 程序代码在未运行时在存储器…

漫谈企业信息化安全 - 勒索软件攻击

一、引言 首先&#xff0c;网络攻击是一个非常广泛的话题&#xff0c;网络攻击从一般分类上包含了恶意软件攻击、钓鱼攻击、拒绝服务攻击&#xff08;DoS/DDoS&#xff09;、中间人攻击、SQL注入、跨站脚本、0-Day攻击、供应链攻击、密码攻击等等&#xff0c;勒索软件攻击只是…

EfficientSAM分割对象后求其中图像中的高

1 分割对象 EfficientSAM https://github.com/yformer/EfficientSAM 2 计算在图像中最高点即y值最小点 import os import cv2def read_images(folder_path):image_files [f for f in os.listdir(folder_path) iff.endswith(".jpg") or f.endswith(".png&quo…

OSPF状态机及网络接口类型

、OSPF 状态机 Down一旦接收到hello 包进人下一个状态机 Init 初始化接收到的hello 包中&#xff0c;若存在本地的 RID&#xff0c;进入下一状态 2way 双向通讯--邻居关系建立的标志 条件匹配:点到点网络直接进入下一个状态机 MA 网络将进行 DR/BDR 选举(40S) 非 DR…

安卓数据存储(键值对、数据库、存储卡、应用组件Application、共享数据)

键值对 此小节介绍Android的键值对存储方式的使用方法&#xff0c;其中包括&#xff1a;如何将数据保存到共享参数&#xff0c;如何从共享参数读取数据&#xff0c;如何使用共享参数实现登陆页面的记住密码功能&#xff0c;如何使用Jetpack集成的数据仓库。 共享参数的用法 …

Linux笔记之命令行JSON处理器jq

Linux笔记之命令行JSON处理器jq code review! 文章目录 Linux笔记之命令行JSON处理器jq1.安装2.jq 基本用法3.例程3.1. 示例JSON文件3.2. 读取特定字段3.3. 管道过滤器&#xff08;Pipe Filters&#xff09;3.4. 映射过滤器&#xff08;Map Filters&#xff09;3.5. 条件过滤…

python使用jsonpath来查找key并赋值

目录 一、引言 二、JsonPath简介 三、Python中的JsonPath库 四、使用JsonPath查找JSON Key 五、使用JsonPath赋值JSON Key 六、高级用法 七、结论 一、引言 在数据驱动的现代应用中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;已成为一种广泛使…

使用echarts配置中国地图

使用echarts配置中国地图 首先要下载地图的geoJSON数据&#xff0c;有两个方式下载&#xff0c;一种是去echarts的github资源文件里面&#xff0c;一种是去阿里云的datav网站。 1.1 echarts文档下载中国地图json数据 1.2 阿里云datav 新建项目&#xff0c;新建index.html,下…

HeyGen AI是什么?怎样使用HeyGen AI?

在数字时代&#xff0c;视频内容为王。无论是在社交媒体还是网站上&#xff0c;视频都以其独特的方式吸引着人们的眼球。然而&#xff0c;制作出专业水准的视频往往需要大量的时间和技术知识。HeyGen AI正是为了解决这一难题而诞生的。 HeyGen AI简介 HeyGen AI是一个创新的视…