RabbitMQ 安装 示例 全面了解有这一篇就够了

news2024/11/18 14:44:20

目录

一、基础知识

1. 什么是RabbitMQ

2.什么是消息和队列

3.什么是消息队列

4.什么地方使用RabbitMQ

5.RabbitMQ组成概念

二、如何落地RabbitMQ

1.RabbitMQ环境安装

2.创建系统业务

三、Exchange交换机及实例分析

1.Fanout Exchange (扇形交换机)

2.Direct Exchange (直连交换机)

3.Topic Exchange (主题交换机)

4.Header Exchange(头部交换机)

四、RabbitMQ消息确认

1.消息生产端

2.消息存储端

3.消息消费端


一、基础知识

1. 什么是RabbitMQ

RabbitMQ是2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,简称MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法,由Erlang(专门针对于大数据高并发的语言)语言开发,可复用的企业消息系统,是当前最主流的消息中间件之一,具有可靠性、灵活的路由、消息集群简单、队列高可用、多种协议的支持、管理界面、跟踪机制以及插件机制。

2.什么是消息和队列

1.消息 就是数据,增删改查的数据。例如在员工管理系统中增删改查的数据

2.队列 指的是一端进数据一端出数据,例如C#中(Queue数据结构)

3.什么是消息队列

1.消息队列指:一端进消息,一端出消息

2.RabbitMQ就是实现了消息队列概念的一个组件,以面向对象的思想去理解,消息队列就是类,而RabbitMQ就是实例,当然不仅仅只有RabbitMQ,例如ActiveMQ,RocketMQ,Kafka,包括Redis也可以实现消息队列。

4.什么地方使用RabbitMQ

1.在常见的单体架构中,主要流程是用户UI操作发起Http请求>服务器处理>然后由服务器直接和数据库交互,最后同步反馈用户结果

2.在微服务架构中,例如下图中的员工管理系统,UI与微服务通信,主要是通过Http或者gRPC同步通信

问题分析

在上述2种情况下,我们发现在UI请求时都是同步操作 ,第2种架构虽然将整体服务按业务拆分成不同的微服务并且对应各自的数据库,但是在用户与微服务通信时,存在的问题依然没有解决,例如数据库的承载能力只能处理10w个请求,如果遇到高并发情况下,UI发起50w请求,那数据库是远远承载不了的,从而导致如下问题。

1.高并发请求导致系统性能下降响应慢,同时数据库承载风险加大

2.扩展性不强UI操作的交互对业务的依赖较大,导致用户体验下降

3.瞬时流量涌入巨大的话,服务器可能直接挂了

解决方案

  • 为了解决性能瓶颈问题。我们需要将同步通信换成异步通信方式。因此就使用消息队列,用户在UI中操作直接写入RabbitMQ然后直接返回,剩下的业务操作由消息队列和各自的微服务来完成

RabbitMQ的优势

  1. 异步处理,响应快,增加了数据库(服务器的承载能力)

  2. 削峰,可以把流量的高峰分解到不同的时间段来处理

  3. 解耦(扩展性就更强),让UI和业务独立演化

  4. 高可用,处理器如果发生故障了,对其他的处理器没有影响

RabbitMQ的不足

  1. 增加了系统复杂性,不方便调试和开发,在使用RabbitMQ以前前端直接和服务交互,现在加了一层

  2. 即时性降低了,在某一程度上提升了用户操作体验,也降低了用户体验,但是避免不了,取长补短

  3. 更加依赖消息队列了

5.RabbitMQ组成概念

1.ConnectionFactory 为Connection的制造工厂。

2.Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。

3.Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

4.Exchange(交换机) 我们通常认为生产者将消息投递到Queue中,实际上实际的情况是,生产者将消息发送到Exchange,由Exchange将消息路由到一个或多个Queue中(或者丢弃),而在RabbitMQ中的Exchange一共有4种策略,分别为:fanout(扇形)、direct(直连)、topic(主题)、headers(头部)


