前言
最近在学习flink, 为了模仿一个持续的无界的数据源, 所以需要一个可以持续发送消息的socket服务端. 先上效果图
效果图
socket服务端可以持续的发送消息, flink端是一个统计单词出现总数的消费端,效果图如下
源代码
flink的消费端就不展示了, 需要引入一些依赖和版本, 此处只展示socket的服务端
import java.io.IOException;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;
/**
* @author <a href="mailto:liang.qin.work@foxmail.com">liang.qin</a>
* @since 2024/7/19 21:50
**/
public class ContinuousMessageServer {
@SuppressWarnings("InfiniteLoopStatement")
public static void main(String[] args) throws IOException {
int port = 9879;
try (ServerSocket serverSocket = new ServerSocket(port)) {
System.out.println("服务器启动,监听端口:" + port);
// 无限循环以接受多个客户端连接
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println("客户端已连接");
// 为每个客户端启动一个新的线程来处理发送消息
new ClientHandler(clientSocket).start();
}
}
}
// 处理客户端连接的内部类
static class ClientHandler extends Thread {
private final Socket clientSocket;
public ClientHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);
Scanner scanner = new Scanner(System.in)) {
do {
System.out.println();
System.out.println("请输入消息:");
String message = scanner.nextLine();
out.println(message);
System.out.println("成功发送消息:" + message);
// 检查客户端是否已断开连接(可选)
} while (!clientSocket.isClosed() && clientSocket.isConnected() && !clientSocket.isInputShutdown());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (clientSocket != null && !clientSocket.isClosed()) {
clientSocket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}