消息队列XXL-MQ源码解析
项目介绍
XXL-MQ是一款轻量级分布式消息队列,拥有 “水平扩展、高可用、海量数据堆积、单机TPS过10万、毫秒级投递” 等特性, 支持 “并发消息、串行消息、广播消息、延迟消息、事务消费、失败重试、超时控制” 等消息特性。现已开放源代码,开箱即用。
源码地址
https://gitee.com/xuxueli0323/xxl-mq
源码解析
Admin启动
- 服务端启动。XxlMqBrokerImpl实现了
InitializingBean
,重写了afterPropertiesSet方法。
@Override
public void afterPropertiesSet() throws Exception {
// init server
initServer();
// init thread
initThead();
}
- 初始化服务器,
XxlMqBrokerImpl#initServer
。添加providerFactory.addService(IXxlMqBroker.class.getName(), null, this);
。将IXxlMqBroker
的类名作为key,将this对象作为value存到serviceMap中。
public void initServer() throws Exception {
// address, static registry
ip = (ip!=null&&ip.trim().length()>0)?ip:IpUtil.getIp();
String address = IpUtil.getIpPort(ip, port);
XxlCommonRegistryData xxlCommonRegistryData = new XxlCommonRegistryData();
xxlCommonRegistryData.setKey(IXxlMqBroker.class.getName());
xxlCommonRegistryData.setValue(address);
XxlCommonRegistryServiceImpl.staticRegistryData = xxlCommonRegistryData;
// init server
providerFactory = new XxlRpcProviderFactory();
providerFactory.initConfig(NetEnum.NETTY, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, null, null, null);
// add server
providerFactory.addService(IXxlMqBroker.class.getName(), null, this);
// start server
providerFactory.start();
}
public void addService(String iface, String version, Object serviceBean){
String serviceKey = makeServiceKey(iface, version);
serviceData.put(serviceKey, serviceBean);
logger.info(">>>>>>>>>>> xxl-rpc, provider factory add service success. serviceKey = {}, serviceBean = {}", serviceKey, serviceBean.getClass());
}
- 因为网络类型是
NetEnum.NETTY
,所以会获取到NettyServer
,server = netType.serverClass.newInstance();
。执行NettyServer#start
。该服务器使用NettyServerHandler
作为消息的处理器。当有消息过来,调用xxlRpcProviderFactory.invokeService(xxlRpcRequest)
,后面细讲。
@Override
public void channelRead0(final ChannelHandlerContext ctx, final XxlRpcRequest xxlRpcRequest) throws Exception {
try {
// do invoke
serverHandlerPool.execute(new Runnable() {
@Override
public void run() {
// invoke + response
XxlRpcResponse xxlRpcResponse = xxlRpcProviderFactory.invokeService(xxlRpcRequest);
ctx.writeAndFlush(xxlRpcResponse);
}
});
} catch (Exception e) {
// catch error
XxlRpcResponse xxlRpcResponse = new XxlRpcResponse();
xxlRpcResponse.setRequestId(xxlRpcRequest.getRequestId());
xxlRpcResponse.setErrorMsg(ThrowableUtil.toString(e));
ctx.writeAndFlush(xxlRpcResponse);
}
}
- 服务器启动,还会初始化线程,
XxlMqBrokerImpl#initThead
,主要是将newMessageQueue
中的消息存储到数据库中。
try {
XxlMqMessage message = newMessageQueue.take();
if (message != null) {
// load
List<XxlMqMessage> messageList = new ArrayList<>();
messageList.add(message);
List<XxlMqMessage> otherMessageList = new ArrayList<>();
int drainToNum = newMessageQueue.drainTo(otherMessageList, 100);
if (drainToNum > 0) {
messageList.addAll(otherMessageList);
}
// save
xxlMqMessageDao.save(messageList);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
客户端启动
XxlMqSpringClientFactory
实现了ApplicationContextAware
,重写方法setApplicationContext
。找到带有@MqConsumer
注解的Bean,存储到serviceMap;另外,初始化xxlMqClientFactory
。
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// load consumer from spring
List<IMqConsumer> consumerList = new ArrayList<>();
Map<String, Object> serviceMap = applicationContext.getBeansWithAnnotation(MqConsumer.class);
if (serviceMap!=null && serviceMap.size()>0) {
for (Object serviceBean : serviceMap.values()) {
if (serviceBean instanceof IMqConsumer) {
consumerList.add((IMqConsumer) serviceBean);
}
}
}
// init
xxlMqClientFactory = new XxlMqClientFactory();
xxlMqClientFactory.setAdminAddress(adminAddress);
xxlMqClientFactory.setAccessToken(accessToken);
xxlMqClientFactory.setConsumerList(consumerList);
xxlMqClientFactory.init();
}
XxlMqSpringClientFactory
初始化。
public void init() {
// pre : valid consumer
validConsumer();
// start BrokerService
startBrokerService();
// start consumer
startConsumer();
}
XxlMqClientFactory#validConsumer
,校验消费者是否合法。如果是合法的,会封装成ConsumerThread
,加入consumerRespository
。
private void validConsumer(){
// valid
if (consumerList==null || consumerList.size()==0) {
logger.warn(">>>>>>>>>>> xxl-mq, MqConsumer not found.");
return;
}
// make ConsumerThread
for (IMqConsumer consumer : consumerList) {
// valid annotation
MqConsumer annotation = consumer.getClass().getAnnotation(MqConsumer.class);
if (annotation == null) {
throw new RuntimeException("xxl-mq, MqConsumer("+ consumer.getClass() +"), annotation is not exists.");
}
// valid group
if (annotation.group()==null || annotation.group().trim().length()==0) {
// empty group means consume broadcase message, will replace by uuid
try {
// annotation memberValues
InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
Field mValField = invocationHandler.getClass().getDeclaredField("memberValues");
mValField.setAccessible(true);
Map memberValues = (Map) mValField.get(invocationHandler);
// set data for "group"
String randomGroup = UUID.randomUUID().toString().replaceAll("-", "");
memberValues.put("group", randomGroup);
} catch (Exception e) {
throw new RuntimeException("xxl-mq, MqConsumer("+ consumer.getClass() +"), group empty and genereta error.");
}
}
if (annotation.group()==null || annotation.group().trim().length()==0) {
throw new RuntimeException("xxl-mq, MqConsumer("+ consumer.getClass() +"),group is empty.");
}
// valid topic
if (annotation.topic()==null || annotation.topic().trim().length()==0) {
throw new RuntimeException("xxl-mq, MqConsumer("+ consumer.getClass() +"), topic is empty.");
}
// consumer map
consumerRespository.add(new ConsumerThread(consumer));
}
}
XxlMqClientFactory#startBrokerService
,XxlRpcInvokerFactory#start
,设置了xxlMqBroker
,开启了多个线程。
public void startBrokerService() {
// init XxlRpcInvokerFactory
xxlRpcInvokerFactory = new XxlRpcInvokerFactory(XxlRegistryServiceRegistry.class, new HashMap<String, String>(){{
put(XxlRegistryServiceRegistry.XXL_REGISTRY_ADDRESS, adminAddress);
put(XxlRegistryServiceRegistry.ACCESS_TOKEN, accessToken);
}});
try {
xxlRpcInvokerFactory.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
// init ConsumerRegistryHelper
XxlRegistryServiceRegistry commonServiceRegistry = (XxlRegistryServiceRegistry) xxlRpcInvokerFactory.getServiceRegistry();
consumerRegistryHelper = new ConsumerRegistryHelper(commonServiceRegistry);
// init IXxlMqBroker
xxlMqBroker = (IXxlMqBroker) new XxlRpcReferenceBean(
NetEnum.NETTY,
Serializer.SerializeEnum.HESSIAN.getSerializer(),
CallType.SYNC,
LoadBalance.ROUND,
IXxlMqBroker.class,
null,
10000,
null,
null,
null,
xxlRpcInvokerFactory).getObject();
// ...添加线程处理
}
-
XxlRpcInvokerFactory#start
,主要初始化了XxlRegistryClient
。服务注册调用的是"/api/registry"
,另外开启发现进程,调用接口/api/monitor
,如果获取到true,会调用"/api/discovery"
接口,会将主题的数据存储到discoveryData
。 -
xxlMqBroker = (IXxlMqBroker) new XxlRpcReferenceBean(...).getObject();
。构造方法中初始化了NettyClient,当需要服务发现的时候,用netty进行通信。用了动态代理。 -
XxlMqClientFactory#startConsumer
,注册消费者,开启线程。
private void startConsumer() {
// valid
if (consumerRespository ==null || consumerRespository.size()==0) {
return;
}
// registry consumer
getConsumerRegistryHelper().registerConsumer(consumerRespository);
// execute thread
for (ConsumerThread item: consumerRespository) {
clientFactoryThreadPool.execute(item);
logger.info(">>>>>>>>>>> xxl-mq, consumer init success, , topic:{}, group:{}", item.getMqConsumer().topic(), item.getMqConsumer().group());
}
}
发消息
- 发送消息。会调用
XxlMqClientFactory#addMessages
。
String topic = "topic_1";
String data = "时间戳:" + System.currentTimeMillis();
XxlMqProducer.produce(new XxlMqMessage(topic, data));
XxlMqClientFactory#addMessages
,没有指定则是异步,添加到newMessageQueue
public static void addMessages(XxlMqMessage mqMessage, boolean async){
if (async) {
// async queue, mult send
newMessageQueue.add(mqMessage);
} else {
// sync rpc, one send
xxlMqBroker.addMessages(Arrays.asList(mqMessage));
}
}
- 客户端消费队列,还是调用
xxlMqBroker.addMessages(messageList);
。xxlMqBroker
是代理对象,会封装成XxlRpcRequest发送给服务端。
try {
XxlMqMessage message = newMessageQueue.take();
if (message != null) {
// load
List<XxlMqMessage> messageList = new ArrayList<>();
messageList.add(message);
List<XxlMqMessage> otherMessageList = new ArrayList<>();
int drainToNum = newMessageQueue.drainTo(otherMessageList, 100);
if (drainToNum > 0) {
messageList.addAll(otherMessageList);
}
// save
xxlMqBroker.addMessages(messageList);
}
} catch (Exception e) {
if (!XxlMqClientFactory.clientFactoryPoolStoped) {
logger.error(e.getMessage(), e);
}
}
- 服务端
xxlRpcProviderFactory.invokeService(xxlRpcRequest)
,根据xxlRpcRequest
的方法名和类名,获取到对应的bean,反射调用方法。即执行XxlMqBrokerImpl#addMessages
。
@Override
public int addMessages(List<XxlMqMessage> messages) {
newMessageQueue.addAll(messages);
return messages.size();
}
- 服务端消费队列,存储到数据库中。
try {
XxlMqMessage message = newMessageQueue.take();
if (message != null) {
// load
List<XxlMqMessage> messageList = new ArrayList<>();
messageList.add(message);
List<XxlMqMessage> otherMessageList = new ArrayList<>();
int drainToNum = newMessageQueue.drainTo(otherMessageList, 100);
if (drainToNum > 0) {
messageList.addAll(otherMessageList);
}
// save
xxlMqMessageDao.save(messageList);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
监听消息
- 核心就是
ConsumerThread
。判断当前线程是否活跃,ConsumerRegistryHelper.ActiveInfo activeInfo = XxlMqClientFactory.getConsumerRegistryHelper().isActice(this);
,用于判断是否有新内容。 - 在
ConsumerRegistryHelper#isActice
方法中,会执行TreeSet<String> onlineConsumerSet = serviceRegistry.discovery(registryKey);
,主要用于获取缓存discoveryData
,discoveryData
获取前,会判断主题。 - 拉取消息列表,
List<XxlMqMessage> messageList = XxlMqClientFactory.getXxlMqBroker().pullNewMessage(mqConsumer.topic(), mqConsumer.group(), activeInfo.rank, activeInfo.total, 100);
。同样,XxlMqClientFactory.getXxlMqBroker()
获取到的是代理类,会调用netty发送远程请求,服务端执行XxlMqBrokerImpl#pullNewMessage
。服务端是从数据库中获取消息列表。
@Override
public List<XxlMqMessage> pullNewMessage(String topic, String group, int consumerRank, int consumerTotal, int pagesize) {
List<XxlMqMessage> list = xxlMqMessageDao.pullNewMessage(XxlMqMessageStatus.NEW.name(), topic, group, consumerRank, consumerTotal, pagesize);
return list;
}
SELECT <include refid="Base_Column_List" />
FROM xxl_mq_message AS t
WHERE t.topic = #{topic}
AND t.group = #{group}
AND t.status = #{newStatus}
AND t.effectTime <![CDATA[ < ]]> NOW()
`
服务发现
- 客户端执行
XxlRegistryBaseClient#discovery
。 - admin服务端提供发现接口,
ApiController#discovery
,主要是读取文件,String fileName = registryDataFilePath.concat(File.separator).concat(key).concat(".properties");
,本地路径:D:\data\applogs\xxl-mq\registrydata
。
服务注册
ApiController#registry
,主要是往XxlCommonRegistryServiceImpl#registryQueue
里面添加信息。XxlCommonRegistryServiceImpl
实现了InitializingBean
,重写XxlCommonRegistryServiceImpl#afterPropertiesSet
中启动线程处理registryQueue
,保存注册信息入库。
try {
XxlCommonRegistryData xxlCommonRegistryData = registryQueue.take();
if (xxlCommonRegistryData !=null) {
// refresh or add
int ret = xxlCommonRegistryDataDao.refresh(xxlCommonRegistryData);
if (ret == 0) {
xxlCommonRegistryDataDao.add(xxlCommonRegistryData);
}
// valid file status
XxlCommonRegistry fileXxlCommonRegistry = getFileRegistryData(xxlCommonRegistryData);
if (fileXxlCommonRegistry!=null && fileXxlCommonRegistry.getDataList().contains(xxlCommonRegistryData.getValue())) {
continue; // "Repeated limited."
}
// checkRegistryDataAndSendMessage
checkRegistryDataAndSendMessage(xxlCommonRegistryData);
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}
- 处理注册信息,保存到文件中。
try {
// new message, filter readed
List<XxlCommonRegistryMessage> messageList = xxlCommonRegistryMessageDao.findMessage(readedMessageIds);
if (messageList!=null && messageList.size()>0) {
for (XxlCommonRegistryMessage message: messageList) {
readedMessageIds.add(message.getId());
// from registry、add、update、deelete,ne need sync from db, only write
XxlCommonRegistry xxlCommonRegistry = JacksonUtil.readValue(message.getData(), XxlCommonRegistry.class);
// default, sync from db (aready sync before message, only write)
// sync file
setFileRegistryData(xxlCommonRegistry);
}
}
// clean old message;
if (System.currentTimeMillis() % registryBeatTime ==0) {
xxlCommonRegistryMessageDao.cleanMessage(10);
readedMessageIds.clear();
}
} catch (Exception e) {
if (!executorStoped) {
logger.error(e.getMessage(), e);
}
}