二、如何落地RabbitMQ

1.RabbitMQ环境安装

1.下载RabbitMQ

2.运行环境erlang

3.安装完成之后,加载RabbitMQ管理插件

rabbitmq-plugins enable rabbitmq_management

4.安装成功访问RabbitMQ管理后台http://localhost:15672

2.创建系统业务

1.分别创建考勤服务,请假服务,计算薪酬服务,邮件服务,短信服务消费者角色

2.创建员工管理网站用于模拟前端调用,主要充当生产者角色

3.在员工管理网站和每一个模拟微服务中通过nuget引入RabbitMQ.Client

4.在员工管理网站中创建模拟添加考勤的控制器并加入生产者代码

 //创建连接
 using (var connection = factory.CreateConnection())
 {
     //创建通道
     var channel = connection.CreateModel();
     //定义队列
     channel.QueueDeclare("CreateAttendance", false, false, false, null);

     string json = JsonConvert.SerializeObject(attendanceDto);

     //创建内容对象
     var properties = channel.CreateBasicProperties();
     //发送消息
     channel.BasicPublish(exchange: "",routingKey: "CreateAttendance",basicProperties: properties,body: Encoding.UTF8.GetBytes(json));
 }

5.在考勤微服务中创建接口,并在接口中加入消费者代码

var connection = factory.CreateConnection();
var channel = connection.CreateModel();   
//创建消费者事件
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    // 1、逻辑代码,添加到数据库
    var message = Encoding.UTF8.GetString(body.ToArray());
    object json = JsonConvert.DeserializeObject(message);
    Console.WriteLine(" [x] 创建考勤信息 {0}", message);
};
//设置消费者属性
//p1.监听队列p2.消息确认ACK p3.消费者实例赋值
channel.BasicConsume(queue: "CreateAttendance",autoAck: false,consumer:consumer);


三、Exchange交换机及实例分析

1.Fanout Exchange (扇形交换机)

fanout类型的Exchange路由规则非常简单,工作方式类似于多播一对多,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

1.生产者一个Exchange对应多个Queue,或者不声明Queue

2.消费者定义Exchange,如果生产者定义了Queue,那必须将exchange和queue绑定,如果没有定义队列,那消费者自己声明一个随机Queue用于接收消费消息

业务实例

当我们有员工需要请假,在员工管理系统提交请假,但是由于公司规定普通员工请假,需要发送短信到他的主管领导,针对此业务场景我们需要调用请假服务的同时去发送短信,这时需要两个消费者(请假服务,短信服务)来消费同一条消息,其实本质就是往RabbitMQ写入一个能被多个消费者接收的消息,所以可以使用 扇形交换机,一个生产者,多个消费者.

生产者模拟使用调用控制器来实现

[HttpPost]
public IEnumerable<bool> CreateLeave(CreateLeaveDto createLeaveDto)
{
    var factory = new ConnectionFactory()
    {
        HostName = "192.168.0.106",
        Port = 5672,
        Password = "guest",
        UserName = "guest",
        VirtualHost = "/"
    };
    using (var connection = factory.CreateConnection())
    {
        var channel = connection.CreateModel();
        //定义交换机
        channel.ExchangeDeclare(exchange: "Leave_fanout", type: "fanout");
        string productJson = JsonConvert.SerializeObject(createLeaveDto);
        var body = Encoding.UTF8.GetBytes(productJson);
        var properties = channel.CreateBasicProperties();
        //设置消息持久化
        properties.Persistent = true;

        channel.BasicPublish(exchange: "Leave_fanout", routingKey: "",  basicProperties: properties,body: body);
    }

}

消费者实现IHostedService 接口创建一个监听主机

public class RabbitmqHostService : IHostedService
{
	  public Task StartAsync(CancellationToken cancellationToken)
      {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                Password = "guest",
                UserName = "guest",
                VirtualHost = "/"
            };
           	var connection = factory.CreateConnection();
            var channel = connection.CreateModel();

