消息队列相关面试题

news2025/1/11 10:04:23

巩固基础,砥砺前行 。
只有不断重复,才能做到超越自己。
能坚持把简单的事情做到极致,也是不容易的。

消息队列有哪些作用

1.解耦:使用消息队列来作为两个系统直接的通讯方式,两个系统不需要相互依赖了
2.异步:系统A给消费队列发送完消息之后,就可以继续做其他事情了
3.流量削峰:如果使用消息队列的方式来调用某个系统,那么消息将在队列中排队,有消费者自己控制消费速度

死信队列是什么?延时队列是什么?

1.死信队列也是一个消息队列,它是用来存放那些没有成功消费的消息的,通常可以用来作为消息重试
2.延时队列就是用来存放需要在指定时间被处理的元素的队列,通常可以用来处理一些具有过期性操作的业务,比如十分钟内未支付则取消订单

ActiveMQ

消息队列(第一节)

1.消息队列产品有好多种,kafka、rabbitMQ 、rocketMQ 、activeMQ 等

在学习这些产品时,都需要从以下几个方面来着手
1)常用的API 如何发送接收消息
2)如何实现MQ高可用
3)MQ的集群和容错机制
4)MQ的持久化
5)MQ如何延迟和定时发送消息,如何保证消息有序
6)MQ的签收机制
7)这些MQ如何和Spring、SpringBoot 整合
8)这些消息队列有什么不同,使用场景有那些差异?
9)他们是用哪些语音开发的?
kafka(java、scale)、rabbitMQ(erlang)、rocketMQ(java)、activeMQ(java)

2.电商业务中的秒杀模块的操作:

读取订单、库存检查、库存冻结、余额查询、余额冻结、订单生成、余额扣减、库存扣减、生成流水、余额解冻、库从解冻

3. activeMQ 的两个端口 61616 后台端口,8161 web页面端口

4.查看后台程序是否存活

ps -ef|grep activemq | grep -v activemq
netstart -anp|grep 61616
lsof -i:61616

5.linux关闭防火墙命令

1)查看防火墙状态:
service iptables status
systemctl status firewalld
2)暂时关闭防火墙
systemctl stop firewalld
service iptables stop
3)永久关闭防火墙
systemctl disable firewalld
service iptables off
4)重启防火墙
systemctl enable firewalld
service iptables restart
5)查看版本
forewalld -cmd -version

6.消息队列工作流程

1)创建连接工厂
2)连接工厂创建连接,得到连接
3)连接创建session
4)session创建消息生产者或者消息消费者
5)消息生产者组装消息,并发送 

activeMQ 使用场景(第二节)

问题引入

1.在什么情况下使用消息中间件?
2.为什么要使用消息中间件?

解耦 系统之间接口耦合太高
异步 同步操作太费时间,例如 注册发送邮件 XXX
消峰 双十一 春运等高并发场景  

activeMQ 官网地址

activeMQ Java简单实现(第三节)

两种通讯方式

点对点(队列) ;
订阅发布(主题)

pom.xml

<properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
  	
  	<dependencies>
        <!--activemq所需要的jar包-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
 
 
        <!-- 下面是通用jar包-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
 
        <!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
            <scope>provided</scope>
        </dependency>
 
        <!-- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
 
    </dependencies>

队列生产者

package com.ttzz.activemq;


import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 创建生产者
 * @Description: 
 * @author: tangzhong
 * @date: 2021年3月15日 下午6:31:41
 */
public class ActiveMQProduceByQueue {
	
	public static String url = "tcp://localhost:61616";
	public static String queueName = "myQueue";
	
	public static void main(String[] args) throws JMSException {
		//1.获取工厂
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
		//2. 创建连接
		Connection connection = activeMQConnectionFactory.createConnection();
		connection.start();
		//3.创建会话
		// 第一个参数 是否开启开启事务
		// 第二个参数 是签收模式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//4. 创建目的地 Queue 
		Queue queue =session.createQueue(queueName);
		//5. 创建生产者
		MessageProducer messageProducer = session.createProducer(queue);
		//6. 发送消息
		for (int i = 0; i < 4; i++) {
			TextMessage textMessage = session.createTextMessage("queue……"+ i );
			messageProducer.send(textMessage);
		}
		//关闭资源
		messageProducer.close();
		session.close();
		connection.close();
		System.out.println("OOKK");
	}
}

