项目实战 — 消息队列(8){网络通信设计②}

news2024/11/26 14:58:40

目录

 一、客户端设计

🍅 1、设计三个核心类

 🍅 2、完善Connection类

        🎄 读取请求和响应、创建channel

         🎄 添加扫描线程

        🎄 处理不同的响应

        🎄 关闭连接

🍅 3、完善Channel类

🎄 编写createChannel()

🎄 编写waitResult()和putRetuens()方法

🎄 编写其他核心API

        🎊 交换机

        🎊 队列

         🎊 绑定

         🎊发布消息

        🎊 订阅消息

        🎊 确认消息

二、客户端测试

🍅 1、准备工作和收尾工作

 🍅 2、测试connection

🍅 3、测试channnel的创建

🍅 4、测试交换机

 🍅 5、测试队列

🍅 6、测试绑定


 一、客户端设计

🍅 1、设计三个核心类

三个核心类:

(1)ConnectoonFactory连接工厂:这个类,持有服务器的地址,主要功能是创建出连接Connectiond对象

(2)Connection:表示一个TCP连接,持有Socket对象,写入请求/读取响应,管理多个Channel对象

(3)Channel:表示一个逻辑上的连接。当前设定的交互模型,一个TCP连接是可以进行复用的,一个客户端可以有多个模块,每个模块都可以和brokerServer之间建立“逻辑上的连接”(channel),但是这几个模块的channel之间是互相不影响的。同时还需要提供一系列的方法,与服务器提供的核心API进行对应。

 先创建这三个核心的类

在包mqclient中创建这三个类。

@Data
public class ConnectionFactory {
//    BrokerServer的ip地址
    private String host;
//    BrokerServer端口号
    private int port;

    public Connection newConnection(){
        Connection connection = new Connection(host,port);
        return connection;
    }
}
@Data
public class Connection {
    private Socket socket = null;

//    需要管理多个channel,使用哈希表把若干个channel组织起来
    private ConcurrentHashMap<String ,Channel> channelMap = new ConcurrentHashMap<>();

    private InputStream inputStream;
    private OutputStream outputStream;
    private DataInputStream dataInputStream;
    private DataOutputStream dataOutputStream;
    private ExecutorService callbackPool = null;

    public Connection(String host,int port) throws IOException {
        socket = new Socket(host,port);
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        dataInputStream = new DataInputStream(inputStream);
        dataOutputStream = new DataOutputStream(outputStream);
    }
    
//    使用该方法分别处理,当前响应是一个针对控制请求的响应,还是服务器推送的响应
    private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
    //TODO
    }

        //    发送请求
    public void writeRequest(Request request) throws IOException {
//        TODO
    }

    
        //    读取响应
    public Response readResponse() throws IOException {
//        TODO
    }

//    通过这个方法,再connection中创建出一个channel

    public Channel createChannel(){
    // TODO
        return channel;
    }
//    关闭connection
   public void close() {
        //TODO
    }
}
@Data
public class Channel {
    private String channelId;
//    当前channel属于哪个连接
    private Connection connection;
//    用来存储后续客户端收到的服务器的响应
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
//    如果当前的Channel订阅了某个队列,此处就需要记录对应的回调是什么
//    当该队列的消息返回回来的时候,就调用回调
//    约定一个channel值能有一个回调
    private Consumer consumer = null;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }


//    这个方法主要和服务器进行交互,
//    目的是为了告知服务器,此处客户端创建了新的channel
    public boolean createChannel() {
//        TODO
        return true;
    }


    //    使用该方法阻塞等待服务器的响应
    private BasicReturns waitResult(String rid) {
        return null;
    }

//    关闭channel,给服务器发送一个type == 0x2的请求
    public boolean close() throws IOException {
        //TODO
        return null;
    }


//创建核心的API方法
}


 🍅 2、完善Connection类

这里完成发送请求、读取响应、创建channel、处理响应、关闭连接

        🎄 读取请求和响应、创建channel

//    发送请求
   public void writeRequest(Request request) throws IOException {
        dataOutputStream.writeInt(request.getType());
        dataOutputStream.writeInt(request.getLength());
        dataOutputStream.write(request.getPayload());
        dataOutputStream.flush();
        System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());
    }

    //    读取响应
    public Response readResponse() throws IOException {
        Response response = new Response();
        response.setType(dataInputStream.readInt());
        response.setLength(dataInputStream.readInt());
        byte[] payload = new byte[response.getLength()];
        int n = dataInputStream.read(payload);
        if (n != response.getLength()) {
            throw new IOException("读取的响应数据不完整!");
        }
        response.setPayload(payload);
        System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());
        return response;
    }


