Java编码实现Activemq通讯

news2024/11/28 18:51:46

目录

一、IDEA建Maven工程

二、pom.xml

三、JMS编码总体架构

四、粗说目的地Destination——队列和主题

五、点对点消息传递域中,目的地被称为队列(queue)

六、在发布订阅消息传递域中,目的地被称为主题(topic)

七、topic和queue对比


一、IDEA建Maven工程

二、pom.xml

<dependencies>
        <!--  activemq  所需要的jar 包-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>
        <!--  activemq 和 spring 整合的基础包 -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
        <!--基础通用配置-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.30</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.11</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.16</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

三、JMS(Java平台上的专业技术规范)编码总体架构

 (1)回顾JDBC

 (2)JMS(Java平台上的专业技术规范)开发基本步骤

 

四、粗说目的地Destination——队列和主题

Destination是目的地。下面拿jvm和mq,做个对比。目的地,我们可以理解为是数据存储的地方。

Destination分为两种:队列和主题。

(1) 队列(queue——1对1)

 

(2)主题(topic——1对N)

 五、点对点消息传递域中,目的地被称为队列(queue)

(1)消息生产者

    public static final String ACTIVEMQ_URL = "tcp://虚拟机ip:61616";
    public static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        //1.创建链接工厂,按照给定的url地址,采用默认的用户和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过链接工厂,获得Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //3.1 两个参数 ①事务 ②签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建生产者
        MessageProducer producer = session.createProducer(queue);
        //6.通过MessageProducer生成3条消息到MQ队列中
        for(int i = 1 ; i <= 3; i ++ ) {
            TextMessage textMessage = session.createTextMessage("msg--" + i);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();

        System.out.println("消息发布到MQ完成……");
    }

(2)控制台说明

回到ActiveMQ前端控制台可以看到如下界面,有三条消息

 

① Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。

② Number Of Consumers:消费者数量,消费者端的消费者数量。

③ Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。

④ Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量。

总结:

当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。

当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。

当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

(3)消息消费者

① 方式一

    public static final String ACTIVEMQ_URL = "tcp://虚拟机ip:61616";
    public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
        //1.创建链接工厂,按照给定的url地址,采用默认的用户和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过链接工厂,获得Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //3.1 两个参数 ①事务 ②签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while(true) {
            //2000ms timeout时间
            //ActiveMQ有五种消息类型,这里发送时是TextMessage,接收时强转一下
            /*同步阻赛方式(receive())
              订阅者或接收者调Messaoeconsumerreceive()方法来接收消息,receive 方法在能够接收到消息之前(或超时之前)将一直阻塞*/
            TextMessage textMessage = (TextMessage) consumer.receive(2000);
            if(null != textMessage) {
                System.out.println("接收到消息:" + textMessage.getText());
            } else {
                break;
            }
        }
        consumer.close();
        session.close();
        connection.close();
    }

 注意:MessageConsumer的receive方法如果选择的是无参类型的,即不设置超时时间(Timeout),那么上图的Number Of Consumers数量为1,因为消息消费完后receive方法会一直等待,程序一直运行,Consumer也就一直在线

② 方式二

public static void main(String[] args) throws JMSException, IOException {
        //1.创建链接工厂,按照给定的url地址,采用默认的用户和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过链接工厂,获得Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //3.1 两个参数 ①事务 ②签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        

        //通过监听方式读取消息
        /*
        异步非阻塞方式(监听器onMessage())
            订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener     
            listener)注册一个消息监听器当消息到达之后,系统自动调用监听器MessageListener的 
            onMessage(Message message)方法。
        */
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //连接AvtiveMQ需要等待时间
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }

 

 ③ 消费者情况分析

  1.  一个消费者
  2. 两个消费者
    1. 先生成消息,先启动1号,在启动2号  结果:1号消费完所有消息,2号等待
    2. 先启动1,2号,再生成消息 结果:1,2号均分了消息,颇有些负载均衡的意味

④ 小总结

一、连接的基本步骤

 二、两种消费方式

① 同步阻塞方式(receive)

订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。

② 异步非阻塞方式(监听器onMessage())

订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。

三、队列特点

 

六、在发布订阅消息传递域中,目的地被称为主题(topic)

(1)topic介绍

在发布订阅消息传递域中,目的地被称为主题(topic)

发布/订阅消息传递域的特点如下:

① 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;

② 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。

③ 生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者

默认情况下如上所述,但是JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅。

