JMS规范及落地产品

news2024/12/23 3:46:35

目录

一、JMS是什么

二、MQ中间件的其他落地产品

三、JMS的组成结构和特点

四、JMS的可靠性 (重要)

(一)PERSISTENT:持久性

(二)事务

(三)Acknowledge:签收

五、JMS点对点总结

六、JMS发布订阅总结


一、JMS是什么

Java平台上的专业技术规范

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

那么什么是Java消息服务?

Java消息服务指的是两个应用程序之间进行异步通信的API,它为标准协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,它们之间不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。

 二、MQ中间件的其他落地产品

(一)种类

(二)对比 

 三、JMS的组成结构和特点

(一)JMS provider

实现JMS接口和规范的消息中间件,也就是我们的MQ服务器

(二)JMS producer

消息生产者,创建和发送JMS消息的客户端应用

(三)JMS consumer

消息消费者,接收和处理JMS消息的客户端应用

(四)JMS message

(1)消息头

  1. JMSDestination消息目的地
  2. JMSDeliveryMode消息持久化模式
  3. JMSExpiration消息过期时间
  4. JMSPriority消息的优先级
  5. JMSMessageID消息的唯一标识符。
  6. package  com.at.activemq.topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class JmsProduce_topic {
    
        public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
        public static final String TOPIC_NAME = "topic01";
    
        public static void main(String[] args) throws  Exception{
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Topic topic = session.createTopic(TOPIC_NAME);
            MessageProducer messageProducer = session.createProducer(topic);
    
            for (int i = 1; i < 4 ; i++) {
                TextMessage textMessage = session.createTextMessage("topic_name--" + i);
                // 这里可以指定每个消息的目的地
                textMessage.setJMSDestination(topic);
                /*
                持久模式和非持久模式。
                一条持久性的消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,它会在服务器恢复之后再次传递。
                一条非持久的消息:最多会传递一次,这意味着服务器出现故障,该消息将会永远丢失。
                 */
                textMessage.setJMSDeliveryMode(0);
                /*
                可以设置消息在一定时间后过期,默认是永不过期。
                消息过期时间,等于Destination的send方法中的timeToLive值加上发送时刻的GMT时间值。
                如果timeToLive值等于0,则JMSExpiration被设为0,表示该消息永不过期。
                如果发送后,在消息过期时间之后还没有被发送到目的地,则该消息被清除。
                 */
                textMessage.setJMSExpiration(1000);
                /*  消息优先级,从0-9十个级别,0-4是普通消息5-9是加急消息。
                JMS不要求MQ严格按照这十个优先级发送消息但必须保证加急消息要先于普通消息到达。默认是4级。
                 */
                textMessage.setJMSPriority(10);
                // 唯一标识每个消息的标识。MQ会给我们默认生成一个,我们也可以自己指定。
                textMessage.setJMSMessageID("ABCD");
                // 上面有些属性在send方法里也能设置
                messageProducer.send(textMessage);
            }
            messageProducer.close();
            session.close();
            connection.close();
            System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
        }
    }

(2)消息体

        TextMessage ——普通字符串消息,包含一个string
        MapMessage ——Map类型的消息,key为string类型,而值为Java的基本类型
        BytesMessage ——二进制数组消息,包含一个byte[]
        StreamMessage ——Java数据流消息,用标准流操作来顺序的填充和读取。
        ObjectMessage ——对象消息,包含一个可序列化的Java对象
 

尝试TextMessage和MapMessage

生产者

//6.通过MessageProducer生成3条消息到MQ队列中
        for(int i = 1 ; i <= 3; i ++ ) {
            TextMessage textMessage = session.createTextMessage("MessageListener--" + i);
            producer.send(textMessage);

            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("k1","v"+i);
            producer.send(mapMessage);
        }

消费者