//    通过这个方法,再connection中创建出一个channel
//    此处的createChannel()方法在后面channel类中编写以后,会抛异常,这里大家写完channel之后回过来手动抛一下
    public Channel createChannel() {    //throws IOException
        String channelId = "C-" + UUID.randomUUID();
        Channel channel = new Channel(channelId,this);
//        把channel对象放到Connection管理的channel的哈希表中
        channelMap.put(channelId,channel);
//        同时也需要把“创建channel”的这个消息也告诉服务器
        boolean ok = channel.createChannel();
        if (!ok){
//            创建channel失败
//            删除hash表中的键值对
            channelMap.remove(channelId);
            return null;
        }
        return channel;
    }

         🎄 添加扫描线程

在构造方法中,添加一个扫描线程,使用该线程,不停的从socket中读取响应,再将这个响应交给对应的channnel。

   public Connection(String host,int port) throws IOException {
        socket = new Socket(host,port);
        inputStream = socket.getInputStream();
        outputStream = socket.getOutputStream();
        dataInputStream = new DataInputStream(inputStream);
        dataOutputStream = new DataOutputStream(outputStream);

        callbackPool = Executors.newFixedThreadPool(4);

        //        创建一个扫描线程
//        该线程负责不停的从socket中读取响应数据,然后把这个响应数据再交给对应的channel负责处理
        Thread t = new Thread(() -> {
            try {
                while (!socket.isClosed()) {
                    Response response = readResponse();
                    dispatchResponse(response);
                }
            } catch (SocketException e) {
                // 连接正常断开的. 此时这个异常直接忽略.
                System.out.println("[Connection] 连接正常断开!");
            } catch (IOException | ClassNotFoundException | MqException e) {
                System.out.println("[Connection] 连接异常断开!");
                e.printStackTrace();
            }
        });
        t.start();
    }

        🎄 处理不同的响应

 使用该方法分别处理两种不同的响应,当前响应是一个针对控制请求的响应,还是服务器推送的响应。

private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
        if (response.getType() == 0xc) {
//            服务器推送来的消息数据
        SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());

//            根据channelId找到对用的channel对象
             Channel channel = channelMap.get(subScribeReturns.getChannelId());
            if (channel == null) {
                throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId =" + channel.getChannelId());
            }
            callbackPool.submit(() -> {
                try {
                    channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
                            subScribeReturns.getBody());
                } catch (MqException | IOException e) {
                    e.printStackTrace();
                }
            });
        } else {
//            当前响应是针对控制请求的响应
            BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
//            把这个结果放到对应的channel的hash表中
            Channel channel = channelMap.get(basicReturns.getChannelId());
            if (channel == null) {
                throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId =" + channel.getChannelId());
            }
            channel.putReturns(basicReturns);
        }
    }

        🎄 关闭连接

