之前有文章用java实现了设备端和应用订阅端,那么我根据AIOT的协议也可以实现一个demo物联网平台端,这种简易的平台是实现自己搭建物联网平台的基础。
直接用代码
新建Springboot的maven项目,pom.xml文件导入依赖包(用到了swagger来测试发送数据)
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>boot.ctwing.tcp.server</groupId>
<artifactId>boot-example-ctwing-tcp-server-2.0.5</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-example-ctwing-tcp-server-2.0.5</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>1.9.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包成一个可执行jar -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
SwaggerConfig配置(和之前的没多大区别)
package boot.ctwing.tcp.server.config;
import com.google.common.base.Predicates;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* 蚂蚁舞
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket createRestApi(){
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
.apis(RequestHandlerSelectors.any()).paths(PathSelectors.any())
.paths(Predicates.not(PathSelectors.regex("/error.*")))
.paths(PathSelectors.regex("/.*"))
.build().apiInfo(apiInfo());
}
private ApiInfo apiInfo(){
return new ApiInfoBuilder()
.title("天翼物联网CtWing模拟云端")
.description("模拟AIOT平台Demo")
.version("0.01")
.build();
}
/**
* http://localhost:8177/doc.html 地址和端口根据实际项目查看
*/
}
CtWingConstant
package boot.ctwing.tcp.server.config;
/**
* 蚂蚁舞
*/
public class CtWingConstant {
public static final int port = 8996;
// 登录认证
public static final String tcp_hex_01 = "01";
// 上行数据报文
public static final String tcp_hex_02 = "02";
// 下行数据报文
public static final String tcp_hex_03 = "03";
// 上行心跳
public static final String tcp_hex_04 = "04";
// 登录响应
public static final String tcp_hex_05 = "05";
// 心跳响应
public static final String tcp_hex_06 = "06";
public static long READ_TIME_OUT = 5*60;
public static long WRITE_TIME_OUT = 5*60;
public static long ALL_TIME_OUT = 5*60;
}
TcpServer服务端Netty核心代码
package boot.ctwing.tcp.server.netty;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* 蚂蚁舞
*/
public class TcpServer {
/**
* 启动服务
*/
public void startup(int port) {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap = serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap = serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap = serverBootstrap.option(ChannelOption.SO_BACKLOG, 10240).option(ChannelOption.SO_SNDBUF, 32 * 10240);
serverBootstrap = serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
serverBootstrap = serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576));
serverBootstrap = serverBootstrap.childHandler(new TcpChannelInitializer<SocketChannel>());
System.out.println(port + " netty start success!");
ChannelFuture f = serverBootstrap.bind(port).sync();
f.channel().closeFuture().sync();
} catch (Exception e) {
System.out.println(e.toString());
} finally {
/**
* 退出,释放线程池资源
*/
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
TcpChannelInboundHandlerAdapter
package boot.ctwing.tcp.server.netty;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import boot.ctwing.tcp.server.config.CtWingConstant;
import boot.ctwing.tcp.server.utils.CtWingUtils;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 蚂蚁舞
*/
public class TcpChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{
private final Logger log = LoggerFactory.getLogger(this.getClass());
protected static AttributeKey<String> _channelId = AttributeKey.valueOf("deviceId");
protected void setChannelDeviceId(Channel channel, String deviceId) {
channel.attr(_channelId).set(deviceId);
}
protected String getChannelDeviceId(Channel channel) {
return Optional.ofNullable(channel).map(ch -> ch.attr(_channelId).get()).orElse(null);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
super.channelRegistered(ctx);
log.info("--channelRegistered--"+ctx.channel().id().toString());
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
super.channelUnregistered(ctx);
log.info("--channelUnregistered--"+ctx.channel().id().toString());
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
byte[] req = (byte[]) msg;
if(req.length > 0){
String hex = CtWingUtils.bytesToHexStr(req);
log.info("data--"+hex);
String soh = hex.substring(0,2);
switch (soh) {
case CtWingConstant.tcp_hex_04:
// 终端的心跳上来,平台回复
String back = CtWingConstant.tcp_hex_06;
byte[] data = CtWingUtils.hexStrToBytes(back);
ctx.channel().writeAndFlush(Unpooled.buffer().writeBytes(data));
break;
case CtWingConstant.tcp_hex_02:
// 终端设备发送的上行数据,可返回03协议的数据表示收到消息,也可以不用返回
String dataHex = hex.substring(6);
log.info("hexStr--"+dataHex);
// 如果是字符串 16进制字符串转字符串
log.info("str--"+CtWingUtils.hexStrToStr(dataHex));
break;
case CtWingConstant.tcp_hex_01:
// 0x01 登录认证消息,认证成功响应 05 00 00 (错误按照CtWing的tcp协议返回)
try {
String dataAuth = hex.substring(2);
int len_d = Integer.parseInt(dataAuth.substring(0,4), 16) * 2;
String deviceId = CtWingUtils.hexStrToStr(dataAuth.substring(4,4+len_d));
System.out.println("str--deviceId--"+deviceId);
dataAuth = dataAuth.substring(4+len_d);
int len_s = Integer.parseInt(dataAuth.substring(0, 4), 16) * 2;
String password = CtWingUtils.hexStrToStr(dataAuth.substring(4,len_s+4));
System.out.println("str-password--"+password);
String imei = deviceId.substring(8);
System.out.println("str-imei--"+imei);
TcpChannelMap map = new TcpChannelMap();
map.setChannel(ctx.channel());
map.setDeviceId(imei);
setChannelDeviceId(ctx.channel(),imei);
TcpChannelMapCache.add(imei, map); // demo 暴力解决,直接覆盖
// 直接返回连接成功 纯demo不需要验证
String authSuccess = "050000";
byte[] authSuccessData = CtWingUtils.hexStrToBytes(authSuccess);
ctx.channel().writeAndFlush(Unpooled.buffer().writeBytes(authSuccessData));
} catch (Exception e){
// 有问题直接关闭,暴力解决
ctx.channel().close();
}
default:
break;
}
}
} catch (Exception e) {
System.out.println("channelRead--"+e.toString());
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
log.info("--channelReadComplete--"+ctx.channel().id().toString());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.info("--exceptionCaught--"+ctx.channel().id().toString());
try {
String imei = getChannelDeviceId(ctx.channel());
if(imei != null){
setChannelDeviceId(ctx.channel(), null);
TcpChannelMapCache.remove(imei);
}
ctx.close();
} catch (Exception e) {
System.out.println("exceptionCaught--"+e.toString());
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().read();
log.info("--channelActive--"+ctx.channel().id().toString());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
super.channelInactive(ctx);
log.info("--channelInactive--"+ctx.channel().id().toString());
String imei = getChannelDeviceId(ctx.channel());
if(imei != null){
setChannelDeviceId(ctx.channel(), null);
TcpChannelMapCache.remove(imei);
}
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
super.userEventTriggered(ctx, evt);
log.info("--userEventTriggered--"+ctx.channel().id().toString());
ctx.close();
}
}
TcpChannelInitializer
package boot.ctwing.tcp.server.netty;
import boot.ctwing.tcp.server.config.CtWingConstant;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.timeout.IdleStateHandler;
import java.util.concurrent.TimeUnit;
/**
* 蚂蚁舞
*/
public class TcpChannelInitializer<SocketChannel> extends ChannelInitializer<Channel>{
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(CtWingConstant.READ_TIME_OUT, CtWingConstant.WRITE_TIME_OUT, CtWingConstant.ALL_TIME_OUT, TimeUnit.SECONDS));
// 二者选择一个就可以
// 使用netty自带的
// ch.pipeline().addLast("decoder", new ByteArrayDecoder());
// ch.pipeline().addLast("encoder", new ByteArrayEncoder());
// 使用自定义的
ch.pipeline().addLast(new TcpMessageCodec());
ch.pipeline().addLast(new TcpChannelInboundHandlerAdapter());
}
}
TcpMessageCodec
package boot.ctwing.tcp.server.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import java.util.List;
/**
* 蚂蚁舞
*/
@ChannelHandler.Sharable
public class TcpMessageCodec extends MessageToMessageCodec<ByteBuf, ByteBuf> {
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
byte[] array = new byte[msg.readableBytes()];
msg.getBytes(0, array);
out.add(Unpooled.wrappedBuffer(array));
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
byte[] array = new byte[msg.readableBytes()];
msg.getBytes(0, array);
out.add(array);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
System.out.println("OutIn异常!"+cause);
}
}
TcpChannelMap用来保活终端连接的对象
package boot.ctwing.tcp.server.netty;
import io.netty.channel.Channel;
/**
* 蚂蚁舞
*/
public class TcpChannelMap {
private String deviceId;
private transient volatile Channel channel;
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
@Override
public String toString() {
return "IotTcpChannel [deviceId=" + deviceId + "]";
}
}
TcpChannelMapCache
package boot.ctwing.tcp.server.netty;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 蚂蚁舞
*/
public class TcpChannelMapCache {
public static volatile Map<String, TcpChannelMap> channelMapCache = new ConcurrentHashMap<String, TcpChannelMap>();
public static void add(String code, TcpChannelMap channel){
channelMapCache.put(code,channel);
}
public static TcpChannelMap get(String code){
return channelMapCache.get(code);
}
public static void remove(String code){
channelMapCache.remove(code);
}
public static void save(String code, TcpChannelMap channel) {
if(channelMapCache.get(code) == null) {
add(code,channel);
}
}
public static Map<String, TcpChannelMap> getMap() {
return channelMapCache;
}
public static int size() {
return channelMapCache.size();
}
public static void setMap(Map<String, TcpChannelMap> channelMap) {
TcpChannelMapCache.channelMapCache = channelMap;
}
public static void list() {
for (Map.Entry<String, TcpChannelMap> entry : channelMapCache.entrySet()) {
System.out.println("key= " + entry.getKey() + " and value= "+ entry.getValue().toString());
}
}
}
TcpServerThread启动线程
package boot.ctwing.tcp.server.netty;
/**
* 蚂蚁舞
*/
public class TcpServerThread extends Thread {
private final int port;
public TcpServerThread(int port){
this.port = port;
}
public void run() {
TcpServer tcpServer = new TcpServer();
tcpServer.startup(port);
}
}
CtWingUtils
package boot.ctwing.tcp.server.utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
/**
* 蚂蚁舞
*/
public class CtWingUtils {
private static final Logger log = LoggerFactory.getLogger(CtWingUtils.class);
private static final char[] HEXES = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
public static String bytesToHexStr(byte[] bytes) {
if (bytes == null || bytes.length == 0) {
return null;
}
StringBuilder hex = new StringBuilder(bytes.length * 2);
for (byte b : bytes) {
hex.append(HEXES[(b >> 4) & 0x0F]);
hex.append(HEXES[b & 0x0F]);
}
return hex.toString().toUpperCase();
}
public static byte[] hexStrToBytes(String hex) {
if (hex == null || hex.length() == 0) {
return null;
}
char[] hexChars = hex.toCharArray();
byte[] bytes = new byte[hexChars.length / 2]; // 如果 hex 中的字符不是偶数个, 则忽略最后一个
for (int i = 0; i < bytes.length; i++) {
bytes[i] = (byte) Integer.parseInt("" + hexChars[i * 2] + hexChars[i * 2 + 1], 16);
}
return bytes;
}
public static String strToHexStr(String str) {
StringBuilder sb = new StringBuilder();
byte[] bs = str.getBytes();
int bit;
for (int i = 0; i < bs.length; i++) {
bit = (bs[i] & 0x0f0) >> 4;
sb.append(HEXES[bit]);
bit = bs[i] & 0x0f;
sb.append(HEXES[bit]);
}
return sb.toString().trim();
}
public static String hexStrToStr(String hexStr) {
//能被16整除,肯定可以被2整除
byte[] array = new byte[hexStr.length() / 2];
try {
for (int i = 0; i < array.length; i++) {
array[i] = (byte) (0xff & Integer.parseInt(hexStr.substring(i * 2, i * 2 + 2), 16));
}
hexStr = new String(array, StandardCharsets.UTF_8);
} catch (Exception e) {
e.printStackTrace();
return "";
}
return hexStr;
}
public static String hexLen4Calc(int fixed, int len) {
StringBuilder x = new StringBuilder(Integer.toHexString(len));
int xC = fixed - x.length();
for (int i = 0; i < xC; i++) {
x.insert(0, "0");
}
return x.toString();
}
}
TcpServerController
package boot.ctwing.tcp.server.controller;
import boot.ctwing.tcp.server.config.CtWingConstant;
import boot.ctwing.tcp.server.netty.TcpChannelMap;
import boot.ctwing.tcp.server.netty.TcpChannelMapCache;
import boot.ctwing.tcp.server.utils.CtWingUtils;
import io.netty.buffer.Unpooled;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 蚂蚁舞
*/
@RestController
public class TcpServerController {
@GetMapping(value = {"", "/"})
public String index() {
return "天翼物联网CtWing终端模拟mock";
}
@GetMapping("/terminalList")
public List<Map<String,String>> terminalList() {
List<Map<String,String>> list = new ArrayList<>();
for (Map.Entry<String, TcpChannelMap> entry : TcpChannelMapCache.channelMapCache.entrySet()) {
Map<String, String> map = new HashMap<String, String>();
map.put("imei", entry.getKey());
map.put("key_id", entry.getValue().getChannel().id().toString());
list.add(map);
}
return list;
}
@GetMapping("/downData")
public String downData(@RequestParam(name="imei", required = true) String imei,
@RequestParam(name="content", required = true) String content) {
TcpChannelMap tcpChannelMap = TcpChannelMapCache.get(imei);
if(tcpChannelMap != null && tcpChannelMap.getChannel().isOpen()){
String upHexStr = CtWingUtils.strToHexStr(content);
String upHexStrLenHex = CtWingUtils.hexLen4Calc(4, upHexStr.length()/2);
String cmd = CtWingConstant.tcp_hex_03+upHexStrLenHex+upHexStr;
System.out.println(cmd);
byte[] bytes = CtWingUtils.hexStrToBytes(cmd);
tcpChannelMap.getChannel().writeAndFlush(Unpooled.buffer().writeBytes(bytes));
return "success";
}
return "fail";
}
}
代码目录结构
├─boot-example-ctwing-tcp-server-2.0.5
│ │ pom.xml
│ │
│ └─src
│ ├─main
│ │ ├─java
│ │ │ └─boot
│ │ │ └─ctwing
│ │ │ └─tcp
│ │ │ └─server
│ │ │ │ BootCtWingTcpServer.java
│ │ │ │
│ │ │ ├─config
│ │ │ │ CtWingConstant.java
│ │ │ │ SwaggerConfig.java
│ │ │ │
│ │ │ ├─controller
│ │ │ │ TcpServerController.java
│ │ │ │
│ │ │ ├─netty
│ │ │ │ TcpChannelInboundHandlerAdapter.java
│ │ │ │ TcpChannelInitializer.java
│ │ │ │ TcpChannelMap.java
│ │ │ │ TcpChannelMapCache.java
│ │ │ │ TcpMessageCodec.java
│ │ │ │ TcpServer.java
│ │ │ │ TcpServerThread.java
│ │ │ │
│ │ │ └─utils
│ │ │ CtWingUtils.java
│ │ │
│ │ └─resources
│ │ application.properties
│ │ logback-spring.xml
│ │
│ └─test
│ └─java
│ └─boot
│ └─ctwing
│ └─tcp
│ └─server
│ BootCtWingTcpServerTest.java
│
我们先启动简易demo的Iot服务端
17:50:48.274 spring-boot-logging [main] INFO s.d.s.w.s.ApiListingReferenceScanner - Scanning for api listing references
17:50:48.451 spring-boot-logging [main] INFO o.a.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8179"]
17:50:48.466 spring-boot-logging [main] INFO o.a.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
17:50:48.486 spring-boot-logging [main] INFO o.s.b.w.e.tomcat.TomcatWebServer - Tomcat started on port(s): 8179 (http) with context path ''
17:50:48.491 spring-boot-logging [main] INFO b.c.tcp.server.BootCtWingTcpServer - Started BootCtWingTcpServer in 6.881 seconds (JVM running for 8.157)
server netty start
Hello World!
8996 netty start success!
在启动之前模拟设备端对本地的模拟连接,启动2个(imei需要更改)
如此可以在服务端的接口中查到2个终端设备连上,并保持着连接
可以看到有2个设备连上了Demo物联网平台
[
{
"key_id": "4f31e0c8",
"imei": "869401041201815"
},
{
"key_id": "99ac6929",
"imei": "359951090161283"
}
]
测试一下用这个demo平台下发
在这个设备的终端收到的消息(日志查看)
18:29:18.688 spring-boot-logging [nioEventLoopGroup-2-1] INFO b.c.t.t.n.TcpChannelInboundHandlerAdapter - data--03001F6D7977E89A82E89A81E8889E2DE89A82E89A81E4B99FE4BC9AE8B7B3E8889E
18:29:18.688 spring-boot-logging [nioEventLoopGroup-2-1] INFO b.c.t.t.n.TcpChannelInboundHandlerAdapter - hexStr--6D7977E89A82E89A81E8889E2DE89A82E89A81E4B99FE4BC9AE8B7B3E8889E
18:29:18.688 spring-boot-logging [nioEventLoopGroup-2-1] INFO b.c.t.t.n.TcpChannelInboundHandlerAdapter - str--myw蚂蚁舞-蚂蚁也会跳舞
channelReadComplete
在测试一下终端上报数据
看云端demo的打印日志
18:30:05.745 spring-boot-logging [nioEventLoopGroup-3-2] INFO b.c.t.s.n.TcpChannelInboundHandlerAdapter - data--02000FE68891E698AFE89A82E89A81E8889E
18:30:05.745 spring-boot-logging [nioEventLoopGroup-3-2] INFO b.c.t.s.n.TcpChannelInboundHandlerAdapter - hexStr--E68891E698AFE89A82E89A81E8889E
18:30:05.745 spring-boot-logging [nioEventLoopGroup-3-2] INFO b.c.t.s.n.TcpChannelInboundHandlerAdapter - str--我是蚂蚁舞
18:30:05.745 spring-boot-logging [nioEventLoopGroup-3-2] INFO b.c.t.s.n.TcpChannelInboundHandlerAdapter - --channelReadComplete--4f31e0c8
这样就是一套最简单的IOT物联网平台,部署到云服务器上可以简单对接终端设备,这也是一套自建物联网平台的一套基础demo代码(用的是天翼物联网平台(AIOT)的TCP透传协议)
具体对接设备,那么在这套协议之上定义一套物联网开发的自定义协议。