RabbitMQ 几种模式

news2025/2/25 2:15:48

一、Hello World 模式

        在这一部分中,我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。模型如下所示:

        在下图中,“ P” 是我们的生产者,“ C” 是我们的消费者。中间的框是一个队列 RabbitMQ 代表使用者保留的消息缓冲区。

1.1 生产者

1.1.1 添加依赖

  <!--指定 jdk 编译版本 -->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依赖客户端 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一个依赖 -->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

1.1.2 编写生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

// 生产者,用于推送消息
public class Producer {
    
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        // 创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.229.146");
        factory.setUsername("admin");
        factory.setPassword("123");
        
        // channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列
         * 1.队列名称
         * 2.队列里面的消息是否持久化 默认消息存储在内存中
         * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
         * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
         * 5.其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message="hello world";
        /**
         * 发送一个消息
         * 1.发送到那个交换机
         * 2.路由的 key 是哪个
         * 3.其他的参数信息
         * 4.发送消息的消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");
    }
}

1.1.3 推送数据

        运行代码,然后在我们的 rabbitmq 管理界面上可以看到,此时我们的 hello 队列里面,有一个消息等待被消费,如下所示:

1.2 消费者

1.2.1 编写消费者代码

import com.rabbitmq.client.*;

// 接收消息的消费者
public class Consumer {
    // 队列的名称
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.229.146");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待接收消息....");

        // 接收到消息时的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            String resultMessage= new String(message.getBody());
            System.out.println(resultMessage);
        };
        // 取消消费时的回调函数,比如在消费的时候队列被删除掉了
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println("消息消费被中断");
        };
        /**
         * 消费者消费消息
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         * 4.消费者取消消费的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

1.2.2 接收数据

        启动代码,可以看到,我们成功的消费了 rabbitmq 里面的消息。

二、Work Queues 模式

        工作队列(又称任务队列)的主要思想是假设生产者发送大量的消息,会把消息放入队列之中,后面有多个工作线程去接收处理消息。

2.1 轮询分发消息

        在这个案例中我们会启动三个工作线程,一个消息发送线程,我们来看看他们三个工作线程是如何工作的。轮询模式就是你处理一个,我处理一个,按顺序一个一个的进行处理。

2.1.1 抽取工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
   // 得到一个连接的 channel
   public static Channel getChannel() throws Exception{
      // 创建一个连接工厂
      ConnectionFactory factory = new ConnectionFactory();
      factory.setHost("192.168.229.147");
      factory.setUsername("admin");
      factory.setPassword("123");
      Connection connection = factory.newConnection();
      Channel channel = connection.createChannel();
      return channel;
   }
}

2.1.2 启动三个工作线程

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker01 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker02 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker03 {
    private static final String QUEUE_NAME="hello";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C3 消费者启动等待消费......");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

2.1.3 启动一个发送线程

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

public class Task01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 10; i++) {
            String message = "I am message " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送消息完成:" + message);
        }
    }
}

2.1.4 结果展示

        通过程序执行发现生产者总共发送 10 个消息,消费者二和消费者分别分得 3 个消息,消费者一分得 4 个消息,并且是按照有序的一个接收一次消息。

2.2 消息应答

2.2.1 概念

        消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并且只完成了一部分任务,这时它突然挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们就会丢失正在处理的消息。后续发送给该消费者的消息也都会丢失。

        为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

        消息应答分为两种,一种是自动应答,一种是手动应答

2.2.2 自动应答

        消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,假设采用自动应答的模式,如果消息在被消费者接收到之前,出现了连接关闭或者 channel 关闭的情况,那么消息就丢失了。

        假设消费者接收的消息很多,且没有对传递的消息数量进行限制,这样就有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

        下面方法里面的第二个参数如果设置为 true 则为自动应答。

/**
 * 消费者消费消息
 * 1.消费哪个队列
 * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
 * 3.消费者未成功消费的回调
 * 4.消费者取消消费的回调
*/
 channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);

2.2.3 手动应答

        手动应答有三种模式,第一种是 Channel.basicAck 肯定确认 ,如下所示,RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。

   public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收到消息:"+receivedMessage);
            /**
             * 1、消息标记 tag
             * 2、false 表示只应答接收到的传递消息;true 表示应答所有传递过来的消息
             * */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费......");
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }

        第二种是 Channel.basicNack 否定确认,第三种是 Channel.basicReject  也是用于否定确认,Channel.basicReject 比 Channel.basicNack 少一个参数,表示不处理该消息了直接拒绝,可以将其丢弃了。

