activeMq将mqtt发布订阅转成消息队列

news2025/1/11 17:50:57

1、activemq.xml置文件新增如下内容

2、mqttx测试发送:

主题(配置的模糊匹配,为了并发):VirtualTopic/device/sendData/12312

3、mqtt接收的结果

4、程序处理

package com

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;

import org.apache.activemq.ActiveMQMessageAudit;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.Message;
import java.nio.charset.Charset;
import java.util.Date;

@Component
public class MessageHandler {

    @JmsListener(destination = "deviceQueue.receiveDate", containerFactory = "queueListener", concurrency = "1-3")
    public void deviceMessage(ActiveMQBytesMessage message) {
        System.out.println(StrUtil.str(message.getContent().getData(), Charset.defaultCharset()));
        System.out.println("###################" + message + "###################");
    }
}

 控制台打印

 全量的:activemq.xml

<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<!-- START SNIPPET: example -->
<beans
  xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">

    <!-- Allows us to use system properties as variables in this configuration file -->
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
            <value>file:${activemq.conf}/credentials.properties</value>
        </property>
    </bean>

    <!--
        The <broker> element is used to configure the ActiveMQ broker.
    -->
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic=">" >
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>


        <!--
            The managementContext is used to configure how ActiveMQ is exposed in
            JMX. By default, ActiveMQ uses the MBean server that is started by
            the JVM. For more information, see:

            http://activemq.apache.org/jmx.html
        -->
        <managementContext>
            <managementContext createConnector="false"/>
        </managementContext>

        <!--
            Configure message persistence for the broker. The default persistence
            mechanism is the KahaDB store (identified by the kahaDB tag).
            For more information, see:

            http://activemq.apache.org/persistence.html
        -->
        <persistenceAdapter>
            <kahaDB directory="${activemq.data}/kahadb"/>
        </persistenceAdapter>


          <!--
            The systemUsage controls the maximum amount of space the broker will
            use before disabling caching and/or slowing down producers. For more information, see:
            http://activemq.apache.org/producer-flow-control.html
          -->
          <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage percentOfJvmHeap="70" />
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/>
                </tempUsage>
            </systemUsage>
        </systemUsage>

        <!--
            The transport connectors expose ActiveMQ over a given protocol to
            clients and other brokers. For more information, see:

            http://activemq.apache.org/configuring-transports.html
        -->
        <transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
        </transportConnectors>

        <!-- destroy the spring context on shutdown to stop jetty -->
        <shutdownHooks>
            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
        </shutdownHooks>
		
		<!-- 消息订阅 -->
		<destinationInterceptors>
			<virtualDestinationInterceptor>
				<virtualDestinations>
					<compositeTopic name="VirtualTopic.device.sendData.*">
						<forwardTo>
							<queue physicalName="deviceQueue.receiveDate" />
						</forwardTo>
					</compositeTopic>
				</virtualDestinations>
			</virtualDestinationInterceptor>
		</destinationInterceptors>
		
		<plugins>
			<simpleAuthenticationPlugin>
				<users>
					<authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>
					<!--<authenticationUser username="user" password="password" groups="users"/>
					<authenticationUser username="guest" password="password" groups="guests"/> -->
				</users>
			</simpleAuthenticationPlugin>
		</plugins>
    </broker>

    <!--
        Enable web consoles, REST and Ajax APIs and demos
        The web consoles requires by default login, you can disable this in the jetty.xml file

        Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
    -->
    <import resource="jetty.xml"/>

</beans>
<!-- END SNIPPET: example -->

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1464651.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Vue】本地使用 axios 调用第三方接口并处理跨域

前端处理跨域 一. 开发准备 开发工具&#xff1a;VScode框架&#xff1a;Vue2项目结构&#xff1a;vue脚手架生成的标准项目&#xff08;以下仅显示主要部分&#xff09; 本地已搭建好的端口&#xff1a;8080要请求的第三方接口&#xff1a;http://1.11.1.111:端口号/xxx-api…

