Springboot整合阿里云ONS RocketMq(4.0 http)

news2024/11/24 17:46:34

1. 引入依赖

<!--阿里云ons,方便的接入到云服务-->
<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>ons-client</artifactId>
  <version>1.8.4.Final</version>
</dependency>

2. 配置

配置注意事项:

  1. nameSrvAddr我这里是用的4.0版本的支持http,5.0不支持http
    image.png
  2. 一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
  3. 订阅关系参考官方文档: 订阅关系一致
  4. 此处我配置了多个GroupId,Tag,Topic(order,market,vehicle)如果不需要配置一个即可,对应基本配置类需要增减对应属性
aliyun:
  rocketmq:
    accessKey: LTAI5txxxxxxx
    secretKey: Afq06tBxrdBxxxxxxxx
    nameSrvAddr: http://MQ_INST_xxxxxxxxxx_BYkZuJCq.cn-beijing.mq.aliyuncs.com:80
    orderGroupId: GID_xxxxxx_test
    orderTag: 'order'
    orderTopic: vehicle-order-test
    marketGroupId: GID_xxxxxx2_test
    marketTag: 'market'
    marketTopic: vehicle-market-test
    vehicleGroupId: GID_xxxxxx3_test
    vehicleTag: 'vehicle'
    vehicleTopic: vehicle-order-test

3. 配置类

3.1 基本配置类

package com.vehicle.manager.core.config;

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * Rocket MQ 配置类
 * @author zr 2024/3/1
 */
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq")
@Data
public class RocketMqConfig {

    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String marketGroupId;
    private String marketTopic;
    private String marketTag;
    private String orderTopic;
    private String orderGroupId;
    private String orderTag;
    private String vehicleTopic;
    private String vehicleGroupId;
    private String vehicleTag;

    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        return properties;
    }
}

3.2 生产者配置

package com.vehicle.manager.core.config;

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author zr 2024/3/1
 */
@Configuration
public class ProducerConfig {
    @Autowired
    private RocketMqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        return producer;
    }
}

3.3 消费者配置

package com.vehicle.manager.core.config;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.vehicle.manager.core.listener.VehicleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * RocketMq消费者
 * @author zr 2024/3/1
 */
@Configuration
public class VehicleConsumerConfig {
    @Autowired
    private RocketMqConfig mqConfig;

    @Autowired
    private VehicleListener vehicleListener;


    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildVehicleBuyerConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getVehicleGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getVehicleTopic());
        subscription.setExpression(mqConfig.getVehicleTag());
        subscriptionTable.put(subscription, vehicleListener);
        //订阅多个topic如上面设置

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

4. 生产者工具类

  • MessageRecord为记录消息发送的对象,可以自行根据字段进行设计调整
  • 参数说明:
    • topic – 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-“和下划线”_"构成.
    • tag – 消息标签, 请使用合法标识符, 尽量简短且见名知意
    • key – 业务主键
    • body – 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
package com.vehicle.manager.core.util;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.vehicle.manager.core.config.RocketMqConfig;
import com.vehicle.manager.core.mapper.MessageRecordMapper;
import com.vehicle.manager.core.model.entity.MessageRecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;

/**
 * RocketMessageProducer rocketMQ消息生产者
 * @author zr 2024/3/1
 */
@Component
@Slf4j
public class RocketMessageProducer {
    private static ProducerBean producer;
    private static RocketMqConfig mqConfig;

    private  static MessageRecordMapper messageRecordMapper;

    @Autowired
    private  MessageRecordMapper messageRecordMapperInstance;


    @PostConstruct
    public void init() {
        RocketMessageProducer.messageRecordMapper = messageRecordMapperInstance;
    }

    public RocketMessageProducer(ProducerBean producer, RocketMqConfig mqConfig) {
        this.producer = producer;
        this.mqConfig = mqConfig;
    }