队列消费者

有两种接收方式:
同步阻塞
异步非阻塞
package com.ttzz.activemq;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 创建消费者
 * @Description: 
 * @author: tangzhong
 * @date: 2021年3月15日 下午6:32:57
 */
public class ActiveMQConsumerByQueue {
	public static String url = "tcp://localhost:61616";
	public static String queueName = "myQueue";
	public static void main(String[] args) throws JMSException, IOException {
		//1.获取工厂
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
		//2. 创建连接
		Connection connection = activeMQConnectionFactory.createConnection();
		connection.start();
		//3.创建会话
		// 第一个参数 是否开启开启事务
		// 第二个参数 是签收模式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//4. 创建目的地 Queue 
		Queue queue =session.createQueue(queueName);
		//5. 创建消费者
		MessageConsumer messageConsumer = session.createConsumer(queue);
		//使用同步阻塞的方式
//		while(true) {
//			TextMessage textMessage = (TextMessage) messageConsumer.receive();
//			if(textMessage!=null) {
//				System.out.println("****消费者接收到消息:"+textMessage.getText());
//			} else {
//				break;
//			}
//			System.out.println(textMessage.getText());
//		}
		//使用异步非阻塞的方式  监听器 
		messageConsumer.setMessageListener(new MessageListener() {

			public void onMessage(Message arg0) {
				TextMessage textMessage = (TextMessage) arg0;
				if(textMessage!=null) {
					try {
						System.out.println("****消费者接收到消息:"+textMessage.getText());
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				} 
				try {
					System.out.println(textMessage.getText());
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			}
			
		});
		 System.in.read();  //保证控制台不关
		//关闭资源
		messageConsumer.close();
		session.close();
		connection.close();
		System.out.println("OOKK2");
	}
}

主题生产者

package com.ttzz.activemq;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQProduceByTopic {
	public static String url = "tcp://localhost:61616";
	public static String topicName = "myTopic";
	
	public static void main(String[] args) throws JMSException {
		//1.获取工厂
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
		//2. 创建连接
		Connection connection = activeMQConnectionFactory.createConnection();
		connection.start();
		//3.创建会话
		// 第一个参数 是否开启开启事务
		// 第二个参数 是签收模式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//4. 创建目的地 Topic 
		Topic queue =session.createTopic(topicName);
		//5. 创建生产者
		MessageProducer messageProducer = session.createProducer(queue);
		//6. 发送消息
		for (int i = 0; i < 4; i++) {
			TextMessage textMessage = session.createTextMessage("myTopic……"+ i );
			messageProducer.send(textMessage);
		}
		//关闭资源
		messageProducer.close();
		session.close();
		connection.close();
		System.out.println("OOKK");
	}
}

主题消费者

package com.ttzz.activemq;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQConsumerByTopic {
	public static String url = "tcp://localhost:61616";
	public static String topicName = "myTopic";
	public static void main(String[] args) throws JMSException, IOException {
		//1.获取工厂
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
		//2. 创建连接
		Connection connection = activeMQConnectionFactory.createConnection();
		connection.start();
		//3.创建会话
		// 第一个参数 是否开启开启事务
		// 第二个参数 是签收模式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//4. 创建目的地Topic
		Topic queue =session.createTopic(topicName);
		//5. 创建消费者
		MessageConsumer messageConsumer = session.createConsumer(queue);
		//使用同步阻塞的方式
//		while(true) {
//			TextMessage textMessage = (TextMessage) messageConsumer.receive();
//			if(textMessage!=null) {
//				System.out.println("****消费者接收到消息:"+textMessage.getText());
//			} else {
//				break;
//			}
//			System.out.println(textMessage.getText());
//		}
		//使用异步非阻塞的方式  监听器 
		messageConsumer.setMessageListener(new MessageListener() {

			public void onMessage(Message arg0) {
				TextMessage textMessage = (TextMessage) arg0;
				if(textMessage!=null) {
					try {
						System.out.println("****消费者接收到消息:"+textMessage.getText());
					} catch (JMSException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				} 
				try {
					System.out.println(textMessage.getText());
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				
			}
			
		});
		 System.in.read();  //保证控制台不关
		//关闭资源
		messageConsumer.close();
		session.close();
		connection.close();
		System.out.println("OOKK2");
	}
}

消费者的三种情况

	/**
	 * 1. 先生成,只启动一个消费者,第1个消费者能消费吗 ?  能
	 * 2. 先生成,先启动一个消费者,再启动一个消费者,第2个消费者能消费吗 ?  no
	 * 3. 先启动两个消费者,然后再启动生成着,第二个消费者可以消费吗 ?Y 采用轮询的方式进行消费
	 */

topic 简介

前提

1.先启动消费者 然后再启动生成者,只有订阅了,才能接收到订阅的消息

生成者

package com.ttzz.activemq;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQProduceByTopic {
	public static String url = "tcp://localhost:61616";
	public static String topicName = "myTopic";
	
	public static void main(String[] args) throws JMSException {
		//1.获取工厂
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
		//2. 创建连接
		Connection connection = activeMQConnectionFactory.createConnection();
		
		//3.创建会话
		// 第一个参数 是否开启开启事务
		// 第二个参数 是签收模式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//4. 创建目的地 Topic 
		Topic topic =session.createTopic(topicName);
		//5. 创建生产者
		MessageProducer messageProducer = session.createProducer(topic);
		
		messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
		connection.start();
		//6. 发送消息
		for (int i = 0; i < 4; i++) {
			TextMessage textMessage = session.createTextMessage("myTopic……"+ i );
			textMessage.setStringProperty("自定义消息的key", "自定义消息的value");
			messageProducer.send(textMessage);
		}
		//关闭资源
		messageProducer.close();
		session.close();
		connection.close();
		System.out.println("OOKK");
	}
}

消费者

package com.ttzz.activemq;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQConsumerByTopic {
	public static String url = "tcp://localhost:61616";
	public static String topicName = "myTopic";

	public static void main(String[] args) throws JMSException, IOException {
		// 1.获取工厂
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
		// 2. 创建连接
		Connection connection = activeMQConnectionFactory.createConnection();
		
		connection.setClientID("消费者2");
		System.out.println("消费者2");
		
		// 3.创建会话
		// 第一个参数 是否开启开启事务
		// 第二个参数 是签收模式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 4. 创建目的地Topic
		Topic topic = session.createTopic(topicName);
		// 5. 创建消费者
		TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark..."); // 创建持久化的订阅

		connection.start();

		Message message = topicSubscriber.receive();
		while (message != null) {
			TextMessage textMessage = (TextMessage) message;
			System.out.println(textMessage.getText());
			message = topicSubscriber.receive();
		}

		session.close();
		connection.close();
		System.out.println("OOKK2");
	}
}

先启动两个消费者,然后再启动生产者

在这里插入图片描述
消费者控制台
消费者1
在这里插入图片描述
消费者2
在这里插入图片描述
MQ界面
在这里插入图片描述

JMS 规范以及消息特性

JMS规范是什么

它是JavaEE体系中的一项Message Service

常用消息中间件比较

JMS组成和特点

JMS provider
实现jms接口的消息中间件
JMS Producer 、JMS Constomer

JMS Message 消息头
1)jms destination 消息目的地 队列或者主题
2)jms deviverymode 持久化方式
3)jms expiration 消息过期时间 
4)jms 优先级 1到4是普通消息  5-9是加急消息
5)消息id  唯一识别每个消息的标识,是有MQ自己生成
消息头之destination