免费文档比对方案 基于Draftable的文档比对功能实现 避免key30天到期问题

一、需求说明 目前在通用文档系统中存在【文档比对】的功能&#xff0c;这里最好是支持word->word、pdf->pdf、word->pdf等形式的通用型比较。 二、首先说明下pdf pdf文件比较特殊&#xff0c;它实际上文字的坐标映射&#xff0c;没有常规文本的段落、句子等含义。所以…

C语言翻译环境:预编译+编译+汇编+链接详解

目录 翻译环境和运行环境 翻译环境 预处理&#xff08;预编译&#xff09; 编译 词法分析 语法分析 语义分析 汇编 链接 运行环境 ⭐翻译环境和运行环境 在ANSI C的任何⼀种实现中&#xff0c;存在两个不同的环境。 第1种是翻译环境&#xff0c;在这个环境中源代码被…

使用Streamlit构建纯LLM Chatbot WebUI傻瓜教程

文章目录 使用Streamlit构建纯LLM Chatbot WebUI傻瓜教程开发环境hello Streatelit显示DataFrame数据显示地图WebUI左右布局设置st.sidebar左侧布局st.columns右侧布局 大语言模型LLM Chatbot WebUI设置Chatbot页面布局showdataframe()显示dataframeshowLineChart()显示折线图s…

解决IntelliJ IDEA 2023版本创建Spring项目时Java只能选择17或21的问题

问题描述&#xff1a; 当使用IntelliJ IDEA2023版本中Spring Initializr新建Spring项目时&#xff0c;即使JDK配置项为1.8&#xff0c;Java配置项仍然只能选17或21. 在JDK为1.8版本情况下&#xff0c;Java选择17或21&#xff0c;点击NEXT按钮&#xff0c;则会弹窗提示SDK不支持…

AI绘画与修图:重塑数字艺术的新纪元

文章目录 一、AI绘画与修图的原理二、AI绘画的应用三、AI修图的优势四、面临的挑战五、未来发展趋势《AI绘画与修图实战&#xff1a;PhotoshopFirefly从入门到精通 轻松玩转AI绘画与修图实战》亮点内容简介作者简介 随着人工智能技术的飞速发展&#xff0c;AI绘画与修图已经成为…

关于设备连接有人云的使用及modbus rtu协议,服务器端TCP调试设置

有人云调试 调试过程问题1. 关于modbus rtu协议,实质上有三种modbus基本原理modbus 格式2. 关于modbus crc16通信校验3. 关于在ubuntu阿里云服务器端,监听网络数据之调试mNetAssist之前的一个项目,再拿出来回顾下。 调试过程 先 要在有人云,用手机号注册一个服务账号,官网显…

“职”想有你!庭田科技2024招聘开始啦!

关于|庭田科技 庭田科技有限公司&#xff08;简称&#xff1a;庭田科技&#xff09;是一家专注于计算机辅助工程(CAE)软件和高科技仪器设备的系统集成商和方案咨询服务供应商&#xff08;下设“上海庭田信息科技有限公司”与“西安庭田信息科技有限公司”&#xff09;。致力于…

基于springboot+vue的智慧社区系统(前后端分离)

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战&#xff0c;欢迎高校老师\讲师\同行交流合作 ​主要内容&#xff1a;毕业设计(Javaweb项目|小程序|Pyt…

css中选择器的优先级

CSS 的优先级是由选择器的特指度&#xff08;Specificity&#xff09;和重要性&#xff08;Importance&#xff09;决定的&#xff0c;以下是优先级规则&#xff1a; 特指度&#xff1a; ID 选择器 (#id): 每个ID选择器计为100。 类选择器 (.class)、属性选择器 ([attr]) 和伪…

手机上wmv怎么转换成视频mp4?3种视频转换方法分享