    /**
     * 生产车辆服务普通消息
     * @param tag
     * @param key
     * @param body
     */
    public  static void producerVehicleMsg(String tag, String key, String body) {
        Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
        long time = System.currentTimeMillis();
        try {
            SendResult sendResult = producer.send(msg);
            assert sendResult != null;
            log.info(time
                    + " Send mq message success.Topic is:" + msg.getTopic()
                    + " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()+" body is:"+new String(msg.getBody())
                    + " msgId is:" + sendResult.getMessageId());

            MessageRecord messageRecord = new MessageRecord();
            messageRecord.setPlatformType("mq");
            messageRecord.setMessageType("order");
            messageRecord.setMqMessageTopic(msg.getTopic());
            messageRecord.setMqMessageTag(msg.getTag());
            messageRecord.setMqMessageKey(msg.getKey());
            messageRecord.setMqMessageId(sendResult.getMessageId());
            messageRecord.setCreatedTime(LocalDateTime.now());
            messageRecord.setMessageContent(new String(msg.getBody()));
            messageRecordMapper.insert(messageRecord);
        } catch (ONSClientException e) {
            e.printStackTrace();
            log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
        }
    }

    /**
     * 生产车辆服务延时普通消息
     * @param tag  order:订单服务   vehicle:主要用于本服务的超时回应
     * @param key
     * @param body
     * @param delay 延迟秒
     */
    public  static void producerVehicleDelayMsg(String tag, String key, String body,Integer delay) {
        Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
        long time = System.currentTimeMillis();
        msg.setStartDeliverTime(time+ delay*1000);
        try {
            SendResult sendResult = producer.send(msg);
            assert sendResult != null;
            log.info(time
                    + " 发送消息成功.Topic is:" + msg.getTopic()
                    + " Tag 为:" + msg.getTag() + " Key 为:" + msg.getKey()+" body 为:"+new String(msg.getBody())
                    + " msgId 为:" + sendResult.getMessageId());
        } catch (ONSClientException e) {
            e.printStackTrace();
            log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
        }
    }
}

5. 消费者监听

package com.vehicle.manager.core.listener;

import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.vehicle.manager.core.model.dto.req.VehicleMQMessageDTO;
import com.vehicle.manager.core.service.HlCarService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


/**
 * @author zr 2024/3/1
 */
@Component
@Slf4j
public class VehicleListener implements MessageListener {
    @Autowired
    private HlCarService hlCarService;


    @Override
    public Action consume(Message message, ConsumeContext context) {

        log.info("VehicleReceive 消息: " + message);

        try {
            byte[] body = message.getBody();
            String s = new String(body);
            log.info(s);
            // VehicleMQMessageDTO需要自行根据业务封装
            VehicleMQMessageDTO vehicleMQMessageDTO = JSON.parseObject(s, VehicleMQMessageDTO.class);
            log.info(vehicleMQMessageDTO.toString());

            // 以下做你的业务处理
            // .........
            
            return Action.CommitMessage;//进行消息的确认
        } catch (Exception e) {
            log.info(e.getMessage());
            //消费失败
            return Action.ReconsumeLater;
        }
    }
}

6. 测试

6.1 发送消息

package com.vehicle.manager.core;

import com.alibaba.fastjson.JSON;
import com.vehicle.manager.api.StartApplication;
import com.vehicle.manager.core.util.RocketMessageProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;


