RabbitMQ深度探索:简单实现 MQ

news2025/2/5 9:46:39

基于多线程队列实现 MQ :

  1. 实现类:
    public class ThreadMQ {
        private static LinkedBlockingDeque<JSONObject> broker = new LinkedBlockingDeque<JSONObject>();
    
        public static void main(String[] args) {
            //创建生产者线程
            Thread producer = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
                            Thread.sleep(1000);
                            JSONObject data = new JSONObject();
                            data.put("phone","11111111");
                            broker.offer(data);
                        }catch (Exception e){
    
                        }
                    }
                }
            },"生产者");
    
            producer.start();
            Thread consumer = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
                            JSONObject data = broker.poll();
                            if(data != null){
                                System.out.println(Thread.currentThread().getName() + data.toJSONString());
                            }
                        }catch (Exception e){
    
                        }
                    }
                }
            },"消费者");
            consumer.start();
    
        }
    }

基于 netty 实现 MQ:

  1. 执行过程:
    1. 消费者 netty 客户端与 nettyServer 端 MQ 服务器保持长连接,MQ 服务器端保存消费者连接
    2. 生产者 netty 客户端发送请求给 nettyServer 端 MQ 服务器,MQ 服务器端再将消息内容发送给消费者
  2. 执行流程:
    1. 导入 Maven 依赖:
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.62</version>
      </dependency>
      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.0.23.Final</version>
      </dependency>
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.62</version>
      </dependency>
      <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.11</version>
      </dependency>
      <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>3.6.5</version>
      </dependency>
    2. 服务端:
      package com.qcby.springboot.MQ;
      
      import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import org.apache.commons.lang3.StringUtils;
      
      import java.io.UnsupportedEncodingException;
      import java.util.ArrayList;
      import java.util.concurrent.LinkedBlockingDeque;
      
      /**
       * @ClassName BoyatopMQServer2021
       * @Author
       * @Version V1.0
       **/
      public class BoyatopNettyMQServer {
          public void bind(int port) throws Exception {
              /**
               * Netty 抽象出两组线程池BossGroup和WorkerGroup
               * BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。
               */
              EventLoopGroup bossGroup = new NioEventLoopGroup();
              EventLoopGroup workerGroup = new NioEventLoopGroup();
              ServerBootstrap bootstrap = new ServerBootstrap();
              try {
                  bootstrap.group(bossGroup, workerGroup)
                          // 设定NioServerSocketChannel 为服务器端
                          .channel(NioServerSocketChannel.class)
                          //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
                          //用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                          .option(ChannelOption.SO_BACKLOG, 100)
                          // 服务器端监听数据回调Handler
                          .childHandler(new BoyatopNettyMQServer.ChildChannelHandler());
                  //绑定端口, 同步等待成功;
                  ChannelFuture future = bootstrap.bind(port).sync();
                  System.out.println("当前服务器端启动成功...");
                  //等待服务端监听端口关闭
                  future.channel().closeFuture().sync();
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  //优雅关闭 线程组
                  bossGroup.shutdownGracefully();
                  workerGroup.shutdownGracefully();
              }
          }
      
          private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                  // 设置异步回调监听
                  ch.pipeline().addLast(new BoyatopNettyMQServer.MayiktServerHandler());
              }
          }
      
          public static void main(String[] args) throws Exception {
              int port = 9008;
              new BoyatopNettyMQServer().bind(port);
          }
      
          private static final String type_consumer = "consumer";
      
          private static final String type_producer = "producer";
          private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();
          private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();
      
          // 生产者投递消息的:topicName
          public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {
      
              /**
               * 服务器接收客户端请求
               *
               * @param ctx
               * @param data
               * @throws Exception
               */
              @Override
              protected void channelRead0(ChannelHandlerContext ctx, Object data)
                      throws Exception {
                  //ByteBuf buf=(ByteBuf)data;
                  //byte[] req = new byte[buf.readableBytes()];
                  //buf.readBytes(req);
                  //String body = new String(req, "UTF-8");
                  //System.out.println("body:"+body);
                  JSONObject clientMsg = getData(data);
                  String type = clientMsg.getString("type");
                  switch (type) {
                      case type_producer:
                          producer(clientMsg);
                          break;
                      case type_consumer:
                          consumer(ctx);
                          break;
                  }
              }
      
              private void consumer(ChannelHandlerContext ctx) {
                  // 保存消费者连接
                  ctxs.add(ctx);
                  // 主动拉取mq服务器端缓存中没有被消费的消息
                  String data = msgs.poll();
                  if (StringUtils.isEmpty(data)) {
                      return;
                  }
                  // 将该消息发送给消费者
                  byte[] req = data.getBytes();
                  ByteBuf firstMSG = Unpooled.buffer(req.length);
                  firstMSG.writeBytes(req);
                  ctx.writeAndFlush(firstMSG);
              }
      
              private void producer(JSONObject clientMsg) {
                  // 缓存生产者投递 消息
                  String msg = clientMsg.getString("msg");
                  msgs.offer(msg); //保证消息不丢失还可以缓存硬盘
      
                  //需要将该消息推送消费者
                  ctxs.forEach((ctx) -> {
                      // 将该消息发送给消费者
                      String data = msgs.poll();
                      if (data == null) {
                          return;
                      }
                      byte[] req = data.getBytes();
                      ByteBuf firstMSG = Unpooled.buffer(req.length);
                      firstMSG.writeBytes(req);
                      ctx.writeAndFlush(firstMSG);
                  });
              }
      
              private JSONObject getData(Object data) throws UnsupportedEncodingException {
                  ByteBuf buf = (ByteBuf) data;
                  byte[] req = new byte[buf.readableBytes()];
                  buf.readBytes(req);
                  String body = new String(req, "UTF-8");
                  return JSONObject.parseObject(body);
              }
      
      
              @Override
              public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                  ctx.flush();
              }
      
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                      throws Exception {
      
                  ctx.close();
              }
          }
      }
    3. 生产端:
      package com.qcby.springboot.MQ;
      
      import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      
      /**
       * @ClassName BoyatopNettyMQProducer
       * @Author
       * @Version V1.0
       **/
      public class BoyatopNettyMQProducer {
          public void connect(int port, String host) throws Exception {
              //配置客户端NIO 线程组
              EventLoopGroup group = new NioEventLoopGroup();
              Bootstrap client = new Bootstrap();
              try {
                  client.group(group)
                          // 设置为Netty客户端
                          .channel(NioSocketChannel.class)
                          /**
                           * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                           * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                           * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                           */
                          .option(ChannelOption.TCP_NODELAY, true)
                          .handler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  ch.pipeline().addLast(new BoyatopNettyMQProducer.NettyClientHandler());
                                  1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());
                              }
                          });
      
                  //绑定端口, 异步连接操作
                  ChannelFuture future = client.connect(host, port).sync();
                  //等待客户端连接端口关闭
                  future.channel().closeFuture().sync();
              } finally {
                  //优雅关闭 线程组
                  group.shutdownGracefully();
              }
          }
      
          public static void main(String[] args) {
              int port = 9008;
              BoyatopNettyMQProducer client = new BoyatopNettyMQProducer();
              try {
                  client.connect(port, "127.0.0.1");
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
          public class NettyClientHandler extends ChannelInboundHandlerAdapter {
      
      
              @Override
              public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  JSONObject data = new JSONObject();
                  data.put("type", "producer");
                  JSONObject msg = new JSONObject();
                  msg.put("userId", "123456");
                  msg.put("age", "23");
                  data.put("msg", msg);
                  // 生产发送数据
                  byte[] req = data.toJSONString().getBytes();
                  ByteBuf firstMSG = Unpooled.buffer(req.length);
                  firstMSG.writeBytes(req);
                  ctx.writeAndFlush(firstMSG);
              }
      
              /**
               * 客户端读取到服务器端数据
               *
               * @param ctx
               * @param msg
               * @throws Exception
               */
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  ByteBuf buf = (ByteBuf) msg;
                  byte[] req = new byte[buf.readableBytes()];
                  buf.readBytes(req);
                  String body = new String(req, "UTF-8");
                  System.out.println("客户端接收到服务器端请求:" + body);
              }
      
              // tcp属于双向传输
      
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  ctx.close();
              }
          }
      }
    4. 客户端:
      package com.qcby.springboot.MQ;
      
      import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      
      /**
       * @ClassName BoyatopNettyMQProducer
       * @Author
       * @Version V1.0
       **/
      public class NettyMQConsumer {
          public void connect(int port, String host) throws Exception {
              //配置客户端NIO 线程组
              EventLoopGroup group = new NioEventLoopGroup();
              Bootstrap client = new Bootstrap();
              try {
                  client.group(group)
                          // 设置为Netty客户端
                          .channel(NioSocketChannel.class)
                          /**
                           * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                           * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                           * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                           */
                          .option(ChannelOption.TCP_NODELAY, true)
                          .handler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  ch.pipeline().addLast(new NettyMQConsumer.NettyClientHandler());
                                  1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());
                              }
                          });
      
                  //绑定端口, 异步连接操作
                  ChannelFuture future = client.connect(host, port).sync();
                  //等待客户端连接端口关闭
                  future.channel().closeFuture().sync();
              } finally {
                  //优雅关闭 线程组
                  group.shutdownGracefully();
              }
          }
      
          public static void main(String[] args) {
              int port = 9008;
              NettyMQConsumer client = new NettyMQConsumer();
              try {
                  client.connect(port, "127.0.0.1");
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
          public class NettyClientHandler extends ChannelInboundHandlerAdapter {
      
      
              @Override
              public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  JSONObject data = new JSONObject();
                  data.put("type", "consumer");
                  // 生产发送数据
                  byte[] req = data.toJSONString().getBytes();
                  ByteBuf firstMSG = Unpooled.buffer(req.length);
                  firstMSG.writeBytes(req);
                  ctx.writeAndFlush(firstMSG);
              }
      
              /**
               * 客户端读取到服务器端数据
               *
               * @param ctx
               * @param msg
               * @throws Exception
               */
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  ByteBuf buf = (ByteBuf) msg;
                  byte[] req = new byte[buf.readableBytes()];
                  buf.readBytes(req);
                  String body = new String(req, "UTF-8");
                  System.out.println("客户端接收到服务器端请求:" + body);
              }
      
              // tcp属于双向传输
      
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  ctx.close();
              }
          }
      }
  3. 持久化机制:
    1. 如果 MQ 接收到生产者投递信息,如果消费者不存在的情况下,消息是否会丢失?
    2. 答:不会丢失,消息确认机制必须要消费者消费成功之后,在通知给 MQ 服务器端,删除该消息
  4. MQ 服务器将该消息推送给消费者:
    1. 消费者已经和 MQ 服务器保持长连接
    2. 消费者在第一次启动的时候会主动拉取信息
  5. MQ 如何实现高并发思想:
    1. MQ 消费者根据自身能力情况,拉取 MQ 服务器端消费消息
    2. 默认的情况下取出一条消息
  6. 缺点:
    1. 存在延迟问题
  7. 需要考虑 MQ 消费者提高速率的问题:
    1. 如何提高消费者速率:消费者实现集群、消费者批量获取消息即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2292239.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