            // 1、定义交换机
            channel.ExchangeDeclare(exchange: "Leave_fanout", type: ExchangeType.Fanout);
            //定义随机队列
            var queueName = channel.QueueDeclare().QueueName;	   
            //队列和交换机绑定
            channel.QueueBind(queueName,"Leave_fanout",routingKey: "");

           var consumer = new EventingBasicConsumer(channel);
           consumer.Received += (model, ea) =>
           {
               Console.WriteLine($"model:{model}");
               var body = ea.Body;
               // 1、业务逻辑
               var message = Encoding.UTF8.GetString(body.ToArray());
               Console.WriteLine(" [x] 创建请假 {0}", message);

               // 1、自动确认机制缺陷,消息是否正常添加到数据库当中,所以需要使用手工确认
               channel.BasicAck(ea.DeliveryTag, true);
          };
          
          // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
          // 每一次一个消费者只成功消费一个)
          channel.BasicQos(0, 1, false);
           // 消息确认(防止消息消费失败)
          channel.BasicConsume(queue: queueName ,autoAck: false,consumer: consumer);
      }
      
      public Task StopAsync(CancellationToken cancellationToken)
      {
         // 1、关闭rabbitmq的连接
         throw new NotImplementedException();
      }
}


2.Direct Exchange (直连交换机)

直接交换器,工作方式类似于单播一对一,Exchange会将消息发送完全匹配ROUTING_KEY的Queue,缺陷是无法实现多生产者对一个消费者

1.生产者一个Exchange对应一个routingKey绑定,也可以声明队列并绑定,然后向指定的队列发送消息。

2.消费者需要定义Exchange和routingKey,如果生产者声明并绑定了队列,那消费者必须绑定生产者指定的Queue来接收消息,如果没有指定Queue,那消费者需要自己声明一个随机Queue然后绑定用于接收消息

当我们员工管理系统需要计算薪资并将结果以发送短信的方式告诉员工,这个时候我们就不太适合用“扇形交换机”了,因为换做是你,你也不想你的工资全公司都知道吧?这个时候就需要定制了一对一的场景了,那就在生产消息时使用直连交换机根据routingKey发送指定的消费者.

生产者模拟使用调用控制器来实现

public IEnumerable<bool> SendCalculateSalary(CalculateSalaryDto calculateSalaryDto)
{
 var factory = new ConnectionFactory()
 {
     HostName = "192.168.0.106",
     Port = 5672,
     Password = "admin",
     UserName = "admin",
     VirtualHost = "/"
 };

using (var connection = factory.CreateConnection())
 {
     var channel = connection.CreateModel();
     //2、定义交换机
     channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: "direct");

     string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);
     var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);

     //3、发送消息
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true; // 设置消息持久化
     //p1 指定交换机
     //p2 routingKey 
     channel.BasicPublish(exchange: "CalculateSalary_direct",routingKey: "product-sms",basicProperties: properties,body: body);
 }
}

消费者实现IHostedService 接口创建一个监听主机

public class RabbitmqHostService : IHostedService
{
      public Task StartAsync(CancellationToken cancellationToken)
      {
         var factory = new ConnectionFactory()
         {
             HostName = "localhost",
             Port = 5672,
             Password = "guest",
             UserName = "guest",
             VirtualHost = "/"
         };
           
	var connection = factory.CreateConnection();
	var channel = connection.CreateModel();

	// 1、定义交换机
	channel.ExchangeDeclare(exchange: "CalculateSalary_direct", type: ExchangeType.Direct);

	// 2、定义随机队列
	var queueName = channel.QueueDeclare().QueueName;

	// 3、队列要和交换机绑定起来
	channel.QueueBind(queueName,"CalculateSalary_direct",routingKey: "product-sms");

	var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            Console.WriteLine($"model:{model}");
            var body = ea.Body;
            // 1、业务逻辑
            var message = Encoding.UTF8.GetString(body.ToArray());
            Console.WriteLine(" [x] 发送短信 {0}", message);
            // 1、消息是否正常添加到数据库当中,所以需要使用手工确认
            channel.BasicAck(ea.DeliveryTag, true);
        };
            // 3、消费消息
            channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
            // autoAck设为false 不进行自动确认                     
            channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);
      }
      
      public Task StopAsync(CancellationToken cancellationToken)
      {
         // 1、关闭rabbitmq的连接
         throw new NotImplementedException();
      }
}