2.2.4 消息自动重新入队

        如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

2.2.5 消息手动应答编码

        消息生产者代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

// 记得先创建下这个队列,执行下就可以了
public class Task01 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();)
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 2; i++) {
            String message = "I am message " + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("发送消息完成:" + message);
        }
    }
}

        两个消费者的代码如下所示,系统默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker01 {
    private static final String QUEUE_NAME="ack_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c1 等待接收消息时间较短");
        DeliverCallback deliverCallback=(consumerTag, delivery)->{
            String receivedMessage = new String(delivery.getBody());
            try {
                Thread.sleep(1000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * 1、消息标记 tag
             * 2、false 表示只应答接收到的传递消息;true 表示应答所有传递过来的消息
             * */
            System.out.println("接收到消息:"+receivedMessage);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback=(consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class Worker02 {
    private static final String QUEUE_NAME = "ack_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c2 等待接收消息时间较长");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody());
            try {
                Thread.sleep(30000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            /**
             * 1、消息标记 tag
             * 2、false 表示只应答接收到的传递消息;true 表示应答所有传递过来的消息
             * */
            System.out.println("接收到消息:" + receivedMessage);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
        };
        // 手动应答
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
    }
}

2.2.6 结果展示

        启动消费者和生产者,消费者 1 由于阻塞时间短,很快的处理完消息,而消费者 2 则会阻塞一会,等到 30s 之后才会开始处理消息,如下所示:

        等过了一会之后,消息成功被消费者 2 消费,如下所示:

        如果消费者 2 等待的过程中,手动将其关闭的话,会发生什么?可以发现消息被消费者 1 消费了,并没有丢失消息。

2.3 RabbitMQ 持久化

2.3.1 概念

        上一节我们学习了消息应答可以保证处理的任务不会丢失,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失呢?默认情况下 RabbitMQ 退出或由于某种原因崩溃时,队列和消息就都丢了。除非配置相关的参数。确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

2.3.2 队列持久化

        之前我们创建的队列都是非持久化的,rabbitmq 如果重启的话,该队列就会被删除掉,如果要队列实现持久化,需要在声明队列的时候把 durable 参数设置为持久化,如下所示:

// 设置消息队列持久化
boolean durable=true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

        但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

        删除完毕之后,再次启动生产者,可以看到 D 表示的就是这个队列是持久化的。

        此时,即使重启 rabbitmq 队列也依然存在。 

2.3.3 消息持久化

        将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,后面会详细说明。

        要想让消息实现持久化需要在消息生产者修改代码,添加如下这个属性。

# 第三个参数为 null 表示不持久化
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

# 第三个参数设置为 MessageProperties.PERSISTENT_TEXT_PLAIN 表示消息要持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

2.3.4 不公平分发

        在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮询分发,但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候如果我们还是采用轮询分发的话,就会导致处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。

        为了避免这种情况,我们可以使用不公平分发,在消费者代码里面设置参数 channel.basicQos(1) 即可。如下:

# 默认等于0,表示轮询,如果等于1的话就表示不公平分发
int prefetchCount = 1;
channel.basicQos(prefetchCount);

        分别启动消费者和生产者,打开 rabbitmq 的管理界面如下:

        意思就是如果这个任务我还没有处理完或者我还没有应答你,你先别分配给我,我目前只能处理一个任务,然后 rabbitmq 就会把该任务分配给没有那么忙的那个空闲消费者,当然如果所有的消费者都没有完成手上任务,队列还在不停的添加新任务,队列有可能就会遇到队列被撑满的情况,这个时候就只能添加新的 worker 或者改变其他存储任务的策略。 

2.3.5 预取值

        rabbitmq 本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。

        这个时候就可以通过使用 basic.qos 方法设置 “预取计数” 值来完成的。该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,类似于 channel 的最大容量。

        例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACKRabbitMQ 将会感知这个情况到并再发送一条消息。消息应答和 QoS 预取值对用户吞吐量有重大影响。

        通常,增加预取值将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器),应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程。

        不同的负载预取值也不同, 100 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

        说白了就是根据你的消费者的性能优劣,动态的去设置 channel ,性能好的,处理速度快的设置大些,性能不好的设置小些,通过 channel.basicQos 参数进行设置,默认等于 0 表示轮询,等于 1 表示不公平分发,设置成其他值则表示预约值,即消息缓冲区的大小。

三、发布确认

3.1 发布确认概念

        我们在前面的几个章节说过,为了防止数据丢失,我们可以将队列和消息都进行持久化的操作。但是,这还是不够好的,如果生产者再向 rabbitmq 发送消息,而 rabbitmq 还没来得及存储到磁盘的时候崩了,消息就丢失了,这个时候该怎么办?此时就引入了发布确认的概念,即生产者向 rabbitmq 发送消息,而 rabbitmq 给生产者个反馈,无论是否可以成功的接收到消息,都给一个反馈,这样就比较合理了。

3.2 发布确认策略

3.2.1 开启发布确认

   发布确认默认是没有开启的,如果要开启需要调用方法 confirmSelect,每当你要想使用发布确认,都需要在 channel 上调用该方法。

// 只需要在创建完 channel 信道之后创建即可
channel.confirmSelect();

3.2.2 单个确认发布

        这是一种最简单的同步确认发布的方式,只有前面发送的消息被确认发布了,后续的消息才能继续发布。

        这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。当然对于某些应用程序来说这可能已经足够了。

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

public class Task01 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName,false,false,false,null);
        // 开启发布确认
        channel.confirmSelect();
        long begin = System.currentTimeMillis();
        for(int i=0;i<1000;i++){
            String message = i +"";
            channel.basicPublish("",queueName,null,message.getBytes());
            // 单个消息马上进行发布确认,可以通过返回值进行判断是否通知成功
            boolean flag = channel.waitForConfirms();
        }
        long end = System.currentTimeMillis();
        System.out.println("发布1000个单独确认消息,耗时:"+(end-begin)+"ms");
    }
}

