一、项目展示
下图(模拟的数据可视化大屏)中数据是动态显示的
二、项目简介
描述:使用Client模拟了硬件设备,比如可燃气体浓度检测器。Client通过Socket与Server建立连接,Server保存数据到txt文件,并使用WebSocket将数据推送到数据可视化大屏
工作:通过多线程+NIO优化了Server性能
原理图:
三、代码实现
Server
NioSocketServerService.java
package com.example.server;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Service
public class NioSocketServerService {
private static final int PORT = 8081;
private static final int TIMEOUT = 5000;
private static final BlockingQueue<String> writeQueue = new LinkedBlockingQueue<>();
@PostConstruct
public void startServer() {
for (int i = 0; i < 4; i++) {
new Thread(new FileWriterTask(writeQueue)).start();
}
new Thread(() -> {
try {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server is listening on port " + PORT);
while (true) {
if (selector.select(TIMEOUT) == 0) {
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) {
handleAccept(key, selector);
} else if (key.isReadable()) {
handleRead(key);
}
} catch (IOException e) {
key.cancel(); // 取消键的注册,这意味着该通道不再被选择器监视
key.channel().close(); // 关闭通道,释放资源
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}
private void handleAccept(SelectionKey key, Selector selector) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
SocketChannelHandler.addBuffer(socketChannel);
System.out.println("New client connected: " + socketChannel.getRemoteAddress());
}
private void handleRead(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
SocketChannelHandler.readFromChannel(socketChannel, writeQueue);
}
}
SocketChannelHandler.java
package com.example.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
public class SocketChannelHandler {
private static final String DIRECTORY = "data/";
private static final int BUFFER_SIZE = 2048;
private static final Map<SocketChannel, ByteBuffer> bufferMap = new ConcurrentHashMap<>();
private static final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
public static void addBuffer(SocketChannel socketChannel) {
bufferMap.put(socketChannel, ByteBuffer.allocateDirect(BUFFER_SIZE));
}
public static void readFromChannel(SocketChannel socketChannel, BlockingQueue<String> writeQueue) throws IOException {
ByteBuffer buffer = bufferMap.get(socketChannel);
buffer.clear();
int bytesRead;
try {
bytesRead = socketChannel.read(buffer);
} catch (IOException e) {
System.err.println("Error reading from socket: " + e.getMessage());
socketChannel.close();
bufferMap.remove(socketChannel);
return;
}
if (bytesRead == -1) { // 读取到-1表示客户端已关闭连接,移除缓冲区
socketChannel.close();
bufferMap.remove(socketChannel);
} else if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data);
String[] dataParts = message.split(" : ", 2);
if (dataParts.length == 2) {
String deviceId = dataParts[0].trim();
String deviceData = dataParts[1].trim();
String currentTime = LocalDateTime.now().format(dateTimeFormatter);
String dataToWrite = DIRECTORY + deviceId + ".txt : " + currentTime + " : " + deviceData;
writeQueue.add(dataToWrite);
WebSocketServer.sendMessage(deviceId + " : " + currentTime + " : " + deviceData);
}
}
}
}
FileWriterTask.java
package com.example.server;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class FileWriterTask implements Runnable {
private static final int BATCH_SIZE = 10;
/**
* BlockingQueue是JUC包中的一个接口,提供了线程安全的队列操作
* 支持阻塞的put和take操作,当队列满时put会阻塞,直到队列有空位;当队列空时take会阻塞,直到队列有元素
* 其主要实现包括:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue
*/
private final BlockingQueue<String> writeQueue;
public FileWriterTask(BlockingQueue<String> writeQueue) {
this.writeQueue = writeQueue;
}
@Override
public void run() {
while (true) {
try {
List<String> dataList = new ArrayList<>();
// 读取BATCH_SIZE条数据,或等待超时后退出循环
while (dataList.size() < BATCH_SIZE) {
String data = writeQueue.poll(100, TimeUnit.MILLISECONDS);
if (data != null) {
dataList.add(data);
} else {
break;
}
}
// 如果读取到数据,则将其写入文件
if (!dataList.isEmpty()) {
for (String data : dataList) {
String[] dataParts = data.split(" : ");
if (dataParts.length == 3) {
String fileName = dataParts[0].trim();
try (FileChannel fileChannel = FileChannel.open(Paths.get(fileName), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.APPEND)) {
ByteBuffer buffer = ByteBuffer.wrap((data + System.lineSeparator()).getBytes());
fileChannel.write(buffer);
}
}
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
}
Client
MultiThreadedSocketClient.java
package com.example.client;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MultiThreadedSocketClient {
public static void main(String[] args) {
String hostname = "localhost";
int port = 8081;
int numberOfDevices = 1000;
ExecutorService executor = Executors.newFixedThreadPool(numberOfDevices);
for (int i = 1; i <= numberOfDevices; i++) {
String deviceId = "Device" + i;
executor.submit(new DeviceClient(hostname, port, deviceId));
}
executor.shutdown();
}
}
DeviceClient.java
package com.example.client;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
class DeviceClient implements Runnable {
private String hostname;
private int port;
private String deviceId;
private Random random = new Random();
private static final int MAX_RETRIES = 15;
private static final int RETRY_DELAY_MS = 1000;
public DeviceClient(String hostname, int port, String deviceId) {
this.hostname = hostname;
this.port = port;
this.deviceId = deviceId;
}
@Override
public void run() {
int attempt = 0;
boolean connected = false;
while (attempt < MAX_RETRIES && !connected) {
try {
Thread.sleep(random.nextInt(15000));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try (Socket socket = new Socket(hostname, port);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true)) {
connected = true;
while (true) {
try {
String data = deviceId + " : " + random.nextInt(50000);
out.println(data);
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
} catch (UnknownHostException e) {
System.err.println("Unknown host: " + hostname);
break;
} catch (IOException e) {
attempt++;
int randomDelay = random.nextInt(10000);
System.err.println(deviceId + "\tAttempt " + attempt + " - Connection refused. Retrying in " + (RETRY_DELAY_MS + randomDelay) + "ms...");
try {
Thread.sleep(RETRY_DELAY_MS + randomDelay);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
if (!connected) {
System.err.println("Failed to connect after " + MAX_RETRIES + " attempts.");
}
}
}