//通过监听方式读取消息
        /*
         异步非阻塞方式(监听器onMessage())
        订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器
        当消息到达之后,系统自动调用监听器MessageListener的onMessage(Message message)方法。*/
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                if(null != message && message instanceof MapMessage) {
                    MapMessage mapMessage = (MapMessage) message;
                    try {
                        System.out.println("监听到消息:"+mapMessage.getString("k1"));
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

(3)消息属性

如果需要除消息头字段之外的值,那么可以使用消息属性。他是识别/去重/重点标注等操作,非常有用的方法。

他们是以属性名和属性值对的形式制定的。可以将属性是为消息头得扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。消息的属性就像可以分配给一条消息的附加消息头一样。它们允许开发者添加有关消息的不透明附加信息。它们还用于暴露消息选择器在消息过滤时使用的数据

 尝试StringProperty、

生产者

 消费者

 注:setStringProperty对应getStringProperty

四、JMS的可靠性

(一)PERSISTENT:持久性

(1)参数说明

①持久化(服务器宕机后,消息依旧存在,activemq默认选择)

producer.setDeliveryMode(DeliveryMode.PERSISTENT);

②非持久化(服务器宕机后,消息依旧存在)

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

注:测试的时候人为的关闭虚拟机中的activemq

(2)持久化的topic

topic默认就是非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。

topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。

生产者

public static final String ACTIVEMQ_URL = "tcp://192.179.123.10:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();


        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();
        for(int i = 1 ; i <= 3; i ++ ) {
            TextMessage textMessage = session.createTextMessage("TextMessage--" + i);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();

        System.out.println("topicMessage消息发布到MQ完成……");
    }

消费者

public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
// 设置客户端ID。向MQ服务器注册自己的名称
        connection.setClientID("marrry");
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(TOPIC_NAME);
// 创建一个topic订阅者对象。一参是topic,二参是订阅者名称
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
         // 之后再开启连接
        connection.start();
        Message message = topicSubscriber.receive();
         while (null != message){
             TextMessage textMessage = (TextMessage)message;
             System.out.println(" 收到的持久化 topic :"+textMessage.getText());
             message = topicSubscriber.receive();
         }
        session.close();
        connection.close();
    }

订阅者z3接收消息后下线(offline)

 z3下线期间再发送3条topic

 z3上线后收到消息

注意:

  1. 一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。
  2. 然后再运行生产者发送消息。
  3. 之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来。

(二)事务

(1)生产者

①true

创建会话时选择开启事务,则消息会被放入队列中,只有当执行了session.commit后才会发送到activemq

Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
session.commit();

②false

创建会话时选择关闭事务,则当消息生产者调用send方法后,消息会立即发送到activemq

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

③好处

当业务复杂时,如果在程序执行过程中出错可以选择rollback,保证了消息的可靠性和程序的事务性

        try {
            //程序正常 session.commit()
        } catch (){
            //异常回滚 session.rollback()
        }finally {
            if(null != session)
                session.close();
        }

(2)消费者

①true

如果没有加上session.commit(), 那么消息会被重复消费

②false

无需commit,不会被重复消费

(三)Acknowledge:签收

(1)事务

Session.SESSION_TRANSACTED    与上面事务选择true搭配

(2)非事务

①自动签收

Session.AUTO_ACKNOWLEDGE

②手动签收

Session.CLIENT_ACKNOWLEDGE 需要客户端调用acknowledge()方法,不然会重复消费
//3.创建会话session
//3.1 两个参数 ①事务 ②签收
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
//4.创建目的地(queue or topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5.创建消费者
MessageConsumer consumer = session.createConsumer(queue);
while(true) {
    //2000ms timeout时间 ActiveMQ有五种消息类型,这里发送时是TextMessage,接收时强转一下
    TextMessage textMessage = (TextMessage) consumer.receive(2000);
    if(null != textMessage) {
        System.out.println("接收到Tx消息:" + textMessage.getText());
        textMessage.acknowledge();
    } else {
        break;
    }
}

③允许重复消息(了解即可)

Session.DUPS_OK_ACKNOWLEDGE

生产者开启事务后,执行commit方法,这批消息才真正的被提交。不执行commit方法,这批消息不会提交。执行rollback方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。

消费者开启事务后,执行commit方法,这批消息才算真正的被消费。不执行commit方法,这些消息不会标记已消费,下次还会被消费。执行rollback方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。

消费者不开启事务,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)

