【开源项目】消息队列XXL-MQ源码解析

news2025/1/10 23:41:05

消息队列XXL-MQ源码解析

项目介绍

XXL-MQ是一款轻量级分布式消息队列,拥有 “水平扩展、高可用、海量数据堆积、单机TPS过10万、毫秒级投递” 等特性, 支持 “并发消息、串行消息、广播消息、延迟消息、事务消费、失败重试、超时控制” 等消息特性。现已开放源代码,开箱即用。

源码地址

https://gitee.com/xuxueli0323/xxl-mq
在这里插入图片描述

源码解析

Admin启动

  1. 服务端启动。XxlMqBrokerImpl实现了InitializingBean,重写了afterPropertiesSet方法。
    @Override
    public void afterPropertiesSet() throws Exception {
        // init server
        initServer();

        // init thread
        initThead();
    }

  1. 初始化服务器,XxlMqBrokerImpl#initServer。添加providerFactory.addService(IXxlMqBroker.class.getName(), null, this);。将IXxlMqBroker的类名作为key,将this对象作为value存到serviceMap中。
    public void initServer() throws Exception {

        // address, static registry
        ip = (ip!=null&&ip.trim().length()>0)?ip:IpUtil.getIp();
        String address = IpUtil.getIpPort(ip, port);

        XxlCommonRegistryData xxlCommonRegistryData = new XxlCommonRegistryData();
        xxlCommonRegistryData.setKey(IXxlMqBroker.class.getName());
        xxlCommonRegistryData.setValue(address);
        XxlCommonRegistryServiceImpl.staticRegistryData = xxlCommonRegistryData;


        // init server
        providerFactory = new XxlRpcProviderFactory();
        providerFactory.initConfig(NetEnum.NETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, null, null, null);

        // add server
        providerFactory.addService(IXxlMqBroker.class.getName(), null, this);

        // start server
        providerFactory.start();
    }
	public void addService(String iface, String version, Object serviceBean){
		String serviceKey = makeServiceKey(iface, version);
		serviceData.put(serviceKey, serviceBean);

		logger.info(">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}", serviceKey, serviceBean.getClass());
	}
  1. 因为网络类型是NetEnum.NETTY,所以会获取到NettyServerserver = netType.serverClass.newInstance();。执行NettyServer#start。该服务器使用NettyServerHandler作为消息的处理器。当有消息过来,调用xxlRpcProviderFactory.invokeService(xxlRpcRequest),后面细讲。
    @Override
    public void channelRead0(final ChannelHandlerContext ctx, final XxlRpcRequest xxlRpcRequest) throws Exception {

        try {
            // do invoke
            serverHandlerPool.execute(new Runnable() {
                @Override
                public void run() {
                    // invoke + response
                    XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);

                    ctx.writeAndFlush(xxlRpcResponse);
                }
            });
        } catch (Exception e) {
            // catch error
            XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
            xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
            xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e));

            ctx.writeAndFlush(xxlRpcResponse);
        }
    }
  1. 服务器启动,还会初始化线程,XxlMqBrokerImpl#initThead,主要是将newMessageQueue中的消息存储到数据库中。
try {
                            XxlMqMessage message = newMessageQueue.take();
                            if (message != null) {
                                // load
                                List<XxlMqMessage> messageList = new ArrayList<>();
                                messageList.add(message);

                                List<XxlMqMessage> otherMessageList = new ArrayList<>();
                                int drainToNum = newMessageQueue.drainTo(otherMessageList, 100);
                                if (drainToNum > 0) {
                                    messageList.addAll(otherMessageList);
                                }

                                // save
                                xxlMqMessageDao.save(messageList);
                            }
                        } catch (Exception e) {
                            if (!executorStoped) {
                                logger.error(e.getMessage(), e);
                            }
                        }