手机上wmv怎么转换成视频mp4&#xff1f;在手机上将WMV格式的视频转换成MP4格式&#xff0c;可以大大方便我们在不同平台和设备上播放和分享视频内容。WMV格式虽然在一些特定场合下使用广泛&#xff0c;但其兼容性和普及度不如MP4格式。MP4格式作为一种广泛支持的多媒体容器格式…

KafKa3.x基础

来源&#xff1a;B站 目录 定义消息队列传统消息队列的应用场景消息队列的两种模式 Kafka 基础架构Kafka 命令行操作主题命令行操作生产者命令行操作消费者命令行操作 Kafka 生产者生产者消息发送流程发送原理生产者重要参数列表 异步发送 API普通异步发送带回调函数的异步发送…

11.CSS3的媒介(media)查询

CSS3 的媒介(media)查询 经典真题 如何使用媒体查询实现视口宽度大于 320px 小于 640px 时 div 元素宽度变成 30% 媒体查询 媒体查询英文全称 Media Query&#xff0c;顾名思义就是会查询用户所使用的媒体或者媒介。 在现在&#xff0c;网页的浏览终端是越来越多了。用户可…

VG5032VDN 电压控制的晶体振荡器 (VCXO) 输出:LVDS

在今天繁复多变的电子市场中&#xff0c;设计师不断寻求更稳定、更灵活的时钟解决方案&#xff0c;以满足从通信网络到工业控制系统的广泛应用。VG5032VDN VCXO是一款高性能的电压控制晶体振荡器 它结合了高性能、多用途性和紧凑设计&#xff0c;是一款适合广泛应用的晶体振荡…

2.5网安学习第二阶段第五周回顾(个人学习记录使用)

本周重点 ①多进程和多线程 1、进程和线程 2、多线程爆破 ②Redis数据库 1、Redis的使用 2、Redis持久化 3、Redis未授权免密登录 ③嗅探和Python攻击脚本 1、嗅探&#xff08;端口扫描和IP扫描&#xff09; 2、SCAPY的应用 3、Python攻击脚本&#xff08;SYN半连接…

Day20_网络编程(软件结构,网络编程三要素,UDP网络编程,TCP网络编程)

文章目录 Day20 网络编程学习目标1 软件结构2 网络编程三要素2.1 IP地址和域名1、IP地址2、域名3、InetAddress类 2.2 端口号2.3 网络通信协议1、OSI参考模型和TCP/IP参考模型2、UDP协议3、TCP协议 2.4 Socket编程 3 UDP网络编程3.1 DatagramSocket和DatagramPacket1、Datagram…

测试公众号开发,链接访问问题

1.管道未打开&#xff0c;需要打开natapp代理&#xff0c;联系相关后端 2.在确保nginx代理配置路径无问题后&#xff0c;就是前端指向问题&#xff0c;联系相关前端 3.nginx代理配置路径不正确&#xff0c;打开页面是白板&#xff0c;看控制台报错&#xff0c;是否是静态资源…

我们在SqlSugar开发框架中,用到的一些设计模式

我们在《SqlSugar开发框架》中&#xff0c;有时候都会根据一些需要引入一些设计模式&#xff0c;主要的目的是为了解决问题提供便利和代码重用等目的。而不是为用而用&#xff0c;我们的目的是解决问题&#xff0c;并在一定的场景下以水到渠成的方式处理。不过引入任何的设计模…

【Linux取经路】文件系统之缓冲区

文章目录 一、先看现象二、用户缓冲区的引入三、用户缓冲区的刷新策略四、为什么要有用户缓冲区五、现象解释六、结语 一、先看现象 #include <stdio.h> #include <string.h> #include <unistd.h>int main() {const char* fstr "Hello fwrite\n"…

谷歌seo推广的费用是多少?

真正的开销取决于个人或企业的具体需求、目标及资源投入水平&#xff0c;如果你选择自己优化&#xff0c;成本主要体现为时间和努力——你需要花时间学习SEO的最佳实践并应用它们来优化你的网站。这种方式就几乎不需要花钱&#xff0c;但需要大量的学习和实践时间 而选择SEO专家…