public void close() {
        //        关闭connection,释放持有的资源
        try {
            callbackPool.shutdownNow();
            channelMap.clear();
            inputStream.close();
            outputStream.close();
            socket.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

🍅 3、完善Channel类

🎄 编写createChannel()

//    这个方法主要和服务器进行交互,
//    目的是为了告知服务器,此处客户端创建了新的channel

//    这个方法主要和服务器进行交互,
//    目的是为了告知服务器,此处客户端创建了新的channel
    public boolean createChannel() throws IOException {
//        对于创建channel来说,payload就是一个basicArguments
        BasicArguments basicArguments=  new BasicArguments();
        basicArguments.setChannelId(channelId);
//        rid表示这次请求的id
        basicArguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(basicArguments);

        Request request = new Request();
        request.setType(0x1);
        request.setLength(payload.length);
        request.setPayload(payload);

//       发送构造出的请求
        connection.writeRequest(request);
//        等待服务器的响应
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }


    private String generateRid(){
        return "R-" + UUID.randomUUID().toString();
    }

🎄 编写waitResult()和putRetuens()方法

putRetuents()是为了将返回的响应放到对用的哈希表中

    public void putReturns(BasicReturns basicReturns) {
        basicReturnsMap.put(basicReturns.getRid(),basicReturns);
        synchronized (this){
//            当前不知道有多少个线程在等待上面的响应
//            把多有的等待的线程都唤醒
            notifyAll();
        }
    }

waitResult()方法的作用是为了阻塞等待服务器的响应。以下举例说明:  

  如下,假如有3个channel,按照123的顺序发送了请求,所以应该是请求1先等待响应1,然后再是2和3。

但是,现在有一个情况,服务器这边是多线程并发处理请求,服务器处理每个请求的时间不一样,返回响应的顺序也就不一样。

如下图,channel1等待的是响应1,但是先返回的响应却是2和3。响应1还没来,请求1就一直等。请求1,没等到,后面的2和3也就拿不到响应

所以,为了解决这个问题,就创建了basicReturnsMap,将socket中收到的所有响应数据放到这个在前面创建的basicReturnsMap哈希表中。客户端的的请求,就可以不断的从这个哈希表中寻找是否存在和自己匹配的响应。

如果存在,就把相应取走;不存在,就继续等待。

这个waitResult()方法就是,当请求对应的响应不再哈希表中时,就阻塞等待。

//    使用该方法阻塞等待服务器的响应
    private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns = null;
        while ((basicReturns = basicReturnsMap.get(rid)) == null){
//            如果查询结果为null,说明响应没来
//            此时就需要阻塞等待
//            此处加锁是为了保证wait/notify的是同一个对象
            synchronized (this){
                try {
                    wait();
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }
//        读取成功之后,把这个响应从哈希表中删除掉
        basicReturnsMap.remove(rid);
        return basicReturns;
    }


🎄 编写其他核心API

        🎊 交换机

//    创建交换机
    public  boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable) throws IOException {
        ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
        exchangeDeclareArguments.setRid(generateRid());
        exchangeDeclareArguments.setChannelId(channelId);
        exchangeDeclareArguments.setExchangeName(exchangeName);
        exchangeDeclareArguments.setExchangeType(exchangeType);
        exchangeDeclareArguments.setDurable(durable);
        byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);

        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
        return basicReturns.isOk();
    }

//    删除交换机
    public boolean exchangeDelete(String exchangeName) throws IOException {
        ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

        🎊 队列

 public boolean queueDeclare(String queueName,boolean durable) throws IOException {
        QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
        queueDeclareArguments.setRid(generateRid());
        queueDeclareArguments.setChannelId(channelId);
        queueDeclareArguments.setQueueName(queueName);
        queueDeclareArguments.setDurable(durable);
        byte[] payload = BinaryTool.toBytes(queueDeclareArguments);

        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
        queueDeleteArguments.setRid(generateRid());
        queueDeleteArguments.setChannelId(channelId);
        queueDeleteArguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(queueDeleteArguments);

        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());
        return basicReturns.isOk();
    }

         🎊 绑定

//    创建绑定
    public  boolean queueBind(String  queueName,String exchangeName,String bindingKey) throws IOException {
        QueueBindArguments queueBindArguments = new QueueBindArguments();
        queueBindArguments.setRid(generateRid());
        queueBindArguments.setChannelId(channelId);
        queueBindArguments.setExchangeName(exchangeName);
        queueBindArguments.setQueueName(queueName);
        queueBindArguments.setBindingKey(bindingKey);
        byte[] payload = BinaryTool.toBytes(queueBindArguments);

        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueBindArguments.getRid());
        return basicReturns.isOk();
    }

//    删除绑定
    public boolean queueUnbind(String queueName,String exchangeName) throws IOException {
        QueueUnbindArguments queueUnbindArguments = new QueueUnbindArguments();
        queueUnbindArguments.setRid(generateRid());
        queueUnbindArguments.setChannelId(channelId);
        queueUnbindArguments.setQueueName(queueName);
        queueUnbindArguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(queueUnbindArguments);

        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(queueUnbindArguments.getRid());
        return basicReturns.isOk();
    }

         🎊发布消息

//  发布消息
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
        BasicPublishArguments arguments = new BasicPublishArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setExchangeName(exchangeName);
        arguments.setRoutingKey(routingKey);
        arguments.setBasicProperties(basicProperties);
        arguments.setBody(body);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

        🎊 订阅消息

//    订阅消息
    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
//        先设置回调.
        if (this.consumer != null) {
            throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");
        }
        this.consumer = consumer;

        BasicConsumeArguments arguments = new BasicConsumeArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
//        此处 consumerTag 使用 channelId 来表示
        arguments.setConsumerTag(channelId);   
        arguments.setQueueName(queueName);
        arguments.setAutoAck(autoAck);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

        🎊 确认消息