客户端启动

  1. XxlMqSpringClientFactory实现了ApplicationContextAware,重写方法setApplicationContext。找到带有@MqConsumer注解的Bean,存储到serviceMap;另外,初始化xxlMqClientFactory
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

        // load consumer from spring
        List<IMqConsumer> consumerList = new ArrayList<>();

        Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(MqConsumer.class);
        if (serviceMap!=null && serviceMap.size()>0) {
            for (Object serviceBean : serviceMap.values()) {
                if (serviceBean instanceof IMqConsumer) {
                    consumerList.add((IMqConsumer) serviceBean);
                }
            }
        }

        // init
        xxlMqClientFactory = new XxlMqClientFactory();

        xxlMqClientFactory.setAdminAddress(adminAddress);
        xxlMqClientFactory.setAccessToken(accessToken);
        xxlMqClientFactory.setConsumerList(consumerList);

        xxlMqClientFactory.init();
    }
  1. XxlMqSpringClientFactory初始化。
    public void init() {
        // pre : valid consumer
        validConsumer();

        // start BrokerService
        startBrokerService();

        // start consumer
        startConsumer();
    }
  1. XxlMqClientFactory#validConsumer,校验消费者是否合法。如果是合法的,会封装成ConsumerThread,加入consumerRespository
    private void validConsumer(){
        // valid
        if (consumerList==null || consumerList.size()==0) {
            logger.warn(">>>>>>>>>>> xxl-mq, MqConsumer not found.");
            return;
        }

        // make ConsumerThread
        for (IMqConsumer consumer : consumerList) {

            // valid annotation
            MqConsumer annotation = consumer.getClass().getAnnotation(MqConsumer.class);
            if (annotation == null) {
                throw new RuntimeException("xxl-mq, MqConsumer("+ consumer.getClass() +"), annotation is not exists.");
            }

            // valid group
            if (annotation.group()==null || annotation.group().trim().length()==0) {
                // empty group means consume broadcase message, will replace by uuid
                try {
                    // annotation memberValues
                    InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
                    Field mValField = invocationHandler.getClass().getDeclaredField("memberValues");
                    mValField.setAccessible(true);
                    Map memberValues = (Map) mValField.get(invocationHandler);

                    // set data for "group"
                    String randomGroup = UUID.randomUUID().toString().replaceAll("-", "");
                    memberValues.put("group", randomGroup);
                } catch (Exception e) {
                    throw new RuntimeException("xxl-mq, MqConsumer("+ consumer.getClass() +"), group empty and genereta error.");
                }

            }
            if (annotation.group()==null || annotation.group().trim().length()==0) {
                throw new RuntimeException("xxl-mq, MqConsumer("+ consumer.getClass() +"),group is empty.");
            }

            // valid topic
            if (annotation.topic()==null || annotation.topic().trim().length()==0) {
                throw new RuntimeException("xxl-mq, MqConsumer("+ consumer.getClass() +"), topic is empty.");
            }

            // consumer map
            consumerRespository.add(new ConsumerThread(consumer));
        }
    }
  1. XxlMqClientFactory#startBrokerServiceXxlRpcInvokerFactory#start,设置了xxlMqBroker,开启了多个线程。
    public void startBrokerService() {
        // init XxlRpcInvokerFactory
        xxlRpcInvokerFactory = new XxlRpcInvokerFactory(XxlRegistryServiceRegistry.class, new HashMap<String, String>(){{
            put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, adminAddress);
            put(XxlRegistryServiceRegistry.ACCESS_TOKEN, accessToken);
        }});
        try {
            xxlRpcInvokerFactory.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

        // init ConsumerRegistryHelper
        XxlRegistryServiceRegistry commonServiceRegistry = (XxlRegistryServiceRegistry) xxlRpcInvokerFactory.getServiceRegistry();
        consumerRegistryHelper = new ConsumerRegistryHelper(commonServiceRegistry);

        // init IXxlMqBroker
        xxlMqBroker = (IXxlMqBroker) new XxlRpcReferenceBean(
                NetEnum.NETTY,
                Serializer.SerializeEnum.HESSIAN.getSerializer(),
                CallType.SYNC,
                LoadBalance.ROUND,
                IXxlMqBroker.class,
                null,
                10000,
                null,
                null,
                null,
                xxlRpcInvokerFactory).getObject();
        // ...添加线程处理
    }
  1. XxlRpcInvokerFactory#start,主要初始化了XxlRegistryClient。服务注册调用的是"/api/registry",另外开启发现进程,调用接口/api/monitor,如果获取到true,会调用"/api/discovery"接口,会将主题的数据存储到discoveryData

  2. xxlMqBroker = (IXxlMqBroker) new XxlRpcReferenceBean(...).getObject();。构造方法中初始化了NettyClient,当需要服务发现的时候,用netty进行通信。用了动态代理。

  3. XxlMqClientFactory#startConsumer,注册消费者,开启线程。

    private void startConsumer() {

        // valid
        if (consumerRespository ==null || consumerRespository.size()==0) {
            return;
        }

        // registry consumer
        getConsumerRegistryHelper().registerConsumer(consumerRespository);

        // execute thread
        for (ConsumerThread item: consumerRespository) {
            clientFactoryThreadPool.execute(item);
            logger.info(">>>>>>>>>>> xxl-mq, consumer init success, , topic:{}, group:{}", item.getMqConsumer().topic(), item.getMqConsumer().group());
        }

    }