React+AI 技术栈(2025 版)

文章目录 核心&#xff1a;React TypeScript元框架&#xff1a;Next.js样式设计&#xff1a;Tailwind CSSshadcn/ui客户端状态管理&#xff1a;Zustand服务器状态管理&#xff1a;TanStack Query动画效果&#xff1a;Motion测试工具表格处理&#xff1a;TanStack Table表单处理…

计算机从何而来?计算技术将向何处发展?

计算机的前生&#xff1a;机械计算工具的演进 算盘是计算机的起点&#xff0c;它其实是一台“机械式半自动化运算器”。打算盘的“口诀”其实就是它的编程语言&#xff0c;算盘珠就是它的存储器。 第二阶段是可以做四则运算的加法器、乘法器。1642年&#xff0c;法国数学家帕斯…

Docker使用指南(二)——容器相关操作详解(实战案例教学,创建/使用/停止/删除)

目录 1.容器操作相关命令​编辑 案例一&#xff1a; 案例二&#xff1a; 容器常用命令总结&#xff1a; 1.查看容器状态&#xff1a; 2.删除容器&#xff1a; 3.进入容器&#xff1a; 二、Docker基本操作——容器篇 1.容器操作相关命令 下面我们用两个案例来具体实操一…

从通讯工具到 AI 助理,AI手机如何发展?

