背景
队列还是非常重要的中间件,可以帮助我们:提高处理效率、完成更复杂的处理流程
最初,我觉得只要掌握一种消息队列就够了,现在想想挺好笑的。
过去的探索
因为我用python,而rabbitmq比较贴合快速和复杂的数据处理,然后就选了这款。一开始我觉得不靠谱,服务老是断了,后来发现,是因为ubuntu做服务器如果用wifi的话天然容易这样。当时不明白为什么,所以弃用了rabbitmq一段时间。
后来再次使用时,功能倒是大体ok了,在早期版本好像直接支持延时消息啥的,当时都把插件装上搞好了。后来又觉得rabbitmq的并发太小了,在大规模etl的时候比较吃不消。所以又弃用了。
再后来,我想搞一些简单,使用一点的队列,于是搞了redis stream。在某些方面的确也还挺好用,速度和吞吐都还比较大。但是是吃内存的,而且在后来也出过一次问题:在维持十几个队列进行大量流转时,我发现有些队列失效了,要删除才能释放内存。这样的隐患看起来更大,所以又多少被打入冷宫。
兜兜转转又回到了kafka。这个是最早我比较排斥的,因为看起来比价麻烦,而且更偏向java。但搭起来的确很好用,吞吐大,也不需要内存,主要依赖硬盘。但是kafka搭起来稍微麻烦一点,组要zookeeper,而且对网络和机器资源的要求很高。有一次,我不确定是不是kafka的问题,把我一台机器的网络整个带崩了。当然大部分时候也是没问题的。
后来也陆续看了一些队列,但类型也就是上面三种了,差异不大。还有个zmq,感觉是更底层的队列转发,没去管了。
一些对应的结论:
- 1 rabbitmq 可以在稍微小的场景用,功能可以比较全面,方便重试等
- 2 redis 把持久化去掉,不需要了,这样万一stream出问题重启服务就行
- 3 kafka 适合保存大量的对话日志,有7天滚动删除,需要用资源稍微高一点的机器运行
新的思考
最近碰到一些问题,又触发了我关于这方面的思考。
问题1:一个同事抱怨请求的微服务失败率过高,任务失败后他的重试比较难搞?
问题2:机器人会有很多零散的数据需要向量化,而现有的向量化微服务是处理批量的,这样导致了能力无法输出?
问题3:大模型不断出新的模型,以及现有的接口价格还是稍贵(虽然已经是业内最低),如何能确保替换?
对于问题1,后来我发现还是数据连接失活的问题,已经解决掉了。但如果是大模型接口不稳定导致的问题,应该如何解决呢? – RabbitMQ
由于调用大模型处理的需求一般都是比较昂贵且缓慢的,这意味着天然的并发就不会太高。RabbitMQ即使在消息体很大的情况下,应该也能做到2000左右的并发(这个后续我可以压一下),那样在并发处理上就够了。
然后利用rabbitmq本身丰富的机制,比如死信队列这种来完成重试。
这样可以应对接口的不稳定调用情况,减少我们自己进行失败的检查和调度。
对于问题2,应该就是做一个微批次服务了。服务端用队列接收请求,只有到一定批次时或者到指定轮询时间(1s),服务才会处理队列的数据,此时就可以发挥服务的批量处理效率了。这对于矩阵处理类的服务特别有效,使用redis stream这样简单的队列来完成这种服务正好。 – Redis Stream。
对于问题3,那么就是一个广播的过程。用kafka比较合适,一方面可以支持很大的吞吐,然后对于不同的消费者,这时应该是不同的模型都可以重复消费。每一个input,只存一次,在kafka,然后可以被重复消费,消费的结果进行实时比对。胜利的模型上台,失败的模型退位。
还有一个比较让我本能抗拒的问题,但其实应该是可以的,后面我也要尝试。
【实时队列服务】服务只是一个消息入口,并不直接处理,而是发到kafka。然后由多个worker盯着kafka进行消费。
这种间接服务是有点不靠谱的,抛开入口服务不谈,这里有kafka队列和worker两个不稳定因素。但如果可行的话,这样反而是比较好的:
- 1 数据存在历史(7天缓存),必要的时候可以追溯和回放
- 2 数据存到kafka,可以有更高的弹性处理能力,对那些延时要求不高的,比如允许timeout 30秒的任务来说肯定是可以的
worker处理完之后进行返回 ,可以采用webhook, websocket或者sse的方式将结果实时的返给请求。
【复杂ETL流转】将数据的处理抽象为在若干个kafka之间进行流转。
这样最大的好处是可以让不同人/流程之间的交互变的简单,可能会稍微费点硬盘,但应该是值得的。