在这里插入图片描述
当然,也可以通过消息进行设置
在这里插入图片描述

四种重载:目的地,消息,优先级,存活时间,是否持久化
消息的目的地:队列 和 主题

持久性
在这里插入图片描述
消息的过期时间 默认是永不过期的

消息体
发送的消息类型有哪些:
StringMessage MapMessage ByteMessage StringMessage ObjectMessage 五中类型

要求:发送的消息体和接受的消息体要求类型一致。

在这里插入图片描述
要求:发送的消息体和接受的消息体要求类型一致。
在这里插入图片描述

自定义的消息属性
自定义的消息属性能有什么还用呢 ?
去重、识别、重点标注等	

TextMessage textMessage = session.createTextMessage("myTopic……"+ i );
messageProducer.send(textMessage);
textMessage.setStringProperty("自定义消息的key", "自定义消息的value");

如何保证消息的可靠性???

消息的可靠性可以从以下四个方面来回答:
1)消息的持久性
2)消息的事务特性
3)消息的签收机制
4)消息持久化

(队列)消息的持久性

在这里插入图片描述
验证1:
设置消息为非持久性,然后生产消息,(服务器不关闭),再去消费消息
在这里插入图片描述
消息被正常消费
在这里插入图片描述
验证2:
设置消息为非持久性,然后生产消息,(服务器关闭),再去消费消息
生成出的消息
在这里插入图片描述
服务器关闭之后,再去消费消息
在这里插入图片描述
消息丢失,不能被消费。
刚刚生成的消息被丢失了
在这里插入图片描述
验证3:设置消息为持久性,然后生成消息。这个是刚刚生成的消息。
在这里插入图片描述
关闭消息服务器,再次重新启动消息,消息依旧存在
在这里插入图片描述
去消费消息。消息成功被消费
在这里插入图片描述
在这里插入图片描述

