【经验篇】Java使用ZMQ断线重连问题

news2024/11/22 11:22:30

简介

ZeroMQ是一个高性能的异步消息传递库,旨在用于分布式或者并发应用程序。它提供了一个消息队列,但与面向消息的中间件不同,ZeroMQ 系统可以在没有专用消息代理的情况下运行。

ZeroMQ 支持各种传输(TCP、进程内、进程间、多播<一个数据发送给不同子网中的一组接收者>、WebSocket 等)上的常见消息传递模式(发布/订阅、请求/回复、客户端/服务器等),使进程间消息传递变得简单作为线程间消息传递。这使得代码清晰、模块化且非常易于扩展。

ZeroMQ的0的意思:
0表示无代理、零延迟、零成本和零管理。
通俗的来说,零是指渗透到项目中的极简主义文化。我们通过消除复杂性1而不是暴露新功能来增加功能。

使用说明

引入Maven项目

<dependency>
  <groupId>org.zeromq</groupId>
  <artifactId>jeromq</artifactId>
  <version>0.5.2</version>
</dependency>

使用示例

订阅发布模式为例:

public class ClientA {
    public static void main(String[] args) {
        ZContext context = new ZContext(3);
        ZMQ.Socket socket = context.createSocket(SocketType.SUB);
        socket.connect("tcp://localhost:5555");
        // 设置,指订阅前缀为A 的消息,设置为“” 表示全部接受
        socket.subscribe("A".getBytes());
        while (!Thread.currentThread().isInterrupted()) {
            byte[] recv = socket.recv();
            System.out.println(new String(recv));
        }
    }
}

问题解决

问题说明

背景说明

使用Zmq pub/sub模式,多个sub订阅一个pub的数据。
pub会10分钟推送一批数据。
pub端和sub端部署在同一个业务网段。
pub端每天启停,sub端是24小时运行。

现象说明

在测试中发现,当pub长时间没有发送数据或者pub端关闭一定时间后,sub端之后就再也接收不到数据了。而且这种现象也不是100%,测试了几天,有个80%的样子吧。

在pub端,netstat查看时,连接已经没有了,而在sub端连接仍然存在。

问题分析

问题排查

以前使用C++时,只要连接断开,Zmq的connect就会自动重连,所以不需要关心连接的问题。

根据文档和测试,也确实是这样。但是请注意,这里说的连接断开是正常的断开,有4次挥手的断开,也就是说通信双方都知道连接断开了。

但有时,并非如此。在复杂的网络环境中,通信双方大概率会经过NAT等网络设备,它们会悄无声息地关闭连接。并且,在长连接长时间无数据时,通信双方根本无法知晓。

问题验证

很简单,为了保持网络连接,增加心跳即可。应用层的心跳也简单,但根据ZMQ文档,最好使用TCP的keepalive。

If we use a TCP connection that stays silent for a long while, it will, in some networks, just die. Sending something (technically, a “keep-alive” more than a heartbeat), will keep the network alive.

参数说明:
查看zmq_setsockopt API文档:

参数名参数说明
ZMQ_TCP_KEEPALIVE设置SO_KEEPALIVE属性,是否开启keepalive特性。默认为-1,使用操作系统默认值,即不开启。
ZMQ_TCP_KEEPALIVE_CNT设置TCP_KEEPCNT属性,如果保活包没有收到响应,连接重试的次数。在达到这个次数仍然无响应的,标记该连接不可用。Windows好象默认是10。
ZMQ_TCP_KEEPALIVE_IDLE设置TCP_KEEPALIVE属性,如果连接在该段时间内持续空闲,将发送第一个保活包。Windows默认为2小时。
ZMQ_TCP_KEEPALIVE_INTVL设置TCP_KEEPINTVL属性,如果发送的保活包没有应答,则间隔该时长继续发送保活包,直到连接标识连接断开。Windows默认为1s。

程序修改:

