RabbitMQ的应用

news2024/11/24 8:47:13

七种工作模式介绍

1.Simple(简单模式)

P:生产者,也就是要发送信息的程序

C:消费者,消息的接收者

Queue:消息队列。图中黄色背景部分,类似一个邮箱,可以缓存发送信息;生产者向其中投递信息,消费者从其中取出消息

特点:一个生产者P,一个消费者C,消息只能被消费一次,就被称为点对点模式

2.Work Queue(工作队列)

一个生产者P,多个消费者C1,C2.在多个消息的情况下,Work Queue会将消息分配给不同的消费者,每个消费者都会接收到不同的消息.

特点:消息不会重复,分配给不同的消费者.

适用场景:集群环境种做异步处理

比如12306的短信通知服务,订票成功后,订单消息会发送到RabbitMQ,短信服务从RabbitMQ获取订单信息,并发送不同的通知信息给消费者.

生产者代码

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1.建立连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置参数
        factory.setHost(Constants.HOST);//ip
        factory.setPort(Constants.PORT);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟机名称
        factory.setUsername(Constants.USER_NAME);//用户名
        factory.setPassword(Constants.PASSWORD);//密码
        //创建连接Connection
        Connection connection = factory.newConnection();
        //2.创建channel通道
        Channel channel = connection.createChannel();
        //3.声明队列
        channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
        //4.发送消息
        for(int i=0;i<10;i++){
            String msg = "hello,work queue~"+i;
            channel.basicPublish("",Constants.WORK_QUEUE,null,msg.getBytes());
        }
        //5.资源释放
        channel.close();
        connection.close();
    }
}

消费者代码,不过是两个消费者

package rabbitmq.work;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列   使用内置的交换机
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        //6. 资源释放
//        channel.close();
//        connection.close();
    }
}

启动两个消费者跟生产者可以观察到

此时队列里就有两个消费者。

3.Publish/Subscribe(发布/订阅)

 在这个模式种出现了一个Exchange的角色

作⽤: ⽣产者将消息发送到Exchange, 由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产
者将消息投递到队列中, 实际上这个在RabbitMQ中不会发⽣. )
RabbitMQ交换机有四种类型: fanout,direct, topic, headers, 不同类型有着不同的路由策略. AMQP协议⾥还有另外两种类型, System和⾃定义, 此处不再描述.
1. Fanout:⼴播,将消息交给所有绑定到交换机的队列( Publish/Subscribe模式)
2. Direct:定向,把消息交给符合指定routing key的队列( Routing模式 )
3. Topic:通配符,把消息交给符合routing pattern(路由模式)的队列( Topics模式 )
4. headers类型的交换器不依赖于路由键的匹配规则来路由消息, ⽽是根据发送的消息内容中的
headers属性进⾏匹配. headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.
Exchange(交换机)只负责转发消息, 不具备存储消息的能⼒, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失

生产者代码

package rabbitmq.fanout;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3.声明交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);
        //4.声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
        //5.绑定交换机和队列
        channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
        channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
        //6.发布消息
        String msg = "hello fanout....";
        channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
        System.out.println("消息发送成功");
        //7.释放资源
        channel.close();
        connection.close();
    }
}

消费者代码(有两个消费者)

package rabbitmq.fanout;

import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3.声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        //4.消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE1,true,consumer);
    }
}

 运行生产者代码可以看到

两个队列分别有一条信息,同时Exchange跟多个队列绑定关系

此时运行消费者

两个消费者都收到信息

4.路由模式

队列和交换机的绑定, 不能是任意的绑定了, ⽽是要指定⼀个BindingKey(RoutingKey的⼀种)
消息的发送⽅在向Exchange发送消息时, 也需要指定消息的RoutingKey
Exchange也不再把消息交给每⼀个绑定的key, ⽽是根据消息的RoutingKey进⾏判断, 只有队列绑定时的BindingKey和发送消息的RoutingKey 完全⼀致, 才会接收到消息
生产者代码
package rabbitmq.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
        //4. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
        //5. 绑定交换机和队列
        channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
        //6. 发送消息
        String msg = "hello direct, my routingkey is a....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());

        String msg_b = "hello direct, my routingkey is b....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());

        String msg_c = "hello direct, my routingkey is c....";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());
        System.out.println("消息发送成功");

        //7. 释放资源
        channel.close();
        connection.close();
    }
}

消费者代码

package rabbitmq.direct;
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);

    }
}

运行代码可以看到队列2有三条消息,队列1有一条消息

 5.Topics(通配符模式)