(topic)消息的持久性

对于topic消息,持久性没有多大意义,因为在topic模式中,需要先启动消费者,然后再启动生产者,如果设置了消息持久性,但是,还没有启动动消费者,则这些消息就会被丢失,不能被消费者消费

设置持久化的topic生成者

package com.ttzz.activemq;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQProduceByTopic {
	public static String url = "tcp://localhost:61616";
	public static String topicName = "myTopic";
	
	public static void main(String[] args) throws JMSException {
		//1.获取工厂
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
		//2. 创建连接
		Connection connection = activeMQConnectionFactory.createConnection();
		
		//3.创建会话
		// 第一个参数 是否开启开启事务
		// 第二个参数 是签收模式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//4. 创建目的地 Topic 
		Topic topic =session.createTopic(topicName);
		//5. 创建生产者
		MessageProducer messageProducer = session.createProducer(topic);
		
		messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
		connection.start();
		//6. 发送消息
		for (int i = 0; i < 4; i++) {
			TextMessage textMessage = session.createTextMessage("myTopic……"+ i );
			textMessage.setStringProperty("自定义消息的key", "自定义消息的value");
			messageProducer.send(textMessage);
		}
		//关闭资源
		messageProducer.close();
		session.close();
		connection.close();
		System.out.println("OOKK");
	}
}

topic 消费者

package com.ttzz.activemq;

import java.io.IOException;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQConsumerByTopic {
	public static String url = "tcp://localhost:61616";
	public static String topicName = "myTopic";

	public static void main(String[] args) throws JMSException, IOException {
		// 1.获取工厂
		ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
		// 2. 创建连接
		Connection connection = activeMQConnectionFactory.createConnection();
		
		connection.setClientID("消费者1");
		System.out.println("topic消费者1");
		
		// 3.创建会话
		// 第一个参数 是否开启开启事务
		// 第二个参数 是签收模式
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		// 4. 创建目的地Topic
		Topic topic = session.createTopic(topicName);
		// 5. 创建消费者
		TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark..."); // 创建持久化的订阅

		connection.start();

		Message message = topicSubscriber.receive();
		while (message != null) {
			TextMessage textMessage = (TextMessage) message;
			System.out.println(textMessage.getText());
			message = topicSubscriber.receive();
		}

		session.close();
		connection.close();
		System.out.println("OOKK2");
	}
}

验证1:启动一个消费者
在这里插入图片描述
在这里插入图片描述
Active Durable Topic Subscribers :处于激活状态的持久化的topic消费者
Offline Durable Topic Subscribers:处于离线状态的持久化的topic消费者
启动持久化的topic生成者:
topic消费者消费消息
在这里插入图片描述
消息服务器
在这里插入图片描述
在这里插入图片描述
验证2:将消费者1 关闭,启动消费者2
在这里插入图片描述
消费者1处于离线状态,启动生成者消费
在这里插入图片描述
消费者2能够正常消费消息
在这里插入图片描述
再次启动消费者1,消费者1也能正常消费消息
在这里插入图片描述
在这里插入图片描述

