简介
ZeroMQ是一个高性能的异步消息传递库,旨在用于分布式或者并发应用程序。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ 系统可以在没有专用消息代理的情况下运行。
ZeroMQ 支持各种传输(TCP、进程内、进程间、多播<一个数据发送给不同子网中的一组接收者>、WebSocket 等)上的常见消息传递模式(发布/订阅、请求/回复、客户端/服务器等),使进程间消息传递变得简单作为线程间消息传递。这使得代码清晰、模块化且非常易于扩展。
ZeroMQ的0的意思:
0表示无代理、零延迟、零成本和零管理。
通俗的来说,零是指渗透到项目中的极简主义文化。我们通过消除复杂性1而不是暴露新功能来增加功能。
使用说明
引入Maven项目
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.5.2</version>
</dependency>
使用示例
以订阅发布模式为例:
public class ClientA {
public static void main(String[] args) {
ZContext context = new ZContext(3);
ZMQ.Socket socket = context.createSocket(SocketType.SUB);
socket.connect("tcp://localhost:5555");
// 设置,指订阅前缀为A 的消息,设置为“” 表示全部接受
socket.subscribe("A".getBytes());
while (!Thread.currentThread().isInterrupted()) {
byte[] recv = socket.recv();
System.out.println(new String(recv));
}
}
}
问题解决
问题说明
背景说明
使用Zmq pub/sub模式,多个sub订阅一个pub的数据。
pub会10分钟推送一批数据。
pub端和sub端部署在同一个业务网段。
pub端每天启停,sub端是24小时运行。
现象说明
在测试中发现,当pub长时间没有发送数据或者pub端关闭一定时间后,sub端之后就再也接收不到数据了。而且这种现象也不是100%,测试了几天,有个80%的样子吧。
在pub端,netstat查看时,连接已经没有了,而在sub端连接仍然存在。
问题分析
问题排查
以前使用C++时,只要连接断开,Zmq的connect就会自动重连,所以不需要关心连接的问题。
根据文档和测试,也确实是这样。但是请注意,这里说的连接断开是正常的断开,有4次挥手的断开,也就是说通信双方都知道连接断开了。
但有时,并非如此。在复杂的网络环境中,通信双方大概率会经过NAT等网络设备,它们会悄无声息地关闭连接。并且,在长连接长时间无数据时,通信双方根本无法知晓。
问题验证
很简单,为了保持网络连接,增加心跳即可。应用层的心跳也简单,但根据ZMQ文档,最好使用TCP的keepalive。
If we use a TCP connection that stays silent for a long while, it will, in some networks, just die. Sending something (technically, a “keep-alive” more than a heartbeat), will keep the network alive.
参数说明:
查看zmq_setsockopt API文档:
参数名 | 参数说明 |
---|---|
ZMQ_TCP_KEEPALIVE | 设置SO_KEEPALIVE属性,是否开启keepalive特性。默认为-1,使用操作系统默认值,即不开启。 |
ZMQ_TCP_KEEPALIVE_CNT | 设置TCP_KEEPCNT属性,如果保活包没有收到响应,连接重试的次数。在达到这个次数仍然无响应的,标记该连接不可用。Windows好象默认是10。 |
ZMQ_TCP_KEEPALIVE_IDLE | 设置TCP_KEEPALIVE属性,如果连接在该段时间内持续空闲,将发送第一个保活包。Windows默认为2小时。 |
ZMQ_TCP_KEEPALIVE_INTVL | 设置TCP_KEEPINTVL属性,如果发送的保活包没有应答,则间隔该时长继续发送保活包,直到连接标识连接断开。Windows默认为1s。 |
程序修改:
// 创建上下文环境
ZContext context = new ZContext(1);
// 以订阅模式创建套接字
ZMQ.Socket socket = context.createSocket(SocketType.SUB);
// 开启TCP保活机制,防止网络连接因长时间无数据而中断
socket.setTCPKeepAlive(1);
// 网络连接空闲2min, 即发送保活包
socket.setTCPKeepAliveIdle(120L);
这样问题就解决了吗?并没有完全解决。原因可查看参考资料2&3
TCP keepalive属性就是要保持TCP连接的活动性。对于一个已经建立的tcp连接。如果在keepalive_time时间内双方没有任何的数据包传输,则开启keepalive功能的一端将发送 keepalive数据包,若没有收到应答,则每隔keepalive_intvl时间再发送该数据包,发送keepalive_probes次。一直没有收到应答,则发送rst包关闭连接。若收到应答,则将计时器清零。
如果tcp连接的另一端突然掉线,或者重启断电,这个时候我们并不知道网络已经关闭。而此时,如果有发送数据失败,tcp会自动进行重传。重传包的优先级高于keepalive,那就意味着,我们的keepalive总是不能发送出去。 而此时,我们也并不知道该连接已经出错而中断。在较长时间的重传失败之后,我们才会知道。
但如果对端又启动了,接收端还在发重传包,此时则不会重连,导致数据丢失。
PS:C++/Python这样写是没问题的。
c/c++:
// 开启TCP保活机制,防止网络连接因长时间无数据而被中断
int tcp_keep_alive = 1;
zmq_setsockopt(fd, ZMQ_TCP_KEEPALIVE, &tcp_keep_alive, sizeof(tcp_keep_alive));
// 网络连接空闲2min即发送保活包
int tcp_keep_idle = 120;
zmq_setsockopt(fd, ZMQ_TCP_KEEPALIVE_IDLE, &tcp_keep_idle, sizeof(tcp_keep_idle));
python:
# self.client is my socket here
self.client.setsockopt(zmq.TCP_KEEPALIVE, 1)
self.client.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 120)
self.client.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 1) # 随意
源码分析
1. ZMQ.java
① setTCPKeepAliveIdle
② setSocketOpt
2. SocketBase.java
① setSocketOpt
可以看到如果为-1,则使用操作系统默认值。
在Linux 中,可以以下3个参数来调整 tcp-keepalive:
net.ipv4.tcp_keepalive_time = 30
net.ipv4.tcp_keepalive_probes = 2
net.ipv4.tcp_keepalive_intvl = 5
为了防止开启调整该参数对其他系统或服务造成不可预知的问题,操作系统参数不作调整。
解决方案
伪心跳方式:
// 设置发送ZMTP心跳的时间间隔, 单位:ms
socket.setHeartbeatIvl(5 * 60 * 1000);
// 设置ZMTP心跳的超时时间, 单位:ms
socket.setHeartbeatTimeout(60 * 1000);
// 设置ZMTP心跳的TTL值, 单位:ms
socket.setHeartbeatTtl(10 * 60 * 1000);
这样的话,一直在发送心跳包,一旦发送端启动成功,便可以快速感知,触发重连,数据可以继续接收。
参考资料
JeroMQ:ZeroMQ的纯Java实现
UNIX网络编程——socket的keep-alive
Linux系统停的设置TCP心跳机制Keepalive为什么总是无效果
ZeroMQ(java)中连接建立与重连机制
Zmq pub/sub无故连接中断解决之 —— TCP keepalive简介
Linux 中的 TCP keepalive