在topic类型的交换机在匹配规则上, 有些要求:
1. RoutingKey 是⼀系列由点( . )分隔的单词, ⽐如 " stock.usd.nyse ", " nyse.vmw ",
" quick.orange.rabbit "
2. BindingKey 和RoutingKey⼀样, 也是点( . )分割的字符串.
3. Binding Key中可以存在两种特殊字符串, ⽤于模糊匹配
* 表⽰⼀个单词
# 表⽰多个单词(0-N个

6.RPC通信

RPC(Remote Procedure Call), 即远程过程调⽤. 它是⼀种通过⽹络从远程计算机上请求服务, ⽽不需要
了解底层⽹络的技术. 类似于Http远程调⽤.
RabbitMQ实现RPC通信的过程, ⼤概是通过两个队列实现⼀个可回调的过程.
⼤概流程如下:
1. 客⼾端发送消息到⼀个指定的队列, 并在消息属性中设置replyTo字段, 这个字段指定了⼀个回调队列, 服务端处理后, 会把响应结果发送到这个队列.
2. 服务端接收到请求后, 处理请求并发送响应消息到replyTo指定的回调队列
3. 客⼾端在回调队列上等待响应消息. ⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应.
编写客户端代码
1. 声明两个队列, 包含回调队列replyQueueName, 声明本次请求的唯⼀标志corrId
2. 将replyQueueName和corrId配置到要发送的消息队列中
3. 使⽤阻塞队列来阻塞当前进程, 监听回调队列中的消息, 把请求放到阻塞队列中
4. 阻塞队列有消息后, 主线程被唤醒,打印返回内容
package rabbitmq.rpc;
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
        //3. 发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

        //4. 接收响应
        //使用阻塞队列, 来存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: "+ respMsg);
                if (correlationID.equals(properties.getCorrelationId())){
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]:"+ result);

    }
}

服务端代码

package rabbitmq.rpc;
import com.rabbitmq.client.*;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 接收请求
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body,"UTF-8");
                System.out.println("接收到请求:"+ request);
                String response = "针对request:"+ request +", 响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                        .correlationId(properties.getCorrelationId())
                        .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
    }
}

7.Publisher Confirms(发布确认)

作为消息中间件, 都会⾯临消息丢失的问题.
消息丢失⼤概分为三种情况:
1. ⽣产者问题. 因为应⽤程序故障, ⽹络抖动等各种原因, ⽣产者没有成功向broker发送消息.
2. 消息中间件⾃⾝问题. ⽣产者成功发送给了Broker, 但是Broker没有把消息保存好, 导致消息丢失.
3. 消费者问题. Broker 发送消息到消费者, 消费者在消费消息时, 因为没有处理好, 导致broker将消费失败的消息从队列中删除
针对问题1, 可以采⽤发布确认(Publisher Confirms)机制实现.
发布确认 属于RabbitMQ的七⼤⼯作模式之⼀.
⽣产者将信道设置成confirm(确认)模式, ⼀旦信道进⼊confirm模式, 所有在该信道上⾯发布的消息都会被指派⼀个唯⼀的ID(从1开始), ⼀旦消息被投递到所有匹配的队列之后, RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID), 这就使得⽣产者知道消息已经正确到达⽬的队列了, 如果消息和队列是可持久化的, 那么确认消息会在将消息写⼊磁盘之后发出. broker回传给⽣产者的确认消息中 deliveryTag 包含了确认消息的序号, 此外 broker 也可以设置channel.basicAck⽅法中的multiple参数, 表⽰到这个序号之前的所有消息都已经得到了处理.
发布确认有三种策略
package rabbitmq.publisher.confirms;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;