(2)发布主题生产者

    public static final String ACTIVEMQ_URL = "tcp://虚拟机ip:61616";
    public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
        //1.创建链接工厂,按照给定的url地址,采用默认的用户和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过链接工厂,获得Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //3.1 两个参数 ①事务 ②签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(queue or topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5.创建生产者
        MessageProducer producer = session.createProducer(topic);
        //6.通过MessageProducer生成3条消息到MQ队列中
        for(int i = 1 ; i <= 3; i ++ ) {
            TextMessage textMessage = session.createTextMessage("topicMessage--" + i);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();

        System.out.println("topicMessage消息发布到MQ完成……");
    }

(3)订阅主题消费者

    public static final String ACTIVEMQ_URL = "tcp://虚拟机ip:61616";
    public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException {
        //1.创建链接工厂,按照给定的url地址,采用默认的用户和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过链接工厂,获得Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //3.1 两个参数 ①事务 ②签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(queue or topic)
        Topic topic = session.createTopic(TOPIC_NAME);
        //5.创建消费者
        MessageConsumer consumer = session.createConsumer(topic);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //连接AvtiveMQ需要等待时间
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }

(4)先启动消费者再启动生产者

① 启动消费者

 ② 启动生产者

 ③ 消费消息

 

④ 控制台

​​​​​​​

 七、topic和queue对比

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/85437.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue axios实战,请求天气预报接口

效果预览 创建流程 1. 创建项目目录 mkdir test22. 进入目录 cd test23. 引入vue 引入vue&#xff0c; 一路敲回车就行了 npm init vuelatest4. 启动项目 创建成功&#xff0c;启动项目 cd vue-project npm install npm run dev执行run 后的访问结果 > vue-project…

简单得令人发指,说不会一定是在骗领导:配准法在地图上叠加边缘透明旅游图演示

目录 1 前言 2 基本设置 3 配准设置 4 切图 5 更简单的切图方法 1 前言 居然还有人在抱怨&#xff0c;说在地图上叠加手绘图好麻烦啊好复杂&#xff0c;都干了一星期了又要推倒重来简直不想活了&#xff0c;眼睛都累哗啦了。。。嗯&#xff0c;我相信同学你没说假话&#…

网络工程师备考5章

5.1 考点分析 5.2 WLAN基础 注&#xff1a;例如上面图中蜂窝的A&#xff0c;周围全是其他的蜂窝&#xff0c;实现了相同频率的区域隔离&#xff0c;简单了解一下它发展的几个阶段。 注&#xff1a;简单了解即可。 WLAN、802.11Wifi 这三个有什么区别&#xff1f; WLAN是无线局…

链式二叉树(C语言实现)

文章目录&#xff1a;二叉树链式结构实现1.链式二叉树的结构2.遍历二叉树2.1前序遍历2.2中序遍历2.3后序遍历2.4层序遍历3.功能接口3.1二叉树节点个数3.2叶子节点个数3.3树的深度3.4第k层节点个数&#xff08;k>1&#xff09;3.5查找目标节点3.6判断是否为完全二叉树3.7构建…

AI-多模态-2022:TCL【triple contrastive learning】【三重对比学习的视觉-语言预训练模型】

论文&#xff1a;https://arxiv.org/pdf/2202.10401.pdf 代码&#xff1a;https://github.com/uta-smile/TCL 写在前面&#xff1a; CPC[1]这篇论文中&#xff0c;作者对互信息的公式进行了分析&#xff0c;得到互信息下界的相反数为InfoNCE loss&#xff0c;即最小化InfoNC…

IPv4地址和子网掩码

目录 1.ip的定义 A.官方定义 B.IP的表现形式 C.IP地址分类 2.网络地址和主机地址 3.子网掩码 4.应用场景 1.ip的定义 A.官方定义 IP地址是一种在Internet上的给主机编址的方式&#xff0c;也称为网际协议地址。IP地址是IP协议提供的一种统一的地址格式&#xff0c;它为…

three.js之组对象

文章目录简介例子查看组对象组对象相关方法addremove层级模型节点命名、查找、遍历模型命名例子遍历查找本地坐标与世界坐标例子本地坐标世界坐标缩放系数专栏目录请点击 简介 层级模型就是一个树的结构&#xff0c;他有一个组的概念&#xff0c;对于组我们可以进行旋转、平移…

操作系统中的进程

目录 什么是进程/任务&#xff08;Process/Task&#xff09; PCB的具体信息 1.pid 进程的身份标识 2.内存指针 3.文件描述符表 4.进程状态 5.进程优先级 6.进程上下文 7.进程 记账信息 虚拟地址空间 我的GitHub&#xff1a;Powerveil GitHub 我的Gitee&#xff1a;P…

