1.rabbitMQ介绍

news2025/1/11 7:06:46

0.思考 我们以前为什么要学习java直接的框架代码,而不是用springboot整合的框架,在学习完MQ后,我的答案是,可以直接写成更灵活的MQ代码(其他框架也是,SSM我们为什么要学,在于灵活度更高,以后可能会遇到SSM的代码我就可以看得懂),springboot整合虽然完成了大多数功能,但是我要其他普通java代码(非springboot)兼容也是可以用原生的(万一有这种老项目呢,总不可能把老项目改为springboot项目吧…),还有一个就是学习了原生的可以看得懂和写出springboot的MQ配置文件了

1.什么是mq(message queue)

1.消息队列 FIFO队列
2.存放的内容是message
3.是跨进程的通信机制,发送信息不需要依赖其他服务

2.应用

1.流量削峰,订单系统无法直接访问1w流量,放到mq可以慢慢处理
不至于系统奔溃,但是访问速度会变慢需要排队
2.应用解耦,订单系统直接调用支付系统/其他系统
支付系统可能会故障导致订单系统故障
用mq在中间可以发信息让执行任务
3.异步处理,不用一直等待服务了,处理完任务通知即可

3.mq的分类

1.activemq 高可用信息可靠高,但是apache社区维护少了 ,ms级
2.kafka(大公司使用) 大数据的杀手锏 ms级消息有序的,大数据实时计算日志采集.消费一次
缺点单机超过54个队列/分区 load彪高 社区更新慢,load越多信息响应更高,消费失败不支持重试
3.RocketMQ(金融互联网)阿里开源,单机吞吐量10w级,支持10亿信息堆积,不会因为信息堆积导致性能低下
缺点: 支持语言少,java c++
4.rabbitMQ(中小公司) 企业最流行的消息中间件 enlang语言高并发特性,多语言, pyjava ruby,.net…ajax社区活跃
缺点: 企业版要收费,学习成本高

  1. MQ的4大概念(一个队列对应一个消费者)