// 创建上下文环境
ZContext context = new ZContext(1);
// 以订阅模式创建套接字
ZMQ.Socket socket = context.createSocket(SocketType.SUB);
// 开启TCP保活机制,防止网络连接因长时间无数据而中断
socket.setTCPKeepAlive(1);
// 网络连接空闲2min, 即发送保活包
socket.setTCPKeepAliveIdle(120L);

这样问题就解决了吗?并没有完全解决。原因可查看参考资料2&3

TCP keepalive属性就是要保持TCP连接的活动性。对于一个已经建立的tcp连接。如果在keepalive_time时间内双方没有任何的数据包传输,则开启keepalive功能的一端将发送 keepalive数据包,若没有收到应答,则每隔keepalive_intvl时间再发送该数据包,发送keepalive_probes次。一直没有收到应答,则发送rst包关闭连接。若收到应答,则将计时器清零。
如果tcp连接的另一端突然掉线,或者重启断电,这个时候我们并不知道网络已经关闭。而此时,如果有发送数据失败,tcp会自动进行重传。重传包的优先级高于keepalive,那就意味着,我们的keepalive总是不能发送出去。 而此时,我们也并不知道该连接已经出错而中断。在较长时间的重传失败之后,我们才会知道。
但如果对端又启动了,接收端还在发重传包,此时则不会重连,导致数据丢失。

PS:C++/Python这样写是没问题的。

c/c++:

// 开启TCP保活机制,防止网络连接因长时间无数据而被中断
int tcp_keep_alive = 1;
zmq_setsockopt(fd, ZMQ_TCP_KEEPALIVE, &tcp_keep_alive, sizeof(tcp_keep_alive));

// 网络连接空闲2min即发送保活包
int tcp_keep_idle = 120;
zmq_setsockopt(fd, ZMQ_TCP_KEEPALIVE_IDLE, &tcp_keep_idle, sizeof(tcp_keep_idle));

python:

# self.client is my socket here
self.client.setsockopt(zmq.TCP_KEEPALIVE, 1)
self.client.setsockopt(zmq.TCP_KEEPALIVE_IDLE, 120)
self.client.setsockopt(zmq.TCP_KEEPALIVE_INTVL, 1) # 随意

源码分析

1. ZMQ.java
① setTCPKeepAliveIdle
在这里插入图片描述
② setSocketOpt
在这里插入图片描述
2. SocketBase.java
① setSocketOpt
在这里插入图片描述
可以看到如果为-1,则使用操作系统默认值。
在Linux 中,可以以下3个参数来调整 tcp-keepalive:

net.ipv4.tcp_keepalive_time = 30
net.ipv4.tcp_keepalive_probes = 2
net.ipv4.tcp_keepalive_intvl = 5

为了防止开启调整该参数对其他系统或服务造成不可预知的问题,操作系统参数不作调整。

解决方案

伪心跳方式:

// 设置发送ZMTP心跳的时间间隔, 单位:ms
socket.setHeartbeatIvl(5 * 60 * 1000);
// 设置ZMTP心跳的超时时间, 单位:ms
socket.setHeartbeatTimeout(60 * 1000);
// 设置ZMTP心跳的TTL值, 单位:ms
socket.setHeartbeatTtl(10 * 60 * 1000);

这样的话,一直在发送心跳包,一旦发送端启动成功,便可以快速感知,触发重连,数据可以继续接收。

参考资料

JeroMQ:ZeroMQ的纯Java实现
UNIX网络编程——socket的keep-alive
Linux系统停的设置TCP心跳机制Keepalive为什么总是无效果
ZeroMQ(java)中连接建立与重连机制
Zmq pub/sub无故连接中断解决之 —— TCP keepalive简介
Linux 中的 TCP keepalive

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/28646.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

初步认识端口服务查询--netstat

