先认识下ZeroMQ
参考:ZeroMQ详解 - 南哥的天下 - 博客园 (cnblogs.com)
ZeroMQ(简称ZMQ)是一个基于消息队列的多线程网络库,其对套接字类型、连接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。
ZMQ不是单独的服务,而是一个嵌入式库,工作在应用层和传输层之间(按照TCP/IP划分),它封装了网络通信、消息队列、线程调度等功能,向上层提供简洁的API,应用程序通过加载库文件,调用API函数来实现高性能网络通信。
更多参考:一文带你入门了解“零之禅“消息队列ZeroMQ-CSDN博客
为什么要使用ZeroMQ
当今的许多应用程序都包含了跨越某种网络的组件,无论这种网络是局域网还是互联网。因此,许多应用程序开发者最终都会处理某种类型的消息传递。一些开发人员使用消息队列产品,但大多数时候,他们使用TCP或UDP自己做。这些协议并不难用,但是,从A 发送几个字节到B和以任何一种可靠的方式处理消息,这两者之间有很大的区别。
让我们来看看当开始使用原始的TCP连接部件的时候,我们要面对的典型问题。任何可复用的消息层都需要解决如下所有这些问题或其中的大部分问题:
- 我们如何处理I/O呢?是让我们的应用程序阻塞,还是在后台处理I/O呢?这是一个关键的设计决策。阻塞式I/O 创建的架构不能很好地扩展,但后台I/O 也是非常难以正确做到的。
- 我们如何处理动态组件(例如,暂时撤除的块)呢?我们需要正式将组件划分为“客户端”和“服务器”,并强制该服务器不能撤除吗?那么,如果我们想将服务器连接到服务器时该怎么办呢?我们需要每隔几秒钟就尝试重新连接吗?
- 我们如何表示在线路上的消息呢?我们应该怎样将数据组织为帧,才能使得它很容易写入和读取,避免缓冲区溢出,既对小型消息高效,也足以处理非常大的戴着聚会礼帽的跳舞猫的视频呢?
- 我们如何处理不能立即传递的消息呢?特别是当我们在等待一个组件的联机回应时如何处理呢?我们需要丢弃消息,把它们放入一个数据库,或者把它们放到一个内存队列吗?
- 我们在哪里存储消息队列呢?如果组件从队列中读取很慢,导致我们的队列堆积,这会发生什么情况?我们的策略是什么呢?
- 我们如何处理丢失的消息呢?我们应该等待新的数据,要求重发,还是应该建立某种可靠性层,确保信息不会丢失呢?如果该层本身崩溃了该怎么办呢?
- 如果我们需要使用一个不同的网络传输,比如说,用多播来取代TCP 单播,或IPv6,该怎么办呢?我们需要重写应用程序吗?还是将传输抽象到某个层中呢?
- 我们如何路由消息呢?我们可以发送同样的消息到多个接收者吗?我们可以发送应答给原来的请求者吗?
- 我们如何编写出另一种语言的API 呢?我们应该重新实现一个线路级协议,还是重新包装一个库?如果是前者,我们怎么能保证协议栈的高效稳定呢?如果是后者,我们又怎么能保证互操作性呢?
- 我们应该如何表示数据,以便它可以在不同的架构之间读取呢?我们应该对数据类型强制执行特定的编码吗?究竟到什么程度,才是消息传递系统的工作,而不是更高一层的工作呢?
- 我们应该如何处理网络错误呢?是等待并重试,默默地忽略它们,还是终止它们呢?
ZeroMQ解决传统网络编程的问题:
- 调用的socket接口较多。
- TCP是一对一的连接。
- 编程需要关注很多socket细节问题。
- 不支持跨平台编程。
- 需要自行处理分包、组包问题。
- 流式传输时需处理粘包、半包问题。
- 需自行处理网络异常,比如连接异常中断、重连等。
- 服务端和客户端启动有先后。
- 自行处理IO模型。
- 自行实现消息的缓存。
- 自行实现对消息的加密。
构建可复用的消息传递系统是十分困难的
但是,我们如何才能做一个可复用的消息传递层呢?为什么有那么多的项目都需要这项技术,人们都还在通过在他们的代码中驱动TCP套接字费力地做它,并重复解决一长串清单中的问题呢?(见下图)。
实证明,构建可复用的消息传递系统是非常困难的,这就是为什么很少有自由和开放源码(FOSS)项目尝试做这项工作的原因,它也是商业通信产品复杂、昂贵、灵活性差,而且脆弱的原因。2006 年,iMatix 设计了高级消息队列协议,或AMQP,开始给FOSS 开发者提供也许是首个可复用的消息传递系统方法。AMQP 工作得比许多其他设计更好,但仍然相对复杂、昂贵,而且脆弱。学会如何使用它需要几个星期,而要用它来建立在事情变得很麻烦时也不会崩溃的稳定架构则需要几个月。
大多数消息传递项目(如AMQP)都在尝试以可重用的方式解决上面这个冗长的清单上的问题,它们通过发明一种负责寻址、路由和排队的新概念——代理,来做到这一点。这将导致一个客户端/服务器协议或一些未在文档中记录的协议之上的一组API,它允许应用程序与这个代理交流。在降低大型网络的复杂性方面,代理是一个很好的东西。但把以代理为基础的消息传递添加到像ZooKeeper 这样的产品会使情况变得更糟,而不是更好。这将意味着增加一台额外的大电脑和一个新的单点故障。代理迅速成为一个瓶颈和一个要管理的新风险。如果软件支持的话,我们可以添加第二个、第三个和第四个代理,并提出一些故障切换方案。人们这么做了。然而,它产生了更多的变动部件,变得更复杂,有更多的东西会被破坏。
此外,以代理为中心的设置都需要自己的运营团队。你真的需要日夜注意代理,并在它们开始“行为不端”时,用棍子敲打它们。你需要电脑,你需要备份的电脑,你需要人来管理那些电脑。只有那些由多个团队在数年内建成的,带有许多变动部件的大型应用程序,才值得这样做
因此,中小型应用程序开发人员陷入了困境。他们要么避免网络编程,制作不可扩展的单一应用程序,要么跳进网络编程,制作脆弱、复杂、很难维护的应用程序。他们还可以把赌注压在消息传递产品上,并最终获得依赖于昂贵且易破坏的技术的可扩展的应用程序。目前还没有非常好的选择,这也许可以解释为什么消息传递主要还停留在上个世纪,并激起强烈的情绪——对用户是消极的,而对那些销售技术支持和许可的厂商则是欢乐的、喜悦。
ZeroMQ的优点
我们需要的是做消息传递工作的东西,但需要它以下面这种简单和廉价的方式完成工作:
- 它可以在任何应用程序中以接近零的消耗开展工作。
- 它应该是不需要任何其他依赖就可以链接的库。
- 无须额外的变动部件,所以没有额外的风险。
- 它应该能运行在任何操作系统上,并能用任何编程语言开展工作。
而这就是ZeroMQ :一个高效的可嵌入库,它解决了大部分应用程序需要解决的问题,变得在网络上有良好的可伸缩性,而没有多少成本。
具体做法:
它在后台线程异步处理I/O,这些线程使用无锁数据结构与应用程序进行通信,所以并发ZeroMQ应用程序不需要锁、信号量、或者其他等待状态。
组件可以动态地来去自如,而ZeroMQ会自动重新连接,这意味着你可以以任何顺序启动组件,你可以创建“面向服务的架构”(SOA),其中的服务可以在任何时间加入和离开网络。
它根据需要自动对消息排队。为此,它会智能地在对消息排队之前,将消息尽可能地推进到接收者。它有一个处理过满队列(称为“高水位标志”)的方法。当队列满时,ZeroMQ会自动阻止发件人,或丢弃消息,这取决于你正在做的是哪种消息传递(即所谓的“模式”)。
它可以让你的应用程序通过任意传输协议来互相交流,这些协议可以是:TCP、多播、进程内、进程间。你不需要更改代码以使用不同的传输工具。
它使用依赖于消息传递模式的不同策略,安全地处理速度慢/阻塞的读取者。
它可以让你采用多种模式,如请求-应答和发布-订阅来将消息路由。这些模式是指你如何创建拓扑结构和网络结构。
它可以让你创建代理(proxy)来排队、转发,或通过一个调用来捕获消息。代理可以降低网络互联的复杂性。
它使用在线路上的简单组帧原封不动地传递整个消息。如果你写了一个10KB 的消息,那么你将收到一个10KB 的消息。
它不对消息强加任何格式。它们是零字节到千兆字节的二进制大对象。当你想表示你的数据时,可以选择其上的其他一些产品,如谷歌的协议缓冲区、XDR 等。
它能智能地处理网络错误。有时候它会重试,有时它会告诉你某个操作失败。
它可以减少你的能源消耗。少花CPU多办事意味着使用电脑更少的能源,你可以让你的旧电脑使用更长的时间。
实际上,ZeroMQ做的比这更多。它对你如何开发网络功能的应用程序有颠覆性的影响:
从表面上看,这是一个在其上做zmq_msg_recv()和zmq_msg_send()的套接字风格的API。
但该消息处理循环迅速成为中心循环,而你的应用程序很快就会分解成一组消息处理任务。它是优雅和自然的。
而且,它可扩展:每个任务对应一个节点,节点通过任意传输方式互相交谈。在一个进程中的两个节点(节点是一个线程),在一台电脑中的两个节点(节点是一个进程),或一个网络上的两台电脑(节点是一台电脑),所有的处理方式都是相同的,不需要更改应用程序代码。
消息模型
ZeroMQ将消息通信分成4种模型,分别是一对一结对模型(Exclusive-Pair)、请求回应模型(Request-Reply)、发布订阅模型(Publish-Subscribe)、推拉模型(Push-Pull)。这4种模型总结出了通用的网络通信模型,在实际中可以根据应用需要,组合其中的2种或多种模型来形成自己的解决方案。
一对一结对模型
最简单的1:1消息通信模型,可以认为是一个TCP Connection,但是TCP Server只能接受一个连接。数据可以双向流动,这点不同于后面的请求回应模型。
请求回应模型
由请求端发起请求,然后等待回应端应答。一个请求必须对应一个回应,从请求端的角度来看是发-收配对,从回应端的角度是收-发对。跟一对一结对模型的区别在于请求端可以是1~N个。该模型主要用于远程调用及任务分配等。Echo服务就是这种经典模型的应用。
发布订阅模型
发布端单向分发数据,且不关心是否把全部信息发送给订阅端。如果发布端开始发布信息时,订阅端尚未连接上来,则这些信息会被直接丢弃。订阅端未连接导致信息丢失的问题,可以通过与请求回应模型组合来解决。订阅端只负责接收,而不能反馈,且在订阅端消费速度慢于发布端的情况下,会在订阅端堆积数据。该模型主要用于数据分发。天气预报、微博明星粉丝可以应用这种经典模型。
推拉模型
Server端作为Push端,而Client端作为Pull端,如果有多个Client端同时连接到Server端,则Server端会在内部做一个负载均衡,采用平均分配的算法,将所有消息均衡发布到Client端上。与发布订阅模型相比,推拉模型在没有消费者的情况下,发布的消息不会被消耗掉;在消费者能力不够的情况下,能够提供多消费者并行消费解决方案。该模型主要用于多任务并行。
通信协议
提供进程内、进程间、机器间、广播等四种通信协议。通信协议配置简单,用类似于URL形式的字符串指定即可,格式分别为inproc://、ipc://、tcp://、pgm://。ZeroMQ会自动根据指定的字符串解析出协议、地址、端口号等信息。
优点
简单
1、仅仅提供24个API接口,风格类似于BSD Socket。
2、处理了网络异常,包括连接异常中断、重连等。
3、改变TCP基于字节流收发数据的方式,处理了粘包、半包等问题,以msg为单位收发数据,结合Protocol Buffers,可以对应用层彻底屏蔽网络通信层。
4、对大数据通过SENDMORE/RECVMORE提供分包收发机制。
5、通过线程间数据流动来保证同一时刻任何数据都只会被一个线程持有,以此实现多线程的“去锁化”。
6、通过高水位HWM来控制流量,用交换SWAP来转储内存数据,弥补HWM丢失数据的缺陷。
7、服务器端和客户端的启动没有先后顺序。
灵活
1、支持多种通信协议,可以灵活地适应多种通信环境,包括进程内、进程间、机器间、广播。
2、支持多种消息模型,消息模型之间可以相互组合,形成特定的解决方案。
跨平台
1、支持Linux、Windows、OS X等。
等等。
再来看看nanomsg
nanomsg也是一个基于
Socket的通讯库
,它是 zeromq 作者重新用 C 语言重新实现的, 是对 zeromq 的经验教训的各种提炼和反思。可以认为,nanomsg 则是流行的 ZMQ的优化版,但在某些方面有所不同。zeromq vs nanomsg
zeromq是作者用cpp早期创作,作者承认存在一些明显问题;nanomsg则是用C写的,更为成熟。
当前版本nanomsg支持以下协议:
配对模式:简单的一对一的通信;
总线模式:简单的多对多的通信;
请求/回复模式:支持组建大规模的集群服务来处理用户请求;
扇入模式:支持从多个源聚合请求消息;
扇出模式:支持分配到多个节点以支持负载均衡;
调查模式:允许在一个单一的请求里检查多个应用的状态;
可扩展协议是在网络通信协议之上实现的,当前版本nanomsg支持以下几种传输机制:
INPROC:单进程内通信;
IPC:单机内多进程的通信;
TCP:通过tcp协议的网络通信;
nanomsg用c实现,不依赖系统特性,所以支持多个操作系统。
nanomsg 的所有操作都是基于不同类型的 Socket,而 Socket 的类型决定了 nanomsg 使用了哪种通信模式和传输机制。
不过,nanomsg不是这篇文章的主角。
我们再来看看NNG
nanomsg 是一个由 Garrett D'Amore 创建的开源项目,最初于 2012 年发布。然而,nanomsg 项目的开发已经在一段时间内停滞。因此,几位 nanomsg 的贡献者创建了 NNG 项目,旨在取代 nanomsg 并继续发展。目前,NNG 项目更加活跃,并且在持续地进行开发和维护。
NNG(Nanomsg Next Generation)是 nanomsg 的后继项目,是一个轻量级的、可扩展的消息传递库,旨在提供简单而可靠的消息传递机制,用于构建分布式系统中的通信。NNG 支持点对点通信、发布-订阅模式、请求-回复等模式。
官网:NNG - nanomsg-NG
可点击download下载源码。
注意,不管是ZMQ还是nanomsg还是NNG,其实都是一种基于socket的框架,但是,使用socket并不是最终目的,最终的目的还是正确有效地处理各种消息数据。就好比,我们要发快递包给别人,最终目的是正确地将快递包发送到目标,而快递公司采用什么样的方式来送快递我们作为应用层并不用太关心,所基于的socket就是通信方式,好比快递公司用飞机来送快递包。所以,这些框架既可以叫网络框架,但更实质的是一种消息库一种消息框架。
注意:消息传递并不等同于消息队列,消息队列只是消息传递的一种途径。
NNG主要特点
模块化设计:NNG 的模块化设计使得它更加灵活和可扩展,可以根据需要选择性地启用或禁用不同的功能模块。
可靠性:NNG 实现了可靠的消息传递机制,保证了消息的完整性和可靠性。
性能优异:NNG 在设计时考虑了性能因素,表现出色,适用于对性能有较高要求的应用场景。
可扩展性:NNG 提供了灵活的配置选项,使得可以根据需要进行扩展和定制,以满足不同应用程序的需求。
线程安全:NNG 的设计考虑了线程安全性,可以在多线程环境下安全地使用。
跨平台支持:NNG 可以在多种操作系统上运行,包括 Linux、Windows、macOS 等,具有良好的跨平台性。
语言无关性:NNG 不仅提供了 C 语言的 API,还支持多种其他语言的绑定,如 Python、Java、C# 等,使得开发者可以在不同的编程语言中使用它。
开源免费:NNG 是一个开源项目,采用了开放的许可证,可以免费获取源代码并在自己的项目中使用。
同时如 NNG 描述所言 “light-weight brokerless messaging”,NNG 中的通信各方是不需要第三方程序介入的,这与 MQTT/Redis 通信需要服务器不同。这样很适合作为通信库来使用而没有其他依赖。
通讯协议
- PAIR 一对一双向通信(点对点通信模式)。
- PIPELINE(PUSH/PULL) 单向通信,类似与生产者消费者模型的消息队列(消息队列模式)。
- PUB/SUB 单向广播(发布订阅模式)。
- REQ/REP 请求-应答模式,类似与 RPC 模式(RPC模式)。
- BUS 网状连接通信,每个加入节点都可以发送/接受广播消息。
- SURVEY 用于多节点表决或者服务发现。
传输模式
- inproc 进程内线程间传输(线程间通信)
- ipc 主机内进程间传输(同主机上多进程间通信)
- tcp 网络内主机间传输(多主机通信)
目录和内容
NNG 协议基本上囊括了常见的通信需求,一些特殊的需求,也可以通过组合协议来实现。这样一来,如果在程序中使用 NNG,不管是多进程,还是多线程,通过设计,可以进一步增强模块化,同时不乏灵活性。如果环境变化,程序不管是由多进程改成多线程,还是由多线程改成多主机,都很容易实现。
常见模块/进程/线程间通信,可以依据具体需求来使用 PIPELINE(消息队列) 还是 REQ/REP(过程调用),而不是锁+全局变量,每个模块单元只需要做单一相关的具体事务,无需知晓全局状态。
1.8.0版源码地址:GitHub - nanomsg/nng at v1.8.0
目录结构如下
src路径内容
网上资料很少。
只有自行实践总结了。
参考手册:
NNG Reference Manual
nng.nanomsg.org/man/v1.8.0/index.html
实际使用时,先研究下demo和tests这些,然后遇到api就查询即可。
看下demo
async,异步io(即aio);
http_client,http请求;
pubsub_forwarder,发布订阅;
reqrep:rpc;
更多待补充。
nng源码阅读参考:
nng源码阅读_nng:poll:epoll-CSDN博客
貌似NNG也提供了一些任务创建和线程同步的方式,和posix、systemv等是并列的关系,我们可以使用其中任意一种风格。我们实际使用时,可以根据需要来选用。比如,我们可以使用posix风格来创建线程,然后用systemv风格的消息队列进行通信,我们也可以使用posix风格创建线程,然后使用NNG风格的互斥锁和条件变量。
补充参考:POSIX 和 SYSTEM V-云社区-华为云 (huaweicloud.com)
NNG在core目录下有taskq.c和thread.c,这里面就有些多线程相关的内容,比如:
thread & mutex & cv
互斥锁,提供
nni_mtx_init
,nni_mtx_fini
,nni_mtx_lock
,nni_mtx_unlock
接口.条件变量,提供
nni_cv_init
,nni_cv_fini
,nni_cv_wake
,nni_cv_wake1
,nni_cv_until
接口.int nni_thr_init(nni_thr *thr, nni_thr_func fn, void *arg);
nni_thr_run
,nni_thr_fini
,nni_thr_wait
,nni_thr_is_self
线程有init, start, stop, done4个状态, 执行
nni_thr_init
后, 进入init状态. 执行nni_thr_run
进入start状态. 执行nni_thr_wait
后, 首先设置为stop状态, 然后等待线程变成done状态.nni_thr并不直接执行
nni_thr_func
函数, 而是执行nni_thr_wrap
函数, 该函数内部再调用nni_thr_init
提供的回调函数
nni_thr_wrap
- 如果没有start并且stop, 那么等待一个线程关联的条件变量.
- 如果
nni_thr_run
设置了start, 那么执行nni_thr_init
提供的回调函数.- 回调函数执行完成设置为done状态, 然后wake线程的条件变量.
taskq为任务队列, 内部包含一个链表, 链表中为需要执行的task, 每个taskq里面会有多个线程来处理链表里面的task
struct nni_taskq { nni_list tq_tasks; // 任务列表, 双向链表实现 nni_mtx tq_mtx; // 锁 nni_cv tq_sched_cv; // 条件变量 nni_cv tq_wait_cv; // 条件变量 nni_taskq_thr *tq_threads; // 线程数组 int tq_nthreads; // 线程数 bool tq_run; // true表示线程开始执行. }; struct nni_task { nni_list_node task_node; // tq_tasks的节点 void * task_arg; // 任务的参数 nni_cb task_cb; // 该任务的回调函数 nni_taskq * task_tq; // 任务所属taskq nni_thr * task_thr; // 执行任务的线程. non-NULL if the task is running unsigned task_busy; bool task_prep; bool task_reap; // task执行完后是否需要销毁. nni_mtx task_mtx; // task的锁. nni_cv task_cv; };
更多待补充。
当一个应用程序中集成了很多的框架的时候,就必然会出现很多重复的实现,我们可以根据实际情况和各自的优缺点使用合适的接口。