问:消费者和生产者需要同时操作事务才行吗?   

答:消费者和生产者的事务,完全没有关联,各自是各自的事务。

五、JMS点对点总结

 

六、JMS发布订阅总结

(一)非持久化订阅

 

(二) 持久化订阅

(三)用哪个

当所有的消息都必须被接收,则使用持久化订阅。当丢失消息能够被容忍,则使用非持久化订阅 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/111808.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#,图像二值化(01)——二值化算法综述与二十三种算法目录

图像二值化&#xff0c;就是把彩色&#xff08;先转为灰色图&#xff09;最终转为黑白两色图片的计算过程。 看似极其简单&#xff0c;但人们研究了几十年&#xff0c;却始终未达到至臻境界的问题。 本文简要介绍了 图像二值化的算法原理、分类及二十三种算法的目录&#xff…

freeswitch的gateway配置方案

概述 freeswitch是一款简单好用的VOIP开源软交换平台。 在voip的网络模型中&#xff0c;网关是我们经常会遇到的概念。 在freeswitch中&#xff0c;如何配置gateway&#xff0c;如何使用好gateway的模型和功能。 本节简单介绍fs中gateway相关的配置方案。 环境 centos&am…

csp-202209

202209题目一&#xff1a;如此编码【100分】题目二&#xff1a;何以包邮&#xff1f;【100分】题目三&#xff1a;防疫大数据【100分】题目一&#xff1a;如此编码【100分】 比较简单的题&#xff0c;根据题意计算一遍就行 一定要关注csp题目中的提示&#xff0c;这个是很有用…

达梦数据IPO过会:拟募资24亿 光谷“扫地僧”冯裕才将敲钟

雷递网 雷建平 12月23日武汉达梦数据库股份有限公司&#xff08;简称&#xff1a;“达梦数据”&#xff09;日前IPO过会&#xff0c;准备在科创板上市。达梦数据计划募资23.51亿元。其中&#xff0c;3.52亿元用于集群数据库管理系统升级项目&#xff0c;3.43亿元用于高性能分布…

[翻译+笔记]变分自编码器:从AutoEncoder到Beta-VAE

与GAN的那篇笔记相同, 做一下笔记. 并不是全文翻译, 只翻译一部分. 原文地址: from AutoEncoder to Beta-VAE 0. 前言 自编码器是用来重构高维数据的&#xff0c;它利用一个有bottleneck层的神经网络。bottleneck层获取压缩的潜在编码&#xff0c;这样将嵌入向量以低维表示可…

Activity生命周期

Activity生命周期 1.Activity状态 1.基本状态 运行&#xff0c;active。位于最前台&#xff0c;可以和用户交互的激活状态。暂停&#xff0c;pause&#xff0c;被透明或者Dialog覆盖&#xff0c;此时可见失去焦点但是不允许交互。停止&#xff0c;stop&#xff0c;被Active覆盖…

spring提前加载,懒加载,bean的作用域和注入注解讲解

前言 sping知识随笔笔记&#xff1b;spring提前加载&#xff0c;懒加载&#xff0c;bean的作用域和注入注解讲解 这里写目录标题前言1 depends-on2 bean的作用域3 lazy-init 懒加载4 Autowrite和Resource的区别和使用1 depends-on depends-on 是提前加载&#xff0c;比如在实…

关于node.js版本切换nvm的命令和安装

首先是安装,第一步,搜索下方链接地址下载Releases coreybutler/nvm-windows GitHub 安装应用下载好后直接安装就可以了,或者下载一个压缩包,在下载安装之前建议先将之前下载的node版本给删除,否则会报错。 上面的操作都结束后,那么,下面就需要通过管理员的权限去查…

外汇天眼:利空美元!2023年美国经济将如履薄冰?各大银行预测整体不乐观!

高盛表示&#xff0c;美国经济可能避免衰退。摩根士丹利预计&#xff0c;美国经济在2023年只是避开了衰退&#xff0c;但着陆并不那么软。瑞士信贷认为&#xff0c;美国明年可以避免经济下滑。摩根大通警告称&#xff0c;明年很有可能出现经济衰退。美国银行预测2023年第一季度…

