ActiveMQ高级特性和大厂面试常考重点

news2025/1/18 11:59:28

目录

一、引入消息队列之后该如何保证其高可用性

二、异步投递Async Sends

三、延迟投递和定时投递

四、ActiveMQ消费重试机制

五、死信队列

六、如何保证消息不被重复消费呢?幂等性问题你谈谈


一、引入消息队列之后该如何保证其高可用性

 ActiveMQ集群模式_zoeil的博客-CSDN博客
二、异步投递Async Sends

(一)官网

http://activemq.apache.org/async-sends

(二)解释

ActiveMQ支持同步、异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。ActiveMQ默认使用异步发送的模式;除非明确指定使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送的。


如果你没有使用事务且发送的是持久化的消息,每一次发送都是同步发送的且会阻寒producer直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。

很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息


异步发送
它可以最大化produer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能;不过这也带来了额外的问题,
就是需要消耗较多的Client端内存同时也会导致broker端性能消耗增加;此外它不能有效的确保消息的发送成功。在useAsyncSend=true的情况下客户端需要突忍消息丢失的可能。

(三)开启方式

①Activemq_URL带参数

public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616?jms.userAsyncSend=true";

②ActiveMQConnectionFactory的setUseAsyncSend

activeMQConnectionFactory.setUseAsyncSend(true);

(四)面试追问:异步发送如何确保发送成功 

异步发送丢失消息的场景是: 生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息。
由于消息不阻塞,生产者会认为所有send的消息均被成功发送至MQ。
如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失。
所以,正确的异步发送方法是需要接收回调的
同步发送和异步发送的区别就在此,
同步发送等send不阻塞了就表示一定发送成功了,异步发送需要接收回执并由客户端再判断一次是否发送成功。

回调测试

生产者

public class JMSProduce {

    public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
    public static final String QUEUE_NAME = "Async";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        activeMQConnectionFactory.setUseAsyncSend(true);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
        activeMQMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

        for(int i = 1 ; i <= 3; i ++ ) {
            TextMessage textMessage = session.createTextMessage("AsyncMessage--" + i);
            textMessage.setJMSMessageID(UUID.randomUUID().toString()+"order001");
            String messageID = textMessage.getJMSMessageID();
            activeMQMessageProducer.send(textMessage, new AsyncCallback() {
                @Override
                public void onSuccess() {
                    System.out.println("成功发送消息Id:"+messageID);
                }

                @Override
                public void onException(JMSException e) {
                    System.out.println("失败发送消息Id:"+messageID);
                }
            });
        }
        activeMQMessageProducer.close();
        session.close();
        connection.close();

        System.out.println("消息发布到MQ完成……");
    }
}

 点击Browse查看队列具体信息

(五)总结

  • 异步发送可以让生产者发的更快。
  • 如果异步投递不需要保证消息是否发送成功,发送者的效率会有所提高。如果异步投递还需要保证消息是否成功发送,并采用了回调的方式,发送者的效率提高不多,这种就有些鸡肋。

三、延迟投递和定时投递

(一)官网

ActiveMQ

  

(二)案例

1.配置文件修改并重启服务

添加 schedulerSupport=true

2.java代码里面封装的辅助消息类型:ScheduleMessage

生产者

public class JMSProduce {

    public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
    public static final String QUEUE_NAME = "schedule";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        activeMQConnectionFactory.setUseAsyncSend(true);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer) session.createProducer(queue);
        activeMQMessageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
        long delay =  10*1000;
        long period = 5*1000;
        int repeat = 3 ;

        for(int i = 1 ; i <= 3; i ++ ) {
            TextMessage textMessage = session.createTextMessage("scheduleMessage--" + i);
            // 延迟的时间
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
            // 重复投递的时间间隔
            textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
            // 重复投递的次数
            textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
            // 此处的意思:该条消息,等待10秒,之后每5秒发送一次,重复发送3次。
            activeMQMessageProducer.send(textMessage);
        }
        activeMQMessageProducer.close();
        session.close();
        connection.close();

        System.out.println("消息发布到MQ完成……");
    }
}

消费者

public class JmsConsumer {
    public static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
    public static final String QUEUE_NAME = "schedule";

    public static void main(String[] args) throws JMSException, IOException {
        //1.创建链接工厂,按照给定的url地址,采用默认的用户和密码
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //2.通过链接工厂,获得Connection并启动
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        //3.创建会话session
        //3.1 两个参数 ①事务 ②签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //4.创建目的地(queue or topic)
        Queue queue = session.createQueue(QUEUE_NAME);
        //5.创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if(null != message && message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("监听到队列消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //连接AvtiveMQ需要等待时间
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }
}

 
四、ActiveMQ消费重试机制

(一)官网

http://activemq.apache.org/redelivery-policy

(二)解释

消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。

那些情况会引起消息重发

  1. Client用了transactions且再session中调用了rollback
  2. Client用了transactions且再调用commit之前关闭或者没有commit
  3. Client再CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

(三)请说说消息重发时间间隔和重发次数

间隔:1

次数:6