转载连接&#xff1a;netstat详解 目录1、语法与参数概括2、输出释义2.1 以netstat -atnlp为例&#xff0c;解释输出结果中各列的含义2.2、以netstat -rn为例&#xff0c;解释输出结果中各列的含义3、实用命令4、项目中通过netstat查询某端口是否被占用netstat命令是一个监控TC…

Spring5源码3-BeanDefinition

1. BeanDefinition BeanDefinition在spring中贯穿始终&#xff0c;spring要根据BeanDefinition对象来实例化bean&#xff0c;只有把解析的标签&#xff0c;扫描的注解类封装成BeanDefinition对象&#xff0c;spring才能实例化bean beanDefinition主要实现类: ChildBeanDefini…

ADB安装及使用详解

一、ADB简介 1、什么是adb ADB 全称为 Android Debug Bridge&#xff0c;起到调试桥的作用&#xff0c;是一个客户端-服务器端程序。其中客户端是用来操作的电脑&#xff0c;服务端是 Android 设备。 ADB 也是 Android SDK 中的一个工具&#xff0c;可以直接操作管理 Androi…

K8s高可用集群搭建

K8s高可用集群搭建1 方案简介2 集群搭建2.1 安装要求2.2 准备环境2.3 master节点部署keepalived2.4 master节点部署haproxy2.5 所有节点安装docker/kubeadm/kubelet2.6 部署k8smaster012.7 安装集群网络2.8 k8smaster02加入节点2.9 k8snode01加入集群3 测试集群1 方案简介 用到…

Session-based Recommendation with Graph Neural Networks论文阅读笔记

1. Abstract &#xff08;1&#xff09;基于会话的推荐问题旨在基于匿名会话来预测用户的行为。 The problem of session-based recommendation aims to predict user actions based on anonymous sessions. &#xff08;2&#xff09; 以前的方法存在的不足&#xff1a;不足以…

day3-day4【代码随想录】长度最小的子数组

文章目录前言一、长度最小的子数组1、暴力求解&#xff1a;2、滑动窗口求解&#xff1a;二、最小覆盖子串&#xff08;乐扣76&#xff09;难难难难难三、水果成篮&#xff08;乐扣904&#xff09;四、最长重复子数组&#xff08;乐扣718&#xff09;前言 实现滑动窗口&#xf…

Android抓包工具——Fiddler

前言 &#x1f525;在平时和其他大佬交流时&#xff0c;总会出现这么些话&#xff0c;“抓个包看看就知道哪出问题了”&#xff0c;“抓流量啊&#xff0c;payload都在里面”&#xff0c;“这数据流怎么这么奇怪”。 &#x1f449;这里出现的名词&#xff0c;其实都是差不多的…

矩阵分析:特征值分解

矩阵分析&#xff1a;特征值分解前置知识空间变换伸缩旋转对称矩阵对称矩阵对角化正交矩阵向量的基基变换不同基下的向量变换逆矩阵不同基下的空间变换内积的几何意义特征值、特征向量特征值分解代码前置知识 空间变换 伸缩 一个矩阵其实就是一个线性变换&#xff0c;因为一个…

SpringCloud微服务(六)——Gateway路由网关

Gateway路由网关 Spring Cloud Spring Cloud Gateway统一访问接口的路由管理方式 作用 整合各个微服务功能&#xff0c;形成一套系统微服务网关实现日志统一纪录实现用户的操作跟踪统一用户权限认证路由转发、跨域设置、负载均衡、服务限流反向代理 微服务网关的概述 不同…

H2DCFDA | ROS 荧光探针检测法

H2DCFDA 工作液的配制1、储存液的配制&#xff1a;用 DMSO 配制 10 mM 的 H2DCFDA (2,000)&#xff0c;如用 1.03 mL DMSO 溶解 5 mg H2DCFDA。注&#xff1a;H2DCFDA 储存液建议分装后-20℃ 避光冻存&#xff0c;一个月。-80 半年。2、工作液的配制&#xff1a;用预热好的无血…