//    确认消息
     public boolean basicAck(String queueName, String messageId) throws IOException {
        BasicAckArguments arguments = new BasicAckArguments();
        arguments.setRid(generateRid());
        arguments.setChannelId(channelId);
        arguments.setQueueName(queueName);
        arguments.setMessageId(messageId);
        byte[] payload = BinaryTool.toBytes(arguments);

        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(arguments.getRid());
        return basicReturns.isOk();
    }

二、客户端测试

🍅 1、准备工作和收尾工作

private BrokerServer brokerServer = null;
    private ConnectionFactory factory = null;
    private Thread t = null;

    @BeforeEach
    public void setUp() throws IOException {
        // 1. 先启动服务器
        TigerMqApplication.context = SpringApplication.run(TigerMqApplication.class);
        brokerServer = new BrokerServer(9090);
        t = new Thread(() -> {
            // 这个 start 方法会进入一个死循环. 使用一个新的线程来运行 start 即可!
            try {
                brokerServer.start();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        t.start();

        // 2. 配置 ConnectionFactory
        factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
    }

    @AfterEach
    public void tearDown() throws IOException {
        // 停止服务器
        brokerServer.stop();
        // t.join();
        TigerMqApplication.context.close();

        // 删除必要的文件
        File file = new File("./data");
        FileUtils.deleteDirectory(file);

        factory = null;
    }
}

 🍅 2、测试connection

@Test
    public void testConnection() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
    }
打印日志:
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[BrokerServer] 服务器停止运行!

🍅 3、测试channnel的创建

 @Test
    public void testChannel() throws IOException{
        Connection connection = connectionFactory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);
    }

🍅 4、测试交换机

@Test
    public void testExchange() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
        Assertions.assertTrue(ok);

        ok = channel.exchangeDelete("testExchange");
        Assertions.assertTrue(ok);

        channel.close();
        connection.close();
    }
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[connection]发送请求!type = 1,length = 188
[Request] rid=R-ff8a7be0-8138-4334-b496-647b472349fa, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=1, length=188
[BrokerServer] 创建 channel 完成! channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f
[Response] rid=R-ff8a7be0-8138-4334-b496-647b472349fa, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=1, length=192
[Connection]收到响应!type = 1,length = 192
[connection]发送请求!type = 3,length = 412
[Request] rid=R-9c7ddbe5-e0c1-42b2-a7b3-e456c5c8e457, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=3, length=412
[MemoryDataCenter]新交换机添加成功!exchangeName = defaulttestExchange
[VirtualHost] 交换机创建完成!exchangeName = defaulttestExchange
[Response] rid=R-9c7ddbe5-e0c1-42b2-a7b3-e456c5c8e457, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=3, length=192
[Connection]收到响应!type = 3,length = 192
[connection]发送请求!type = 4,length = 288
[Request] rid=R-76865d42-f11f-4a57-9e99-d66f5bb6d228, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=4, length=288
[MemoryDataCenter]交换机删除成功! exchangeName = defaulttestExchange
[VirtualHost] 交换机删除成功!exchangeName = defaulttestExchange
[Response] rid=R-76865d42-f11f-4a57-9e99-d66f5bb6d228, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=4, length=192
[Connection]收到响应!type = 4,length = 192
[connection]发送请求!type = 2,length = 188
[Request] rid=R-49132d69-cc39-4030-9293-c9c4d3c3f0d1, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=2, length=188
[BrokerServer] 销毁 channel 完成! channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f
[Response] rid=R-49132d69-cc39-4030-9293-c9c4d3c3f0d1, channelId=C-4cc5b5a4-6b73-435e-b7cf-8d9e2a2ec72f, type=2, length=192
[Connection]收到响应!type = 2,length = 192
[Connection] 连接正常断开!
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:54675
[BrokerServer]清理session完成~ 被清理的channeId = []
[BrokerServer] 服务器停止运行!
2023-08-13 17:10:28.619  INFO 38940 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 17:10:28.704  INFO 38940 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 17:10:28.727  INFO 38940 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

Process finished with exit code 0

 🍅 5、测试队列