消息的事务特性

事务主要是针对生产者,签收主要针对消费者

	//3.创建会话
	// 第一个参数 是否开启开启事务
	// 第二个参数 是签收模式
	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

验证1:
事务设置为 false,执行发送消息,消息自动到服务器
在这里插入图片描述
在这里插入图片描述
因为设置了消息的持久性,关闭服务器,再次重启启动,该消息依旧存在

验证2:
事务设置为 true,执行发送消息,
在这里插入图片描述

看看服务器有没有收到消息。服务器中没有收到刚刚发送的消息。因为没有做消息的提交操作
在这里插入图片描述
提交事务
在这里插入图片描述
消息入队
在这里插入图片描述
事务对于多个消息同时发送,能够保证原子性
session.rollback();

为了验证的需要,需要重启的时候,删除持久化的消息

操作:在配置文件activemq.xml的broker字段添加deleteAllMessagesOnStartup=“true”
在这里插入图片描述
在这里插入图片描述
可以看到持久化的消息被删除,

生成者开启事务,将消息发送到消息服务器
在这里插入图片描述
看到消息
在这里插入图片描述
消费者消费掉消息
在这里插入图片描述
再次启动一个消费者,发现消息已经被消费,说明消息不能被重新消费
在这里插入图片描述
验证2:设置消费者开启事务,但是,没有提交事务【消息被重复消费】。第一次,消费者1正常消费消息,
在这里插入图片描述
但是在服务器看到:消息没有被消费
在这里插入图片描述
再次启动另一个消费者,发现消息可以被多次消费
在这里插入图片描述

一个有趣的现象

生成者以事务的方式将生成者发送到服务器
消费者开启事务进行消费,但是,没有提交事务。保证控制到不灭。再次启动2号消费者,发现不能重复消费???
在这里插入图片描述
在这里插入图片描述
如果(去掉: System.in.read();)设置消费者4秒之后,没有消息,自动关闭。则启动2号消费者,可以重复消费。
原因呢???哈哈哈哈

消息的签收

非事务的签收有三种

1)自动签收 Session.AUTO_ACKNOWLEDGE
2)手动签收 Session.CLIENT_ACKNOWLEDGE
3)允许重复消息 Session.DUPS_OK_ACKNOWLEDGE ???这个我没有验证通过
在这里插入图片描述
对生产者而言,如果开启了事务,则签收机制可以随便选择,事务的优先级高于签收机制
验证1:生产者未开启事务,采用自动签收的方式将消息发送到服务器
消费者采用手动签收,发现消息可以重复消费。
消息发送到服务器,运行消费者程序,消息没有被消费掉
在这里插入图片描述

在这里插入图片描述
开启消息签收机制后,消息不能重复消费
在这里插入图片描述

事务模式下的签收

生产者开启事务,签收模式为自动签收,将消息发送到服务器
在这里插入图片描述

在这里插入图片描述

消费者开启事务,采用手动签收模式,但是消息没有使用ack机制。
消息仍然被消费掉
在这里插入图片描述
在这里插入图片描述在这里插入图片描述

验证:
生产者开启事务,签收模式为自动签收,将消息发送到服务器;
消费者开启事务,采用手动签收模式,消息使用ack机制。但是没有commit。消息能被重复消费在这里插入图片描述
在这里插入图片描述
结论:
在事务性会话中,当一个事物被成功提交则消息被自动签收。如果事物回滚,则消息会被再次传送。
非事物性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement)

activeMQ两种模式比较

1)工作模式上来说,主题采用订阅发布模式,如果没有订阅者消息就会被丢弃;如果有多个订阅者,则
就会被多个订阅者接收;队列采用一对一的模式,如果当前消息没有消费者,则该
消息也不会丢弃,如果有多个消费者,那么该消息只能被一个消费者消费,同时要求
消费者发送ack确认信息
2)从有无状态上来看,主题是无状态的,队列会默认在服务器上以文件的形式保存,
也可以配置DB存储
3)从消息传递的完整性来看,主题如果没有订阅者,则消息会丢弃,而队列不会
4) 处理效率,主题会随着订阅者的增多效率减低,而队列不会