随着AI进军各行各业&#xff0c;全面AI化时代已经到来。手机&#xff0c;作为现代人类的“数字器官”之一&#xff0c;更是首当其冲地融入了这一变革浪潮之中。 2024年年初&#xff0c;OPPO联合IDC发布了《AI手机白皮书》&#xff0c;公布OPPO已迈向AI手机这一全新阶段。到如今…

小程序-基础加强

前言 这一节把基础加强讲完 1. 导入需要用到的小程序项目 2. 初步安装和使用vant组件库 这里还可以扫描二维码 其中步骤四没什么用 右键选择最后一个 在开始之前&#xff0c;我们的项目根目录得有package.json 没有的话&#xff0c;我们就初始化一个 但是我们没有npm这个…

【CSS】谈谈你对BFC的理解

理解 CSS 中的 BFC&#xff08;块格式化上下文&#xff09; 在 CSS 中&#xff0c;BFC&#xff08;Block Formatting Context&#xff09; 是一个非常重要的概念&#xff0c;它决定了元素如何对其子元素进行定位&#xff0c;以及与其他元素的关系。理解 BFC 对于解决常见的布局…

【Uniapp-Vue3】iconfont图标库的使用

先在iconfont图标库中将需要的图标加入购物车 点击右侧购物车的图标 点击添加至项目&#xff0c;可以选中项目进行加入&#xff0c;也可以点击文件加号创建一个新的项目并添加 加入以后会来到如下界面&#xff0c;点击下载至本地 双击打开下载的.zip文件 将.css和.ttf文件进…