3.2.3 批量确认发布

        上面那种方式非常慢,与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

public class Task02 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 批量确认消息大小
        int batchSize = 100;
        // 未确认消息个数
        int outstandingMessageCount = 0;
        long begin = System.currentTimeMillis();
        for (int i = 0; i < 1000; i++) {
            String message = i + "";
            channel.basicPublish("", queueName, null, message.getBytes());
            outstandingMessageCount++;
            if (outstandingMessageCount == batchSize) {
                channel.waitForConfirms();
                outstandingMessageCount = 0;
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("发布1000个批量确认消息,耗时:" + (end - begin) + "ms");
    }
}

3.2.4 异步确认发布

        异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,下面就让我们来详细讲解异步确认是怎么实现的。

        如下图,首先消息生产者发送消息到队列中(信道),数据在信道中的存储类型类似于 map key 存储消息序号,value 存储具体的消息内容。在信道里面会根据 key 为消息排列顺序,这样的好处是消息推送的成功与否完全可以根据 key 来识别出来。

       broker rabbitmq 的消息实体,当它接收到 1 号消息的时候,它就会进行一次确认收到的回调函数,告诉消息生产者消息我收到了,如果没有收到 1 号消息,它就会调用未收到消息的回调函数通知生产者,消息我没有收到。

        所以作为消息生产者,我只管一直发送消息即可,将来会由 broker 告诉我哪些消息接收到了,哪些消息没有接收到。只需要将没接收到的消息重新发送即可。收到的不做任何处理。且 broker 的通知类型为异步,速度会很快。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;

public class Task03 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        // 开始的时间
        long begin = System.currentTimeMillis();

        // 消息确认成功的回调函数
        ConfirmCallback ackCallback = (deliveryTag,multiple)->{
            System.out.println("确认的消息:"+deliveryTag);
        };
        // 消息确认失败的回调函数
        // 第一个参数:消息的标记。第二个参数:是否为批量确认
        ConfirmCallback nackCallback = (deliveryTag,multiple)->{
            System.out.println("未确认的消息:"+deliveryTag);
        };
        // 添加一个消息的监听器,用于监听哪些消息成功了,哪些消息失败了
        // 异步通知
        // 第一个参数:监听哪些消息成功了。第二个参数:监听哪些消息失败了
        channel.addConfirmListener(ackCallback,nackCallback);
        // 批量发送消息
        for(int i=0;i<1000;i++){
            String message = i+"";
            channel.basicPublish("",queueName,null,message.getBytes());
        }
        // 结束的时间
        long end = System.currentTimeMillis();
        System.out.println("发布1000个异步确认消息,耗时:" + (end - begin) + "ms");
    }
}

