项目背景
正文
一、项目架构
二、项目模块
三、业务流程
四、代码详解
五、测试
六、源码
后记
项目背景
最近公司某物联网项目需要使用socket长连接进行消息通讯。本猿为了解决这个问题,经过长时间的研究、调试和测试,最终找到了一个可行的方案。这中间遇到了很多的问题和困难,包括代码的BUG、技术难题等等。然而,本猿并没有放弃,他借助百度度娘等网站,不断学习和探索,最终克服了所有的困难,成功地完成了该项目。为了帮助其他开发者更好地理解和学习这个项目,本猿将其提炼成了一个demo项目,并分享给了其他同学。这个demo项目尽可能地摒弃了其中丑陋的业务部分,让其他开发者可以更加专注于技术的研究和探索。希望我们可以一起学习进步,共同推动物联网技术的发展。
正文
一、项目架构
本项目使用了netty、redis以及springboot2.2.0
二、项目模块
本项目目录结构如下图:
netty-tcp-core
是公共模块,主要是工具类。netty-tcp-server
是netty服务端,服务端仅作测试使用,实际项目中我们只使用了客户端。netty-tcp-client
是客户端,也是本文的重点。
三、业务流程
我们实际项目中使用RocketMQ作为消息队列,本项目由于是demo项目于是改为了BlockingQueue
。数据流为:
生产者->消息队列->消费者(客户端)->tcp通道->服务端->tcp通道->客户端。
当消费者接收到某设备发送的消息后,将判断缓存中是否存在该设备与服务端的连接,如果存在并且通道活跃则使用该通道发送消息,如果不存在则创建通道并在通道激活后立即发送消息,当客户端收到来自服务端的消息时进行响应的业务处理。
四、代码详解
1.消息队列
由于本demo项目移除了消息中间件,于是需要自己创建一个本地队列模拟真实使用场景
使用一个类保存队列的静态实例以便在任何类中都可以快速引用。接下来我们需要启动一个线程去监听队列中的消息,一但消息投递到队列中,我们就取出消息然后异步多线程处理该消息。
使用take方法会使该线程一直阻塞直到队列收到消息后进入下一次循环。
2.执行类
process方法来自于MessageProcessor
类,该类为单例,但是会有多线程同时执行。
其中imei是我们设备的唯一标识,我们可以用imei作为缓存的key来确认是否已创建过连接。由于我们消息的并发量可能会很大,所以存在当某设备的连接正在创建的过程中,另一个线程收到该设备消息也开始创建连接的情况,所以我们使用synchronized 代码块以及redis分布式锁来避免此情况的发生。当一条消息获得锁后,在锁释放前,后续消息将会被重新放回消息队列并延迟消费。
获取锁的线程会根据imei判断缓存是否存在连接,如果存在直接发送消息,如果不存在则进入创建客户端的方法。
当netty客户端实例创建后使用线程池执行初始化,由于是异步执行,我们此时立刻发送消息很可能客户端还没有完成连接,因此必须加锁等待。进入synchronized
代码块,使用wait方法等待客户端激活后解锁,参数5000为自动解锁的毫秒数,意思是如果客户端出现异常情况迟迟未能连接成功并激活通道、解锁,则最多5000毫秒后该锁自动解开。
这参数在实际使用时可以视情况调整,在并发量很大的情况下,5秒的阻塞可能会导致线程池耗尽,或内存溢出。待客户端创建成功并激活后则立即发送消息。
3.客户端
netty客户端为多实例,每个实例绑定一个线程,持续阻塞到客户端关闭为止,每个客户端中可以保存自己的业务数据,以便在后续与服务端交互时处理业务使用。客户端执行连接时,给了2次重试的机会,如果3次都没连接成功则放弃。后续可以选择将该消息重新入列消费。我们实际项目中,此处还应该预先给服务端发送一条登录消息,待服务端确认后才能执行后续通讯,这需要视实际情况进行调整。
另一个需要注意的点是EventLoopGroup
是从构造函数传入的,而不是在客户端中创建的,因为当客户端数量非常多时,每个客户端都创建自己的线程组会极大的消耗服务器资源,因此我们在实际使用中是按业务去创建统一的线程组给该业务下的所有客户端共同使用的,线程组的大小需要根据业务需求灵活配置。
在init方法中,我们给客户端加上了一个handler来处理与服务端的交互,下面来看一下具体实现。
DemoClientHandler
也是多实例bean,每个实例持有自己的NettyClient引用,以便在后续处理具体业务。在channelActive方法中,我们可以看到执行了客户端实例的notify方法,此处就是在客户端创建成功并且通道激活后解除wait锁的地方。channelRead方法就是我们处理服务端发送过来的消息的方法,我们的具体业务应该在该方法执行,当然不建议长时间阻塞客户端的工作线程,可以考虑异步处理。
最后我们看一下客户端缓存类。
由于netty的通道无法序列化,因此不能存入redis,只能缓存在本地内存中,其本质就是一个ConcurrentHashMap。
五、测试
测试接口代码如上,调用testOne,日志如下:
可以看到第一条消息触发了客户端创建流程,创建后发送了消息,而5秒后的第二条消息直接通过已有通道发送了。
测试接口代码如上,调用testTwo,日志如下:
发送shutdown可以主动断开已有连接。
测试接口代码如上,调用testThree,日志如下:
六、源码
https://gitee.com/jaster/netty-tcp-demo
后记
本demo项目仅作为学习交流使用,如果您需要将其应用到生产环境中,我们建议您先进行一些必要的改进。例如,您可以考虑添加更多的功能以满足实际需求,或者进行更加深入的测试以确保其稳定性和安全性。如果您在使用过程中遇到任何问题或有任何建议,请随时留言与我们进行交流。我们非常乐意听取您的意见并为您提供帮助。