绘制文字(QFont字体)

QPainter绘制文字的话使用的函数为 QPainter::drawText() QPainter::drawText()有多种重载方式。 根据坐标直接绘画文字&#xff1a; void Widget::paintEvent(QPaintEvent *event)//绘图事件 {QPainter painter(this);painter.translate(100,100);//移动坐标painter.drawText(…

E. Sending a Sequence Over the Network(DP)

Problem - 1741E - Codeforces 序列a在网络上的发送情况如下。 序列a被分割成若干段&#xff08;序列的每个元素正好属于一个段&#xff0c;每个段是序列的一组连续元素&#xff09;。 对于每个段&#xff0c;它的长度被写在它的旁边&#xff0c;要么在它的左边&#xff0c;要…

递归展示树状图/树状表格

递归展示树状图一、数据库表设计二、后端java递归代码三、前端展示树状表格四、效果展示一、数据库表设计 这里我们采用自关联的设计&#xff0c;通过id和pid的对应来确认数据的上下级关系 建表语句&#xff0c;我这里把一级菜单的pid设置成了0 /*Navicat Premium Data Transfe…

Spring中Bean的作用域和生命周期

目录 Bean的作用域 singleton prototype request session application websocket 单例作用域和全局作用域的区别 Bean的生命周期 Bean的作用域 Bean的作用域是指Bean在Spring整个框架中的某种行为模式&#xff0c;比如singleton单例作用域&#xff0c;就表示Bean在整…

大数据Spark面试题2023

文章目录Spark核心——RDD概念特点创建方式RDD的分区依赖关系Spark的shuffle介绍Spark的 Partitioner 分区器都有哪些?Spark中的算子都有哪些RDD工作流&#x1f4cc;Spark运行模式(资源调度框架的使用&#xff0c;了解)&#x1f4cc;讲一下Spark 的运行架构一个spark程序的执行…

常用的框架技术-08 ElasticSearch分布式、高扩展、高实时的搜索与数据分析引擎

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录1.ElasticSearch 概述1.1 ElasticSearch介绍1.2 全文搜索引擎1.3 lucene介绍1.4 倒排索引1.5 elasticsearch、solr对比2.ElasticSearch安装2.1 下载软件2.2 windows环…

【web渗透思路】框架敏感信息泄露(特点、目录、配置)

目录 一、挖掘思路 1、方法&#xff1a; 二、框架之信息泄露 1、Webpack 1.1、简述 1.2、.js.map文件泄露 1.3、源码审计 2、Spring boot 1.1、简述 1.2、利用 1.3、框架识别 &#xff08;基本分析方法都是一样&#xff0c;这里就举2个框架关于信息泄露方面的&#x…

Mybatis分页功能

1. 功能分析 如图所示分页功能&#xff0c;包括上一页、下一页、中间显示的当前页前后页码、全部页码以及跳转到XX页。手写的话实现起来很难&#xff0c;Mybatis给我们提供了插件&#xff0c;所提供的方法&#xff0c;直接包含了上述分页的相关数据。 2. 分页插件的使用及其相关…

虚拟环境下把python代码打包成exe(小白教程)

本教程适用于小白&#xff0c;本人也是小白&#xff0c;不妥之处还请包涵。 1、系统环境下安装 virtualenv 可以理解为 直接打开 系统的cmd安装 pip32 install virtualenv我之所以用pip32因为我电脑上装了两个版本的python 一个是32位一个是64位&#xff0c;如果你电脑上只有一…

为什么选择快速应用开发

如今&#xff0c;企业想要持续蓬勃发展&#xff0c;就需要具备快速满足客户期望的能力。无论是十几年历史的重要市场占有者推出新的APP&#xff0c;还是在疫情期间从线下转向线上电商营销&#xff0c;企业都需要主动适应市场。随着为客户提供新的服务方式&#xff0c;员工也需要…