3.2.5 处理异步未确认消息

        最好的解决的解决方案就是把未确认的消息放到一个基于内存且能被发布线程访问的队列,比如说用 ConcurrentLinkedQueue, 这个队列可以在 confirm callbacks 与发布线程之间进行消息的传递。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.utils.RabbitMqUtils;

import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class Task03 {
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个队列
        String queueName = UUID.randomUUID().toString();
        channel.queueDeclare(queueName, false, false, false, null);
        // 开启发布确认
        channel.confirmSelect();
        /** 创建一个线程安全有序的哈希表,适用于高并发的情况
         * 1、将序号与消息进行关联
         * 2、可以根据需要批量的删除消息条目
         * 3、支持高并发
         */
        ConcurrentSkipListMap<Long,String>  outstandingConfirms =new ConcurrentSkipListMap<>();
        // 开始的时间
        long begin = System.currentTimeMillis();

        // 消息确认成功的回调函数
        ConfirmCallback ackCallback = (deliveryTag,multiple)->{
            if(multiple){
                // 第二步:删除掉已经确认的消息,剩下的就是未确认的消息
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag);
                confirmed.clear();
            }else{
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("确认的消息:"+deliveryTag);
        };
        // 消息确认失败的回调函数
        // 第一个参数:消息的标记。第二个参数:是否为批量确认
        ConfirmCallback nackCallback = (deliveryTag,multiple)->{
            // 第三步:打印未确认的消息有哪些
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("未确认的消息是:"+message+":::未确认消息的标记为:"+deliveryTag);
        };
        // 添加一个消息的监听器,用于监听哪些消息成功了,哪些消息失败了
        // 异步通知
        // 第一个参数:监听哪些消息成功了。第二个参数:监听哪些消息失败了
        channel.addConfirmListener(ackCallback,nackCallback);
        // 批量发送消息
        for(int i=0;i<1000;i++){
            String message = i+"";
            // 第一步:记录下所有要发送的消息
            outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
            channel.basicPublish("",queueName,null,message.getBytes());  
        }
        // 结束的时间
        long end = System.currentTimeMillis();
        System.out.println("发布1000个异步确认消息,耗时:" + (end - begin) + "ms");
    }
}

3.2.6 三种发布确认速度对比

        单独发布消息:同步等待确认,简单,但吞吐量非常有限。

        批量发布消息:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是那条消息出现了问题。

        异步发布处理:最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来稍微难些。

四、Exchanges 交换机

        我们在前面讲解了 Hello World 模式 和 Work Queues 模式,这两种都属于简单的队列模式,即生产者将消息直接推送到 rabbitmq 的队列里面,消费者直接从队列里面竞争消费。

        我们接下来要讲解的模式为”发布/订阅“,即生产者生产完消息,只有它的订阅者才可以获取消息,想要使用这种模式就需要先认识一下交换机。

4.1 Exchanges

4.1.1 Exchanges 概念

        RabbitMQ 消息传递模型的核心思想是:生产者生产的消息不会直接发送到队列。而实际上生产者甚至都不知道这些消息传递传递到了哪些队列中。

        相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面接收来自生产者的消息,另一方面将它们推入队列。交换机必须清楚的知道如何处理收到的消息。是应该把这些消息放到特定队列、还是说把他们放到许多队列中、还是说应该丢弃它们。这就需要由交换机的类型来决定。

4.1.2 Exchanges 类型

        总共包含以下几种类型:直连交换机(direct)主题交换机(topic)头交换机(headers)扇出交换机(fanout)。

4.1.3 无名 Exchanges

        在学习本章节之前我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的原因是因为我们使用的是默认交换机,是通过空字符串 (“”) 进行标识。

# 第一个参数是交换机的名称。空字符串表示默认或无名称交换机
channel.basicPublish("",queueName,null,message.getBytes());

        这个 AMQP default 就是默认的交换机,他的类型是直连交换机。 

4.2 临时队列

        我们在前两章的学习中创建了 hello 队列和 ack_queue 队列,这些都是指定名称的队列,队列的名称很重要,因为我们需要根据队列的名称来指定消费者去哪里消费。

        在实际的生产环境中,我们一般都会创建一个随机名称的临时队列,它的特点是:当消费者断开连接之后,该队列会被自动删除,创建临时队列的代码如下:

String queueName = channel.queueDeclare().getQueue();

        创建之后的样子如下: 

