springboot 使用RocketMQ客户端生产消费消息DEMO

news2025/1/21 0:51:24

创建springboot项目省略

项目依赖

注意:当前客户端版本是 5.1.3 ,安装的rocketmq服务的版本要与其对应

	<properties>
        <java.version>11</java.version>
        <rocketmq-client-java-version>5.1.3</rocketmq-client-java-version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq-client-java-version}</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

创建 JmsConfig


public class JmsConfig {
	//roketmq 服务地址
    public static String nameServerAddr = "192.168.2.109:9876";
	//主题
    public static String TOPIC = "test_topic";
}

创建生产者 Producer

package com.example.springbootrocketmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;

@Component
public class PayProducer {
	//生产组
    private String producerGroup = "test_group";

    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);

        //多个NameServer地址 多个地址 ; 号隔开
        producer.setNamesrvAddr(JmsConfig.nameServerAddr);
        start();
    }

    /**
     * 开始
     */
    public void start(){
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    public DefaultMQProducer getProducer(){
        return this.producer;
    }

    /**
     * 一般关闭上下文是关闭
     */
    @PreDestroy
    public void shutdown(){
        System.out.println("关闭....");
        this.producer.shutdown();
    }
}

创建消费者 Consumer


import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;

@Component
public class Consumer {

    private DefaultMQPushConsumer consumer;

    private String consumerGroup = "test_consumer_group";

    public PayConsumer() throws Exception{
        consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(JmsConfig.nameServerAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe(JmsConfig.TOPIC, "*");
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            try {
                Message msg = msgs.get(0);
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
                String topic = msg.getTopic();
                String body = new String(msg.getBody(), "utf-8");
                String tags = msg.getTags();
                String keys = msg.getKeys();
                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });

        consumer.start();
    }
}

配置 TestController


import com.example.springbootrocketmq.jms.JmsConfig;
import com.example.springbootrocketmq.jms.Producer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
public class TestController{

    @Autowired
    private Producer producer;

    @RequestMapping("/api/v1/test_cb")
    public Object callback(String text) throws Exception {

        Message message = new Message(JmsConfig.TOPIC,"taga",("hello rocketmq = "+ text).getBytes());
        SendResult sendResult = producer.getProducer().send(message);
        log.info(sendResult.toString());
        return null;
    }
}

测试结果

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1095373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EasyX趣味化编程note6,图片操作及文字

大家好这里是Dark FlameMaster 如果说前边所学的内容会给我们带来一定的乐趣&#xff0c;那么今天这篇可以说是最好玩的了&#xff0c;我们可以进一步改进写出小程序的好玩度&#xff0c;你甚至可以把身边的人或事写进一个小程序&#xff0c;制作一个小游戏&#xff0c;恶搞身边…

工业级Netty网关,京东是如何架构的?

说在前面 在40岁老架构师 尼恩的读者交流群(50)中&#xff0c;很多小伙伴拿到一线互联网企业如阿里、网易、有赞、希音、百度、滴滴的面试资格。 最近&#xff0c;尼恩指导一个小伙伴简历&#xff0c;写了一个《高并发网关项目》&#xff0c;此项目帮这个小伙拿到 字节/阿里/…

掌握Golang匿名函数

一个全面的指南&#xff0c;以理解和使用Golang中的匿名函数 Golang以其简单和高效而闻名&#xff0c;赋予开发人员各种编程范式。其中一项增强代码模块化和灵活性的功能就是匿名函数。在这篇正式的博客文章中&#xff0c;我们将踏上探索Golang匿名函数深度的旅程。通过真实世…

机器学习之Sigmoid函数

文章目录 Sigmoid函数是一种常用的数学函数&#xff0c;通常用于将实数映射到一个特定的区间。它的形状类似于"S"形状曲线&#xff0c;因此得名。Sigmoid函数在机器学习、神经网络和统计学中经常被使用&#xff0c;主要用于二元分类和处理概率值。 Sigmoid函数的一般…

【蓝桥】契合匹配

一、题目 1、题目描述 小蓝有很多齿轮&#xff0c;每个齿轮的凸起和凹陷分别用一个字符表示&#xff0c;一个字符串表示一个齿轮。 如果这两个齿轮的对应位分别是同一个字母的大小写&#xff0c;我们称这个两个齿轮是契合的。 例如&#xff1a;AbCDeFgh 和 aBcdEfGH 就是契合…

基于html+js编写的生命游戏

前言 本文将介绍一个基于htmljs的生命游戏&#xff0c;该项目只有一个html代码&#xff0c;无任何其他以来&#xff0c;UI方面采用了vueelement-plus进行渲染&#xff0c;游戏的界面基于canvas进行渲染&#xff0c;先来看一下成果。 我不知道游戏规则有没有写错&#xff0c;感…

Vue-3.2自定义创建项目

