完整的TCP服务端解析代码
1.maven依赖 不要的依赖自行删除,懒的删了
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.13.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.site</groupId>
<artifactId>TcpServer</artifactId>
<version>2.0</version>
<name>TcpServer</name>
<description>Tcp系统服务端</description>
<properties>
<java.version>1.8</java.version>
<protobuf.version>3.3.0</protobuf.version>
<hutool.version>5.7.16</hutool.version>
<fastjson.version>1.2.78</fastjson.version>
<plumelog.version>3.3</plumelog.version>
<druid.version>1.2.8</druid.version>
<redisson.version>3.7.3</redisson.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<!-- 阿里JSON解析器 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- redis 缓存操作 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- pool 对象池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.plumelog</groupId>
<artifactId>plumelog-logback</artifactId>
<version>${plumelog.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!-- 阿里数据库连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<!-- Mysql驱动包 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.32</version>
</dependency>
<!-- 引入 websocket 依赖类 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!--常用工具类 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- SpringBoot 拦截器 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>javax.activation-api</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<!-- <version>2.12.6</version>-->
</dependency>
<dependency>
<groupId>com.mybatis-flex</groupId>
<artifactId>mybatis-flex-spring-boot-starter</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>com.mybatis-flex</groupId>
<artifactId>mybatis-flex-processor</artifactId>
<version>1.9.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork> <!-- 如果没有该配置,devtools不会生效 -->
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<executions>
<execution>
<id>default-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<outputDirectory>target/classes</outputDirectory>
<useDefaultDelimiters>false</useDefaultDelimiters>
<delimiters>
<delimiter>${*}</delimiter>
</delimiters>
<resources>
<resource>
<directory>src/main/resources/</directory>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<finalName>${project.artifactId}</finalName>
</build>
<profiles>
<profile>
<id>dev</id>
<properties>
<profileActive>dev</profileActive>
</properties>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
</profile>
<profile>
<id>pro</id>
<properties>
<profileActive>pro</profileActive>
</properties>
</profile>
</profiles>
</project>
2.cmd2实时数据接受
package com.site.tcp.command;
import com.site.tcp.server.TcpReceiveStructure;
import com.site.tcp.utils.*;
import lombok.extern.slf4j.Slf4j;
/**
* @author 程涛
* @Title: CMD2
* @Description: 实时信息上报
* @date 2021年2月23日
*/
@Slf4j
public class CMD2 {
public static void cmd2(TcpReceiveStructure cmdReceiveStructure) {
// 车辆VIN(车架号)
String VIN = cmdReceiveStructure.VIN;
byte[] date = cmdReceiveStructure.date;
byte[] timeByte = new byte[6];
System.arraycopy(date, 0, timeByte, 0, 6);
// 数据采集时间
String time = JacTools.hexToTime(timeByte);
int index = 6;
while (index < date.length) {
int type = JacTools.subBytesToInt(date, index, 1);
// 0x01 整车数据 详见7.2.3.1
// 0x02 驱动电机数据 详见7.2.3.2,且停车充电过程无需传输该数据
// 0x03 燃料电池数据 详见7.2.3.3
// 0x04 发动机数据 详见7.2.3.4,停车充电过程无需传输该数据
// 0x05 车辆位置数据 详见7.2.3.5
// 0x06 极值数据 详见7.2.3.6
// 0x07 报警数据 详见7.2.3.7
// 0x08~0x09 终端数据预留
// 0x0A~0x2F 平台交换协议自定义数据
switch (type) {
case 1:
byte[] carinfoByte = new byte[20];
System.arraycopy(date, index + 1, carinfoByte, 0, 20);
// 解析整车数据
AnalyzeVehicleData.getCarInfo(carinfoByte, VIN, time, 2);
index = index + 21;
break;
case 2:
// 驱动电机个数
index = DriveMotorUtils.getDriveMotor(date, VIN, time, index);
break;
case 3:
// 燃料电池数据
index = FuelCellUtils.getFuelCell(date, VIN, time, index);
break;
case 4:
// 发动机数据
index = EngineDataUtils.getEngineData(date, VIN, time, index);
break;
case 5:
byte[] positioningInfoByte = new byte[9];
System.arraycopy(date, index + 1, positioningInfoByte, 0, 9);
// 解析定位数据
AnalyzePositioningInfo.getPositioningInfo(positioningInfoByte, VIN, time);
index = index + 10;
break;
case 6:
// 极值数据
byte[] ExtremeInfoByte = new byte[14];
System.arraycopy(date, index + 1, ExtremeInfoByte, 0, 14);
ExtremeInfoUtils.getExtremeInfo(ExtremeInfoByte, VIN, time);
index = index + 15;
break;
case 7:
index = AlarmInfoUtils.getAlarmInfo(date, VIN, time, index, 2);
default:
index = date.length;
break;
}
}
}
}
3.CMD3延迟数据补发
package com.site.tcp.command;
import com.site.tcp.server.TcpReceiveStructure;
import com.site.tcp.utils.*;
import lombok.extern.slf4j.Slf4j;
/**
* @author 程涛
* @Title: CMD2
* @Description: 实时信息上报
* @date 2021年2月23日
*/
@Slf4j
public class CMD3 {
public static void cmd3(TcpReceiveStructure cmdReceiveStructure) {
// 车辆VIN(车架号)
String VIN = cmdReceiveStructure.VIN;
byte[] date = cmdReceiveStructure.date;
byte[] timeByte = new byte[6];
System.arraycopy(date, 0, timeByte, 0, 6);
// 数据采集时间
String time = JacTools.hexToTime(timeByte);
int index = 6;
while (index < date.length) {
int type = JacTools.subBytesToInt(date, index, 1);
// 0x01 整车数据 详见7.2.3.1
// 0x02 驱动电机数据 详见7.2.3.2,且停车充电过程无需传输该数据
// 0x03 燃料电池数据 详见7.2.3.3
// 0x04 发动机数据 详见7.2.3.4,停车充电过程无需传输该数据
// 0x05 车辆位置数据 详见7.2.3.5
// 0x06 极值数据 详见7.2.3.6
// 0x07 报警数据 详见7.2.3.7
// 0x08~0x09 终端数据预留
// 0x0A~0x2F 平台交换协议自定义数据
switch (type) {
case 1:
byte[] carinfoByte = new byte[20];
System.arraycopy(date, index + 1, carinfoByte, 0, 20);
// 解析整车数据
AnalyzeVehicleData.getCarInfo(carinfoByte, VIN, time, 3);
index = index + 21;
break;
case 2:
// 驱动电机数据
index = DriveMotorUtils.getDriveMotor(date, VIN, time, index);
break;
case 3:
// 燃料电池数据
index = FuelCellUtils.getFuelCell(date, VIN, time, index);
break;
case 4:
// 发动机数据
index = EngineDataUtils.getEngineData(date, VIN, time, index);
break;
case 5:
byte[] positioningInfoByte = new byte[9];
System.arraycopy(date, index + 1, positioningInfoByte, 0, 9);
// 解析定位数据
AnalyzePositioningInfo.getPositioningInfo(positioningInfoByte, VIN, time);
index = index + 10;
break;
case 6:
// 极值数据
byte[] ExtremeInfoByte = new byte[14];
System.arraycopy(date, index + 1, ExtremeInfoByte, 0, 14);
ExtremeInfoUtils.getExtremeInfo(ExtremeInfoByte, VIN, time);
index = index + 15;
break;
case 7:
index = AlarmInfoUtils.getAlarmInfo(date, VIN, time, index, 3);
default:
index = date.length;
break;
}
}
}
}
4 handler模块
package com.site.tcp.handler;
import com.site.tcp.server.TcpReceiveStructure;
import com.site.tcp.utils.JacTools;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class RequestDecoder extends ByteToMessageDecoder {
// 记录上次未读完的字节
private ByteBuf tempMessage = Unpooled.buffer();// 申请一块内存保存分包数据,根据最后一次0d出现的位置选择保留哪部分数据
private int CurrentSize = 0; // 本次新传输的字节数
private int TempMessageSize = 0; // 上次剩余的字节数
private ByteBuf WorkByteBuf = Unpooled.buffer();// 最终操作的ByteBuf
private byte[] WorkPackBuf = null;// 最终操作的字节数组
/**
* 用来进行解包操作,得到完整的合法数据包
*
* @param channelHandlerContext
* @param byteBuf
* @param list
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list)
throws Exception {
// log.info("=========开始分析数据========");
WorkByteBuf.clear();
CurrentSize = byteBuf.readableBytes();// 当前流可读取字节数
TempMessageSize = tempMessage.readableBytes();// 上次保存的可读字节数
// 进行合包操作
if (TempMessageSize == 0) {
WorkPackBuf = new byte[CurrentSize];// ByteBuf的转换的字节数组
WorkByteBuf.writeBytes(byteBuf);
WorkByteBuf.readBytes(WorkPackBuf);
} else if (TempMessageSize > 0) {
WorkByteBuf.writeBytes(tempMessage);
WorkByteBuf.writeBytes(byteBuf);
WorkPackBuf = new byte[CurrentSize + TempMessageSize];
WorkByteBuf.readBytes(WorkPackBuf);
} else {
tempMessage.clear();
}
if (WorkPackBuf != null) {
if (JacTools.subBytesToInt(WorkPackBuf, 0, 1) == 0x23 & JacTools.subBytesToInt(WorkPackBuf, 1, 1) == 0x23) {
// 车辆VIN码
String VIN = JacTools.subBytesToString(WorkPackBuf, 4, 17);
// 命令标识
int cmd = JacTools.subBytesToInt(WorkPackBuf, 2, 1);
// 消息长度
int length = JacTools.subBytesToInt(WorkPackBuf, 22, 2);
// 数据域
byte[] data = new byte[length];
System.arraycopy(WorkPackBuf, 24, data, 0, length);
// 数据校验
if (checksum(WorkPackBuf, length)) {
TcpReceiveStructure cmdReceiveStructure = new TcpReceiveStructure(VIN, cmd, data);
list.add(cmdReceiveStructure);
log.info("车辆数据上行》" + JacTools.bytesToHexString(WorkPackBuf));
}
}
}
}
/**
* 安全校验
*
* @param WorkPackBuf
* @param length
* @return
*/
private static Boolean checksum(byte[] WorkPackBuf, int length) {
// 数据和域字节数组
byte[] checksumByte = new byte[1];
System.arraycopy(WorkPackBuf, length + 24, checksumByte, 0, 1);
// 数据和域字符
String checksumStr = JacTools.bytesToHexString(checksumByte);
// 数据域
byte[] data = new byte[length + 22];
System.arraycopy(WorkPackBuf, 2, data, 0, length + 22);
String bcc = JacTools.getBCC(data);
return checksumStr.toUpperCase().equals(bcc);
}
}
package com.site.tcp.handler;
import com.site.tcp.utils.JacTools;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import lombok.extern.slf4j.Slf4j;
/**
* @program: ResponseEncoder
* @description: 发送数据
* @author: 程涛
* @create: 2020-12-10
**/
@Slf4j
public class ResponseEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
String str = ((String) msg).toUpperCase();
byte[] mess = JacTools.hexStringToByte(str);
ByteBuf encoded = ctx.alloc().buffer(mess.length);
encoded.writeBytes(mess);
ctx.write(encoded);
ctx.flush();
log.info("车辆数据下行》 " + str);
}
}
package com.site.tcp.handler;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
/**
* 空闲检测
*
* @author pjmike
* @create 2018-10-25 16:21
*/
@Slf4j
public class ServerIdleStateHandler extends IdleStateHandler {
/**
* 设置空闲检测时间为 60s
*/
private static final int READER_IDLE_TIME = 60;
public ServerIdleStateHandler() {
super(READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.close();
log.error("车辆数据服务空闲检测超时断开连接");
}
}
package com.site.tcp.handler;
import com.site.tcp.command.CMD2;
import com.site.tcp.command.CMD3;
import com.site.tcp.server.TcpReceiveStructure;
import com.site.tcp.utils.JacTools;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Component
@ChannelHandler.Sharable
@Slf4j
public class TcpHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
TcpReceiveStructure cmdReceiveStructure = (TcpReceiveStructure) msg;
switch (cmdReceiveStructure.cmd) {
// 实时信息上报
case 2:
CMD2.cmd2(cmdReceiveStructure);
break;
// 信息补发
case 3:
CMD3.cmd3(cmdReceiveStructure);
break;
default:
break;
}
String start = "2323";
String back = JacTools.intToHex(cmdReceiveStructure.cmd, 1) + "01"
+ JacTools.strToHex(cmdReceiveStructure.VIN, 17) + "010006" + JacTools.getHexTime();
String bcc = JacTools.getBCC(JacTools.hexToByteArr(back));
ctx.writeAndFlush(start + back + bcc);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// cause.printStackTrace();
log.error("-----客户端关闭:" + ctx.channel().remoteAddress(), cause);
// Channel channel = ctx.channel();
// if (!channel.isActive()) {
// ctx.close();
// }
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("-----客户端断开" + ctx.channel().remoteAddress());
ctx.close();
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
log.info("-----客户端注册" + ctx.channel().remoteAddress());
}
}
5.server模块
package com.site.tcp.server;
import java.nio.ByteOrder;
import com.site.tcp.handler.RequestDecoder;
import com.site.tcp.handler.ResponseEncoder;
import com.site.tcp.handler.TcpHandler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
public class NettyServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
// 空闲检测
// .addLast(new ServerIdleStateHandler())
// 分包,大端
.addFirst("decoder",
new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE, 22, 2, 1, 0, false))
.addLast(new RequestDecoder()).addLast(new ResponseEncoder()).addLast(new TcpHandler());
}
}
package com.site.tcp.server;
import java.net.InetSocketAddress;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* @author pjmike
* @create 2018-10-24 15:13
*/
@Component
public class TcpNettyServer {
protected final Logger logger = LoggerFactory.getLogger(TcpNettyServer.class);
/**
* boss 线程组用于处理连接工作
*/
private EventLoopGroup boss = new NioEventLoopGroup();
/**
* work 线程组用于数据处理
*/
private EventLoopGroup work = new NioEventLoopGroup();
@Value("${netty.tcpPort}")
private Integer port;
/**
* 启动Netty Server
*
* @throws InterruptedException
*/
@PostConstruct
public void start() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, work)
// 指定Channel
.channel(NioServerSocketChannel.class)
// 使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
// 服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
.option(ChannelOption.SO_BACKLOG, 1024)
// 设置TCP长连接,一般如果两个小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 将小的数据包包装成更大的帧进行传送,提高网络的负载
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new NettyServerHandlerInitializer());
ChannelFuture future = bootstrap.bind().sync();
if (future.isSuccess()) {
logger.info("启动车俩数据服务端");
}
}
@PreDestroy
public void destory() throws InterruptedException {
boss.shutdownGracefully().sync();
work.shutdownGracefully().sync();
logger.info("关闭Netty");
}
}