3.Topic Exchange (主题交换机)

Exchange绑定队列需要制定Key; Key 可以有自己的规则;Key可以有占位符;或者# ,匹配一个单词、#匹配多个单词,在Direct基础上加上模糊匹配;多生产者一个消费者,可以多对对,也可以多对1, 真实项目当中,使用主题交换机。可以满足所有场景

1.生产者定义Exchange,然后不同的routingKey绑定

2.消费者定义Exchange,如果生产者定义了Queue,那必须将exchange和queue以及routingKey绑定,如果没有定义队列,那消费者自己声明一个随机Queue用于接收消费消息,

3.消费者routingKey的模糊匹配,生产者发送消息时routingKey定义以sms.开头, * 号只能匹配的routingKey为一级,例如(sms.A)或(sms.B)的发送的消息,# 能够匹配的routingKey为一级及多级以上 ,例如 (sms.A)或者(sms.A.QWE.IOP)

在月底的时候我们需要把员工存在异常考勤信息,薪资结算信息,请假信息分别以邮件的形式发送给我们的员工查阅,我们知道这是一个典型的多个生产者,一个消费者场景,异常考勤信息,薪资结算信息,请假信息分别需要生产消息发送到RabbitMQ,然后供我们员工消费

分别模拟3个生产者:异常考勤信息,薪资结算信息,请假信息

var factory = new ConnectionFactory()
 {
     HostName = "192.168.0.106",
     Port = 5672,
     Password = "admin",
     UserName = "admin",
     VirtualHost = "/"
 };
 //计算薪资生产者
public IEnumerable<bool> SendCalculateSalary(CalculateSalaryDto calculateSalaryDto)
{
using (var connection = factory.CreateConnection())
 {
     var channel = connection.CreateModel();
     //2、定义topic交换机
     channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");

     string calculateSalaryDtoJson = JsonConvert.SerializeObject(calculateSalaryDto);
     var body = Encoding.UTF8.GetBytes(calculateSalaryDtoJson);

     //3、发送消息
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true; // 设置消息持久化
     //p1 指定交换机
     //p2 routingKey 
     channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateSalary",basicProperties: properties,body: body);
 }
}

//考勤生产者
public IEnumerable<bool> SendCalculateAttendance(CalculateAttendanceDto calculateAttendance)
{
using (var connection = factory.CreateConnection())
 {
     var channel = connection.CreateModel();
     //2、定义topic交换机
     channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");

     string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);
     var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);

     //3、发送消息
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true; // 设置消息持久化
     //p1 指定交换机
     //p2 routingKey 
     channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateAttendance",basicProperties: properties,body: body);
 }
}

