0.思考 我们以前为什么要学习java直接的框架代码,而不是用springboot整合的框架,在学习完MQ后,我的答案是,可以直接写成更灵活的MQ代码(其他框架也是,SSM我们为什么要学,在于灵活度更高,以后可能会遇到SSM的代码我就可以看得懂),springboot整合虽然完成了大多数功能,但是我要其他普通java代码(非springboot)兼容也是可以用原生的(万一有这种老项目呢,总不可能把老项目改为springboot项目吧…),还有一个就是学习了原生的可以看得懂和写出springboot的MQ配置文件了
1.什么是mq(message queue)
1.消息队列 FIFO队列
2.存放的内容是message
3.是跨进程的通信机制,发送信息不需要依赖其他服务
2.应用
1.流量削峰,订单系统无法直接访问1w流量,放到mq可以慢慢处理
不至于系统奔溃,但是访问速度会变慢需要排队
2.应用解耦,订单系统直接调用支付系统/其他系统
支付系统可能会故障导致订单系统故障
用mq在中间可以发信息让执行任务
3.异步处理,不用一直等待服务了,处理完任务通知即可
3.mq的分类
1.activemq 高可用信息可靠高,但是apache社区维护少了 ,ms级
2.kafka(大公司使用) 大数据的杀手锏 ms级消息有序的,大数据实时计算日志采集.消费一次
缺点单机超过54个队列/分区 load彪高 社区更新慢,load越多信息响应更高,消费失败不支持重试
3.RocketMQ(金融互联网)阿里开源,单机吞吐量10w级,支持10亿信息堆积,不会因为信息堆积导致性能低下
缺点: 支持语言少,java c++
4.rabbitMQ(中小公司) 企业最流行的消息中间件 enlang语言高并发特性,多语言, pyjava ruby,.net…ajax社区活跃
缺点: 企业版要收费,学习成本高
- MQ的4大概念(一个队列对应一个消费者)
`
生产者 --->交换机--绑定--队列-->消费者
--队列-->消费者
5.名词
1.channel信道(提高利用率,不会只占用整个tcp连接,有多个信道发送信息,)
2.virtual host 多租户(vh 有多个交换机exchange)
3.blinding exchange和queue的绑定关系
6.安装mq
#开机启动服务,上传elang,rabbitmq文件
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
chkconfig rabbitmq-server on
#web界面,先关机后安装插件启动,访问15672,要关防火墙
rabbitmq-plugins enable rabbitmq_management
#启动服务
/sbin/service rabbitmq-server start
#看状态
/sbin/service rabbitmq-server status
#停服务
/sbin/service rabbitmq-server stop
#添加账号赋权限才能登录admin密码123
rabbitmqctl add_user admin 123
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
#查看用户
rabbitmqctl list_users
#重置命令
rabbitmqctl stop_app
rabbitmqctl reset
#重启
rabbitmqctl start_app
7.写java项目 生产者 消费者
1.导入maven依赖
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
2. 生产者 通过chanel操作交换机的默认的队列
ConnectionFactory
Connection
Channel
//队列名
//队列消息是否持久化到磁盘,默认在内存中
//队列是否只供一个消费者消费(不共享)
//是否自动删除,开新队列
//其他参数
channel.("name",false,false,false,null)
//1.哪个交换机,队列名称,其他参数.消息体(二进制)
String msg="hello";
channel.basicPublish("","qname",null,msg.getBytes("UTF-8"));
//到mq的Queues界面看消息
public class Producer {
public static String QUEUE_NAME="hello1";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.10.104");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String msg="hello3";
//发送消息
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes("UTF-8"));
System.out.println("发送信息成功");
}
}
3.消费者队列类 设置ip账号和密码
ConnectionFactory
Connection=factory.createConnection();
Channel
DeliverCallBack =(xxx)->{
sout(new String(message.getBody(),"UTF-8"));
};
CancelCallBack =xxx->{};
//消费, 项目队列名,是否自动应答(就是要手动处理还是被动处理)
//失败的回调,消费者取消消费的回调(都要lambda表达式)
channel.basicConsume();
public class Consumer {
public static String QUEUE_NAME="hello1";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.10.104");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
DeliverCallback deliverCallback=(consumerTag,delivery)->{
System.out.println(new String(delivery.getBody()));
System.out.println(consumerTag);
};
CancelCallback cancelCallback=(var1)->{
System.out.println("应答失败");
};
Channel channel = connection.createChannel();
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
System.out.println("接收信息成功");
}
}
//但是发现一个队列只能发一次,多了接收不到,什么问题???
//原来开启了手动应答,这一句代码第二个参数现在是自动应答
channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);
8.工作队列 (上面是普通队列)避免立即执行资源密集型任务
生产者大量发消息到队列,消费者需要有多个工作线程处理任务(就不会处理单线程慢慢的处理)
但是要注意消息不能重复消费,导致重复的任务
1.mq默认使用轮询(你一个我一个消息,不会重复)分发给工作线程
2.封装连接信道工具类
3.worker1.class工作线程1 工作线程2
开发工具的work01–>edit config–>allow parallel run
//启动2个work01的窗口 …psf
//写生产者
9.消息应答(防止消息没有被消费者处理完中途丢失消息)处理完成通知mq删除队列的消息
1.自动应答(少使用 用于机器非常可靠能快速及时处理任务) 2.手动应答(多使用) 1.批量应答(不建议) 会把信道的信息全部确认(虽然可以减少网络压力,但是数据可能丢失) 2.不批量 (只应答当前发送过来的消息,发过去一次应答一次) 如 队列发送了 1 ,2 ,3消息,会等待3的处理结果并确认
10.消息应答重新入队(消息发送但是tcp在应答前断开连接[消息发送到线程,队列已经没有数据了,需要恢复数据],没有ack确认消息,需要重新入队发给其他工作线程处理)
如动图mq1 消息最后丢失
1.消息自动重新入队
//消息的标记, 是否批量应答
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
2.代码如下
//提供者
public class AckMsg {
public static final String QUEUE_NAME="ack";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQRabbitUtil.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//队列名
//队列消息是否持久化到磁盘,默认在内存中
//队列是否只供一个消费者消费(不共享)
//是否自动删除,开新队列
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String next = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,next.getBytes("UTF-8"));
}
}
}
//消费者
public class Consumer1 {
public static final String QUEUE_NAME="ack";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQRabbitUtil.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->{
SleepUtils.sleep(1);
System.out.println("消息处理快接收:"+new String(delivery.getBody(),"UTF-8"));
System.out.println(consumerTag);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback=(var1)->{
System.out.println("应答失败");
};
//消费, 项目队列名,是否自动应答(就是要手动处理还是被动处理)
//失败的回调,消费者取消消费的回调(都要lambda表达式)
boolean IsAck=false;
channel.basicConsume(QUEUE_NAME, IsAck, deliverCallback, cancelCallback);
System.out.println("worker2正在等待接收消息");
}
}
//消费者2
public class Consumer2 {
public static final String QUEUE_NAME="ack";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = MQRabbitUtil.getChannel();
DeliverCallback deliverCallback=(consumerTag, delivery)->{
SleepUtils.sleep(30);
System.out.println("消息处理慢接收:"+new String(delivery.getBody(),"UTF-8"));
System.out.println(consumerTag);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback=(var1)->{
System.out.println("应答失败");
};
//消费, 项目队列名,是否自动应答(就是要手动处理还是被动处理)
//失败的回调,消费者取消消费的回调(都要lambda表达式)
boolean IsAck=false;
channel.basicConsume(QUEUE_NAME, IsAck, deliverCallback, cancelCallback);
System.out.println("worker2正在等待接收消息");
}
}
10.队列持久化(消息在mq保存,而不发出去就消失)
//需要将原来不持久化的队列删除,不然报错
//界面的feature会变为D代表持久化
//生产者写
boolean Duration=true;
channel.queueDeclare(QUEUE_NAME,Duration,false,false,null);
11.消息持久化(也会丢失)
//准备写入磁盘的时候,没有存储完,消息在缓存中
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
//是否自动应答 false,不然的话会学习达到消费者那里消费
12.不公平分配(能者多劳,轮询是公平的)常用
1.消费者设置
//预取值 直接设置下面的,指定每个消费者得到几条
int prefetchCount =1;//系统会根据处理时间动态分配
//积压的数据才能看到效果(就是同一时间能接受多少条数据)
channel.basicQos(prefetchCount);//默认是0公平分发,1代表不公平
13.怎么确保MQ消息不丢失
1.队列持久化
2.队列的消息持久化
3.发布确认
14.发布确认原理(之前的是消费者确认,这里在生产者确认!!!,确认消费者(mq)保存在磁盘上才能保证队列绝对不丢失)
1.单个确认(同步确认,发一条确认一条,速度慢)
Channel channel = MQRabbitUtil.getChannel();
channel.confirmSelect();//代表要确认磁盘中mq已经存储了数据,先写
boolean flag=channel.waitForConfirms();
if(flag){
System.out.println("消息已经写入磁盘的确认");
}
----------全部代码------------
public class AckMsg {
public static final String QUEUE_NAME="ack4";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Channel channel = MQRabbitUtil.getChannel();
channel.confirmSelect();//代表要确认磁盘中mq已经存储了数据,先写
boolean Duration=true;
//队列名
//队列消息是否持久化到磁盘,默认在内存中
//队列是否只供一个消费者消费(不共享)
//是否自动删除,开新队列
//其他参数
channel.queueDeclare(QUEUE_NAME,Duration,false,false,null);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String next = scanner.next();
//交换机
//队列名
//设置消息持久化
//二进制
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
boolean flag=channel.waitForConfirms();
if(flag){
System.out.println("消息已经写入磁盘的确认");
}
}
}
}
-----consumer2-----
public class Consumer2 {
public static final String QUEUE_NAME="ack4";
public static void main(String[] args) throws Exception {
Channel channel = MQRabbitUtil.getChannel();
channel.basicQos(5);
DeliverCallback deliverCallback=(consumerTag, delivery)->{
SleepUtils.sleep(30);
System.out.println("消息处理慢接收:"+new String(delivery.getBody(),"UTF-8"));
System.out.println(consumerTag);
//确认的标志
//是否批量应答
//这里是我手动应答的代码 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback=(var1)->{
System.out.println("应答失败");
};
//消费队列名
// 是否自动应答(就是要手动处理还是被动处理)
//失败的回调,消费者取消消费的回调(都要lambda表达式)
boolean IsAck=false;//手动应答
channel.basicConsume(QUEUE_NAME, IsAck, deliverCallback, cancelCallback);
System.out.println("worker2正在等待接收消息");
}
}
2.批量确认(性能高,但是一群消息发送,出故障时不知道那个消息出故障)(不使用他)
//批量发数据 确认1次
//代码也是一样的
int batchSize=100; //100条确认一次
for(int i=0;i<MESSAGE_COUNT;i++){
if(i%batchSize==0){
channel.waitForConfirms();//确认
}
}
3.异步批量确认(性能高,信息可靠但是代码难写)(比前面两个都快) 使用一个有序的map key记录编号和消息,信息到后面才异步确定
//!!!消费者没有确认代码,提供者必须要有 下面
channel.confirmSelect();
//消费者有个broker通知map是否收到信息
nackCallback #未确认回调
ackCallback #确认回调
//提供者代码,写监听器 监听成功接口还是失败接口,不用waitForConfirms
//不看他也可以,因为并发访问时线程不安全
ConfirmCallback ackCallback=(tag,multiple)->{
if(multiple){//如果批量确认,直接批量删除标签
//成功一个,在全部列表删除该标签的值
ConcurrentNavigableMap<Long,String> confirmed=out.headMap(deliverTag);
confirm.clear();//清除确认的
}else{
out.remove(tag);//直接删除
}
sout("确认的消息"+tag)
}
ConfirmCallback nackCallback=(tag,multiple)->{
out.get(tag);//得到为确认的消息
sout("确认的消息"+tag)
}
channel.addConfirmListener(ackCallback,nackCallback);//异步通知
//怎么处理未确认消息,因为发消息和确认消息是两个队列,所以使用ConcurrentLinkedQueue对确认和发布线程进行消息传递
//全部发送的消息(发送消息的时候) - 记录成功的消息(确认成功的接口)=未确认的消息
//老师选择了另外一个来记录 1.序号和信息关联2.轻松批量删除3.支持高并发(多线程)
ConcurrentSkipListMap<Long,String> out=new ConcurrentSkipListMap<>();
for(int i=0;i<MESSAGE_COUNT;i++){
String msg="msg"+i;
channel.basicPublish("",queueName,null,msg.getBytes());
out.put(channel.getNextPublishSeqNo(),msg);
}