基于VueCli自定义创建项目架子 选择第三个 空格选中&#xff0c;再空格取消 选择vue2 其实就是mode模式&#xff0c;之后再去修改就可以&#xff0c;history和hash 选择less 无分号规范&#xff08;标准化&#xff09;&#xff0c;目前最流行的 将配置文件放在单独的文件中 是否…

Linux环境配置安装Redis

Windows版本因官网不在提供与支持&#xff0c;以下基于linux环境安装 前提&#xff1a; 1.一台linux服务器 2.服务器已安装gcc 安装 1、官网下载 https://redis.io/download/ 对应压缩包 2、上传压缩包至服务器并解压缩 tar -zxvf redis-stable.tar.gz3、cd 至该目录下 4、…

双周总结#002 - 红树林

红树林公园&#xff0c;一棵单独生长在海岸边的树&#xff0c;下面一根根树立的幼苗&#xff0c;是从它的根茎上生长出来的。傍晚落潮后&#xff0c;会有一只只小螃蟹在这里浪荡。当然&#xff0c;也会有海鸟在这里进食。 文档 深入了解 Commonjs 和 Es Module1 Web 开发中&am…

两道关于顺序表的经典算法

文章目录 力扣&#xff1a;[移除元素](https://leetcode.cn/problems/remove-element/)[力扣&#xff1a;88. 合并两个有序数组](https://leetcode.cn/problems/merge-sorted-array/) 力扣&#xff1a;移除元素 题目 给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移…

JAVA实战项目 超市商品管理系统

师傅开发的实战项目&#xff0c;感觉不错&#xff0c;拿出来分享分享。 目录 一、摘要1.1 简介1.2 项目录屏 二、研究内容三、系统设计3.1 用例图3.2 时序图3.3 类图3.4 E-R图 四、系统实现4.1 登录4.2 注册4.3 主页4.4 超市区域管理4.5 超市货架管理4.6 商品类型管理4.7 超市商…

JDBC操作BLOB类型字段

JDBC中Statement接口本身不能直接操作BLOB数据类型 操作BLOB数据类型需要使用PreparedStatement或者CallableStatement(存储过程) 这里演示通过PreparedStatement操作数据库BLOB字段 设置最大传入字节 一般是4M 可以通过以下命令修改 set global max_allowed_packet1024*1…

C语言,洛谷题,赦免战俘

先上答案&#xff0c;再对答案进行解释&#xff1a; #include <stdio.h> int arr[1025][1025] { 0 }; void fun(int bian,int x ,int y) {if (bian 2)//进入if再出去if之后&#xff0c;结束递归&#xff0c;因为递归在else里面{arr[x][y] 0;}else{int i 0;int j 0;…

【Linux】:Linux中Shell命令及其运行原理/权限的理解

Shell命令以及运行原理 Linux严格意义上说的是一个操作系统&#xff0c;我们称之为“核心&#xff08;kernel&#xff09;“ &#xff0c;但我们一般用户&#xff0c;不能直接使用kernel 而是通过kernel的“外壳”程序&#xff0c;也就是所谓的shell&#xff0c;来与kernel沟通…

SpringCloud之Gateway整合Sentinel服务降级和限流

1.下载Sentinel.jar可以图形界面配置限流和降级规则 地址:可能需要翻墙 下载jar文件 2.引入maven依赖 <!-- spring cloud gateway整合sentinel的依赖--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-s…

从基础到卷积神经网络(第14天)

1. PyTorch 神经网络基础 1.1 模型构造 1. 块和层 首先&#xff0c;回顾一下多层感知机 import torch from torch import nn from torch.nn import functional as Fnet nn.Sequential(nn.Linear(20, 256), nn.ReLU(), nn.Linear(256, 10))X torch.rand(2, 20) # 生成随机…

苍穹外卖(七) Spring Task 完成订单状态定时处理

Spring Task 完成订单状态定时处理, 如处理支付超时订单 Spring Task介绍 Spring Task 是Spring框架提供的任务调度工具&#xff0c;可以按照约定的时间自动执行某个代码逻辑。 应用场景: 信用卡每月还款提醒 火车票售票系统处理未支付订单 入职纪念日为用户发送通知 点外…

C++:多态讲解

多态 1.多态的概念2.多态的定义和实现2.1多态构成条件2.2虚函数2.3虚函数的重写(覆盖)2.4 C11 override 和 final2.5重载、重写(覆盖)、隐藏(重定义)的对比 3.抽象类4.多态的原理5.单继承和多继承关系的虚函数表5.1单继承5.2多继承5.3菱形继承和多态 1.多态的概念 多态的概念&…

【Vue面试题二十三】、你了解vue的diff算法吗?说说看

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;你了解vue的diff算法吗&…

MFC-对话框

目录 1、模态和非模态对话框&#xff1a; &#xff08;1&#xff09;、对话框的创建 &#xff08;2&#xff09;、更改默认的对话框名称 &#xff08;3&#xff09;、创建模态对话框 1&#xff09;、创建按钮跳转的界面 2&#xff09;、在跳转的窗口添加类 3&#xff0…