//请假信息生产者
public IEnumerable<bool> SendCalculateLeave(CalculateLeaveDto calculateLeave)
{
using (var connection = factory.CreateConnection())
 {
     var channel = connection.CreateModel();
     //2、定义topic交换机
     channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");

     string calculateLeaveJson = JsonConvert.SerializeObject(calculateLeave);
     var body = Encoding.UTF8.GetBytes(calculateLeaveJson);

     //3、发送消息
     var properties = channel.CreateBasicProperties();
     properties.Persistent = true; // 设置消息持久化
     //p1 指定交换机
     //p2 routingKey 
     channel.BasicPublish(exchange: "sms_topic",routingKey: "sms.CalculateAttendance",basicProperties: properties,body: body);
 }
}
public class RabbitmqHostService : IHostedService
{
	  public Task StartAsync(CancellationToken cancellationToken)
      {
            var factory = new ConnectionFactory()
            {
                HostName = "localhost",
                Port = 5672,
                Password = "guest",
                UserName = "guest",
                VirtualHost = "/"
            };
           
	              var connection = factory.CreateConnection();
                      var channel = connection.CreateModel();

			// 1、定义交换机
			channel.ExchangeDeclare(exchange: "sms_topic", type: ExchangeType.Topic);

			// 2、定义随机队列
			var queueName = channel.QueueDeclare().QueueName;

			// 3、队列要和交换机绑定起来
			// * 号的缺陷:只能匹配一级
            // # 能够匹配一级及多级以上 
			channel.QueueBind(queueName,"sms_topic",routingKey: "sms.#");

			var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                Console.WriteLine($"model:{model}");
                var body = ea.Body;
                // 1、业务逻辑
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine(" [x] 发送短信 {0}", message);
                // 1、消息是否正常添加到数据库当中,所以需要使用手工确认
                channel.BasicAck(ea.DeliveryTag, true);
            };
            // 3、消费消息
            channel.BasicQos(0, 1, false); // Qos(防止多个消费者,能力不一致,导致的系统质量问题。
            // autoAck设为false 不进行自动确认                     
            channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);
      }
      
      public Task StopAsync(CancellationToken cancellationToken)
      {
         // 1、关闭rabbitmq的连接
         throw new NotImplementedException();
      }
}

4.Header Exchange(头部交换机)

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对以及x-match参数,x-match参数是字符串类型,可以设置为any或者all。如果设置为any,意思就是只要匹配到了headers表中的任何一对键值即可,all则代表需要全部匹配。

1.不需要依赖Key

2.更多的时候,像这种Key Value 的键值,可能会存储在数据库中,那么我们就可以定义一个动态规则来拼装这个Key value ,从而达到消息灵活转发到不同的队列中去


四、RabbitMQ消息确认

我们根据上面的业务和代码简单实现了由生产者到消费者的一个业务流程,我们可以总结出知道,整个消息的收发过程包含有三个角色,生产者(员工管理网站)、RabbitMQ(Broker)、消费者(微服务),在理想状态下,按照这样实现,整个流程以及系统的稳定性,可能不会发生太大的问题,但是真正在实际应用中我们要去思考可能存在的问题,主要从三个大的方面去分析,然后发散。

1.生产端

2.存储端

3.消费端

1.消息生产端

我们在给RabbitMQ发送消息时,如何去保证消息一定到达呢,我们可以使用RabbitMQ提供了2种生产端的消息确认机制

模式描述实现方式
Confirm模式应答模式,生产者发送一条消息之后,Rabbitmq服务器做了个响应,表示消息确认收到异步模式,在应答之前,可以继续发送消息,单条消息、批量消息
Tx事务模式基于AMQP协议;可以把channel 设置成一个带事务的通道道,分为三步:1.开启事务,提交事务,回滚事务同步模式,在事务提交之前不能继续发送消息,事务模式效率差一些
  • 1.Confirm 实现