(四)有毒消息Poison ACK

一个消息被redelivedred超过默认的最大重发次数(默认6次,可修改)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)。 

(五)属性说明

(六)代码验证

模拟上面可能发生消费重试的第二种情况

生产者代码同上

消费者

开启事务,却没有commit。重启消费者,前6次都能收到消息,到第7次,不会再收到消息。代码:

public class JmsPoisonConsumer {
    private static final String ACTIVEMQ_URL = "tcp://193.179.123.10:61616";
    private static final String ACTIVEMQ_QUEUE_NAME = "dead01";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        //session.commit();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        });
        //关闭资源
        System.in.read();
//        session.commit();  本应该commit,这里为演示重发机制,故意不提交事务
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

(七)修改默认重试次数

消费者。除橙色代码外,其他代码都和之前一样。修改重试次数为3。更多的设置请参考官网文档

ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 修改默认参数,设置消息消费重试3
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);

        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

(八)整合spring

五、死信队列

(一)官网

ActiveMQ

死信队列:异常消息规避处理的集合,主要处理失败的消息。

(二)生产环境

(三)死信队列的配置(一般采用默认)

 1.sharedDeadLetterStrategy

不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。

 2.individualDeadLetterStrategy

可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。

 3.自动删除过期消息

过期消息是值生产者指定的过期时间,超过这个时间的消息。

 > 类似于mysql的*,指对所有的队列都起效

4.存放非持久消息到死信队列中

 

六、如何保证消息不被重复消费呢?幂等性问题你谈谈

 幂等性如何解决,根据messageid去查这个消息是否被消费了。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/116902.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习】KNN 算法介绍

文章目录一、KNN 简介二、KNN 核心思想实例分析&#xff1a;K 值的影响三、KNN 的关键1. 距离计算1. 闵可夫斯基距离2. 曼哈顿距离3. 欧氏距离4. 切比雪夫距离5. 余弦距离总结2. K值选择四、KNN 的改进&#xff1a;KDTree五、KNN 回归算法参考链接一、KNN 简介 KNN 算法&#…

想在微信上使用chatGPT?小程序?公众号?企业微信,最终还是选择了企业微信版本的chatgpt

chatgpt的接口现在都可以正常用了&#xff0c;但是怎么把这个功能放在手机上随用随开呢&#xff1f;微信个人聊天版本小程序版本公众号版本企业微信版本逻辑实现方式微信个人聊天版本 网上很多微信机器人版本的&#xff0c;但是原理是网页版微信&#xff0c;很多账号都不能登陆…

golang指针

指针 区别于C/C中的指针&#xff0c;Go语言中的指针不能进行偏移和运算&#xff0c;是安全指针。 要搞明白Go语言中的指针需要先知道3个概念&#xff1a;指针地址、指针类型和指针取值。 1.1. Go语言中的指针 Go语言中的函数传参都是值拷贝&#xff0c;当我们想要修改某个变…

Linux中如何理解线程?线程ID到底是什么?

朋友们好&#xff0c;这里简要介绍了进程和线程的区别以及对LINUX中线程ID的理解&#xff0c;本人目前理解尚浅&#xff0c;若文中有表述不当的地方还望理解并指正&#xff0c;谢谢大家&#xff01; 文章目录一&#xff1a;进程和线程二&#xff1a;线程ID和进程地址空间布局一…

5 项目部署

5.1 Linux-项目部署 5.1.1 环境 5.1.1.1 开发环境&#xff08;dev&#xff09; 外部用户无法访问&#xff0c;开发人员使用&#xff0c;版本变动很大 平时大家大多是在Windows或者Mac操作系统下去编写代码进行开发. 在开发环境中安装大量的软件&#xff0c;这样会导致环境的稳…

2022 年度盘点 | 更成熟的 AI,更破圈的技术狂欢

By 超神经内容一览&#xff1a;2022 年 AI 领域发展不断提速&#xff0c;新技术成果纷纷落地&#xff0c;模型迭代加速升级。本文总结了 2022 年 AI 领域各大公司的技术成就。关键词&#xff1a;年终盘点 大厂 技术创新2022 年在此起彼伏的咳嗽声中接近尾声&#xff0c;这一…

onCreate、onSaveInstanceState、onRestoreInstance一个参数和两个参数

Android Studio移动应用开发——onCreate、onSaveInstanceState、onRestoreInstance一个参数和两个参数_dear_jing的博客-程序员宅基地 - 程序员宅基地 在做Android生命周期实验过程中&#xff0c;把 Log.i(TAG, "(1) onCreate()") 写到了含有两个参数的函数 onSave…

HTML5 元素拖放

文章目录HTML5 元素拖放概述触发事件实现元素拖放功能dataTransfer元素拖动效果垃圾箱效果HTML5 元素拖放 概述 在HTML5中&#xff0c;我们只需要给元素添加一个draggable属性&#xff0c;然后设置该属性值为true&#xff0c;就能实现元素的拖放。 拖放&#xff0c;指的是“…

【Python】Numpy分布函数总结