大学宿舍四位舍友皆为软测,3年后的现状~

笔者最近收到测试员好友小H的分享&#xff0c;临年关&#xff0c;他参加了一场大学舍友毕业3年后的聚会&#xff0c;感慨良多。 从2019年至今&#xff0c;这已经是毕业的第3个年头了。小H的寝室大多来自五湖四海&#xff0c;毕业后&#xff0c;能够相聚的时间也少之又少&#…

Android -- 每日一问:如何设计一个照片上传 app ?

经典回答 把自己放在一个面试官的角度&#xff0c;自己先实现一次这个 App &#xff0c;然后自己总结一下你在这次实现中需要哪些能力、需要注意哪些事项。最后&#xff0c;再回过头来看&#xff0c;如果你是面试官&#xff0c;你希望面试者怎么回答才算是符合你的标准的&…

el-table 列的动态显示与隐藏

目录 业务场景 官方链接 实现效果图 使用框架 代码展示 template代码 ①、为什么要给el-table绑定【:key"reload"】&#xff1f; ②、为什么给每个绑定【key"Math.random()"】呢&#xff1f; ③、为什么列改变之后要添加【reload Math.random();…

【HarmonyOS】调测助手安装失败10内部错误

关于鸿蒙开发通过应用调测助手向watch gt 3 手表安装hap时报错。 问题背景&#xff1a; 鸿蒙开发&#xff0c;使用新建工程的helloworld 没有其他修改&#xff0c;生成hap包。然后通过应用调测助手向watch gt 3 手表安装hap时提示 安装失败:10.内部错误。 Sdk&#xff1a; a…

Shiro之授权

授权 1、角色认证 在controller层创建接口 使用shiro中的注解RequiresRoles指定能访问的角色名称 /*** 登录认证角色*/ RequiresRoles("admin") GetMapping("/userLoginRoles") ResponseBody public String userLoginRoles(){System.out.println("…

54 线程最外层异常的处理

前言 之前在 kafka 消费者客户端的一个 case 中曾经看到了这样的了一个情况 我没有配置 "group.id", 然后 kafka 客户端抛出了 InvalidGroupIdException 然后 输出的日志信息 除了类型, 其他 什么都没有, 主要是 么有堆栈信息 这里 来大致看一下 这个问题, 以及…

WooCommerce Product Feed指南 – Google Shopping和Facebook[2022]

在过去十年中&#xff0c;在线购物一直在增加。全球超过 85% 的人更喜欢网上购物而不是光顾实体店。 许多 WooCommerce 商店都做得非常好&#xff0c;销售额是大约几年前的三倍。 您是否知道您也可以立即轻松地将商店销售额翻三倍&#xff1f; 秘诀是什么&#xff1f; 好吧&…

【网络安全】浅识 SQL 注入

前言 SQL 注入&#xff08;SQL Injection&#xff09;是发生在 Web 程序中数据库层的安全漏洞&#xff0c;是网站存在最多也是最简单的漏洞。主要原因是程序对用户输入数据的合法性没有判断和处理&#xff0c;导致攻击者可以在 Web 应用程序中事先定义好的 SQL 语句中添加额外…

AcrGIS Pro一键出图

简介 日常工作中我们经常遇到批量出图的场景,比如对某个县下的各个乡镇分别按照其行政区范围出图、对某个流域/河流按照一定方向纵横的网格排布顺序出图等等要求,ArcGIS Pro对于上述需求提供了一个良好的解决方案——地图系列! 那么应该如何创建一个地图系列呢?ArcGIS Pro…

我不是浮躁,只是迷茫,北大毕业转行学编程

北大毕业的我选择去学习编程了&#xff01;&#xff01;&#xff01; 没有希望的地方&#xff0c;就没有奋斗。于千万人之中遇见它&#xff0c;于千万年之中&#xff0c;时间的无涯的荒野里&#xff0c;没有早一步&#xff0c;也没有晚一步&#xff0c;刚巧赶上了&#xff0c;那…