using (var connection = factory.CreateConnection())
{
    var channel = connection.CreateModel();
    //2、定义topic交换机
    channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");

    string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);
    var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);

    //3、发送消息
    var properties = channel.CreateBasicProperties();
    properties.Persistent = true; // 设置消息持久化

    try
    {
        //开启消息确认模式
        channel.ConfirmSelect();
        channel.BasicPublish(exchange: "sms_topic", 
        routingKey: "sms.CalculateAttendance", basicProperties: properties, body: body);
        //如果一条消息或多消息都确认发送
	    if (channel.WaitForConfirms()) 
        {
           Console.WriteLine($"【{message}】发送到Broke成功!");
        }
        else
        { 
           //可以记录个日志,重试一下;
        }
        //如果所有消息发送成功 就正常执行;如果有消息发送失败;就抛出异常;
        channel.WaitForConfirmsOrDie();
    }
    catch (Exception ex)
    {	
    	 Console.WriteLine($"【{message}】发送到Broker失败!");
    }
}
  • 2.Tx事务 实现
  using (var connection = factory.CreateConnection())
  {
      var channel = connection.CreateModel();
      //2、定义topic交换机
      channel.ExchangeDeclare(exchange: "sms_topic", type: "topic");
  
      string calculateAttendanceDtoJson = JsonConvert.SerializeObject(calculateAttendance);
      var body = Encoding.UTF8.GetBytes(calculateAttendanceDtoJson);
  
      //3、发送消息
      var properties = channel.CreateBasicProperties();
      properties.Persistent = true; // 设置消息持久化
  
      try
      {
          //开启事务机制,AMQP协议支持
          channel.TxSelect(); //事务是协议支持的
          channel.BasicPublish(exchange: "sms_topic", 
          routingKey: "sms.CalculateAttendance", basicProperties: properties, body: body);
          //提交事务 只有事务提交了才会真正写入队列
          channel.TxCommit();
      }
      catch (Exception ex)
      {	
      	//事务回滚
    		 channel.TxRollback(); 
      }
  }
2.消息存储端

我们生产端给RabbitMQ发送消息成功后,如果RabbitMQ宕机了,会导致RabbitMQ中消息丢失,如何解决消息丢失问题,针对RabbitMQ消息丢失,我们可以在生产者中使用

1.持久化消息

2.集群

3.消息消费端
  • 1.消费者宕机,导致消息丢失

  • 2.执行业务逻辑失败,但是消息已经被消费

​ 当生产者写入消息到RabbitMQ后,消费服务接收消息期间,服务器宕机,导致消息丢失了,这个时候我们就应该使用RabbitMQ的消费端消息确认机制

模式描述特点
自动确认 autoAck自动确认,是消费消息的时候,只要收到消息,就直接回执给RabbitMQ,已经收到一切正常; 直接总览所有了,如果有1w条消息,只是消费成功了一条消息,RabbitMQ也会认为你是全部成功了,会将所有消息从队列中移除;这样会导致消息的丢失处理很快
手动确认消费者消费一条,回执给RabbitMQ一条消息,RabbitMQ 只删除当前这一条消息,相当于是一条消费了,删除一条消息;性能稍微低一些

1.自动确认

// 消息自动确认机制
channel.BasicConsume(queue: "CreateAttendance",autoAck: true, consumer: consumer);

2.手动确认

消费者收到消息。消费者发送确认消息给rabbitmq期间。执行业务逻辑失败了,但是消息已经确认被消费了,我们应该在我们的消费者接收消息回调执行业务逻辑后面,执行使用手动确认消息机制,保证消息不被丢失

var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "sms_topic", type: ExchangeType.Topic);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queueName,"sms_topic",routingKey: "sms.#");

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
      var message = Encoding.UTF8.GetString(ea.Body.ToArray());
      //执行业务逻辑
      
      //手工确认告诉borker可以删除消息了
      channel.BasicAck(ea.DeliveryTag, true);

      //否定:告诉Broker,这个消息我没有正常消费;  requeue: true:重新写入到队列里去; false:你还是删除掉;
      //channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
};

// autoAck设为false 不进行自动确认                     
channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);
  • 3.由于服务器性能不一致导致消息堆积

    生产者发送高并发消息,消费者来不及处理,导致消息堆积,如何解决消息堆积问题?可以使用消费服务集群,将压力分散到不同的服务实例能解决这个问题,但是又产生了一个新的集群缺陷问题,假设集群服务器的强弱不一致,比较弱的服务器处理消息慢,就会导致大部分消息堆积在这台性能较差的服务器,那又该如何解决呢?
    我们可以采用RabbitMQ的QOS功能,俗称限流,他的意思就是消费者一次可以拉取指定数量的消息,在这些消息未处理完毕之前,不会再向队列拉取消息。