activemq传输协议

http://activemq.apache.org/configuring-version-5-transports.html

activemq 默认采用tcp协议
在网络传输之前,序列化数据为字节流 open wire
tcp协议的优点
可靠性高稳定性强;效率高,采用字节流的方式传递;高效性、可用性支持所有平台

nio协议:

在这里插入图片描述
auto+nio

在这里插入图片描述

//	public static String url = "tcp://localhost:61616";
//	public static String url = "nio://localhost:61618";
	public static String url = "auto+nio://localhost:5671";

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/867811.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode169. 多数元素

题目 给定一个大小为 n 的数组 nums &#xff0c;返回其中的多数元素。多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 示例 示例 1&#xff1a; 输入&#xff1a;nums [3,2,3] 输出&#x…

SpringCloud实用篇6——elasticsearch搜索功能

目录 1 DSL查询文档1.1 DSL查询分类1.2 全文检索查询1.2.1 使用场景1.2.2 基本语法1.2.3 示例1.2.4 总结 1.3 精准查询1.3.1 term查询1.3.2 range查询1.3.3 总结 1.4.地理坐标查询1.4.1 矩形范围查询1.4.2 附近查询 1.5 复合查询1.5.1 相关性算分1.5.2 算分函数查询1&#xff0…

PolarDB-X 针对跑批场景的思考和实践

背景 金融行业和运营商系统&#xff0c;业务除了在线联机查询外&#xff0c;同时有离线跑批处理&#xff0c;跑批场景比较注重吞吐量&#xff0c;同时基于数据库场景有一定的使用惯性&#xff0c;比如直连MySQL分库分表的存储节点做本地化跑批、以及基于Oracle/DB2等数据库做E…

基于comsol进行共振薄膜声学超材料的模态分析

研究背景&#xff1a; 从声学超材料出现到薄膜型和薄板型声学超材料局域共振隔声机理的广泛研究&#xff0c;其负等效质量和负等效密度特性打破了传统吸隔声材料质量定律的限制&#xff0c;为低频吸隔声提供了新途径。由吸声系数理论模型可知&#xff0c;薄膜型结构的吸声性能…

JavaScript之BOM+window对象+定时器+location,navigator,history对象

一.BOM概述 BOM即浏览器对象模型,它提供了独立于内容而与窗口进行交互的对象 BOM的顶级对象是window 二.window对象的常见事件 1.窗口加载事件window.onload window.onload function(){} 或者 window.addEventListener("onload" , function(){}); window.onlo…

简单易用且高效的跨平台开发工具:Xojo 2023 for Mac

Xojo for Mac是Mac平台上一个跨平台的针对桌面、Web、移动和Raspberry Pi的快速应用程序开发软件。与其他多平台开发工具相比&#xff0c;Xojo for Mac为开发人员提供了显着的生产率提高。 Xojo for Mac具有拖放功能&#xff0c;使您能够快速创建用户界面设计&#xff0c;然后…

构造函数——初始化列表

初始化列表的引入。 #include<iostream> using namespace std;//栈类 typedef int DataType; class Stack { public://默认构造&#xff1a;Stack(size_t capacity ){cout << "Stack()" << endl;_array (DataType*)malloc(sizeof(DataType) * ca…

MSDN镜像和版本的区别

重装系统 前言 装系统本质上就是运行U盘内放好的windows安装包&#xff0c;目前U盘有三种格式化方式&#xff0c;分别是FAT32 exFAT NTFS ,后面两个方式很挑主板&#xff0c;老旧主板胡总和老旧电脑无法识别&#xff0c;选用FAT32种方式进行格式化&#xff0c;几乎所有的设备都…

连锁酒店(民宿)多商户全开源版 (多店模块版),支持创建多个小程序

手边酒店多商户小程序支持创建多个小程序&#xff0c;更合适平台型或者连锁酒店使用。后台支持一键入住&#xff0c;一键退款、退押金、钟点房支持微信支付、模板消息。客服实时收到新的订单信息&#xff0c;可以在手机端处理订单。支持按日期维护房价和房间数量&#xff0c;支…