4.3 bindings 绑定

        binding exchange queue 之间的桥梁,它告诉 exchange 和哪个队列进行了绑定。比如说下面这张图告诉我们的就是 X Q1 Q2 进行了绑定。

4.4 Fanout exchange 扇出交换机

4.4.1 简介

        路由广播的形式,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。

        路由键就是 RoutingKey,即扇出交换机不需要配置 RoutingKey,只需要将交换机和队列绑定即可。

4.4.2 Fanout 实战

        我们根据下图来编写代码实现下:EmitLog 作为生产者,将消息发送到名字为 logs 的 fanout 交换机上,而交换机绑定了两个队列,队列里面的消息又分别被 ReceiveLogs01 和 ReceiveLogs02 接收。

        编写两个消费者的代码,如下所示:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogs01 {
    // 设置交换机的名字
    public static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个交换机,类型为 fanout
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        // 声明一个临时队列,队列名称随机,好处是当消费者与队列断开连接的时候,队列就自动删除了
        String queueName = channel.queueDeclare().getQueue();
        // 交换机和队列进行绑定,第三个参数 routingKey,有没有值都是被无视的
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("消费者1等待接收消息.....");
        // 接收消息时的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("控制台打印接收到的消息"+message);
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogs02 {
    // 设置交换机的名字
    public static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个交换机,类型为 fanout
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        // 声明一个临时队列,队列名称随机,好处是当消费者与队列断开连接的时候,队列就自动删除了
        String queueName = channel.queueDeclare().getQueue();
        // 交换机和队列进行绑定,第三个参数 routingKey,有没有值都是被无视的
        channel.queueBind(queueName,EXCHANGE_NAME,"");
        System.out.println("消费者2等待接收消息.....");
        // 接收消息时的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("控制台打印接收到的消息"+message);
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }
}

        编写生产者的代码,如下所示,作为生产者,只需要知道将消息发送给哪个交换机即可。

// 负责发送消息给交换机
public class EmitLog {
    // 设置交换机的名字
    public static final String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        for(int i=0;i<10;i++) {
            String message = i + "";
            channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes());
            System.out.println("生产者发送的消息为:"+message);
        }
    }
}

        分别启动两个消费者和生产者,结果如下所示,可以发现两个消费者都分别获取到了生产者生产的消息。

4.5 Direct exchange 直连交换机

4.5.1 简介

        RabbitMQ 默认的交换机模式,也是最简单的模式。初始化时队列绑定到一个直连交换机上,同时赋予一个路由键 BindingKey。当发送者发送消息的时候它会携带着路由值 Key。当 Key 和消息队列的 BindingKey 一致的时候,消息将会被发送到该消息队列中。

4.5.2 多重绑定

        在下面这张图中,我们可以看到 X 交换机绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange,队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green。

        在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1。绑定键为 black green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

4.5.3 Direct 实战

        我们根据下图来编写代码实现下:作为生产者,将消息发送到名字为 direct 交换机上,交换机 X 根据 RoutingKeyerror 绑定 disk 队列、交换机 X 根据 RoutingKey info warning 绑定 console 队列, disk 队列里面的消息被 C2 接收,console 队列里面的消息被 C1 接收。

        编写两个消费者的代码,如下所示:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogs01 {
    // 设置交换机的名字
    public static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个交换机,类型为直连
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 声明一个名字为 console 的队列
        channel.queueDeclare("console",false,false,false,null);
        // 交换机和队列进行绑定,routingKey 为 info
        channel.queueBind("console",EXCHANGE_NAME,"info");
        // 针对同一个队列,再写一行即可实现多重绑定,routingKey 为 warning
        channel.queueBind("console",EXCHANGE_NAME,"warning");

        System.out.println("消费者1等待接收消息.....");
        // 接收消息时的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("控制台打印接收到的消息"+message);
        };
        channel.basicConsume("console",true,deliverCallback,consumerTag ->{});
    }
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogs02 {
    // 设置交换机的名字
    public static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个交换机,类型为直连
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 声明一个名字为 disk 的队列
        channel.queueDeclare("disk",false,false,false,null);
        // 交换机和队列进行绑定,routingKey 为 error
        channel.queueBind("disk",EXCHANGE_NAME,"error");

        System.out.println("消费者2等待接收消息.....");
        // 接收消息时的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("控制台打印接收到的消息"+message);
        };
        channel.basicConsume("disk",true,deliverCallback,consumerTag ->{});
    }
}

        编写生产者的代码,如下所示,作为生产者,只需要知道将消息发送给哪个交换机即可。

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