Linux find 命令 | grep 命令 | 查找 / 列出文件或目录路径 | 示例

注&#xff1a;本文为 “Linux find 命令 | grep 命令使用” 相关文章合辑。 未整理去重。 如何在 Linux 中查找文件 作者&#xff1a; Lewis Cowles 译者&#xff1a; LCTT geekpi | 2018-04-28 07:09 使用简单的命令在 Linux 下基于类型、内容等快速查找文件。 如果你是 W…

Day 28 卡玛笔记

这是基于代码随想录的每日打卡 77. 组合 给定两个整数 n 和 k&#xff0c;返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 示例 1&#xff1a; 输入&#xff1a;n 4, k 2 输出&#xff1a; [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4], ]示例 2…

1.PPT:天河二号介绍【12】

目录 NO1 NO2.3.4.5 NO6.7.8.9​ NO1 PPT&#xff1a;新建一个空白演示文档→保存到考生文件夹下&#xff1a;天河二号超级计算机.pptx幻灯片必须选择一种设计主题&#xff1a;设计→主题&#xff08;随便选中一种&#xff09;幻灯片的版式&#xff1a;开始→版式&#x…

物联网领域的MQTT协议,优势和应用场景

MQTT&#xff08;Message Queuing Telemetry Transport&#xff09;作为轻量级发布/订阅协议&#xff0c;凭借其低带宽消耗、低功耗与高扩展性&#xff0c;已成为物联网通信的事实标准。其核心优势包括&#xff1a;基于TCP/IP的异步通信机制、支持QoS&#xff08;服务质量&…