// Qos(防止多个消费者,能力不一致,导致的系统质量问题。
// 每一次一个消费者只成功消费一个)
channel.BasicQos(0, 1, false); 
  • 4.如何保证消息不被重复消费(幂等性)

    1.生产时消息重复

    由于生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,实际上MQ
    已经接收到了消息。这时候生产者就会重新发送一遍这条消息。生产者中如果消息未被确认,或确
    认失败,我们可以使用定时任务+(redis/db)来进行消息重试。

    2.消费时消息重复

    消费者消费成功后,再给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息被消费,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
    我们可以让每个消息携带一个全局的唯一ID,即可保证消息的幂等性消费者获取到消息后先根据id去查询redis/db是否存在该消息。如果不存在,则正常消费,消费完毕后写入redis/db。
    如果存在,则证明消息被消费过,直接丢弃。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1066369.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【拿完年终奖后】想要转行网络安全,一定不要错过这个时间段。

网络安全&#xff0c;作为当下互联网行业中较为热门的岗位&#xff0c;薪资可观、人才需求量大&#xff0c;作为转行必考虑。 在这里奉劝所有零基础想转行&#xff08;入门&#xff09; 网络安全的朋友们 在转行之前&#xff0c;一定要对网络安全行业做一个大概了解&#xf…

2023-10-7 看C++八股的一些困惑

1、编译器常量和运行期常量有什么不同 编译期常量和运行期常量是两种不同的常量类型&#xff0c;它们在常量的定义和使用时具有不同的特点。 编译期常量&#xff08;Compile-time Constant&#xff09;&#xff1a; 编译期常量是在编译阶段就能确定其值的常量。编译器在编译代码…

SpringBatch适配不同数据库的两种方法