/**
 * @author zr 2024/3/1
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class MqTest {
    
    @Test
    public void producerMsg() {
        RocketMessageProducer.producerVehicleMsg("vehicle","test", JSON.toJSONString(new String("testBody")));
    }
}

6.2 接收消息

image.png

7. 延时消息

如果需要使用延时消息可以参考RocketMessageProducer中有一个延时消息的方法producerVehicleDelayMsg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1831392.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

牛客周赛 46 F 祥子拆团

原题链接&#xff1a;F-祥子拆团 题目大意&#xff1a;多测&#xff0c;每次给a,b&#xff0c;要将a分解成b个数相乘&#xff0c;问有多少种分的方法。 思路&#xff1a;对a进行质因数分解&#xff0c;对每一个质数计数&#xff0c;然后分到b个篮子里面&#xff0c;允许篮子里…

SFNC —— 图像格式控制(三)

系列文章目录 SFNC —— 标准特征命名约定&#xff08;一&#xff09; SFNC —— 设备控制&#xff08;二&#xff09; SFNC —— 图像格式控制&#xff08;三&#xff09; 文章目录 系列文章目录4、图像格式控制&#xff08;Image Format Control&#xff09;1. 图像格式控制&…

Flink系列之:Generating Watermarks生成水印

Flink系列之&#xff1a;Generating Watermarks生成水印 一、水印策略简介二、使用水印策略三、处理闲置资源四、水印对齐五、编写水印生成器六、编写周期性水印生成器七、编写标点水印生成器八、水印策略和 Kafka 连接器九、Operators如何处理水印十、已弃用的AssignerWithPer…

电信网关配置管理系统 del_file.php 前台RCE漏洞复现

0x01 产品简介 中国电信集团有限公司(英文名称“China Telecom”、简称“中国电信”)成立于2000年9月,是中国特大型国有通信企业、上海世博会全球合作伙伴。电信网关配置管理系统是一个用于管理和配置电信网络中网关设备的软件系统。它可以帮助网络管理员实现对网关设备的远…

护眼台灯哪个牌子最好?学生台灯护眼好还是防近视好

当前&#xff0c;我国正面临日益严峻的青少年近视挑战。从小学到大学&#xff0c;学生的近视率居高不下&#xff0c;其中高度近视的比例更是令人担忧。这不仅直接威胁到孩子们的身体健康和视力发育&#xff0c;从长远来看&#xff0c;还可能对国家的未来发展和国家安全产生不利…

RTSP/Onvif安防监控平台EasyNVR抓包命令tcpdump使用不了,该如何解决?

安防视频监控汇聚EasyNVR智能安防视频监控平台&#xff0c;是基于RTSP/Onvif协议的安防视频平台&#xff0c;可支持将接入的视频流进行全平台、全终端分发&#xff0c;分发的视频流包括RTSP、RTMP、HTTP-FLV、WS-FLV、HLS、WebRTC等格式。平台可提供的视频能力包括&#xff1a;…

gitlab仓库中用git bash生成不是默认路径的ssh秘钥

使用命令 ssh-keygen -t rsa -b 4096 -C "your_emailexample.com" 会在默认路径生成秘钥&#xff0c;&#xff08;C:\Users\用户\.ssh\&#xff09; 想要修改默认路径使用如下命令&#xff1a; ssh-keygen -t rsa -b 4096 -C "your_emailexample.com"…

软链接和硬链接的详解 (Linux系统下)

文章目录 硬链接的引入软链接和硬链接的形成软链接硬链接 软硬链接区别的探究硬链接数结语 硬链接的引入 当我们在命令行中输入ll时会出现很多行信息&#xff0c;详情请看下面的图 ~~~~εεε(&#xffe3;▽&#xffe3;) 我在之前的几篇Linux的文章也讲过哦 (o&#xff9f;v…

2024最新版Node.js下载安装及环境配置教程(非常详细)

一、进入官网地址下载安装包 官网&#xff1a;Node.js — Run JavaScript Everywhere 其他版本下载&#xff1a;Node.js — Download Node.js (nodejs.org) 选择对应你系统的Node.js版本 二、安装程序 &#xff08;1&#xff09;下载完成后&#xff0c;双击安装包&#xf…

小白Linux提权

1.脏牛提权 原因&#xff1a; 内存子系统处理写入复制时&#xff0c;发生内存条件竞争&#xff0c;任务执行顺序异常&#xff0c;可导致应用崩溃&#xff0c;进一步执行其他代码。get_user_page内核函数在处理Copy-on-Write(以下使用COW表示)的过程中&#xff0c;可能产出竞态…

项目方案:社会视频资源整合接入汇聚系统解决方案(六)

目录 一、概述 1.1 应用背景 1.2 总体目标 1.3 设计原则 1.4 设计依据 1.5 术语解释 二、需求分析 2.1 政策分析 2.2 业务分析 2.3 系统需求 三、系统总体设计 3.1设计思路 3.2总体架构 3.3联网技术要求 四、视频整合及汇聚接入 4.1设计概述 4.2社会视频资源分类…

5G消息 x 政务 | 新型数智政务服务平台

5G消息 x 政务 | 新型数智政务服务平台 通过 5G 消息&#xff0c;帮助政府部门及公共事业部门优化服务品质、提高服务效能&#xff0c;打造现代政府的展示窗口、便民利企的服务窗口、营商环境的感知窗口&#xff0c;提供多元、透明、高效的线上政务服务。 5G消息 x 政务 —— 优…

新闻稿标题怎么写吸引人?建议收藏

一个好的标题&#xff0c;不仅能激发读者的好奇心&#xff0c;还能引导他们继续深入了解文章内容。本文伯乐网络传媒将为你揭秘新闻稿标题写作的十大技巧&#xff0c;让你轻松写出吸引人的标题。 1. 激发好奇心 a. 提出疑问&#xff1a;以问句的形式提出问题&#xff0c;让读者…

​1:25万基础电子地图(江西版)

我们在《50幅1:25万基础电子地图&#xff08;四川版&#xff09;》和《1&#xff1a;25基础电子地图&#xff08;云南版&#xff09;》等文中&#xff0c;为你分享过四川和云南的基础电子地图。 现在我们再为你分享江西的1&#xff1a;25万基础电子地图&#xff0c;你可以在文…

申请国外访问学者面签技巧有哪些?

申请国外访问学者面签是一项重要的步骤&#xff0c;关系到能否成功获得访问学者身份。以下是一些实用的面签技巧&#xff0c;帮助您顺利通过面试。 1.充分准备材料 成功的面签始于准备充分的材料。确保您的申请材料齐全&#xff0c;包括&#xff1a; 个人简历&#xff1a;突出…

Inpaint软件下载附加详细安装教程

​Inpaint是一款由Maxim Gapchenko开发的图像处理软件&#xff0c;它可以帮助用户轻松地去除图像中的水印和其他不需要的元素&#xff0c;这个软件的核心技术是基于图像处理算法的&#xff0c;它可以自动识别图片中的像素&#xff0c;并用周围的颜色进行替换&#xff0c;使得图…

新版FMEA培训内容中关于团队协作的部分可以怎么展开?

团队协作&#xff0c;作为新版FMEA的核心要素之一&#xff0c;其重要性不言而喻。在FMEA的分析过程中&#xff0c;团队成员的密切合作与沟通是确保分析全面性和准确性的关键。通过团队协作&#xff0c;不同领域的专家能够共同参与到潜在故障模式的识别、评估与预防中来&#xf…

Python 爬取淘宝指定搜索商品评论 标题 销量 计算sign

只需要替换原来的Cookie和token即可使用&#xff0c;自动计算对应链接地址的sign直接使用即可。需要注意是一个账号爬取过多会有验证码 import json import hashlib import random import timeimport pandas as pd import requestsresults []def fetch_review_list(datas, md…

MMpose安装实例

摘要&#xff1a; 这个大数据训练发展较快&#xff0c;各种版本问题&#xff0c;不太好匹配&#xff0c;仅是安装就会大费周章。本文图文并茂的描述了一种成功的安装方式。仅供参考。 使用的win版本是win11&#xff0c;英伟达显卡是GeForce GTX 1660 SUPER。 1.cuda版本选择 通…

学生用小台灯什么牌子的好?五大强劲护眼台灯牌子分享

在这个数码时代&#xff0c;人们对屏幕的依赖程度越来越高&#xff0c;尤其是孩子们。他们不仅在学校里需要长时间盯着教科书&#xff0c;还会在学习和娱乐中使用各种数码设备。然而&#xff0c;这也使得眼睛健康问题逐渐凸显&#xff0c;尤其是儿童近视的问题。为了保护视力&a…