28年蛰伏,易特驰打响「软件定义汽车」硬战

今年3月&#xff0c;特斯拉给车主推送了新的软件更新版本2022.8.2&#xff0c;更新内容包括&#xff1a;车辆温度预设改进、空调页面显示除雾/除霜提醒、预计充电时间更精准估算、正在进行的通话更新等。 这并不是特斯拉第一次通过升级OTA&#xff0c;增加新功能、完善现有功能…

防火墙的前世今生

防火墙的前世今生 1、第一代防火墙&#xff1a;包过滤防火墙&#xff0c;实现简单的访问控制&#xff0c;也就是我们经常在交换机路由器用到ACL技术 当我们192.168.1.1需要访问192.168.2.1的WEB服务的时候&#xff0c;先要去精确控制能匹配源目地址&#xff0c;端口号&#xf…

限制 SLS告警通知时段的几种常见方法

前言 在对系统进行监控告警的过程中&#xff0c;有时候并非在任何时候都要接收告警通知&#xff0c;例如以下场景&#xff1a; 计划内变更触发的已知告警可以无需通知非工作时间不接收不严重的告警夜里不接收电话告警等等 本文会介绍几种常见的限制告警通知时段的方法&#x…

IDEA技巧汇总:这30个强大的功能,总有一个你能用上!

目录查看代码历史版本调整idea的虚拟内存&#xff1a;idea设置成eclipse的快捷键设置提示词忽略大小写关闭代码检查设置文档注释模板显示方法分隔符设置多行tabtab过多会自动关闭快速匹配方法的大括号位置代码结尾补全模糊搜索方法预览某个类的代码查看方法在哪里被调用代码模板…

一个简单的网页制作期末作业,学生个人html静态网页制作成品代码

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

12月13日:跟着猫叔写代码,fastadmin中Api相关只是

Api权限管理 api常用返回信息实例 api获取所有方法&#xff0c;都在common目录下cotroller文件夹中Api.php中 /*** 需要登录的接口**/public function test2(){//$this->success(返回成功, [action > test2]);//判断当前用户是否登录//$this->success(返回成功, $t…

1. Python_Django项目之大型电商项目介绍

1.开发项目目的 联系已掌握的知识点发现新的知识点掌握开发技巧掌握项目结构增加项目经验 2.所用技术 语言&#xff1a;Python3&#xff08;Django4&#xff09;数据库&#xff1a;MySQLweb服务器&#xff1a;Nginxuwsgi开发环境&#xff1a;VScode、linux 3.功能介绍 商品…

动态规划——背包问题(3)

文章目录求解最佳方案数例题思路代码混合背包问题例题思路代码有依赖的背包问题例题思路代码考察思维的一些背包题目机器分配金明的预算方案货币系统能量石总结求解最佳方案数 例题 有 N 件物品和一个容量是 V 的背包。每件物品只能使用一次。 第 i 件物品的体积是 vi&#…

springboot前后端交互(小白教学)

在上次前后端交互&#xff0c;我们使用的是最基本的HTMLServlet的组合&#xff0c;比较基础&#xff0c;今天我们来讲一讲HtmlSpringboot框架&#xff0c;前后端交互实现更为简便&#xff0c;大大降低了我们开发人员在代码上面所花费的时间&#xff0c;那今天让我们一探究竟吧。…

1998-2014年工企污染数据库

1998-2014年工企污染匹配数据库 1、时间区间为&#xff1a;1998-2014年 2、部分指标&#xff1a; 工业总产值(现价)&#xff08;万元&#xff09;、工业用水总量&#xff08;吨&#xff09;、煤炭消费总量&#xff08;吨&#xff09;、其中:新鲜水量&#xff08;吨&#xff…

是谁实现了 Pod 的多副本管理?

目录一、前言二、案例分析三、案例总结一、前言 在 K8s 中 Pod 是由 Controller 来管理的&#xff0c;Controller 定义了 Pod 的部署 spec&#xff0c;如 Pod 的副本数、运行的 Node 等。不同的业务场景 Controller 是不同的。K8s 提供了多种 Controller&#xff0c;如常见的 …

POCV/SOCV 、LVF

1.POCV与OCV、AOCV 为了模拟片上PVT的差异带来的影响&#xff0c;最早提出了OCV&#xff08;On Chip Variation&#xff09;给每个cell都设置一个固定的derate值&#xff0c;来覆盖最悲观的情况&#xff0c;但是随着工艺发展&#xff0c;设计规模增大&#xff0c;OCV过度的悲观…