// 负责发送消息给交换机
public class DirectLogs {
    // 设置交换机的名字
    public static final String EXCHANGE_NAME="direct_logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        for(int i=0;i<10;i++) {
            String message = i + "";
            // 程序需要运行三遍,将数据分别发送给不同的 Routing Key 的交换机
            channel.basicPublish(EXCHANGE_NAME,"error",null, message.getBytes());
            // channel.basicPublish(EXCHANGE_NAME,"info",null, message.getBytes());
            // channel.basicPublish(EXCHANGE_NAME,"warning",null, message.getBytes());
            System.out.println("生产者发送的消息为:"+message);
        }
    }
}

        分别启动两个消费者和生产者,结果如下所示,可以发现两个消费者都分别获取到了生产者生产的消息。

4.6 Topics 主题交换机

4.6.1 简介

        主题交换机,转发信息主要是依据通配符,队列和交换机的绑定主要是依据一种模式(通配符 + 字符串),而当发送消息的时候,只有指定的 Key 和该模式相匹配的时候,消息才会被发送到该消息队列中。

4.6.2 要求

        topic 交换机的 routing_key 不能随意写,它必须是一个单词列表,以点号分隔开。这些单词可以是任意单词,比如说:"stock.usd.nyse""nyse.vmw" "quick.orange.rabbit" 这种类型的。当然这个单词列表最多不能超过 255 个字节。

        * 可以代替一个单词。

        # 可以代替零个或多个单词。

4.6.3 Topic 实战

        下图是一个队列绑定关系图,我们可以发现 Q1 队列绑定的是:一共有 3 个单词且中间是 orange 的字符串。Q2 队列绑定的是:最后一个单词是 rabbit 3 个单词和第一个单词是 lazy 的多个单词。

        我们分别测试底下这些 routing_key 和我们预想的结果是否一致。 

Routing key绑定关系
quick.orange.rabbit被队列 Q1 和 Q2 接收到
lazy.orange.elephant被队列 Q1 和 Q2 接收到
quick.orange.fox被队列 Q1 接收到
lazy.brown.fox被队列 Q2 接收到
lazy.pink.rabbit虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox不匹配任何绑定会被丢弃
quick.orange.male.rabbit不匹配任何绑定会被丢弃
lazy.orange.male.rabbit被队列 Q2 接收到

        两个消费者的代码如下所示:

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogsTopic01 {
    // 设置交换机的名字
    public static final String EXCHANGE_NAME="topic_logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个交换机,类型为直连
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 声明一个名字为 Q1 的队列
        String queueName = "Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        // 交换机和队列进行绑定,routingKey 为 *.orange.*
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");

        System.out.println("消费者1等待接收消息.....");
        // 接收消息时的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("接收队列:"+queueName+"绑定的键值为:"+delivery.getEnvelope().getRoutingKey()+"::消息为:"+message);
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }
}
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.utils.RabbitMqUtils;

public class ReceiveLogsTopic02 {
    // 设置交换机的名字
    public static final String EXCHANGE_NAME="topic_logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        // 声明一个交换机,类型为直连
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 声明一个名字为 Q2 的队列
        String queueName = "Q2";
        channel.queueDeclare(queueName,false,false,false,null);
        // 交换机和队列进行绑定,routingKey 为 *.orange.*
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        // 针对同一个队列,再写一行即可实现多重绑定,routingKey 为 lazy.#
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");

        System.out.println("消费者2等待接收消息.....");
        // 接收消息时的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("接收队列:"+queueName+"绑定的键值为:"+delivery.getEnvelope().getRoutingKey()+"::消息为:"+message);
        };
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }
}

        编写生产者的代码,如下所示,作为生产者,只需要知道将消息发送给哪个交换机即可。

import com.rabbitmq.client.Channel;
import com.rabbitmq.utils.RabbitMqUtils;

import java.util.HashMap;
import java.util.Map;

