文章目录
- 一、需求概述
- 二、设计选择
- 三、代码结构
- 四、代码放送
- 五、本地调试
- 1. 服务端日志
- 2. 客户端日志
- 3. 断线重连日志
- 六、服务器部署运行
- 1. 源码下载
- 2. 打包镜像
- 3. 运行容器
一、需求概述
最近开发某数据采集系统,系统整体的数据流程图如下:
同时,数据中心又需要下发命令到某客户端执行,客户端执行完成后将结果通知到数据中心。
二、设计选择
考虑功能点:
- 客户端多个,一段时间内数量可控相对固定。
- 客户端主动连接服务端,支持断线重连。
- 客户端与服务端支持双向通信。
选择TCP协议作为客户端与数据中心之间的交互协议比较合适,数据中心服务器作为tcp-server开放端口供tcp-client连接。
三、代码结构
四、代码放送
https://gitcode.com/00fly/tcp-show
或者使用下面的备份文件恢复成原始的项目代码
如何恢复,请移步查阅:神奇代码恢复工具
//goto docker\docker-compose.yml
version: '3.7'
services:
tcp-server:
image: registry.cn-shanghai.aliyuncs.com/00fly/tcp-show-server:0.0.1
container_name: tcp-server
deploy:
resources:
limits:
cpus: '1.0'
memory: 64M
reservations:
cpus: '0.05'
memory: 64M
ports:
- 8000:8000
restart: on-failure
logging:
driver: json-file
options:
max-size: '5m'
max-file: '1'
tcp-client:
image: registry.cn-shanghai.aliyuncs.com/00fly/tcp-show-client:0.0.1
container_name: tcp-client
depends_on:
- tcp-server
deploy:
resources:
limits:
cpus: '1.0'
memory: 64M
reservations:
cpus: '0.05'
memory: 64M
restart: on-failure
environment:
#- TCP_SERVER=192.168.15.202
- TCP_SERVER=tcp-server
logging:
driver: json-file
options:
max-size: '5m'
max-file: '1'
//goto docker\restart-server.sh
#!/bin/bash
docker-compose down tcp-server
sleep 10
docker-compose up -d tcp-server
docker logs -f tcp-server
//goto docker\restart.sh
#!/bin/bash
docker-compose down && docker-compose up -d
sleep 2
docker logs -f network-server
//goto docker\stop.sh
#!/bin/bash
docker-compose down
//goto Dockerfile
#基础镜像
#FROM openjdk:8-jre-alpine
FROM adoptopenjdk/openjdk8-openj9:alpine-slim
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
echo 'Asia/Shanghai' >/etc/timezone
#拷贝发布包
COPY target/*.jar /app.jar
#启动脚本
ENTRYPOINT ["java", "-Djava.security.egd=file:/dev/./urandom", "-Xshareclasses", "-Xquickstart", "-jar", "/app.jar"]
//goto pom-client.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fly</groupId>
<artifactId>tcp-show</artifactId>
<version>0.0.1</version>
<name>tcp-show</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<docker.hub>registry.cn-shanghai.aliyuncs.com</docker.hub>
<java.version>1.8</java.version>
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-client-${project.version}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 方式一:带dependencies运行包 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.5.0</version>
<configuration>
<!-- 是否添加assemblyId -->
<appendAssemblyId>false</appendAssemblyId>
<archive>
<manifest>
<mainClass>com.fly.protocol.tcp.run.StartClient</mainClass>
</manifest>
</archive>
<descriptorRefs>
<!--将所有外部依赖JAR都加入生成的JAR包 -->
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution><!-- 配置执行器 -->
<id>make-assembly</id>
<phase>package</phase><!-- 绑定到package阶段 -->
<goals>
<goal>single</goal><!-- 只运行一次 -->
</goals>
</execution>
</executions>
</plugin>
<!-- 添加docker-maven插件 -->
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.40.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>build</goal>
<!--<goal>push</goal>-->
<!--<goal>remove</goal>-->
</goals>
</execution>
</executions>
<configuration>
<!-- 连接到带docker环境的linux服务器编译image -->
<!-- <dockerHost>http://192.168.182.10:2375</dockerHost> -->
<!-- Docker 推送镜像仓库地址 -->
<pushRegistry>${docker.hub}</pushRegistry>
<images>
<image>
<name>
${docker.hub}/00fly/${project.artifactId}-client:${project.version}</name>
<build>
<dockerFileDir>${project.basedir}</dockerFileDir>
</build>
</image>
</images>
</configuration>
</plugin>
</plugins>
</build>
</project>
//goto pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.fly</groupId>
<artifactId>tcp-show</artifactId>
<version>0.0.1</version>
<name>tcp-show</name>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<docker.hub>registry.cn-shanghai.aliyuncs.com</docker.hub>
<java.version>1.8</java.version>
<skipTests>true</skipTests>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.10</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}-server-${project.version}</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.10.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<!-- 方式一:带dependencies运行包 -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.5.0</version>
<configuration>
<!-- 是否添加assemblyId -->
<appendAssemblyId>false</appendAssemblyId>
<archive>
<manifest>
<mainClass>com.fly.protocol.tcp.run.StartServer</mainClass>
</manifest>
</archive>
<descriptorRefs>
<!--将所有外部依赖JAR都加入生成的JAR包 -->
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution><!-- 配置执行器 -->
<id>make-assembly</id>
<phase>package</phase><!-- 绑定到package阶段 -->
<goals>
<goal>single</goal><!-- 只运行一次 -->
</goals>
</execution>
</executions>
</plugin>
<!-- 添加docker-maven插件 -->
<plugin>
<groupId>io.fabric8</groupId>
<artifactId>docker-maven-plugin</artifactId>
<version>0.40.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>build</goal>
<!--<goal>push</goal>-->
<!--<goal>remove</goal>-->
</goals>
</execution>
</executions>
<configuration>
<!-- 连接到带docker环境的linux服务器编译image -->
<!-- <dockerHost>http://192.168.182.10:2375</dockerHost> -->
<!-- Docker 推送镜像仓库地址 -->
<pushRegistry>${docker.hub}</pushRegistry>
<images>
<image>
<name>
${docker.hub}/00fly/${project.artifactId}-server:${project.version}</name>
<build>
<dockerFileDir>${project.basedir}</dockerFileDir>
</build>
</image>
</images>
</configuration>
</plugin>
</plugins>
</build>
</project>
//goto src\main\java\com\fly\protocol\tcp\bio\TcpClient.java
package com.fly.protocol.tcp.bio;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TcpClient implements Runnable
{
private String ip;
private int port;
private Socket socket;
private DataOutputStream dataOutputStream;
private String clientName;
private boolean isClientCoreRun = false;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
private ExecutorService executor = Executors.newFixedThreadPool(2);
public TcpClient(String clientName)
{
super();
this.clientName = clientName;
}
/**
*
* @param ip 服务端IP
* @param port 服务端PORT
* @return
*/
public boolean connectServer(String ip, int port)
{
try
{
this.ip = ip;
this.port = port;
socket = new Socket(InetAddress.getByName(ip), port);
log.info("****** TcpClient will connect to Server {}:{}", ip, port);
scheduler.scheduleAtFixedRate(this::checkConnection, 0, 10, TimeUnit.SECONDS);
isClientCoreRun = true;
dataOutputStream = new DataOutputStream(socket.getOutputStream());
dataOutputStream.writeUTF(clientName);
dataOutputStream.flush();
}
catch (IOException e)
{
log.error(e.getMessage());
isClientCoreRun = false;
}
return isClientCoreRun;
}
/**
* 检查TCP连接
*/
private void checkConnection()
{
if (socket == null || socket.isClosed())
{
log.error("Connection lost, attempting to reconnect");
reconnect();
}
}
private void reconnect()
{
try
{
socket = new Socket(InetAddress.getByName(ip), port);
log.info("****** TcpClient will connect to Server {}:{}", ip, port);
isClientCoreRun = true;
executor.execute(new ReceiveMsg());
dataOutputStream = new DataOutputStream(socket.getOutputStream());
dataOutputStream.writeUTF(clientName);
dataOutputStream.flush();
}
catch (IOException e)
{
log.error(e.getMessage());
isClientCoreRun = false;
}
}
/**
* 发送报文
*/
public void sendMsg(String msg)
{
try
{
dataOutputStream.writeUTF(msg);
dataOutputStream.flush();
}
catch (IOException e)
{
log.error(e.getMessage());
closeClientConnect();
}
}
/**
* 断开客户端与服务端的连接
*/
public void closeClientConnect()
{
if (dataOutputStream != null)
{
try
{
dataOutputStream.close();
isClientCoreRun = false;
if (socket != null)
{
socket.close();
}
}
catch (IOException e)
{
log.error(e.getMessage());
}
}
}
@Override
public void run()
{
executor.execute(new ReceiveMsg());
// 发送数据
scheduler.scheduleAtFixedRate(() -> {
sendMsg(RandomStringUtils.randomAlphanumeric(10));
}, RandomUtils.nextInt(1, 10), 10, TimeUnit.SECONDS);
}
class ReceiveMsg implements Runnable
{
private DataInputStream dataInputStream;
public ReceiveMsg()
{
try
{
// 数据输入流
dataInputStream = new DataInputStream(socket.getInputStream());
}
catch (IOException e)
{
log.error(e.getMessage());
}
}
@Override
public void run()
{
try
{
// server停止后, 会影响接受消息线程工作
while (isClientCoreRun)
{
String msg = dataInputStream.readUTF();
log.info("{} get msg: {}", clientName, msg);
}
}
catch (IOException e)
{
log.error(e.getMessage());
// 防止重连失败
closeClientConnect();
}
}
}
}
//goto src\main\java\com\fly\protocol\tcp\bio\TcpServer.java
package com.fly.protocol.tcp.bio;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TcpServer implements Runnable
{
private ServerSocket serverSocket;
private boolean isServerCoreRun = false;
private Map<String, NewClient> allClient = new HashMap<>();
public boolean startServer(String ip, int port)
{
try
{
serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(ip, port));
isServerCoreRun = true;
}
catch (IOException e)
{
log.error(e.getMessage());
isServerCoreRun = false;
}
return isServerCoreRun;
}
/**
* 关闭服务
*
* #1 断开与所有客户端的连接,并将客户端容器中的所有已连接的客户端清空。 #2 关闭服务器套接字
*/
public void closeServer()
{
try
{
isServerCoreRun = false;
for (Map.Entry<String, NewClient> all : this.allClient.entrySet())
{
all.getValue().isNewClientRun = false;
all.getValue().socket.close();
}
allClient.clear();
serverSocket.close();
}
catch (IOException e)
{
log.error(e.getMessage());
}
}
/**
* 向客户端发送报文
*/
public void sendMsg(String clientName, String msg)
{
if (allClient.containsKey(clientName))
{
allClient.get(clientName).sendMsg(msg);
}
}
@Override
public void run()
{
try
{
log.info("TcpServer will start");
while (isServerCoreRun)
{
// 阻塞式等待客户端连接
Socket socket = serverSocket.accept();
String clientName = new DataInputStream(socket.getInputStream()).readUTF();
String clientIP = socket.getInetAddress().getHostAddress();
int clientPort = socket.getPort();
String clientConnectDateTime = DateFormatUtils.format(System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss");
NewClient newClient = new NewClient(socket, clientName, clientIP, clientPort, clientConnectDateTime);
allClient.put(clientName, newClient);
log.info("**** add new client ===> {}", allClient.keySet());
new Thread(newClient).start();
}
}
catch (IOException e)
{
log.error(e.getMessage());
}
}
class NewClient implements Runnable
{
// 客户端套接字
private Socket socket;
// 数据输入流
private DataInputStream dataInputStream;
// 数据输出流
private DataOutputStream dataOutputStream;
// 客户端运行(收、发报文)状态
private boolean isNewClientRun = true;
// 客户端的名称
private String clientName;
// 客户端的IP地址
private String clientIP;
public NewClient()
{
}
// 构造方法初始化成员属性
public NewClient(Socket socket, String clientName, String clientIP, int clientPort, String clientConnectDateTime)
{
this.socket = socket;
this.clientName = clientName;
this.clientIP = clientIP;
try
{
// 创建客户端数据输入、输出流
dataInputStream = new DataInputStream(socket.getInputStream());
dataOutputStream = new DataOutputStream(socket.getOutputStream());
}
catch (IOException e)
{
log.error(e.getMessage());
closeCurrentClient();
}
}
@Override
public void run()
{
try
{
// 客户端在运行才能收发报文
while (this.isNewClientRun)
{
// 获取到客户端发送的报文
String msg = dataInputStream.readUTF();
if (StringUtils.isNotBlank(msg))
{
log.info("clientName: {}, clientIP: {}, send msg ===> {}", clientName, clientIP, msg);
}
// 向客户端传送数据
int index = 0;
for (String key : allClient.keySet())
{
index++;
if (StringUtils.equals(key, clientName))
{
allClient.get(key).sendMsg("from server: " + msg + StringUtils.repeat("-----", index));
}
}
}
}
catch (IOException e)
{
log.error(e.getMessage());
closeCurrentClient();
}
}
/**
* 断开当前客户端的连接释放资源
*/
public void closeCurrentClient()
{
try
{
// 结束客户端的运行状态
isNewClientRun = false;
// 断开数据输出出流
if (dataOutputStream != null)
{
dataOutputStream.close();
}
// 断开数据输入出流
if (dataInputStream != null)
{
dataInputStream.close();
}
// 断开客户端套解析
if (socket != null)
{
socket.close();
}
// 将该客户端从客户端容器中删除
allClient.remove(clientName);
log.info("**** remove client ===> {}", allClient.keySet());
}
catch (IOException e)
{
log.error(e.getMessage());
}
}
/**
* 发送报文
*/
public void sendMsg(String msg)
{
try
{
// 发送报文
dataOutputStream.writeUTF(msg);
// 清空报文缓存
dataOutputStream.flush();
}
catch (IOException e)
{
log.error(e.getMessage());
closeCurrentClient();
}
}
}
}
//goto src\main\java\com\fly\protocol\tcp\run\StartClient.java
package com.fly.protocol.tcp.run;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import com.fly.protocol.tcp.bio.TcpClient;
public class StartClient
{
public static void main(String[] args)
{
// docker环境下优先使用docker-compose中environment值
String serverIp = StringUtils.defaultIfBlank(System.getenv().get("TCP_SERVER"), "127.0.0.1");
IntStream.rangeClosed(1, 3).forEach(i -> {
TcpClient client = new TcpClient("CLIENT_" + i);
if (client.connectServer(serverIp, 8000))
{
new Thread(client).start();
}
});
}
}
//goto src\main\java\com\fly\protocol\tcp\run\StartServer.java
package com.fly.protocol.tcp.run;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import com.fly.protocol.tcp.bio.TcpServer;
public class StartServer
{
public static void main(String[] args)
{
TcpServer server = new TcpServer();
if (server.startServer("0.0.0.0", 8000))
{
Executors.newScheduledThreadPool(2).scheduleAtFixedRate(() -> {
int index = RandomUtils.nextInt(1, 4);
server.sendMsg("CLIENT_" + index, "random: " + RandomStringUtils.randomAlphanumeric(10));
}, 10, 60, TimeUnit.SECONDS);
new Thread(server).start();
}
}
}
//goto src\main\resources\log4j2.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration status="off" monitorInterval="0">
<appenders>
<console name="Console" target="system_out">
<patternLayout
pattern="%d{yyyy-MM-dd HH:mm:ss} [%t] %-5level %logger{36} - %msg%n" />
</console>
</appenders>
<loggers>
<root level="INFO">
<appender-ref ref="Console" />
</root>
</loggers>
</configuration>
五、本地调试
先后启动StartServer、StartClient
1. 服务端日志
2. 客户端日志
3. 断线重连日志
六、服务器部署运行
1. 源码下载
在安装好jdk、maven、docker环境的服务器下载源码
git clone https://gitcode.com/00fly/tcp-show.git
2. 打包镜像
#server打包
mvn clean package
#client打包
mvn clean package -f pom-client.xml
3. 运行容器
上传docker文件目录到服务器,执行
sh restart.sh
sh restart-server.sh
docker logs -f tcp-server
docker logs -f tcp-client
有任何问题和建议,都可以向我提问讨论,大家一起进步,谢谢!
-over-