文章目录总表均匀分布和三角分布幂分布与正态分布相关的分布与Gamma相关的分布极值分布总表 np.random中提供了一系列的分布函数&#xff0c;用以生成符合某种分布的随机数。下表中&#xff0c;如未作特殊说明&#xff0c;均有一个size参数&#xff0c;用以描述生成数组的尺寸…

【综合笔试题】难度 1.5/5,常规二叉树爆搜题

题目描述 这是 LeetCode 上的 95. 不同的二叉搜索树 II &#xff0c;难度为 中等。 Tag : 「树」、「二叉搜索树」、「BST」、「DFS」、「递归」、「爆搜」 给你一个整数 n&#xff0c;请你生成并返回所有由 n 个节点组成且节点值从 1 到 n 互不相同的不同 二叉搜索树 。可以…

2022出圈的ML研究:爆火的Stable Diffusion、通才智能体Gato,LeCun转推

这些机器学习领域的研究你都读过吗&#xff1f; 2022 年即将步入尾声。在这一年里&#xff0c;机器学习领域涌现出了大量有价值的论文&#xff0c;对机器学习社区产生了深远的影响。 今日&#xff0c;ML & NLP 研究者、Meta AI 技术产品营销经理、DAIR.AI 创始人 Elvis S.…

CSRF漏洞渗透与攻防(一)

目录 前言 什么是CSRF漏洞 CSRF实现流程 CSRF漏洞危害 XSS漏洞危害 CSRF与XSS区别 CSRF分类 GET型&#xff1a; POST型&#xff1a; CSRF漏洞案列模拟 CSRF常用Payload&#xff1a; CSRF漏洞挖掘 检测工具 CSRF漏洞防御 防御思路 我们该如何去防御CSRF漏洞…

LeetCode动态规划—打家劫舍从平板板到转圈圈(198、213)

打家劫舍平板板打家劫舍转圈圈打家劫舍&#xff08;进阶版&#xff09;平板板打家劫舍 转化子问题&#xff1a; 按顺序偷n间房子&#xff0c;就是考虑偷前n-1间房子还是偷前n-2间房子再偷第n间房子。 列出公式&#xff1a; res[n] max{ res[n-1] , 数组中最后一个数据res[n-…

企业信息化之源代码防泄密场景分析

场景描述 随着企业信息化发展迅速&#xff0c;越来越多的无形资产面临着被泄露&#xff0c;被盗取的&#xff0c;或员工无意导致的数据泄密风险。尤其是有源码开发的企业&#xff0c;源代码的安全更是重中之重&#xff0c;一旦泄密&#xff0c;有可能给企业带来不可估量的损失…

全程数字化的企业电子招标采购管理系统源码

全程数字化的采购管理 智能化平台化电子化内外协同 明理满足采购业务全程数字化&#xff0c; 实现供应商管理、采购需求、全网寻源、全网比价、电子招 投标、合同订单执行的全过程管理。 传统采购模式面临的挑战 如何以最合适的价格,找到最优的供应商,购买到最好的产品和服务?…

程序员必备网站,建议收藏!

俗话说的好&#xff0c;一个程序员&#xff0c;20%靠知识储备&#xff0c;80%靠网络搜索。 打开代码&#xff0c;打开Google&#xff0c;开始工作。 那么常用的写码软件&#xff0c;你知道几个呢&#xff1f; 下面我们来一起看一下常用的写码软件吧~ 建议收藏本文&#xff…

【算法】面试题 - 链表

链表相关面试题141. 环形链表问题&#xff1a;快慢指针为什么一定会相遇142. 环形链表 II问题&#xff1a;如何确认入口160. 相交链表237. 删除链表中的节点19. 删除链表的倒数第 N 个结点21. 合并两个有序链表23. 合并K个升序链表&#xff08;两种解法&#xff09;扩展&#x…

国产手机扬眉吐气,终于打击了苹果的嚣张气焰

苹果在9月份、10月份都取得了快速增长&#xff0c;而国产手机品牌持续下滑&#xff0c;但是11月份终于让国产手机捡回了主动权&#xff0c;11月份的数据显示有国产手机品牌的出货量大幅增长&#xff0c;而苹果的出货量却大幅下滑&#xff0c;国产手机成功反击了苹果。分析机构给…

Vue + SpreadJS 实现高性能数据展示与分析

Vue SpreadJS 实现高性能数据展示与分析 在前端开发领域&#xff0c;表格一直都是一个高频使用的组件&#xff0c;尤其是在中后台和数据分析场景下。但当一屏展示数据超过1000条数据记录时&#xff0c;会出现浏览器卡顿等问题&#xff0c;严重影响客户体验。为解决这些性能问…

Seay代码审计系统审计实战

今天继续给大家介绍渗透测试相关知识&#xff0c;本文主要内容是Seay代码审计系统审计实战。 免责声明&#xff1a; 本文所介绍的内容仅做学习交流使用&#xff0c;严禁利用文中技术进行非法行为&#xff0c;否则造成一切严重后果自负&#xff01; 再次强调&#xff1a;严禁对未…