// 负责发送消息给交换机
public class TopicLogs {
    // 设置交换机的名字
    public static final String EXCHANGE_NAME = "topic_logs";
    public static void main(String[] args) throws Exception {
        Map<String,String> map = new HashMap<>();
        map.put("quick.orange.rabbit", "被队列 Q1 和 Q2 接收到");
        map.put("lazy.orange.elephant", "被队列 Q1 和 Q2 接收到");
        map.put("quick.orange.fox", "被队列 Q1 接收到");
        map.put("lazy.brown.fox", "被队列 Q2 接收到");
        map.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列 Q2 接收一次");
        map.put("quick.brown.fox", "不匹配任何绑定会被丢弃");
        map.put("quick.orange.male.rabbit", "不匹配任何绑定会被丢弃");
        map.put("lazy.orange.male.rabbit", "被队列 Q2 接收到");
        Channel channel = RabbitMqUtils.getChannel();
        for(Map.Entry<String,String> entry:map.entrySet()){
            String bindingKey = entry.getKey();
            String message = entry.getValue();
            channel.basicPublish(EXCHANGE_NAME, bindingKey, null, message.getBytes());
            System.out.println("生产者发出消息:"+message);
        }
    }
}

        结果如下所示,是可以和表格里面的数据对上的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1027728.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

灞桥论“健” 共话康养 灞桥康养论坛取得圆满成功

随着我国“老龄化”的加速&#xff0c;养老资源的匮乏已经成为一个十分严峻的社会问题。同时随着生活水平的大幅提高&#xff0c;康养产业应势而生。涵盖了养老、医疗、体育、养生、旅游等多个领域的康养产业&#xff0c;不仅要为老人们实现“老有所乐、身体健康”的理想&#…

【数据结构】二叉树的前序遍历(七)

题目&#xff1a;二叉树的前序遍历 题目详情&#xff1a;给你二叉树的根节点 root &#xff0c;返回它节点值的 前序 遍历&#xff1b; 我们先来看几个示例&#xff1a; 输入&#xff1a;root [ 1&#xff0c;null&#xff0c;2&#xff0c;3 ] 输出&#xff1a;[ 1&#xf…

【送书】从不了解用户画像,到用画像数据赋能业务看这一本书就够了丨《用户画像:平台构建与业务实践》

文章目录 内容了解本书目录参与方式 &#x1f308;hello&#xff01; 各位铁汁们大家好啊&#xff0c;今天给大家推荐的的是机械工业出版社的 《用户画像&#xff1a;平台构建与业务实践》这本书&#xff01;   ⛳️大数据时代&#xff0c;如何有效地挖掘数据价值并通过画像数…

jvm-sandbox-repeater时间mock插件设计与实现

一、背景 jvm-sandbox-repeater实现了基础的录制回放流程编排&#xff0c;并简单的给了几个插件的demo&#xff0c;离实际项目运用其实还需要二次开发很多东西&#xff0c;其中时间mock能力是一个非常基础的能力&#xff0c;业务代码里经常需要用到这块&#xff1b; 二、调研 …

win10安装kafka,监听9092端口,java调用

1、从Kafka的官网下载Kafka安装包&#xff1a;Apache Kafka 我下的是2.8.0 对应pom.xml配置 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version> </dependency&…

linux安装配置zeppein

zeppelin是一个让交互式数据分析变得可行的基于网页的开源框架&#xff0c;具有数据分析、数据可视化等功能 一 解压安装包 这里提供了网盘资源 链接: https://pan.baidu.com/s/16pIoHL6ApGAs063cTOUYUg?pwdffq6 提取码: ffq6 下面的是 zeppein 安装包以及&#xff0c;上面的…

【爬虫实战】用python爬今日头条热榜TOP50榜单!

文章目录 一、爬取目标二、爬取结果三、代码讲解四、技术总结五、演示视频六、附完整源码 一、爬取目标 您好&#xff01;我是马哥python说&#xff0c;一名10年程序猿。 今天分享一期爬虫案例&#xff0c;爬取的目标是&#xff1a;今日头条热榜的榜单数据。 打开今日头条首…

月木学途开发 1.后台用户模块