@Test
    public void testQueue() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);

        ok = channel.queueDelete("testQueue");
        Assertions.assertTrue(ok);

        channel.close();
        connection.close();
    }
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[connection]发送请求!type = 1,length = 188
[Request] rid=R-86546d10-c911-4cbc-be4c-ac2c9d04ff38, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=1, length=188
[BrokerServer] 创建 channel 完成! channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40
[Response] rid=R-86546d10-c911-4cbc-be4c-ac2c9d04ff38, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=1, length=192
[Connection]收到响应!type = 1,length = 192
[connection]发送请求!type = 5,length = 349
[Request] rid=R-2b3df748-be8a-4aab-9dc7-8c37abf93b91, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=5, length=349
[MemoryDataCenter]队列删除成功!queueName = defaulttestQueue
[VirtualHost]队列创建成功!queueName = defaulttestQueue
[Response] rid=R-2b3df748-be8a-4aab-9dc7-8c37abf93b91, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=5, length=192
[Connection]收到响应!type = 5,length = 192
[connection]发送请求!type = 6,length = 279
[Request] rid=R-057a79a8-20db-408f-82db-eb9e9145a48b, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=6, length=279
[MemoryDataCenter]删除队列成功!queueName = defaulttestQueue
[VirtualHost]删除队列成功!queueName = defaulttestQueue
[Response] rid=R-057a79a8-20db-408f-82db-eb9e9145a48b, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=6, length=192
[Connection]收到响应!type = 6,length = 192
[connection]发送请求!type = 2,length = 188
[Request] rid=R-73a48380-78e9-448f-9566-4cdc7da17bda, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=2, length=188
[BrokerServer] 销毁 channel 完成! channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40
[Response] rid=R-73a48380-78e9-448f-9566-4cdc7da17bda, channelId=C-91c1e861-6a7b-49de-a102-27c5437e0a40, type=2, length=192
[Connection]收到响应!type = 2,length = 192
[Connection] 连接正常断开!
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:54945
[BrokerServer]清理session完成~ 被清理的channeId = []
[BrokerServer] 服务器停止运行!
2023-08-13 17:16:54.304  INFO 72124 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 17:16:54.333  INFO 72124 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 17:16:54.363  INFO 72124 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

Process finished with exit code 0

🍅 6、测试绑定

 @Test
    public void testBinding() throws IOException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
        Assertions.assertTrue(ok);
        ok = channel.queueDeclare("testQueue", true);
        Assertions.assertTrue(ok);

        ok = channel.queueBind("testQueue", "testExchange", "testBindingKey");
        Assertions.assertTrue(ok);

        ok = channel.queueUnbind("testQueue", "testExchange");
        Assertions.assertTrue(ok);

        channel.close();
        connection.close();
    }
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[connection]发送请求!type = 1,length = 188
[Request] rid=R-071477e3-7115-42e7-9370-7995fa36daab, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=1, length=188
[BrokerServer] 创建 channel 完成! channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52
[Response] rid=R-071477e3-7115-42e7-9370-7995fa36daab, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=1, length=192
[Connection]收到响应!type = 1,length = 192
[connection]发送请求!type = 3,length = 412
[Request] rid=R-cc632d4d-f06f-4b70-88dd-63dd94c666f2, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=3, length=412
[MemoryDataCenter]新交换机添加成功!exchangeName = defaulttestExchange
[VirtualHost] 交换机创建完成!exchangeName = defaulttestExchange
[Response] rid=R-cc632d4d-f06f-4b70-88dd-63dd94c666f2, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=3, length=192
[Connection]收到响应!type = 3,length = 192
[connection]发送请求!type = 5,length = 349
[Request] rid=R-478d13e5-b999-4ac4-96cf-e0c8df829152, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=5, length=349
[MemoryDataCenter]队列删除成功!queueName = defaulttestQueue
[VirtualHost]队列创建成功!queueName = defaulttestQueue
[Response] rid=R-478d13e5-b999-4ac4-96cf-e0c8df829152, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=5, length=192
[Connection]收到响应!type = 5,length = 192
[connection]发送请求!type = 7,length = 347
[Request] rid=R-77076a0e-faa7-4ecb-867b-48c7e33d720d, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=7, length=347
[MemoryDataCenter]新绑定添加成功!exchangeName = defaulttestQueue,queueName = defaulttestQueue
[VirtualHost]绑定创建成功! exchangeName = defaulttestExchangequeueName = defaulttestQueue
[Response] rid=R-77076a0e-faa7-4ecb-867b-48c7e33d720d, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=7, length=192
[Connection]收到响应!type = 7,length = 192
[connection]发送请求!type = 8,length = 314
[Request] rid=R-f5eeb7c0-0e4c-4339-8e3b-057354e27380, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=8, length=314
[MemoryDataCenter]绑定删除成功!exchangeName = defaulttestQueue,queueName = defaulttestQueue
[VirtualHost]删除绑定成功
[Response] rid=R-f5eeb7c0-0e4c-4339-8e3b-057354e27380, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=8, length=192
[Connection]收到响应!type = 8,length = 192
[Request] rid=R-c3335cd6-d02e-440e-9aea-5275bf445412, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=2, length=188
[BrokerServer] 销毁 channel 完成! channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52
[Response] rid=R-c3335cd6-d02e-440e-9aea-5275bf445412, channelId=C-fa94e733-1642-425a-9da2-a1174b52ab52, type=2, length=192
[Connection]收到响应!type = 2,length = 192
[connection]发送请求!type = 2,length = 188
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:55256
[BrokerServer]清理session完成~ 被清理的channeId = []
[Connection] 连接正常断开!
[BrokerServer] 服务器停止运行!
2023-08-13 17:22:34.611  INFO 74604 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 17:22:34.646  INFO 74604 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 17:22:34.691  INFO 74604 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