发消息

  1. 发送消息。会调用XxlMqClientFactory#addMessages
		String topic = "topic_1";
		String data = "时间戳:" + System.currentTimeMillis();
		XxlMqProducer.produce(new XxlMqMessage(topic, data));
  1. XxlMqClientFactory#addMessages,没有指定则是异步,添加到newMessageQueue
    public static void addMessages(XxlMqMessage mqMessage, boolean async){
        if (async) {
            // async queue, mult send
            newMessageQueue.add(mqMessage);
        } else {
            // sync rpc, one send
            xxlMqBroker.addMessages(Arrays.asList(mqMessage));
        }

    }
  1. 客户端消费队列,还是调用xxlMqBroker.addMessages(messageList);xxlMqBroker是代理对象,会封装成XxlRpcRequest发送给服务端。
try {
                            XxlMqMessage message = newMessageQueue.take();
                            if (message != null) {
                                // load
                                List<XxlMqMessage> messageList = new ArrayList<>();
                                messageList.add(message);

                                List<XxlMqMessage> otherMessageList = new ArrayList<>();
                                int drainToNum = newMessageQueue.drainTo(otherMessageList, 100);
                                if (drainToNum > 0) {
                                    messageList.addAll(otherMessageList);
                                }

                                // save
                                xxlMqBroker.addMessages(messageList);
                            }
                        } catch (Exception e) {
                            if (!XxlMqClientFactory.clientFactoryPoolStoped) {
                                logger.error(e.getMessage(), e);
                            }
                        }
  1. 服务端xxlRpcProviderFactory.invokeService(xxlRpcRequest),根据xxlRpcRequest的方法名和类名,获取到对应的bean,反射调用方法。即执行XxlMqBrokerImpl#addMessages
    @Override
    public int addMessages(List<XxlMqMessage> messages) {
        newMessageQueue.addAll(messages);
        return messages.size();
    }
  1. 服务端消费队列,存储到数据库中。
                        try {
                            XxlMqMessage message = newMessageQueue.take();
                            if (message != null) {
                                // load
                                List<XxlMqMessage> messageList = new ArrayList<>();
                                messageList.add(message);

                                List<XxlMqMessage> otherMessageList = new ArrayList<>();
                                int drainToNum = newMessageQueue.drainTo(otherMessageList, 100);
                                if (drainToNum > 0) {
                                    messageList.addAll(otherMessageList);
                                }

                                // save
                                xxlMqMessageDao.save(messageList);
                            }
                        } catch (Exception e) {
                            if (!executorStoped) {
                                logger.error(e.getMessage(), e);
                            }
                        }