`
   生产者 --->交换机--绑定--队列-->消费者
                        --队列-->消费者 

5.名词

1.channel信道(提高利用率,不会只占用整个tcp连接,有多个信道发送信息,)
2.virtual host 多租户(vh 有多个交换机exchange)
3.blinding exchange和queue的绑定关系

6.安装mq

    #开机启动服务,上传elang,rabbitmq文件
   rpm -ivh erlang-21.3-1.el7.x86_64.rpm
   yum install socat -y
   rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm
   chkconfig rabbitmq-server on
     #web界面,先关机后安装插件启动,访问15672,要关防火墙
   rabbitmq-plugins enable rabbitmq_management
   #启动服务
   /sbin/service rabbitmq-server start
    #看状态
   /sbin/service rabbitmq-server status
   #停服务
   /sbin/service rabbitmq-server stop
 
   #添加账号赋权限才能登录admin密码123
   rabbitmqctl add_user admin 123
   rabbitmqctl set_user_tags admin administrator
   rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
   #查看用户
   rabbitmqctl list_users
   #重置命令
    rabbitmqctl stop_app
    rabbitmqctl reset
   #重启
   rabbitmqctl start_app

7.写java项目 生产者 消费者

    1.导入maven依赖
           <!--rabbitmq 依赖客户端-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.9.0</version>
            </dependency>
            <!--操作文件流的一个依赖-->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.6</version>
            </dependency>
    2. 生产者 通过chanel操作交换机的默认的队列
       ConnectionFactory
       Connection
       Channel
         //队列名
         //队列消息是否持久化到磁盘,默认在内存中
        //队列是否只供一个消费者消费(不共享)
        //是否自动删除,开新队列
       //其他参数
       channel.("name",false,false,false,null)
       //1.哪个交换机,队列名称,其他参数.消息体(二进制)
       String msg="hello";  
     channel.basicPublish("","qname",null,msg.getBytes("UTF-8"));
     //到mq的Queues界面看消息
   public class Producer {
    public static String QUEUE_NAME="hello1";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.10.104");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String msg="hello3";
		//发送消息        
		channel.basicPublish("",QUEUE_NAME,null,msg.getBytes("UTF-8"));

        System.out.println("发送信息成功");


    }
}    


    3.消费者队列类 设置ip账号和密码
       ConnectionFactory
      Connection=factory.createConnection();
       Channel
       DeliverCallBack =(xxx)->{
                  sout(new String(message.getBody(),"UTF-8"));
         };
       CancelCallBack  =xxx->{};
       //消费, 项目队列名,是否自动应答(就是要手动处理还是被动处理)
       //失败的回调,消费者取消消费的回调(都要lambda表达式)
        channel.basicConsume();
                             
   public class Consumer {
    public static String QUEUE_NAME="hello1";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.10.104");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        DeliverCallback deliverCallback=(consumerTag,delivery)->{
            System.out.println(new String(delivery.getBody()));
            System.out.println(consumerTag);

        };
        CancelCallback cancelCallback=(var1)->{

            System.out.println("应答失败");
        };


        Channel channel = connection.createChannel();
       channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

        System.out.println("接收信息成功");
    }
}
    //但是发现一个队列只能发一次,多了接收不到,什么问题???
    //原来开启了手动应答,这一句代码第二个参数现在是自动应答
    channel.basicConsume(QUEUE_NAME,false,deliverCallback,cancelCallback);

8.工作队列 (上面是普通队列)避免立即执行资源密集型任务
生产者大量发消息到队列,消费者需要有多个工作线程处理任务(就不会处理单线程慢慢的处理)
但是要注意消息不能重复消费,导致重复的任务

1.mq默认使用轮询(你一个我一个消息,不会重复)分发给工作线程
2.封装连接信道工具类
3.worker1.class工作线程1 工作线程2
开发工具的work01–>edit config–>allow parallel run
//启动2个work01的窗口 …psf
//写生产者

9.消息应答(防止消息没有被消费者处理完中途丢失消息)处理完成通知mq删除队列的消息

         1.自动应答(少使用 用于机器非常可靠能快速及时处理任务)
         2.手动应答(多使用)
             1.批量应答(不建议) 会把信道的信息全部确认(虽然可以减少网络压力,但是数据可能丢失)
             2.不批量 (只应答当前发送过来的消息,发过去一次应答一次)
                   如 队列发送了 1 ,2 ,3消息,会等待3的处理结果并确认

10.消息应答重新入队(消息发送但是tcp在应答前断开连接[消息发送到线程,队列已经没有数据了,需要恢复数据],没有ack确认消息,需要重新入队发给其他工作线程处理)
如动图mq1 消息最后丢失
请添加图片描述

1.消息自动重新入队

      //消息的标记, 是否批量应答
   channel.basicAck(message.getEnvelope().getDeliveryTag(),false);  

2.代码如下
//提供者

  public class AckMsg {
    public static final String QUEUE_NAME="ack";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = MQRabbitUtil.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //队列名
        //队列消息是否持久化到磁盘,默认在内存中
        //队列是否只供一个消费者消费(不共享)
        //是否自动删除,开新队列
        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String next = scanner.next();
            channel.basicPublish("",QUEUE_NAME,null,next.getBytes("UTF-8"));
        }
    }
}
       //消费者
     public class Consumer1 {
public static final String QUEUE_NAME="ack";
public static void main(String[] args) throws IOException, TimeoutException {
    Channel channel = MQRabbitUtil.getChannel();
    DeliverCallback deliverCallback=(consumerTag, delivery)->{
        SleepUtils.sleep(1);
        System.out.println("消息处理快接收:"+new String(delivery.getBody(),"UTF-8"));
        System.out.println(consumerTag);
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);


    };
    CancelCallback cancelCallback=(var1)->{

        System.out.println("应答失败");
    };
        //消费, 项目队列名,是否自动应答(就是要手动处理还是被动处理)
        //失败的回调,消费者取消消费的回调(都要lambda表达式)
        boolean IsAck=false;
        channel.basicConsume(QUEUE_NAME, IsAck, deliverCallback, cancelCallback);

        System.out.println("worker2正在等待接收消息");
    }

}
        //消费者2
       public class Consumer2 {
    public static final String QUEUE_NAME="ack";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = MQRabbitUtil.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            SleepUtils.sleep(30);
            System.out.println("消息处理慢接收:"+new String(delivery.getBody(),"UTF-8"));
            System.out.println(consumerTag);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);


        };
        CancelCallback cancelCallback=(var1)->{

            System.out.println("应答失败");
        };
        //消费, 项目队列名,是否自动应答(就是要手动处理还是被动处理)
        //失败的回调,消费者取消消费的回调(都要lambda表达式)
        boolean IsAck=false;
        channel.basicConsume(QUEUE_NAME, IsAck, deliverCallback, cancelCallback);

        System.out.println("worker2正在等待接收消息");
    }

}

10.队列持久化(消息在mq保存,而不发出去就消失)
//需要将原来不持久化的队列删除,不然报错
//界面的feature会变为D代表持久化
//生产者写

   boolean Duration=true;
        channel.queueDeclare(QUEUE_NAME,Duration,false,false,null);

11.消息持久化(也会丢失)

//准备写入磁盘的时候,没有存储完,消息在缓存中
channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes("UTF-8"));
      //是否自动应答 false,不然的话会学习达到消费者那里消费

12.不公平分配(能者多劳,轮询是公平的)常用

   1.消费者设置 
      //预取值 直接设置下面的,指定每个消费者得到几条
      int prefetchCount =1;//系统会根据处理时间动态分配
      //积压的数据才能看到效果(就是同一时间能接受多少条数据)
      channel.basicQos(prefetchCount);//默认是0公平分发,1代表不公平

13.怎么确保MQ消息不丢失

1.队列持久化
2.队列的消息持久化
3.发布确认

14.发布确认原理(之前的是消费者确认,这里在生产者确认!!!,确认消费者(mq)保存在磁盘上才能保证队列绝对不丢失)
1.单个确认(同步确认,发一条确认一条,速度慢)

     Channel channel = MQRabbitUtil.getChannel();
    channel.confirmSelect();//代表要确认磁盘中mq已经存储了数据,先写

   boolean flag=channel.waitForConfirms();
    if(flag){
            System.out.println("消息已经写入磁盘的确认");

        }
       ----------全部代码------------
  public class AckMsg {
    public static final String QUEUE_NAME="ack4";
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        Channel channel = MQRabbitUtil.getChannel();
        channel.confirmSelect();//代表要确认磁盘中mq已经存储了数据,先写

        boolean Duration=true;
        //队列名
        //队列消息是否持久化到磁盘,默认在内存中
        //队列是否只供一个消费者消费(不共享)
        //是否自动删除,开新队列
        //其他参数
        channel.queueDeclare(QUEUE_NAME,Duration,false,false,null);

        Scanner scanner = new Scanner(System.in);

        while (scanner.hasNext()){
            String next = scanner.next();
            //交换机
            //队列名
            //设置消息持久化
            //二进制
            channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,next.getBytes());
            boolean flag=channel.waitForConfirms();
            if(flag){
                System.out.println("消息已经写入磁盘的确认");

            }

        }
    }
}

-----consumer2-----

public class Consumer2 {
    public static final String QUEUE_NAME="ack4";
    public static void main(String[] args) throws Exception {
        Channel channel = MQRabbitUtil.getChannel();
        channel.basicQos(5);
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            SleepUtils.sleep(30);
            System.out.println("消息处理慢接收:"+new String(delivery.getBody(),"UTF-8"));
            System.out.println(consumerTag);
            //确认的标志
            //是否批量应答
			//这里是我手动应答的代码            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);



        };
        CancelCallback cancelCallback=(var1)->{

            System.out.println("应答失败");
        };
        //消费队列名
        // 是否自动应答(就是要手动处理还是被动处理)
        //失败的回调,消费者取消消费的回调(都要lambda表达式)
        boolean IsAck=false;//手动应答
        channel.basicConsume(QUEUE_NAME, IsAck, deliverCallback, cancelCallback);

        System.out.println("worker2正在等待接收消息");
    }

}
2.批量确认(性能高,但是一群消息发送,出故障时不知道那个消息出故障)(不使用他)
 //批量发数据 确认1次
  //代码也是一样的
   int batchSize=100; //100条确认一次
  for(int i=0;i<MESSAGE_COUNT;i++){
          if(i%batchSize==0){
channel.waitForConfirms();//确认
        }

          
  }
3.异步批量确认(性能高,信息可靠但是代码难写)(比前面两个都快) 使用一个有序的map key记录编号和消息,信息到后面才异步确定
      //!!!消费者没有确认代码,提供者必须要有 下面
      
       channel.confirmSelect();                         
    //消费者有个broker通知map是否收到信息
               nackCallback #未确认回调
               ackCallback   #确认回调
    //提供者代码,写监听器 监听成功接口还是失败接口,不用waitForConfirms
//不看他也可以,因为并发访问时线程不安全
ConfirmCallback ackCallback=(tag,multiple)->{
                     if(multiple){//如果批量确认,直接批量删除标签
                              //成功一个,在全部列表删除该标签的值
                              ConcurrentNavigableMap<Long,String> confirmed=out.headMap(deliverTag);
                              confirm.clear();//清除确认的
                    }else{
						 out.remove(tag);//直接删除
                     }
                         sout("确认的消息"+tag)
            }
           ConfirmCallback nackCallback=(tag,multiple)->{ 
                        out.get(tag);//得到为确认的消息
                         sout("确认的消息"+tag)
            }
       channel.addConfirmListener(ackCallback,nackCallback);//异步通知
    //怎么处理未确认消息,因为发消息和确认消息是两个队列,所以使用ConcurrentLinkedQueue对确认和发布线程进行消息传递
   //全部发送的消息(发送消息的时候) - 记录成功的消息(确认成功的接口)=未确认的消息
    //老师选择了另外一个来记录 1.序号和信息关联2.轻松批量删除3.支持高并发(多线程)
         ConcurrentSkipListMap<Long,String> out=new ConcurrentSkipListMap<>();
  for(int i=0;i<MESSAGE_COUNT;i++){
       String msg="msg"+i; 
       channel.basicPublish("",queueName,null,msg.getBytes());
       out.put(channel.getNextPublishSeqNo(),msg);
          
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/475812.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot+MyBatis搭建迷你微信小程序

课程链接&#xff1a;https://www.imooc.com/learn/945 标签组件 view view类似于HTML中的div标签是最基础的UI组件 https://blog.csdn.net/wushibo750/article/details/113802928 https://developers.weixin.qq.com/miniprogram/dev/component/view.html block flex ht…

突破1300件!腾讯数据安全专利授权量最新成绩出炉

今天是世界知识产权日&#xff0c;跟大家汇报一下我们在数据安全专利工作上的进展。 截至2023年4月&#xff0c;腾讯共获得数据安全相关专利授权量超过1300件、申请公开量超过1800件&#xff0c;涵盖数据加解密、数据库访问、数据分级分类、数据备份、数据防泄漏、机密计算等多…

音视频八股文(9)-- flv的h264六层结构和aac六层结构

flv介绍 FLV(Flash Video)是Adobe公司推出的⼀种流媒体格式&#xff0c;由于其封装后的⾳视频⽂件体积⼩、封装简单等特点&#xff0c;⾮常适合于互联⽹上使⽤。⽬前主流的视频⽹站基本都⽀持FLV。采⽤FLV格式封装的⽂件后缀为.flv。 FLV封装格式是由⼀个⽂件头(file header)…

概述篇——01 计算机网络概述

一、什么是计算机网络 计算机网络主要由一些通用的、可编程的硬件互连而成&#xff0c;通过这些硬件&#xff0c;可以传送不同类型的数据&#xff0c;并且可以支持广泛和日益增长的应用&#xff1b; 计算机网络不只是软件概念&#xff0c;还包含硬件设备&#xff1b;计算机网…

【Mysql】基础篇:DML(data manipulation language)语句:增、删、改数据库数据总结

博主简介&#xff1a;努力学习的大一在校计算机专业学生&#xff0c;热爱学习和创作。目前在学习和分享&#xff1a;数据结构、Go&#xff0c;Java等相关知识。博主主页&#xff1a; 是瑶瑶子啦所属专栏: Mysql从入门到精通近期目标&#xff1a;写好专栏的每一篇文章 目录 一、…

微搭低代码实现下拉框动态填充值

有个粉丝问我&#xff0c;微搭的下拉框如何自动填充值 想问一下&#xff0c;下拉控件需要绑定数据源里面的列表&#xff0c;这个需要怎么做&#xff0c;自己研究了蛮久也没弄出来&#xff0c;需要参考您哪一篇教程&#xff1f; 一般你字段设置为枚举类型就可以&#xff0c;如果…

数据结构入门(二)——单链表(增,删,查,改)

1.单链表的概念 概念&#xff1a;链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;但链表在逻辑上是连续的&#xff0c;顺序的&#xff0c;而数据元素的逻辑顺序是通过链表中的指针连接次序实现的。 1.2链表的结构 我们给int重新定义一下新类型叫做SLDataType…

开源Stylegan人脸生成预训练模型

最近在研究Stylegan对抗式图像生成网络&#xff0c;使用了网络的一些预训练模型生成相应的图像&#xff0c;感觉非常有趣。下面开源一些我找到了预训练模型和代码&#xff0c;供大家一起玩。 Stylegan2官方给出的是TensorFlow版本的&#xff0c;费了半天劲找出了pytorch版本 这…

【五一创作】【Midjourney】Midjourney 连续性人物创作 ① ( 通过垫图方式生成类似图像 )

文章目录 一、Midjourney 生成图像二、通过垫图方式生成类似图像 一、Midjourney 生成图像 Midjourney 可以生成高质量的图像 , 但是 生成过程有很大的随机性 , 输入同样的提示词指令 , 其输出结果也存在很大的不同 ; 如果要 生成稳定的人物角色 , 场景 , 描述连贯的内容 , 这…

RMAN-03009、ORA-19566数据文件坏块报错处理方法

在备份数据库的时候&#xff0c;出现RMAN-03009、ORA-19566报错&#xff1a; RMAN-03009: backup 命令 (c3 通道上, 在 04/29/2023 10:58:11 上) 失败 ORA-19566: 超出损坏块限制 0 (文件 E:\APP\ADMINISTRATOR\ORADATA\JHSEMR\JHEMR2.DBF) 继续执行其他作业步骤, 将不重新运行…

React--》一些不常见的hook函数讲解

目录 Hook函数 useImperativeHandle useLayoutEffect和useInsertionEffect与useEffect区别 useDebugValue useDeferredValue useTransition Hook函数 关于React中的钩子函数&#xff0c;在我之间的文章中讲解完我们已经非常熟悉了&#xff0c;钩子函数的功能非常强大而它…

编译安卓系统源码时异常处理

编译安卓系统源码时异常处理 提示语法错误&#xff0c;如下所示&#xff1a; FAILED: out/target/product/generic/system-qemu.img /bin/bash -c "(export SGDISKout/host/linux-x86/bin/sgdisk SIMG2IMGout/host/linux-x86/bin/simg2img; device/generic/goldfis…

新安装的ubuntu,遇到的问题记录

镜像版本&#xff1a; https://mirror.nju.edu.cn/ubuntu-releases/22.04/ubuntu-22.04.1-live-server-amd64.iso 安装后无法切换 root 用户&#xff1a; 问题截图&#xff1a; null 解决办法&#xff1a; 解决ubuntu操作系统默认没有创建root账户&#xff1a; 1、sudo passwd …

云原生CAx软件:多租户的认证

云原生CAx软件是在设计时便将云平台作为部署、运行环境的CAx软件。通常&#xff0c;为了降低成本、方便管理&#xff0c;云原生CAx系统需要能为多个租户提供服务&#xff0c;即多租户(Multi-tenancy)&#xff0c;而实现这种多租户系统&#xff0c;关键是要处理好身份认证、权限…

PhotoShop如何使用图层之实例演示?

文章目录 0.引言1.创建简单的立体书效果图2.给人像制作逼真的影子3.用调整图层除去图像中的灰色4.制作有质感的口红颜色5.给黑白图像上色6.制作粉笔文字效果 0.引言 因科研等多场景需要进行绘图处理&#xff0c;笔者对PS进行了学习&#xff0c;本文通过《Photoshop2021入门教程…

Packet Tracer - 配置 IPv6 静态路由和默认路由

Packet Tracer - 配置 IPv6 静态路由和默认路由 IPv6 地址分配表 设备 接口 IPv6 地址/前缀 默认网关 R1 G0/0 2001:DB8:1:1::1/64 不适用 S0/0/0 2001:DB8:1:A001::1/64 不适用 R2 G0/0 2001:DB8:1:2::1/64 不适用 S0/0/0 2001:DB8:1:A001::2/64 不适用 S0…

如何将图片恢复水平位置?图片旋转矫正方法大全,ddddocr作者基于RotNet的旋转验证码深度学习识别模型Rotate-Captcha-Crack

基于边缘检测的图像旋转校正模型&#xff1a; 该模型首先使用边缘检测算法对图像进行边缘检测&#xff0c;然后找到边缘上的直线&#xff0c;并计算直线的角度。最后通过旋转图像来校正图像的角度。 import cv2 import numpy as np# 加载图像 img cv2.imread(skewed_image.j…

【chatgpt】学习开源项目chatgpt-web,搭建自己的chatgpt服务,功能非常丰富有打字效果

目录 前言1&#xff0c;开源的chatgpt项目2&#xff0c;项目可以直接使用docker-compose跑起来3&#xff0c;关于打字模式SSE&#xff0c; octet-stream &#xff08;打字特效&#xff09;4&#xff0c;关于内容存储5&#xff0c;总结 前言 本文的原文连接是: https://blog.csd…

自动控制原理笔记-根轨迹法

目录 一&#xff0c;根轨迹的基本概念 1.根轨迹的基本概念 2.根轨迹方程 3.根轨迹方程的应用 二&#xff0c;根轨迹的绘制规则 【规则一】根轨迹有n条分支&#xff1a; 【规则二】根轨迹对称于实轴&#xff1a; 【规则三】根轨迹的起点和终点&#xff1a; 【规则四】…

BUUCTF-Web-[极客大挑战 2019]Upload

打开后可以看到是一个可以进行文件上传的页面&#xff0c;如下图所示 查看页面源代码&#xff0c;如下图所示&#xff0c;可以看到有js代码&#xff0c;说明存在前端验证的可能性 上传一个php文件&#xff0c;此处上传shell.php后页面如下图所示&#xff0c;显示不是图片 用bur…