电控---中断

中断 1.处理器系统在执行代码的时候&#xff0c;会从存储器依次取出指令和数据&#xff0c;这种能力需要在处理器里保存一个存储器地址&#xff0c;就是所谓的程序计数器&#xff08;Program Counter,PC&#xff09;&#xff0c;也叫程序指针 2.当外部中断&#xff08;Extern …

动态规划DP 背包问题 多重背包问题(朴素版+二进制优化+单调队列)

概览检索 动态规划DP 概览&#xff08;点击链接跳转&#xff09; 动态规划DP 背包问题 概览&#xff08;点击链接跳转&#xff09; 多重背包问题1 原题链接 AcWiing 4. 多重背包问题1 题目描述 有 N种物品和一个容量是 V的背包。 第 i 种物品最多有 si件&#xff0c;每件体…

Golang 并发机制-5:详解syn包同步原语

并发性是现代软件开发的一个基本方面&#xff0c;Go&#xff08;也称为Golang&#xff09;为并发编程提供了一组健壮的工具。Go语言中用于管理并发性的重要包之一是“sync”包。在本文中&#xff0c;我们将概述“sync”包&#xff0c;并深入研究其最重要的同步原语之一&#xf…

排序算法与查找算法

1.十大经典排序算法 我们希望数据以一种有序的形式组织起来&#xff0c;无序的数据我们要尽量将其变得有序 一般说来有10种比较经典的排序算法 简单记忆为Miss D----D小姐 时间复杂度 &#xff1a;红色<绿色<蓝色 空间复杂度&#xff1a;圆越大越占空间 稳定性&…

数据结构课程设计(三)构建决策树

3 决策树 3.1 需求规格说明 【问题描述】 ID3算法是一种贪心算法&#xff0c;用来构造决策树。ID3算法起源于概念学习系统&#xff08;CLS&#xff09;&#xff0c;以信息熵的下降速度为选取测试属性的标准&#xff0c;即在每个节点选取还尚未被用来划分的具有最高信息增益的…

python-leetcode-二叉树的层序遍历

102. 二叉树的层序遍历 - 力扣&#xff08;LeetCode&#xff09; # Definition for a binary tree node. # class TreeNode: # def __init__(self, val0, leftNone, rightNone): # self.val val # self.left left # self.right right from coll…

毕业设计:基于深度学习的高压线周边障碍物自动识别与监测系统

目录 前言 课题背景和意义 实现技术思路 一、算法理论基础 1.1 卷积神经网络 1.2 目标检测算法 1.3 注意力机制 二、 数据集 2.1 数据采集 2.2 数据标注 三、实验及结果分析 3.1 实验环境搭建 3.2 模型训练 3.2 结果分析 最后 前言 &#x1f4c5;大四是整个大学…

【Hadoop】Hadoop的HDFS

这里写目录标题 HDFS概述HDFS产出背景及定义HDFS产生背景HDFS定义 HDFS优缺点HDFS优点HDFS缺点 HDFS组成架构HDFS文件块大小 HDFS的Shell操作常用命令实操准备工作上传下载HDFS直接操作 HDFS的API操作客户端环境准备HDFS的API案例实操HDFS文件上传HDFS文件下载HDFS文件更名和移…

C++ Primer 迭代器

欢迎阅读我的 【CPrimer】专栏 专栏简介&#xff1a;本专栏主要面向C初学者&#xff0c;解释C的一些基本概念和基础语言特性&#xff0c;涉及C标准库的用法&#xff0c;面向对象特性&#xff0c;泛型特性高级用法。通过使用标准库中定义的抽象设施&#xff0c;使你更加适应高级…