监听消息

  1. 核心就是ConsumerThread。判断当前线程是否活跃,ConsumerRegistryHelper.ActiveInfo activeInfo = XxlMqClientFactory.getConsumerRegistryHelper().isActice(this);,用于判断是否有新内容。
  2. ConsumerRegistryHelper#isActice方法中,会执行TreeSet<String> onlineConsumerSet = serviceRegistry.discovery(registryKey);,主要用于获取缓存discoveryDatadiscoveryData获取前,会判断主题。
  3. 拉取消息列表,List<XxlMqMessage> messageList = XxlMqClientFactory.getXxlMqBroker().pullNewMessage(mqConsumer.topic(), mqConsumer.group(), activeInfo.rank, activeInfo.total, 100);。同样,XxlMqClientFactory.getXxlMqBroker()获取到的是代理类,会调用netty发送远程请求,服务端执行XxlMqBrokerImpl#pullNewMessage。服务端是从数据库中获取消息列表。
    @Override
    public List<XxlMqMessage> pullNewMessage(String topic, String group, int consumerRank, int consumerTotal, int pagesize) {
        List<XxlMqMessage> list = xxlMqMessageDao.pullNewMessage(XxlMqMessageStatus.NEW.name(), topic, group, consumerRank, consumerTotal, pagesize);
        return list;
    }
		SELECT <include refid="Base_Column_List" />
		FROM xxl_mq_message AS t
		WHERE 	t.topic = #{topic}
			AND t.group = #{group}
			AND t.status = #{newStatus}
			AND t.effectTime <![CDATA[ < ]]> NOW()
    `

服务发现

  1. 客户端执行XxlRegistryBaseClient#discovery
  2. admin服务端提供发现接口,ApiController#discovery,主要是读取文件, String fileName = registryDataFilePath.concat(File.separator).concat(key).concat(".properties");,本地路径:D:\data\applogs\xxl-mq\registrydata

服务注册

  1. ApiController#registry,主要是往XxlCommonRegistryServiceImpl#registryQueue里面添加信息。
  2. XxlCommonRegistryServiceImpl实现了InitializingBean,重写XxlCommonRegistryServiceImpl#afterPropertiesSet中启动线程处理registryQueue,保存注册信息入库。
                    try {
                        XxlCommonRegistryData xxlCommonRegistryData = registryQueue.take();
                        if (xxlCommonRegistryData !=null) {

                            // refresh or add
                            int ret = xxlCommonRegistryDataDao.refresh(xxlCommonRegistryData);
                            if (ret == 0) {
                                xxlCommonRegistryDataDao.add(xxlCommonRegistryData);
                            }

                            // valid file status
                            XxlCommonRegistry fileXxlCommonRegistry = getFileRegistryData(xxlCommonRegistryData);
                            if (fileXxlCommonRegistry!=null && fileXxlCommonRegistry.getDataList().contains(xxlCommonRegistryData.getValue())) {
                                continue;     // "Repeated limited."
                            }

                            // checkRegistryDataAndSendMessage
                            checkRegistryDataAndSendMessage(xxlCommonRegistryData);
                        }
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }
  1. 处理注册信息,保存到文件中。
                    try {
                        // new message, filter readed
                        List<XxlCommonRegistryMessage> messageList = xxlCommonRegistryMessageDao.findMessage(readedMessageIds);
                        if (messageList!=null && messageList.size()>0) {
                            for (XxlCommonRegistryMessage message: messageList) {
                                readedMessageIds.add(message.getId());

                                // from registry、add、update、deelete,ne need sync from db, only write
                                XxlCommonRegistry xxlCommonRegistry = JacksonUtil.readValue(message.getData(), XxlCommonRegistry.class);

                                // default, sync from db (aready sync before message, only write)

                                // sync file
                                setFileRegistryData(xxlCommonRegistry);
                            }
                        }

                        // clean old message;
                        if (System.currentTimeMillis() % registryBeatTime ==0) {
                            xxlCommonRegistryMessageDao.cleanMessage(10);
                            readedMessageIds.clear();
                        }
                    } catch (Exception e) {
                        if (!executorStoped) {
                            logger.error(e.getMessage(), e);
                        }
                    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/129988.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

专栏目录总览

文章目录摘要1. Backbone2. Neck3. Bottleneck4. Head5.GAP或者avgpool&#xff1a;6.Embedding摘要 梳理了一些长见的名词&#xff0c;方便大家够好的理解论文和Ai方向的文章。 1. Backbone 骨干网络或者说是主干网络&#xff0c;指的是提取特征的网络&#xff0c;其作用就…

Bean 作用域,Bean生命周期,Bean执行原理

Spring 是⽤来读取和存储 Bean&#xff0c;因此在 Spring 中 Bean 是最核⼼的操作资源&#xff0c;所以接下来我们深⼊学习⼀下 Bean 对象. 1.通过⼀个案例来看 Bean 作⽤域的问题 假设现在有⼀个公共的 Bean&#xff0c;提供给 A ⽤户和 B ⽤户使⽤&#xff0c;然⽽在使⽤的…

线程池的原理和使用

ThreadPoolExecutor 为什么用线程池 线程池做的主要工作就是控制运行的线程的数量&#xff0c;处理过程中&#xff0c;将任务放入到队列中&#xff0c;然后线程创建后&#xff0c;启动这些任务&#xff0c;如果线程数量超过了最大数量的线程排队等候&#xff0c;等其它线程执…

7.移动端笔记-less基础

1.css的弊端 CSS需要书写大量的看似没有逻辑的代码&#xff0c;冗余度高不方便维护&#xff0c;不利于复用没有很好的计算能力 2.Less介绍 简单说&#xff1a;Less是CSS预处理语言&#xff0c;扩展了CSS的动态性 CSS的扩展语言&#xff0c;也成为CSS的预处理器。在CSS基础上…

论文精读:Centernet:Objects as Points

论文地址:https://arxiv.org/pdf/1904.07850.pdf 代码地址:https://github. com/xingyizhou/CenterNet. Abstract 基于anchor的目标检测算法通常会列举大量可能存在对象位置的列表&#xff0c;这是浪费的、低效的。作者采用了一种不同的方法。将一个对象建模为单个点——其边…

魔术小游戏

魔术游戏一、问题描述二、基本流程三、具体步骤1.在集合中随机生成起始牌2.菜单栏3.找到包含[选中牌]的一组牌在大集合中的索引4.洗牌5.发牌四、完整代码五、效果展示一、问题描述 这是一个魔术游戏&#xff0c;将15张牌分为三组&#xff0c;每组5张&#xff0c;让玩家从中任选…

填鸭表单|2022年度总结功能发布

自从我们在2020年发布了开源版本以来&#xff0c;我们结识了许多社区伙伴。在和这些社区伙伴的接触中&#xff0c;我们深刻地感受到了“做产品的感觉&#xff0c;令人振奋且充满激情”。 我们认为&#xff0c;专注于做一件事情&#xff0c;持续深耕&#xff0c;时间自然会给出…

C#语言实例源码系列-实现对文件进行加密保护

专栏分享点击跳转>Unity3D特效百例点击跳转>案例项目实战源码点击跳转>游戏脚本-辅助自动化点击跳转>Android控件全解手册 &#x1f449;关于作者 众所周知&#xff0c;人生是一个漫长的流程&#xff0c;不断克服困难&#xff0c;不断反思前进的过程。在这个过程中…

jmeter压测使用实践

环境搭建篇见https://blog.csdn.net/weixin_42498050/article/details/12847945 参考Jmter压测使用实践 jmeter压测实战总结 搭建 Apache Jmeter 分布式压测与监控 Jmeter常用断言 1. 添加线程组 测试计划 &#xff08;右键->添加->Threads&#xff08;Users&#x…

做了这么久的自动化测试现在才知道API 接口测试还能...

接口测试作为最常用的集成测试方法的一部分&#xff0c;通过直接调用被测试的接口来确定系统在功能性、可靠性、安全性和性能方面是否能达到预期&#xff0c;有些情况是功能测试无法覆盖的&#xff0c;所以接口测试是非常必要的。首先需要对接口测试的基本信息做一些了解&#…

Linux如何安装BeyondCompare

博客主页&#xff1a;https://tomcat.blog.csdn.net 博主昵称&#xff1a;农民工老王 主要领域&#xff1a;Java、Linux、K8S 期待大家的关注&#x1f496;点赞&#x1f44d;收藏⭐留言&#x1f4ac; 目录安装yumtar.gz使用示例BeyondCompare是一款广受好评的文本对比工具。本…

文件上传漏洞渗透与攻防(一)

目录 前言 文件上传漏洞原理 Webshell介绍 一句话木马&#xff1a; 小马&#xff1a; 大马&#xff1a; Webshell集合&#xff1a; 网站控制工具 文件上传漏洞危害 文件上传漏洞靶场练习 Pass-01 Pass-02 Pass-03 Pass-04 Pass-06 Pass-07 Pass-08 Pass-09 Pass-10 Pas…

Java并发编程(二)

线程方法 API Thread 类 API&#xff1a; 方法说明public void start()启动一个新线程&#xff0c;Java虚拟机调用此线程的 run 方法public void run()线程启动后调用该方法public void setName(String name)给当前线程取名字public void getName()获取当前线程的名字 线程存…

实战演练 | 使用 Navicat Premium 自动运行数据库复制

与同步&#xff08;使两个数据库的模式和数据同步的一次性过程&#xff09;不同&#xff0c;复制是一个连续&#xff08;自动&#xff09;在两个数据库之间重现数据的过程&#xff08;尽管模式更新也是可能的&#xff09;。复制可以异步完成&#xff0c;因此不需要永久连接两个…

【Lniux】目录的权限,默认权限,粘滞位详细讲解

大家好&#xff0c;今天详细讲解一些关于目录权限的细节 很多细节都是通过问答方式&#xff0c;希望大家可以先自己思考一下答案然后再听我的分析 欢迎指正错误&#xff0c;我们共同成长 目录 1.目录的权限 2.默认权限 3.粘滞位 1.目录的权限 如果我们要进图一个目录只需要…

ArcGIS基础实验操作100例--实验25统一多分辨率栅格数据

本实验专栏来自于汤国安教授《地理信息系统基础实验操作100例》一书 实验平台&#xff1a;ArcGIS 10.6 实验数据&#xff1a;请访问实验1&#xff08;传送门&#xff09; 基础编辑篇--实验25 统一多分辨率栅格数据 目录 一、实验背景 二、实验数据 三、实验步骤 &#xff0…

springboot admin-server的使用

指标监控可视化文档&#xff1a; 用于管理 Spring Boot 应用程序的管理 UI Spring Boot Admin Reference Guide 一、创建项目 就勾选Spring Web项目即可 二、基础设置 (1) 依赖引入 <dependency><groupId>de.codecentric</groupId><artifactId>sp…

Android: Binder: 彻底顿悟Android Binder

Binder机制可谓是Android 知识体系的重中之中&#xff0c;作为偏底层的基础组件&#xff0c;平时我们很少关注它&#xff0c;但是它却无处不在&#xff0c;这也是android面试考察点之一&#xff0c;本篇将从流程上将Binder通信过一遍。 文章目录 1&#xff1a;Binder作用 2&…

STM32F7-Discovery使用ITM作为调试工具

关于代码的调试手段&#xff0c;我在自己的一篇文章(http://bbs.ickey.cn/index.php?appgroup&actopic&id54944链接中的《STM32F030 Nucleo-开发调试的经验USART的重要性.pdf》)中已经详细谈到&#xff0c;为什么在调试中我们通常使用J-Link或ULINK或ST-Link(ST)或Ope…

机器学习——细节补充

1.matplotlib与seaborn的区别 来源&#xff1a;https://geek-docs.com/matplotlib/matplotlib-ask-answer/difference-between-matplotlib-and-seaborn.html 2.%matplotlib inline使图片嵌入notebook&#xff0c;而不需要使用show()方法 3.IPython与python&#xff1a;IPyth…