Process finished with exit code 0

🍅 7、测试消息的相关操作

 @Test
    public void testMessage() throws IOException, MqException, InterruptedException {
        Connection connection = factory.newConnection();
        Assertions.assertNotNull(connection);
        Channel channel = connection.createChannel();
        Assertions.assertNotNull(channel);

        boolean ok = channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true);
        Assertions.assertTrue(ok);
        ok = channel.queueDeclare("testQueue", true, false);
        Assertions.assertTrue(ok);

        byte[] requestBody = "hello".getBytes();
        ok = channel.basicPublish("testExchange", "testQueue", null, requestBody);
        Assertions.assertTrue(ok);

        ok = channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                Assertions.assertArrayEquals(requestBody, body);
                System.out.println("[消费数据] 结束!");
            }
        });
        Assertions.assertTrue(ok);

        Thread.sleep(500);

        channel.close();
        connection.close();
    }
[DataBaseManger]创建表完成
[DataBaseManger]创建初始数据已经完成
[DataBaseManger]数据库初始化完成
[BrokerServer] 启动!
[Connection] 发送请求! type=1, length=188
[Request] rid=R-143ae2ea-f258-4874-bff4-be3f719d44ed, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=1, length=188
[BrokerServer] 创建 channel 完成! channelId=C-0977501d-4608-4428-ae5c-db2738c02068
[Response] rid=R-143ae2ea-f258-4874-bff4-be3f719d44ed, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=1, length=192
[Connection] 收到响应! type=1, length=192
[Connection] 发送请求! type=3, length=512
[Request] rid=R-f4acb000-ae7f-44b5-829d-aef69d8a7394, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=3, length=512
[MemoryDataCenter]新交换机添加成功!exchangeName = defaulttestExchange
[VirtualHost] 交换机创建完成!exchangeName = defaulttestExchange
[Response] rid=R-f4acb000-ae7f-44b5-829d-aef69d8a7394, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=3, length=192
[Connection] 收到响应! type=3, length=192
[Connection] 发送请求! type=5, length=349
[Request] rid=R-311dd85c-c95b-436e-8d4d-85a3c38b445e, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=5, length=349
[MemoryDataCenter]队列删除成功!queueName = defaulttestQueue
[VirtualHost]队列创建成功!queueName = defaulttestQueue
[Response] rid=R-311dd85c-c95b-436e-8d4d-85a3c38b445e, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=5, length=192
[Connection] 收到响应! type=5, length=192
[Connection] 发送请求! type=9, length=429
[Request] rid=R-b1088753-1ee7-43ff-ae1f-85679ad4e48d, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=9, length=429
[MemoryDataCenter]新消息添加成功!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[MemoryDataCenter]消息被投递到到队列中! messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[Response] rid=R-b1088753-1ee7-43ff-ae1f-85679ad4e48d, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=9, length=192
[Connection] 收到响应! type=9, length=192
[Connection] 发送请求! type=10, length=315
[Request] rid=R-bf16b934-229a-4b03-8d8c-50416fec0aa6, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=10, length=315
[MemoryDataCenter]消息从队列中取出!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[VirtualHost]basicConsume成功! queueName = defaulttestQueue
[Response] rid=R-bf16b934-229a-4b03-8d8c-50416fec0aa6, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=10, length=192
[Connection] 收到响应! type=10, length=192
[MemoryDataCenter]消息进入待确认队列!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[Connection] 收到响应! type=12, length=520
[MemoryDataCenter]消息从待确认队列删除!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[MemoryDataCenter]消息被移除!messageId = M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b
[ConsumerManager]消费被成功消费!queueName = defaulttestQueue
[消费数据] 开始!
consumerTag=C-0977501d-4608-4428-ae5c-db2738c02068
basicProperties=BasicProperties(messageId=M-b688741f-808b-4cfa-9d7a-1a4f51d28c0b, routingKey=testQueue, deliverMode=1)
[消费数据] 结束!
[Connection] 发送请求! type=2, length=188
[Request] rid=R-024d2c5b-dfd0-4944-983b-2cfab56350d4, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=2, length=188
[BrokerServer] 销毁 channel 完成! channelId=C-0977501d-4608-4428-ae5c-db2738c02068
[Response] rid=R-024d2c5b-dfd0-4944-983b-2cfab56350d4, channelId=C-0977501d-4608-4428-ae5c-db2738c02068, type=2, length=192
[Connection] 收到响应! type=2, length=192
[Connection] 连接正常断开!
[BrokerServer] connection 关闭! 客户端的地址: /127.0.0.1:58925
[BrokerServer]清理session完成~ 被清理的channeId = []
[BrokerServer] 服务器停止运行!
2023-08-13 18:19:24.906  INFO 75700 --- [           main] o.apache.catalina.core.StandardService   : Stopping service [Tomcat]
2023-08-13 18:19:24.929  INFO 75700 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown initiated...
2023-08-13 18:19:24.935  INFO 75700 --- [           main] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Shutdown completed.