一、配置JobRepository Configuration EnableBatchProcessing public class TaskArrangeConfig extends DefaultBatchConfigurer {Autowiredprivate DataSource dataSource;Autowiredprivate JobLauncher jobLauncher;Autowiredprivate JobExplorer jobExplorer;Autowiredpriv…

预测性人工智能会彻底改变SIEM行业吗?

网络安全行业具有高度的活力&#xff0c;始终能够将最新和最优秀的技术融入其系统中。这背后有两个主要原因&#xff1a;首先&#xff0c;网络攻击不断演化&#xff0c;因此组织需要拥有先进技术以便检测复杂的攻击&#xff1b;其次&#xff0c;许多组织的网络架构非常复杂。 …

Unity2D创建帧动画片段

文章目录 概述为角色创建动画Animator组件创建动画片段状态转移 其他文章 概述 动画是游戏中一种使对象表现出运动或变换的方式。当涉及到动画时&#xff0c;我们通常就会用到Animator组件。它允许我们在Unity编辑器中创建、管理和控制这些动画&#xff0c;并将其应用于游戏对…

png图片给背景添加阴影

原图 效果图 代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><styl…

从互联网报告中得出5个关于ITSM的结论

IT服务管理即ITSM正在进入云端&#xff0c;并不断发展以支持移动员工&#xff0c;随着IT服务管理(ITSM)进入云端并发展为支持移动员工&#xff0c;它将迎来一个有趣的时代。ManageEngine的市场分析师表示&#xff0c;随着终端用户对ITSM解决方案的期望开始反映消费者应用程序的…

TikTok体育精彩瞬间:全球体育迷的天堂

体育是连接世界的一种语言&#xff0c;它能够跨越文化和国界&#xff0c;将人们汇聚在一起&#xff0c;共同感受比赛的激情和荣誉。 而在现代社交媒体的时代&#xff0c;TikTok已经成为了全球体育迷的天堂&#xff0c;为他们提供了前所未有的方式来分享和体验体育精彩瞬间。 T…

认识接口自动化测试

目录 1. 什么是接口测试 2. 基本流程 3. 需求分析 4. 用例设计 5. 脚本开发 6. 结果分析 7. 完整脚本 1. 什么是接口测试 顾名思义&#xff0c;接口测试是对系统或组件之间的接口进行测试&#xff0c;主要是校验数据的交换&#xff0c;传递和控制管理过程&#xff0c;以…

企业想过等保,其中2FA双因素认证手段必不可少

随着信息技术的飞速发展&#xff0c;网络安全问题日益凸显。等保2.0时代的到来&#xff0c;意味着企业和组织需要更加严格地保护自身的信息安全。而在这个过程中&#xff0c;双因素认证的重要性逐渐得到广泛认可。本文将探讨 2FA 双因素认证的重要性。 在了解 2FA 双因素认证的…

FRM-10102 错误解决办法

去服务器此路径 /u01/test/app/fs2/EBSapps/appl/au/12.0.0/resource 把缺失的文件拿到本地库即可

电压放大器在心电图中的作用是什么

心电图是一种常用的临床检测方法&#xff0c;用于评估心脏的电活动。在进行心电图检测时&#xff0c;为了保证测量结果的准确性和可靠性&#xff0c;需要使用一种特殊的电压放大器&#xff0c;即心电放大器&#xff0c;来增强心电信号并抑制噪音和干扰。 心电图信号具有微弱的幅…

“传统文化宣传片+虚拟人动捕设备”前景如何?

在数字化时代的发展下&#xff0c;动捕设备的加入&#xff0c;让传播传统文化的虚拟人更具生动表现&#xff0c;拉近人们与传统文化的距离&#xff0c;通过虚拟人动作捕捉动画宣传片&#xff0c;引起更多人对传统文化的关注与传承。 *图片源于网络 深圳文博会创意短片《嗨ICIF…

一文生成猫眼电影热榜词云

1.爬取猫眼电影热榜数据 此次爬取的是电影票房的热榜电影名称&#xff0c;具体网站网址为猫眼电影热榜&#xff0c;经过实验观察后发现&#xff0c;此处的数据是通过ajax异步加载的&#xff0c;如果不相信可以使用request对当前网站网址发送请求&#xff0c;会发现无法获取电影…

基于springboot实现职称评审管理系统演示【项目源码+论文说明】分享

基于springboot实现职称评审管理系统项目演示 摘要 不管是从事哪个行业、对于职称是对一个对个人的最高荣誉&#xff0c;有通过科技手段、农业、工业、教育等都有评职称&#xff0c;开发一套职称评审管理系统就很有必要了。职称评审管理系统是以实际运用为开发背景&#xff0c…

移远通信EG916Q-GL Cat 1 bis模组亮相MWC Las Vegas 2023

9月27日&#xff0c;在MWC Las Vegas 2023期间&#xff0c;全球领先的物联网整体解决方案供应商移远通信宣布&#xff0c;面向全球市场推出更具性价比的Cat 1 bis模组EG916Q-GL。该模组基于高通QCX216 LTE物联网调制解调器&#xff0c;非常适用于全球范围内的各种应用场景&…

人机言语交互模型的评估要素

智能客服中的言语交互模型评估要素&#xff0c;主要包括以下几个方面&#xff1a; 有效性&#xff1a;指模型能否准确识别和理解用户的言语意图&#xff0c;以及生成正确和合适的回答。可以通过比较模型生成的回答与人工回答的准确率来评估。流畅性&#xff1a;指模型在回答问…

力扣第572题 另一棵树的子树 c++深度(DFS)注释版

题目 572. 另一棵树的子树 简单 给你两棵二叉树 root 和 subRoot 。检验 root 中是否包含和 subRoot 具有相同结构和节点值的子树。如果存在&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 二叉树 tree 的一棵子树包括 tree 的某个节点和这个节点的所有…

ES6 class类关键字super

super关键字 在 JavaSCript 中&#xff0c;能通过 extends 关键字去继承父类 super 关键字在子类中有以下用法&#xff1a; 当成函数调用 super() 作为 "属性查询" super.prop 和 super[expr] super() super 作为函数调用时&#xff0c;代表父类的构造函数。 ES6 要求…