巩固基础,砥砺前行 。
只有不断重复,才能做到超越自己。
能坚持把简单的事情做到极致,也是不容易的。
消息队列有哪些作用
1.解耦:使用消息队列来作为两个系统直接的通讯方式,两个系统不需要相互依赖了
2.异步:系统A给消费队列发送完消息之后,就可以继续做其他事情了
3.流量削峰:如果使用消息队列的方式来调用某个系统,那么消息将在队列中排队,有消费者自己控制消费速度
死信队列是什么?延时队列是什么?
1.死信队列也是一个消息队列,它是用来存放那些没有成功消费的消息的,通常可以用来作为消息重试
2.延时队列就是用来存放需要在指定时间被处理的元素的队列,通常可以用来处理一些具有过期性操作的业务,比如十分钟内未支付则取消订单
ActiveMQ
消息队列(第一节)
1.消息队列产品有好多种,kafka、rabbitMQ 、rocketMQ 、activeMQ 等
在学习这些产品时,都需要从以下几个方面来着手
1)常用的API 如何发送接收消息
2)如何实现MQ高可用
3)MQ的集群和容错机制
4)MQ的持久化
5)MQ如何延迟和定时发送消息,如何保证消息有序
6)MQ的签收机制
7)这些MQ如何和Spring、SpringBoot 整合
8)这些消息队列有什么不同,使用场景有那些差异?
9)他们是用哪些语音开发的?
kafka(java、scale)、rabbitMQ(erlang)、rocketMQ(java)、activeMQ(java)
2.电商业务中的秒杀模块的操作:
读取订单、库存检查、库存冻结、余额查询、余额冻结、订单生成、余额扣减、库存扣减、生成流水、余额解冻、库从解冻
3. activeMQ 的两个端口 61616 后台端口,8161 web页面端口
4.查看后台程序是否存活
ps -ef|grep activemq | grep -v activemq
netstart -anp|grep 61616
lsof -i:61616
5.linux关闭防火墙命令
1)查看防火墙状态:
service iptables status
systemctl status firewalld
2)暂时关闭防火墙
systemctl stop firewalld
service iptables stop
3)永久关闭防火墙
systemctl disable firewalld
service iptables off
4)重启防火墙
systemctl enable firewalld
service iptables restart
5)查看版本
forewalld -cmd -version
6.消息队列工作流程
1)创建连接工厂
2)连接工厂创建连接,得到连接
3)连接创建session
4)session创建消息生产者或者消息消费者
5)消息生产者组装消息,并发送
activeMQ 使用场景(第二节)
问题引入
1.在什么情况下使用消息中间件?
2.为什么要使用消息中间件?
解耦 系统之间接口耦合太高
异步 同步操作太费时间,例如 注册发送邮件 XXX
消峰 双十一 春运等高并发场景
activeMQ 官网地址
activeMQ Java简单实现(第三节)
两种通讯方式
点对点(队列) ;
订阅发布(主题)
pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!--activemq所需要的jar包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<!-- 下面是通用jar包-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/ch.qos.logback/logback-classic -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.18</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
队列生产者
package com.ttzz.activemq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 创建生产者
* @Description:
* @author: tangzhong
* @date: 2021年3月15日 下午6:31:41
*/
public class ActiveMQProduceByQueue {
public static String url = "tcp://localhost:61616";
public static String queueName = "myQueue";
public static void main(String[] args) throws JMSException {
//1.获取工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
//2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话
// 第一个参数 是否开启开启事务
// 第二个参数 是签收模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建目的地 Queue
Queue queue =session.createQueue(queueName);
//5. 创建生产者
MessageProducer messageProducer = session.createProducer(queue);
//6. 发送消息
for (int i = 0; i < 4; i++) {
TextMessage textMessage = session.createTextMessage("queue……"+ i );
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("OOKK");
}
}
队列消费者
有两种接收方式:
同步阻塞
异步非阻塞
package com.ttzz.activemq;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 创建消费者
* @Description:
* @author: tangzhong
* @date: 2021年3月15日 下午6:32:57
*/
public class ActiveMQConsumerByQueue {
public static String url = "tcp://localhost:61616";
public static String queueName = "myQueue";
public static void main(String[] args) throws JMSException, IOException {
//1.获取工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
//2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话
// 第一个参数 是否开启开启事务
// 第二个参数 是签收模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建目的地 Queue
Queue queue =session.createQueue(queueName);
//5. 创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
//使用同步阻塞的方式
// while(true) {
// TextMessage textMessage = (TextMessage) messageConsumer.receive();
// if(textMessage!=null) {
// System.out.println("****消费者接收到消息:"+textMessage.getText());
// } else {
// break;
// }
// System.out.println(textMessage.getText());
// }
//使用异步非阻塞的方式 监听器
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
TextMessage textMessage = (TextMessage) arg0;
if(textMessage!=null) {
try {
System.out.println("****消费者接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
System.in.read(); //保证控制台不关
//关闭资源
messageConsumer.close();
session.close();
connection.close();
System.out.println("OOKK2");
}
}
主题生产者
package com.ttzz.activemq;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQProduceByTopic {
public static String url = "tcp://localhost:61616";
public static String topicName = "myTopic";
public static void main(String[] args) throws JMSException {
//1.获取工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
//2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话
// 第一个参数 是否开启开启事务
// 第二个参数 是签收模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建目的地 Topic
Topic queue =session.createTopic(topicName);
//5. 创建生产者
MessageProducer messageProducer = session.createProducer(queue);
//6. 发送消息
for (int i = 0; i < 4; i++) {
TextMessage textMessage = session.createTextMessage("myTopic……"+ i );
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("OOKK");
}
}
主题消费者
package com.ttzz.activemq;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQConsumerByTopic {
public static String url = "tcp://localhost:61616";
public static String topicName = "myTopic";
public static void main(String[] args) throws JMSException, IOException {
//1.获取工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
//2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3.创建会话
// 第一个参数 是否开启开启事务
// 第二个参数 是签收模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建目的地Topic
Topic queue =session.createTopic(topicName);
//5. 创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
//使用同步阻塞的方式
// while(true) {
// TextMessage textMessage = (TextMessage) messageConsumer.receive();
// if(textMessage!=null) {
// System.out.println("****消费者接收到消息:"+textMessage.getText());
// } else {
// break;
// }
// System.out.println(textMessage.getText());
// }
//使用异步非阻塞的方式 监听器
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
TextMessage textMessage = (TextMessage) arg0;
if(textMessage!=null) {
try {
System.out.println("****消费者接收到消息:"+textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
try {
System.out.println(textMessage.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
System.in.read(); //保证控制台不关
//关闭资源
messageConsumer.close();
session.close();
connection.close();
System.out.println("OOKK2");
}
}
消费者的三种情况
/**
* 1. 先生成,只启动一个消费者,第1个消费者能消费吗 ? 能
* 2. 先生成,先启动一个消费者,再启动一个消费者,第2个消费者能消费吗 ? no
* 3. 先启动两个消费者,然后再启动生成着,第二个消费者可以消费吗 ?Y 采用轮询的方式进行消费
*/
topic 简介
前提
1.先启动消费者 然后再启动生成者,只有订阅了,才能接收到订阅的消息
生成者
package com.ttzz.activemq;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQProduceByTopic {
public static String url = "tcp://localhost:61616";
public static String topicName = "myTopic";
public static void main(String[] args) throws JMSException {
//1.获取工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
//2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//3.创建会话
// 第一个参数 是否开启开启事务
// 第二个参数 是签收模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建目的地 Topic
Topic topic =session.createTopic(topicName);
//5. 创建生产者
MessageProducer messageProducer = session.createProducer(topic);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
//6. 发送消息
for (int i = 0; i < 4; i++) {
TextMessage textMessage = session.createTextMessage("myTopic……"+ i );
textMessage.setStringProperty("自定义消息的key", "自定义消息的value");
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("OOKK");
}
}
消费者
package com.ttzz.activemq;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQConsumerByTopic {
public static String url = "tcp://localhost:61616";
public static String topicName = "myTopic";
public static void main(String[] args) throws JMSException, IOException {
// 1.获取工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
// 2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("消费者2");
System.out.println("消费者2");
// 3.创建会话
// 第一个参数 是否开启开启事务
// 第二个参数 是签收模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4. 创建目的地Topic
Topic topic = session.createTopic(topicName);
// 5. 创建消费者
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark..."); // 创建持久化的订阅
connection.start();
Message message = topicSubscriber.receive();
while (message != null) {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
System.out.println("OOKK2");
}
}
先启动两个消费者,然后再启动生产者
消费者控制台
消费者1
消费者2
MQ界面
JMS 规范以及消息特性
JMS规范是什么
它是JavaEE体系中的一项Message Service
常用消息中间件比较
JMS组成和特点
JMS provider
实现jms接口的消息中间件
JMS Producer 、JMS Constomer
JMS Message 消息头
1)jms destination 消息目的地 队列或者主题
2)jms deviverymode 持久化方式
3)jms expiration 消息过期时间
4)jms 优先级 1到4是普通消息 5-9是加急消息
5)消息id 唯一识别每个消息的标识,是有MQ自己生成
消息头之destination
当然,也可以通过消息进行设置
四种重载:目的地,消息,优先级,存活时间,是否持久化
消息的目的地:队列 和 主题
持久性
消息的过期时间 默认是永不过期的
消息体
发送的消息类型有哪些:
StringMessage MapMessage ByteMessage StringMessage ObjectMessage 五中类型
要求:发送的消息体和接受的消息体要求类型一致。
要求:发送的消息体和接受的消息体要求类型一致。
自定义的消息属性
自定义的消息属性能有什么还用呢 ?
去重、识别、重点标注等
TextMessage textMessage = session.createTextMessage("myTopic……"+ i );
messageProducer.send(textMessage);
textMessage.setStringProperty("自定义消息的key", "自定义消息的value");
如何保证消息的可靠性???
消息的可靠性可以从以下四个方面来回答:
1)消息的持久性
2)消息的事务特性
3)消息的签收机制
4)消息持久化
(队列)消息的持久性
验证1:
设置消息为非持久性,然后生产消息,(服务器不关闭),再去消费消息
消息被正常消费
验证2:
设置消息为非持久性,然后生产消息,(服务器关闭),再去消费消息
生成出的消息
服务器关闭之后,再去消费消息
消息丢失,不能被消费。
刚刚生成的消息被丢失了
验证3:设置消息为持久性,然后生成消息。这个是刚刚生成的消息。
关闭消息服务器,再次重新启动消息,消息依旧存在
去消费消息。消息成功被消费
(topic)消息的持久性
对于topic消息,持久性没有多大意义,因为在topic模式中,需要先启动消费者,然后再启动生产者,如果设置了消息持久性,但是,还没有启动动消费者,则这些消息就会被丢失,不能被消费者消费
设置持久化的topic生成者
package com.ttzz.activemq;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQProduceByTopic {
public static String url = "tcp://localhost:61616";
public static String topicName = "myTopic";
public static void main(String[] args) throws JMSException {
//1.获取工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
//2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
//3.创建会话
// 第一个参数 是否开启开启事务
// 第二个参数 是签收模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4. 创建目的地 Topic
Topic topic =session.createTopic(topicName);
//5. 创建生产者
MessageProducer messageProducer = session.createProducer(topic);
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
//6. 发送消息
for (int i = 0; i < 4; i++) {
TextMessage textMessage = session.createTextMessage("myTopic……"+ i );
textMessage.setStringProperty("自定义消息的key", "自定义消息的value");
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("OOKK");
}
}
topic 消费者
package com.ttzz.activemq;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
public class ActiveMQConsumerByTopic {
public static String url = "tcp://localhost:61616";
public static String topicName = "myTopic";
public static void main(String[] args) throws JMSException, IOException {
// 1.获取工厂
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(url);
// 2. 创建连接
Connection connection = activeMQConnectionFactory.createConnection();
connection.setClientID("消费者1");
System.out.println("topic消费者1");
// 3.创建会话
// 第一个参数 是否开启开启事务
// 第二个参数 是签收模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4. 创建目的地Topic
Topic topic = session.createTopic(topicName);
// 5. 创建消费者
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark..."); // 创建持久化的订阅
connection.start();
Message message = topicSubscriber.receive();
while (message != null) {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
message = topicSubscriber.receive();
}
session.close();
connection.close();
System.out.println("OOKK2");
}
}
验证1:启动一个消费者
Active Durable Topic Subscribers :处于激活状态的持久化的topic消费者
Offline Durable Topic Subscribers:处于离线状态的持久化的topic消费者
启动持久化的topic生成者:
topic消费者消费消息
消息服务器
验证2:将消费者1 关闭,启动消费者2
消费者1处于离线状态,启动生成者消费
消费者2能够正常消费消息
再次启动消费者1,消费者1也能正常消费消息
消息的事务特性
事务主要是针对生产者,签收主要针对消费者
//3.创建会话
// 第一个参数 是否开启开启事务
// 第二个参数 是签收模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
验证1:
事务设置为 false,执行发送消息,消息自动到服务器
因为设置了消息的持久性,关闭服务器,再次重启启动,该消息依旧存在
验证2:
事务设置为 true,执行发送消息,
看看服务器有没有收到消息。服务器中没有收到刚刚发送的消息。因为没有做消息的提交操作
提交事务
消息入队
事务对于多个消息同时发送,能够保证原子性
session.rollback();
为了验证的需要,需要重启的时候,删除持久化的消息
操作:在配置文件activemq.xml的broker字段添加deleteAllMessagesOnStartup=“true”
可以看到持久化的消息被删除,
生成者开启事务,将消息发送到消息服务器
看到消息
消费者消费掉消息
再次启动一个消费者,发现消息已经被消费,说明消息不能被重新消费
验证2:设置消费者开启事务,但是,没有提交事务【消息被重复消费】。第一次,消费者1正常消费消息,
但是在服务器看到:消息没有被消费
再次启动另一个消费者,发现消息可以被多次消费
一个有趣的现象
生成者以事务的方式将生成者发送到服务器
消费者开启事务进行消费,但是,没有提交事务。保证控制到不灭。再次启动2号消费者,发现不能重复消费???
如果(去掉: System.in.read();)设置消费者4秒之后,没有消息,自动关闭。则启动2号消费者,可以重复消费。
原因呢???哈哈哈哈
消息的签收
非事务的签收有三种
1)自动签收 Session.AUTO_ACKNOWLEDGE
2)手动签收 Session.CLIENT_ACKNOWLEDGE
3)允许重复消息 Session.DUPS_OK_ACKNOWLEDGE ???这个我没有验证通过
对生产者而言,如果开启了事务,则签收机制可以随便选择,事务的优先级高于签收机制
验证1:生产者未开启事务,采用自动签收的方式将消息发送到服务器
消费者采用手动签收,发现消息可以重复消费。
消息发送到服务器,运行消费者程序,消息没有被消费掉
开启消息签收机制后,消息不能重复消费
事务模式下的签收
生产者开启事务,签收模式为自动签收,将消息发送到服务器
消费者开启事务,采用手动签收模式,但是消息没有使用ack机制。
消息仍然被消费掉
验证:
生产者开启事务,签收模式为自动签收,将消息发送到服务器;
消费者开启事务,采用手动签收模式,消息使用ack机制。但是没有commit。消息能被重复消费
结论:
在事务性会话中,当一个事物被成功提交则消息被自动签收。如果事物回滚,则消息会被再次传送。
非事物性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement)
activeMQ两种模式比较
1)工作模式上来说,主题采用订阅发布模式,如果没有订阅者消息就会被丢弃;如果有多个订阅者,则
就会被多个订阅者接收;队列采用一对一的模式,如果当前消息没有消费者,则该
消息也不会丢弃,如果有多个消费者,那么该消息只能被一个消费者消费,同时要求
消费者发送ack确认信息
2)从有无状态上来看,主题是无状态的,队列会默认在服务器上以文件的形式保存,
也可以配置DB存储
3)从消息传递的完整性来看,主题如果没有订阅者,则消息会丢弃,而队列不会
4) 处理效率,主题会随着订阅者的增多效率减低,而队列不会
activemq传输协议
http://activemq.apache.org/configuring-version-5-transports.html
activemq 默认采用tcp协议
在网络传输之前,序列化数据为字节流 open wire
tcp协议的优点
可靠性高稳定性强;效率高,采用字节流的方式传递;高效性、可用性支持所有平台
nio协议:
auto+nio
// public static String url = "tcp://localhost:61616";
// public static String url = "nio://localhost:61618";
public static String url = "auto+nio://localhost:5671";