Process finished with exit code 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/872637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习编译系列

机器学习编译MLC 1. 引言2. 机器学习编译--概述2.1 什么是机器学习编译 1. 引言 陈天奇目前任教于CMU&#xff0c;研究方向为机器学习系统。他是TVM、MXNET、XGBoost的主要作者。2022年夏天&#xff0c;陈天奇在B站开设了《机器学习编译》的课程。   《机器学习编译》课程共分…

2023最新水果编曲软件FL Studio 21.1.0.3267音频工作站电脑参考配置单及系统配置要求

音乐在人们心中的地位日益增高&#xff0c;近几年音乐选秀的节目更是层出不穷&#xff0c;喜爱音乐&#xff0c;创作音乐的朋友们也是越来越多&#xff0c;音乐的类型有很多&#xff0c;好比古典&#xff0c;流行&#xff0c;摇滚等等。对新手友好程度基本上在首位&#xff0c;…

全网最牛,Appium自动化测试框架-关键字驱动+数据驱动实战(一)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、关键字驱动框架…

Stm32-使用TB6612驱动电机及编码器测速

这里写目录标题 起因一、电机及编码器的参数二、硬件三、接线四、驱动电机1、TB6612电机驱动2、定时器的PWM模式驱动电机 五、编码器测速1、定时器的编码器接口模式2、定时器编码器模式测速的原理3、编码器模式的配置4、编码器模式相关代码5、测速方法 六、相关问题以及解答1、…

关于Cesium的常见需求整理之点位和弹窗(点位弹窗)

一、点位上图 ①在Cesium中&#xff0c;每个自定义的地图元素被视为一个entity对象&#xff0c;如果我们要添加点位到地图上&#xff0c;那就必须先创建一个entity对象。 var entity new Cesium.Entity({position: position, });以上代码我们创建了一个entity对象&#xff0…

Autosar通信入门系列06-聊聊CAN通信的线与机制与ACK应答

本文框架 1. 概述2. CAN通信的线与机制3. ACK应答机制理解 1. 概述 本文为Autosar通信入门系列介绍&#xff0c;如您对AutosarMCAL配置&#xff0c;通信&#xff0c;诊断等实战有更高需求&#xff0c;可以参见AutoSar 实战进阶系列专栏&#xff0c;快速链接&#xff1a;AutoSa…

数据库基础(增删改查)

目录 MySQL 背景知识 数据库基础操作 1.创建数据库 2.查看所有数据库 3.选中指定的数据库 4.删除数据库 数据库表操作 MySQL的数据类型 1.创建表 3.查看指定表的结构 4.删除表 增删改 新增操作 修改(Updata) 删除语句 面试题 查询操作 指定列查询 查询的列为表达式…

系统设计:通用思路之4S分析法

1.系统设计 系统设计是一个定义系统架构、功能模块、服务及接口和数据存储等满足特定需求的过程。 与面向对象设计不同的是&#xff0c;面向对象设计通常是对于某个特定功能模块的设计&#xff0c;通常要求设计类图关系、接口关系、实现关系等涉及具体代码层面的设计&#xff…

C语言库函数之 qsort 讲解、使用及模拟实现

