简单的springboot 编写Socket服务接口
1.需求
我们项目中有部分老接口为票据接口,其中实现为java socket
形式进行实现,但是其中大部分信息都是原始公司封装的包进行实现的,想要修改非常费劲,所以此处简单了解了一下socket
,自己简单的 编写了两个测试接口,方便以后如果需要自己添加socket接口,可以快速编写。
2. 简单实现
编写的接口为测试接口,整体结构相对简单,主要就是客户端发起一个请求,请求信息前6位为请求串长度,其余为请求的请求体,发送信息到服务端后,服务端使用线程池异步处理信息,最终返回处理之后的响应信息,客户端则接收响应信息,同样的步骤处理响应信息,前6位为响应信息长度,然后解析响应信息即可,因为为简单案例,所以没有进行数据通信加密。
2.1 客户端实现
客户端代码相对简单,直接写入到controller当中了,具体实现代码如下:
package cn.git.controller;
import cn.git.entity.Product;
import cn.git.socket.SocketUtil;
import com.alibaba.fastjson.JSONObject;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.math.BigDecimal;
import java.net.Socket;
/**
* @description: Socket测试controller
* @program: bank-credit-sy
* @author: lixuchun
* @create: 2023-03-20
*/
@RestController
@RequestMapping("/socket")
public class SocketController {
/**
* 异步发送200个请求,模拟多用户
*/
@GetMapping("/client")
public String client() {
// 异步发送200个请求,模拟多用户
for (int i = 0; i < 200; i++) {
int finalI = i;
new Thread(()-> {
try {
// 创建Socket对象
Socket socket = new Socket("localhost", 7777);
// 设置超时时间
socket.setSoTimeout(60000);
// 测试产品
Product product = new Product();
product.setAmount(new BigDecimal(finalI));
product.setCycle(12);
product.setEndTime("2018-08-08");
product.setName("test");
product.setRate(new BigDecimal(1));
product.setRaised(new BigDecimal(0));
// 拼接请求报文
String message = JSONObject.toJSONString(product);
String reqLengthStr = SocketUtil.leftFixedZero(6, message.length());
// 发送请求报文
PrintStream out = new PrintStream(socket.getOutputStream());
out.println(reqLengthStr.concat(message));
// 获取服务端返回的消息长度信息
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
char[] lengthByte = new char[6];
in.read(lengthByte);
String rspLengthStr = new String(lengthByte);
int responseLength = Integer.parseInt(rspLengthStr);
// 获取服务端返回的消息体信息
char[] responseByte = new char[responseLength];
in.read(responseByte);
String responseBody = new String(responseByte);
// 打印返回结果
System.out.println("返回结果为 : ".concat(responseBody));
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
return "处理成功啦";
}
}
2.2 服务端代码
服务端代码相对复杂一些,主要有socket服务初始化,公共线程池,工具类以及接口处理handle类。具体实现如下:
-
socket初始化类
package cn.git.socket; import cn.git.mapper.ProductMapper; import cn.git.socket.handler.SocketHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.io.IOException; import java.net.ServerSocket; import java.net.Socket; /** * @description: socket接口入口信息 * @program: bank-credit-sy * @author: lixuchun * @create: 2023-03-20 */ @Slf4j @Component public class CustomSocketServer { @Autowired private ProductMapper productMapper; /** * 初始化调用接口 * * 异步启动socket监听服务,端口 7777 */ @PostConstruct public void socketServerInit() throws IOException { new Thread(() -> { try { // 监听7777端口 log.info("开始启动socket服务信息,端口监听 7777"); ServerSocket serverSocket = new ServerSocket(7777); // 循环监听 while (true) { log.info("等待客户端连接..."); Socket clientSocket = serverSocket.accept(); ThreadPoolUtil.THREAD_POOL.execute( // 构建handler SocketHandler.builder().clientSocket(clientSocket).productMapper(productMapper).build() ); log.info("客户端连接成功,当前连接数:{}", ThreadPoolUtil.THREAD_POOL.getActiveCount()); } } catch (Exception e) { e.printStackTrace(); } }).start(); } }
-
通用线程池相关类
自定义线程池工厂实现如下package cn.git.socket; import cn.hutool.core.util.StrUtil; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * 自定义线程池工厂 * @program: bank-credit-sy * @author: lixuchun * @create: 2021-12-25 */ public class OnlineThreadFactory implements ThreadFactory { /** * 自增线程序号 */ private final AtomicInteger threadNumber = new AtomicInteger(1); /** * 线程名称前缀 */ private final String threadNamePrefix; /** * 构造方法 * @param threadNamePrefix 方法前缀 */ public OnlineThreadFactory(String threadNamePrefix) { this.threadNamePrefix = threadNamePrefix.concat(StrUtil.DASHED); } /** * Constructs a new {@code Thread}. Implementations may also initialize * priority, name, daemon status, {@code ThreadGroup}, etc. * @param runnable a runnable to be executed by new thread instance * @return constructed thread, or {@code null} if the request to * create a thread is rejected */ @Override public Thread newThread(Runnable runnable) { // 设置线程池名称 Thread thread = new Thread(runnable , threadNamePrefix.concat(StrUtil.toString(threadNumber.getAndIncrement()))); // 设置守护线程 if (thread.isDaemon()) { thread.setDaemon(false); } // 同意设置程默认优先级 5 if (thread.getPriority() != Thread.NORM_PRIORITY) { thread.setPriority(Thread.NORM_PRIORITY); } return thread; } }
线程池工具类
package cn.git.socket; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @description: 线程池工具类 * @program: bank-credit-sy * @author: lixuchun * @create: 2022-08-16 10:58:07 */ public class ThreadPoolUtil { /** * 线程池线程名称 */ private static final String DICS_THREAD_POOL_PREFIX = "DICS-SOCKET"; /** * 超时时间 单位毫秒 */ private static final int REQ_TIME_OUT = 10 * 1000; /** * 阻塞队列大小 */ private static final int QUEUE_SIZE = 200; /** * 核心线程池数量 */ private static final int CORE_THREAD_NUM = 5; /** * 最大线程池数量 */ private static final int MAX_THREAD_NUM = 20; /** * 线程池构造参数 */ public static ThreadPoolExecutor THREAD_POOL = new ThreadPoolExecutor(CORE_THREAD_NUM, MAX_THREAD_NUM, REQ_TIME_OUT, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE), new OnlineThreadFactory(DICS_THREAD_POOL_PREFIX)); }
-
业务处理handle类
package cn.git.socket.handler; import cn.git.entity.Product; import cn.git.mapper.ProductMapper; import cn.git.socket.SocketUtil; import cn.hutool.core.util.IdUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import lombok.*; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /** * @description: socket请求处理类 * @program: bank-credit-sy * @author: lixuchun * @create: 2023-03-20 */ @Data @Builder @NoArgsConstructor @AllArgsConstructor public class SocketHandler implements Runnable { /** * 订单信息mapper */ private ProductMapper productMapper; /** * 客户端socket */ private Socket clientSocket; /** * When an object implementing interface <code>Runnable</code> is used * to create a thread, starting the thread causes the object's * <code>run</code> method to be called in that separately executing * thread. * <p> * The general contract of the method <code>run</code> is that it may * take any action whatsoever. * * @see Thread#run() */ @SneakyThrows @Override public void run() { // 获取请求数据信息 System.out.println("接收数据开始处理!"); BufferedReader in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream())); PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true); // 读取数据前6位,获取请求数据长度 char[] bodyBytes = new char[6]; in.read(bodyBytes); String dataLengthStr = new String(bodyBytes); // 获取请求数据信息 Integer dataLength = Integer.parseInt(dataLengthStr); System.out.println("请求数据长度:" + dataLength); bodyBytes = new char[dataLength]; in.read(bodyBytes); String requestBodyInfo = new String(bodyBytes); System.out.println("请求数据:" + requestBodyInfo); // 请求数据转换为Person对象 Product product = JSON.parseObject(requestBodyInfo, Product.class); product.setId(IdUtil.simpleUUID()); productMapper.insert(product); // 响应数据 String rspJSONInfo = JSONObject.toJSONString(product); // 响应数据长度标识位 eg: 000667 String prefixLength = SocketUtil.leftFixedZero(6, rspJSONInfo.length()); // 最终响应数据 String finalRspInfo = prefixLength.concat(rspJSONInfo); System.out.println("响应数据:" + finalRspInfo); out.println(finalRspInfo); } }
-
socket工具类
package cn.git.socket; /** * @description: socket工具类 * @program: bank-credit-sy * @author: lixuchun * @create: 2023-03-20 */ public class SocketUtil { /** * 左补0 * eg: length = 6, num = 123, return 000123 * * @param length 长度 * @param num 数字 * @return */ public static String leftFixedZero(int length, int num) { return String.format("%0" + length + "d", num); } }
3.测试
启动服务,观察socket监听端口 7777
是否正常启动监听,观察如下,socket服务端正常启动监听端口
开始模拟多客户端调用,请求 http://localhost:8088/socket/client
接口,循环异步发起 200
socket 请求。
观察后台信息
观察数据库,发现数据已经正确导入了, 成功插入了 200
条数据信息