概述 权限控制采用springsecurity 数据库设计 用户表 DROP TABLE IF EXISTS admin; CREATE TABLE admin (aid int(32) NOT NULL AUTO_INCREMENT,email varchar(50) DEFAULT NULL,username varchar(50) DEFAULT NULL,password varchar(255) DEFAULT NULL,phoneNum varchar(2…

日主题RiPro主题高端美化/设计素材软件下载站专用子主题/美化包源码(升级版/免拓展)

主题简介 日主题RiPro主题高端美化/设计素材软件下载站专用子主题/美化包源码&#xff0c;这个子主题美化包无授权无暗链&#xff0c;不用再修改原主题代码&#xff0c;在用这个本美化包前要先下载最新ripro主题&#xff0c;本子主题有版本要求&#xff0c;仅支持RiPro8.0以上…

Api接口加密策略

接口安全要求&#xff1a; 1.防伪装攻击&#xff08;案例&#xff1a;在公共网络环境中&#xff0c;第三方 有意或恶意 的调用我们的接口&#xff09; 2.防篡改攻击&#xff08;案例&#xff1a;在公共网络环境中&#xff0c;请求头/查询字符串/内容 在传输过程被修改&#x…

什么是网络安全?网络安全包括哪几个方面?

提及网络安全&#xff0c;很多人都是既熟悉又陌生&#xff0c;所谓的熟悉就是知道网络安全可以保障网络服务不中断。那么到底什么是网络安全?网络安全包括哪几个方面?通过下文为大家介绍一下。 什么是网络安全? 网络安全是指网络系统的硬件、软件及系统中的数据受到保护&a…

无人直播设置必看:手机自动直播有哪些好处?

随着科技的不断发展&#xff0c;手机直播已经成为了人们生活中不可或缺的一部分。手机自动直播作为一种新兴的技术&#xff0c;为我们的生活带来了诸多便利。本文将从以下几个方面阐述手机自动直播的好处。 首先&#xff0c;手机自动直播可以节省时间和精力。传统的直播方式需…

c 语言开发

目录 IOS 开发&#xff1a;c、swift、object-c 第一节 终端生成.c 文件 ​编辑 第二节 常见的数据类型 第三节 运算符、scanf ​编辑 第四节&#xff1a;如何产生随机数 第五节&#xff1a;while 循环 第六节&#xff1a;goto 第七节&#xff1a;函数 ​编辑 第八节…

中移链交易模块介绍

中移链交易模块是中移链区块链系统的核心模块之一。它的主要作用是处理用户发起的交易请求&#xff0c;并将其打包成区块添加到区块链上。交易模块接收来自不同合约执行的指令&#xff0c;比如创建账号、转账、部署和执行智能合约等指令&#xff0c;并确保所有交易都是有效且合…

css实现Chrome标签栏

如图这是一个特殊的带有圆角的导航栏&#xff0c;实现这种效果并不难 这是我实现的效果&#xff1a; 淡一点的就是鼠标悬停的样式 以下是代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><m…

python基于django或flask开发的健身俱乐部网站rix1z

本系统有三个角色&#xff1a;管理员、用户和教练&#xff0c;要求具备以下功能&#xff1a; &#xff08;1&#xff09;用户可以浏览主页了解健身课程、健身器材、会员卡信息、新闻公告等信息&#xff0c;并进行在线留言&#xff1b; &#xff08;2&#xff09;管理员通过后台…

Centos7安装nginx及网页如何放入nginx

Centos7安装nginx及网页如何放入nginx 安装所需的插件 安装gcc gcc是linux下的编译器在此不多做解释&#xff0c;感兴趣的小伙伴可以去查一下相关资料&#xff0c;它可以编译 C,C,Ada,Object C和Java等语言 查看版本 gcc -v如果出现下图就说明是你的Linux有centos 没有的话…

JWT 令牌撤销:中心化控制与分布式Kafka处理

【squids.cn】 全网zui低价RDS&#xff0c;免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等 令牌对于安全数字访问至关重要&#xff0c;但如果您需要撤销它们怎么办&#xff1f;尽管我们尽了最大努力&#xff0c;但有时代币可能会被泄露。这可能是由于编码错误、…

vue+springboot,easyexcel的excel文件下载

文章目录 1.效果展示1.1 前端界面1.2 下载的excel 2.思路介绍3.前端代码展示4.后端代码展示5.核心代码解释 1.效果展示 excel文件单一sheet&#xff0c;多sheet导出 本文主要介绍如何使用easyexcel &#xff0c;配合前端导出Excel文件。同时提供Excel的两种导出形式&#xff1a…

《扩散模型 从原理到实战》Hugging Face (二)

第二章 Hugging Face简介 本章无有效内容 第三章 从零开始搭建扩散模型 有时候&#xff0c;只考虑事情最简单的情况反而更有助于理解其工作原理。本章尝试从零开始搭建廓庵模型&#xff0c;我们将从一个简单的扩散模型讲起&#xff0c;了解其不同部分的工作原理&#xff0c;…