引入 我们在学习排序的时候&#xff0c;第一个接触到的应该都是冒泡排序&#xff0c;我们先来复习一下冒泡排序的代码&#xff0c;来作为一个铺垫和引入。 代码如下&#xff1a; #include<stdio.h>void bubble_sort(int *arr, int sz) {int i 0;for (i 0; i < sz…

基于chatgpt动手实现一个ai_translator

动手实现一个ai翻译 前言 最近在极客时间学习《AI 大模型应用开发实战营》&#xff0c;自己一边跟着学一边开发了一个进阶版本的 OpenAI-Translator&#xff0c;在这里简单记录下开发过程和心得体会&#xff0c;供有兴趣的同学参考&#xff1b; ai翻译程序 版本迭代 在学习…

C语言必会题目(2)

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; 今天继续分享C语言必会的题目&#xff0c;上一篇文章主要是一些选择题&#xff0c;而今天我们主要内容为编程题的推荐与讲解 准备好迎接下面的题了吗&#xff1f;开始发车了&#xff01;&#xff01;&#xff01; 输入…

pytest运行时参数说明,pytest详解,pytest.ini详解

一、Pytest简介 1.pytest是一个非常成熟的全功能的Python测试框架&#xff0c;主要有一下几个特点&#xff1a; 简单灵活&#xff0c;容易上手&#xff0c;支持参数化 2.能够支持简单的单元测试和复杂的功能测试&#xff0c;还可以用来做selenium、appium等自动化测试&#xf…

zookeeper案例

目录 案例一&#xff1a;服务器动态上下线 服务端&#xff1a; &#xff08;1&#xff09;先获取zookeeper连接 &#xff08;2&#xff09;注册服务器到zookeeper集群&#xff1a; &#xff08;3&#xff09;业务逻辑&#xff08;睡眠&#xff09;&#xff1a; 服务端代码…

提高生产力 | Apifox 数据结构验证最佳实践

目录 实践场景 定义返回响应 场景数据准备 校验响应数据 总结 在设计接口的过程中&#xff0c;响应数据需要和返回响应规范一一对应。这样能够确保接口的一致性和可靠性&#xff0c;并且方便接口的使用和维护&#xff0c;即使在后续迭代过程中出现问题&#xff0c;开发人员…

zabbix监控安装部署

目录 一、环境 二、配置 1.配置yum源&#xff0c;这里用的清华的 2.过滤一下安装包&#xff0c;查看依赖包 安装依赖包 3.配置数据库 开机自启 创建数据库 创建用户 授权 导入数据到数据库 查看zabbix数据库有没有表和数据 4.修改zabbix配置文件 1.修改zabbix配置…

【Java】常见面试题:多线程

文章目录 1. 谈谈进程和线程之间的区别【高频】2. java中有哪些方式来创建线程&#xff1f;3. run和start的区别【经典面试题】4. Java线程的状态5. 【线程不安全的原因】6. 就以count为例&#xff1a;一个线程加锁、一个线程不加锁&#xff0c;此时能否保证线程的安全呢&#…

client-go实战之十二:选主(leader-election)

欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)&#xff1a;https://github.com/zq2599/blog_demos 本篇概览 本文是《client-go实战》系列的第十二篇&#xff0c;又有一个精彩的知识点在本章呈现&#xff1a;选主(leader-election)在解释什么是选主之前&…

Keepalived源码安装

文章目录 Keepalived源码安装安装准备缺少OpenSSL解决方法 Keepalived 源码安装 安装准备 tar zxf keepalived-2.2.8.tar.gz /root/ ll drwxrwxr-x. 10 1000 1000 4096 Aug 9 18:29 keepalived-2.2.8 #进入目录执行以下命令查看帮助 ./configure --help #重要编译参数 -…

QT学习笔记-oracle oci数据库驱动交叉编译并移植到ARM开发板

QT学习笔记-oracle oci数据库驱动交叉编译并移植到RK3568ARM开发板 0、背景1、搭建交叉编译环境2、交叉编译过程3、把数据库驱动部署到目标系统中 0、背景 在上一文《QT学习笔记-QT安装oracle oci驱动》中介绍了在Windows环境下使用QT访问oracle数据库时遇到驱动无法加载问题的…

kingbase:数据库启动状态

1 启停KingbaseES数据库 Linux下通过系统服务&#xff1a; root用户执行&#xff1a; service kingbase8d stop/start/restart ——注册服务的情况下 Linux下通过安装用户&#xff1a; 安装用户执行&#xff1a; sys_ctl stop/start/restart -D data路径 2 查看数据库当…