public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 200;
    static Connection createConnection() throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        return connectionFactory.newConnection();
    }

    public static void main(String[] args) throws Exception {
        //单独确认
        publishingMessagesIndividually();
        //批量确认
        publishingMessagesInBatches();
        //异步确认
        handlingPublisherConfirmsAsynchronously();
    }

    private static void handlingPublisherConfirmsAsynchronously() throws Exception{
        try (Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
            //4. 监听confirm
            //集合中存储的是未确认的消息ID
            long start = System.currentTimeMillis();
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                        if(multiple){
                            confirmSeqNo.headSet(deliveryTag+1).clear();
                        }else{
                            confirmSeqNo.remove(deliveryTag);
                        }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                    //业务需要根据实际场景进行处理, 比如重发, 此处代码省略
                }
            });
            //5. 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()){
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }
        //批量确认
    private static void publishingMessagesInBatches() throws Exception{
        try(Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
            //4. 发送消息, 并进行确认
            long start = System.currentTimeMillis();
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount==batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if (outstandingMessageCount>0){
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);

        }
    }

    //单独确认
    private static void publishingMessagesIndividually() throws Exception{
        try(Connection connection = createConnection()){
            //1.开启信道
            Channel channel = connection.createChannel();
            //2.设置信道为confirm模式
            channel.confirmSelect();
            //3.声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1,true,false,false,null);
            //4.发送消息吗,并等待确认
            long start =System.currentTimeMillis();
            for(int i=0;i< MESSAGE_COUNT;i++){
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1,null,msg.getBytes());
                //等待确认
                channel.waitForConfirmsOrDie(5000);
            }
            long end =System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2237068.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

K8S网络插件故障处理

1网络插件故障 1此故障问题处理方法 查询ip是否正常是否是主节点IP地址如果不是需要更改 更改方式 1 修改calico.yaml文件的相应参数 # Cluster type to identify the deployment type - name: IP_AUTODETECTION_METHOD #增加内容value: "interfaceens*" 或者 value…

【论文速看】DL最新进展20241109-图像超分、物理信息神经网络、扩散模型

目录 【图像超分】【物理信息神经网络】【扩散模型】 【图像超分】 [2024 红外图像超分] Infrared Image Super-Resolution via Lightweight Information Split Network 论文链接&#xff1a;https://arxiv.org/pdf/2405.10561v2 代码链接&#xff1a;无 单图像超分辨率&…

Python学习从0到1 day26 第三阶段 Spark ① 数据输入

要学会 剥落旧痂 然后 循此新生 —— 24.11.8 一、Spark是什么 定义&#xff1a; Apache Spark 是用于大规模数据处理的统一分析引擎 简单来说&#xff0c;Spark是一款分布式的计算框架&#xff0c;用于调度成百上千的服务器集群&#xff0c;计算TB、PB乃至EB级别的海量数据…

[Python学习日记-63] 继承与派生

[Python学习日记-63] 继承与派生 简介 继承 派生 简介 上一篇文章我们学习了类如何使用&#xff0c;以及相关特性&#xff0c;也做了相关的练习&#xff0c;在练习当中发现类与类之间有时也会存在重复代码&#xff0c;其实在类中我们还有一个继承和派生的概念没有说&#xf…

基于 Encoder-only 架构的大语言模型

基于 Encoder-only 架构的大语言模型 Encoder-only 架构 Encoder-only 架构凭借着其独特的双向编码模型在自然语言处理任务中表现出色&#xff0c;尤其是在各类需要深入理解输入文本的任务中。 核心特点&#xff1a;双向编码模型&#xff0c;能够捕捉全面的上下文信息。 En…

Python学习------第四天

Python的判断语句 一、布尔类型和比较运算符 二、 if语句的基本格式 if语句注意空格缩进&#xff01;&#xff01;&#xff01; if else python判断语句的嵌套用法&#xff1a;

uniapp实现H5和微信小程序获取当前位置(腾讯地图)

之前的一个老项目&#xff0c;使用 uniapp 的 uni.getLocation 发现H5端定位不准确&#xff0c;比如余杭区会定位到临平区&#xff0c;根据官方文档初步判断是项目的uniapp的版本太低。 我选择的方式不是区更新uniapp的版本&#xff0c;是直接使用高德地图的api获取定位。 1.首…

测试网空投进行中 — 全面了解 DePIN 赛道潜力项目 ICN Protocol 及其不可错过的早期红利

随着云计算技术的飞速发展&#xff0c;越来越多的企业和个人对云服务的需求变得多样化且复杂化。然而&#xff0c;传统的中心化云服务平台&#xff08;如AWS、微软Azure等&#xff09;往往存在着高成本、数据隐私保护不足以及灵活性差等问题。 为了解决这些挑战&#xff0c;Imp…

IntelliJ IDEA 使用心得与常用快捷键

刚开始学习写Java的时候&#xff0c;用的eclipse&#xff0c;正式工作后&#xff0c;主要用的myeclipse&#xff0c;去年初在前辈的推荐下&#xff0c;在2折的时候买了正版的 IntelliJ IDEA 和 Pycharm&#xff0c;12.0版终生使用&#xff0c;一年更新。 使用前早就久闻其名&am…

【rust】rust基础代码案例

文章目录 代码篇HelloWorld斐波那契数列计算表达式&#xff08;加减乘除&#xff09;web接口 优化篇target/目录占用一个g&#xff0c;仅仅一个actix的helloWorld demo升级rust版本&#xff0c; 通过rustupcargo换源windows下放弃吧&#xff0c;需要额外安装1g的toolchain并且要…

施工企业为什么要用工程项目管理软件?工程项目管理软件的用处是什么?

施工企业一定会遇到哪些问题&#xff1f;工人怠工、材料浪费、数据造假、工期拖延、质量问题、安全隐患等。这些问题正在悄然侵蚀建施工业的经济效益。每一个环节的失控都可能导致巨大的经济损失&#xff0c;还可能损害企业的声誉。面对日益复杂的工程管理环境&#xff0c;如何…

【C++】详解RAII思想与智能指针

&#x1f308; 个人主页&#xff1a;谁在夜里看海. &#x1f525; 个人专栏&#xff1a;《C系列》《Linux系列》 ⛰️ 丢掉幻想&#xff0c;准备斗争 目录 引言 内存泄漏 内存泄漏的危害 内存泄漏的处理 一、RAII思想 二、智能指针 1.auto_ptr 实现原理 模拟实现 弊端…

所谓的情商高,其实就是会说话!

所谓的情商高&#xff0c;其实就是会说话&#xff01; 1.当遇到不知道的事情时&#xff0c;不要直截了当地说“不知道”。而应委婉地表达为“我想听听你的看法”。 如此既能避免尴尬&#xff0c;又能展现出对对方见解的尊重和期待。 2.不要简单地说“我迟到了”&#xff0c;…

ALB搭建

ALB: 多级分发、消除单点故障提升应用系统的可用性&#xff08;健康检查&#xff09;。 海量微服务间的高效API通信。 自带DDoS防护&#xff0c;集成Web应用防火墙 配置&#xff1a; 1.创建ECS实例 2.搭建应用 此处安装的LNMP 3.创建应用型负载均衡ALB实例 需要创建服务关联角…

【spark面试】spark的shuffle过程

概述 所有的shuffle的过程本质上就是一个task将内存中的数据写入磁盘&#xff0c;然后另一个task将磁盘中的数据读入内存的过程。 对于mapreduce来说&#xff0c;我们将内存中的数据写入磁盘成为maptask&#xff0c;将磁盘中的数据读入内存称为reducetask。 而对于spark来说&…

Android 实现一个系统级的悬浮秒表

前言 由于项目需要将手机录屏和时间日志对应起来&#xff0c;一般的手机录屏只能看到分钟&#xff0c;但是APP的日志输出通常都是秒级别的&#xff0c;于是决定自己手撸一个悬浮秒表&#xff08;有拖拽效果&#xff09;。 效果如下 具体实现 大致的实现思路&#xff1a; 创…

【科普小白】LLM大语言模型的基本原理

一、要了解LLM大模型的基本原理就要先来了解一下自然语言处理&#xff08;NLP&#xff09;。 NLP 是 AI 的一个子领域&#xff0c;专注于使计算机能够处理、解释和生成人类语言&#xff0c;主要任务包括&#xff1a;文本分类、自动翻译、问题回答、生成文本等。到底是NLP促生了…

Go语言开发商城管理后台-GoFly框架商城插件已发布 需要Go开发商城的朋友可以来看看哦!

温馨提示&#xff1a;我们分享的文章是给需要的人&#xff0c;不需要的人请绕过&#xff0c;文明浏览&#xff0c;误恶语伤人&#xff01; 前言 虽然现在做商城的需求不多&#xff0c;但有很多项目中带有商城功能&#xff0c;如社区医院系统有上服务套餐、理疗产品需求、宠物…

ts 如何配置引入 json 文件

ts 如何配置引入 json 文件 参考文档&#xff1a; https://maxgadget.dev/article/how-to-import-a-json-file-in-typescript-a-comprehensive-guide 项目中有一个 .json 的文件是配置文件&#xff0c;如何引入到 ts 项目中 配置 tsconfig.json 文件&#xff0c;添加这两个 {…

如何让ffmpeg运行时从当前目录加载库,而不是从/lib64

程序在linux下运行时&#xff0c;一般从 /lib64 目录下加载依赖的库文件&#xff0c;如xxx.so. 有时候&#xff0c;系统里没有这些库&#xff0c;也不想从系统目录下加载&#xff0c;怎么办呢&#xff1f; 看下面的调整过程。 使用的源代码是 ffmpeg-6.1.tar.xz 解压后&…