分布式 - 消息队列Kafka:Kafka消费者和消费者组

文章目录 1. Kafka 消费者是什么&#xff1f;2. Kafka 消费者组的概念&#xff1f;3. Kafka 消费者和消费者组有什么关系&#xff1f;4. Kafka 多个消费者如何同时消费一个分区&#xff1f; 1. Kafka 消费者是什么&#xff1f; 消费者负责订阅Kafka中的主题&#xff0c;并且从…

最新版本的Anaconda环境配置、Cuda、cuDNN以及pytorch环境一键式配置流程

本教程是最新的深度学习入门环境配置教程&#xff0c;跟着本教程可以帮你解决入门深度学习之前的环境配置问题。同时&#xff0c;本教程拒绝琐碎&#xff0c;大部分以图例形式进行教程。这里我们安装的都是最新版本~ 文章目录 一、Anaconda的安装1.1 下载1.2 安装1.3 环境配置…

STM32CubeMX之freeRTOS互斥量

这是大哥保护小弟的故事 高中低等级的任务 互斥量就是谁要敢插我小弟的队&#xff0c;我就要打他&#xff0c;不能让其他人插我小弟的队 互斥量的使用是默认开启的不用手动开启&#xff01; 最高优先级任务&#xff1a;延时&#xff08;10ms&#xff09;再上厕所 中间&#x…

2023年录屏软件哪个好用,Camtasia Studio2023安装激活教程最新激活密钥

2023年录屏软件哪个好用&#xff0c;电脑Windows10系统自带录屏不是挺香吗&#xff0c;干嘛还需要安装录屏软件&#xff01;系统自带的录屏功能有一定局限限&#xff0c;想要录制其他文件、软件内容根本不好用&#xff1b;与其费时费力研究系统自带&#xff0c;不如选择好用的录…

matlab解微分方程:方向场

在微分方程中&#xff0c;常见的形式是&#xff1a; x ′ f ( x , t ) xf(x,t) x′f(x,t) 方向场的每一个矢量可以形象地刻画一阶微分方程的解。在方向场中的每个点处&#xff0c;都会出现一条其斜率等于通过该点的微分方程解的矢量。给定一个初值&#xff0c;微分方程对应一条…

MyBatis源码解析手写持久层框架

1. 手写持久层框架 1.1 JDBC操作数据库_问题分析 JDBC API 允许应用程序访问任何形式的表格数据&#xff0c;特别是存储在关系数据库中的数据 代码示例&#xff1a; public static void main(String[] args) { Connection connection null;PreparedStatement preparedState…

axios请求

参考&#xff1a;https://www.axios-http.cn/docs/instance

Visual Studio 2022 如何关闭左侧绿色条的点击事件,避免误触?

如图&#xff0c;文本编辑器左侧的绿条&#xff0c;很容易误触&#xff0c;真是神烦&#xff01;点一下就会弹出这个差异框。 我也不知道这个绿色的条叫什么&#xff0c;烦了好久都没有找到怎么关闭它&#xff01; 是叫 git 状态条&#xff1f;git 差异条&#xff1f;git 更改…

三平面映射的技术

大家好&#xff0c;我是阿赵。   之前在做护盾的时候&#xff0c;使用过一种叫做三平面映射的技术&#xff0c;这里来详细的说一下。 一、效果说明 在做场景的时候&#xff0c;很多美工都会遇到一个问题&#xff0c;想把一个通用的材质贴图赋予给一个经过拉伸的模型&#xf…

无涯教程-Perl - my函数

描述 此函数声明LIST中的变量在包围式块内按词法范围。如果指定了多个变量,则所有变量都必须用括号括起来。 语法 以下是此函数的简单语法- my LIST返回值 此函数不返回任何值。 例 以下是显示其基本用法的示例代码- #!/usr/bin/perl -wmy $string "We are the w…

【算法学习】高级班九

这种互为旋变串&#xff1a; 给定两个字符串&#xff0c;判断是否互为旋变串 代码&#xff1a; 打表法&#xff1a; 每一层内的数字不互相依赖&#xff0c;只依赖它下面的层但实际上size会约束L1和L2的值&#xff0c;即L1和L2<N-size 思路&#xff1a;设置一个窗口…