前言
要手动编码,和k8s做shell交互,我们需要弄清以下两个问题:
1、Java如何与k8s做shell交互
2、前端界面如何与Java后台交互
3、多个用户并发访问如何实现
问题1:
- k8s官方提供了各种语言的KubernetesAPI,对于Java语言来说,采用KubernetesClient即可实现执行shell命令。
- 需要pod的容器的基础镜像本身支持bash、sh等终端
问题2:
- 为了交互的实时性,我们与前端的交互采用长连接
问题3:
- 为各个用户分配独立的窗口线程
综上,我绘制了以下架构图:
一、后台代码
我们假设crm-publiccloud-5fcdb4749b-rlr8s
这个pod中,有一个容器crm
,自身的基础镜像支持bash。
1、通过Netty建立websocket-server:
import io.fabric8.kubernetes.client.KubernetesClient;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@Slf4j
@Component
public class NettyWebSocketServer {
@Autowired
private KubernetesClient k8sClient;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel serverChannel;
@Value("${netty.websocket.port:6698}")
private int port;
@PostConstruct
public void startNettyServer() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(Channel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
//websocket协议本身是基于http协议的,所以也要用http编解码器
pipeline.addLast(new HttpServerCodec());
//以块的方式来写的处理器
pipeline.addLast(new ChunkedWriteHandler());
//netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
pipeline.addLast(new WebSocketHandler(k8sClient));
}
});
serverChannel = bootstrap.bind(port).sync().channel();
log.info("Netty Ws started on port {}", port);
} catch (Exception ex) {
log.error("startNettyServer error : {}", ex.getMessage(), ex);
stop();
}
}
@PreDestroy
public void stop() {
if (serverChannel != null) {
serverChannel.close();
}
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
log.info("Netty Ws stopped");
}
}
2、WebSocketHandler
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
public static final Map<String, ShellTerminal> SHELL_WINDOWS = new ConcurrentHashMap<>();
private KubernetesClient client;
public WebSocketHandler(KubernetesClient k8sClient) {
client = k8sClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
// 处理WebSocket消息
Channel channel = ctx.channel();
String id = channel.id().asLongText();
SocketAddress remoteAddress = channel.remoteAddress();
String inputText = msg.text();
log.info("[{}] Receive ws msg:{}, {}", id, inputText, remoteAddress);
ShellTerminal shellTerminal = SHELL_WINDOWS.get(id);
if (shellTerminal != null) {
shellTerminal.input(inputText);
} else {
log.error("[{}] there is no shell terminal:{}", id, remoteAddress);
}
}
private ExecWatch getOpsExecWatch() {
String podName = "crm-publiccloud-5fcdb4749b-rlr8s";
String namespace = "default";
String containerName = "crm";
String command = "bash";
return client.pods()
.inNamespace(namespace)
.withName(podName)
.inContainer(containerName)
.redirectingInput()
.redirectingOutput()
.redirectingError()
.withTTY()
.exec(command);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
String id = channel.id().asLongText();
SocketAddress remoteAddress = channel.remoteAddress();
log.info("[{}] Ws client connected {}", id, remoteAddress);
try {
ExecWatch opsExecWatch = getOpsExecWatch();
ShellTerminal shellTerminal = new ShellTerminal(id, opsExecWatch, ctx);
SHELL_WINDOWS.put(id, shellTerminal);
} catch (Exception e) {
log.error("[{}] init ops exec error:{}", e.getMessage(), e);
ctx.close();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
String id = channel.id().asLongText();
SocketAddress remoteAddress = channel.remoteAddress();
ShellTerminal shellTerminal = SHELL_WINDOWS.get(id);
if (shellTerminal != null) {
shellTerminal.closeExecWatch();
SHELL_WINDOWS.remove(id);
ctx.close();
}
log.info("[{}] Ws client disconnected: {}", id, remoteAddress);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
Channel channel = ctx.channel();
String id = channel.id().asLongText();
SocketAddress remoteAddress = channel.remoteAddress();
ShellTerminal shellTerminal = SHELL_WINDOWS.get(id);
if (shellTerminal != null) {
shellTerminal.closeExecWatch();
SHELL_WINDOWS.remove(id);
}
ctx.close();
log.error("[{}] Ws exceptionCaught {}:{}", id, remoteAddress, e.getMessage(), e);
}
}
3、ShellTerminal类
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.io.*;
import java.nio.charset.StandardCharsets;
@Data
@Slf4j
public class ShellTerminal {
private static final String EXIT_CODE = "exit";
private static final String CLOSED = "closed";
private String id;
private String remoteAddress;
private ExecWatch execWatch;
private ChannelHandlerContext ctx;
public ShellTerminal(String id, ExecWatch opsExecWatch, ChannelHandlerContext ctx) {
this.id = id;
this.execWatch = opsExecWatch;
this.ctx = ctx;
initListenThread(id, execWatch, ctx);
}
public void input(String inputText) throws IOException {
if (EXIT_CODE.equals(inputText)) {
closeExecWatch();
this.ctx.close();
return;
}
inputText += "\n";
OutputStream stdin = execWatch.getInput();
if (stdin != null) {
stdin.write(inputText.getBytes(StandardCharsets.UTF_8));
stdin.flush();
}
}
public void closeExecWatch() {
if (execWatch == null) {
log.info("[{}] execWatch is null {}", id, remoteAddress);
return;
}
try {
OutputStream stdin = execWatch.getInput();
InputStream stdout = execWatch.getOutput();
InputStream stderror = execWatch.getError();
if (stdin != null) {
stdin.close();
}
if (stdout != null) {
stdout.close();
}
if (stderror != null) {
stderror.close();
}
} catch (Exception ex) {
log.error("[{}] close execWatch error {}:{}", id, remoteAddress, ex.getMessage(), ex);
}
}
public void initListenThread(String id, ExecWatch execWatch, ChannelHandlerContext ctx) {
// 创建一个线程用于持续读取输出并打印
Thread outputThread = new ShellOutputThread(id, execWatch, ctx);
outputThread.start();
}
public static class ShellOutputThread extends Thread {
private final String id;
private final ChannelHandlerContext ctx;
private final ExecWatch execWatch;
public ShellOutputThread(String id, ExecWatch execWatch, ChannelHandlerContext ctx) {
this.id = id;
this.execWatch = execWatch;
this.ctx = ctx;
}
@Override
public void run() {
try {
InputStream output = execWatch.getOutput();
BufferedReader stdout = new BufferedReader(new InputStreamReader(output));
String line;
while ((line = stdout.readLine()) != null) {
System.out.println(line + "\n");
ctx.channel().writeAndFlush(new TextWebSocketFrame(line + "\n"));
}
} catch (Exception e) {
if ((e instanceof IOException) && CLOSED.equals(e.getMessage())) {
log.error("[" + id + "] shell output stream closed");
} else {
log.error("[" + id + "] shell output read error:{}", e.getMessage(), e);
}
}
}
}
}
二、测试
我们可以通过一些在线工具来测试:
三、前端
采用xterm.js
库来实现
<!DOCTYPE html>
<html>
<head>
<title>交互式终端</title>
<style>
body,
html {
height: 100%;
margin: 0;
padding: 0;
}
#terminal-container {
height: 100vh;
background-color: black;
}
#terminal {
width: 100%;
height: 100%;
}
</style>
</head>
<body>
<div id="terminal-container">
<div id="terminal"></div>
</div>
<script src="https://cdn.jsdelivr.net/npm/xterm@5.2.1/lib/xterm.min.js" referrerpolicy="no-referrer"></script>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/xterm@5.2.1/css/xterm.min.css" />
<script>
const terminal = new Terminal({ cursorBlink: true });
terminal.open(document.getElementById('terminal'));
const socket = new WebSocket("ws://127.0.0.1:6698/ws");
let currentInput = '';
socket.onopen = () => {
terminal.writeln("Connected to the server");
};
socket.onmessage = (event) => {
const message = event.data;
if (message === 'clear') {
terminal.clear();
} else {
terminal.writeln(message);
}
};
const commands = ['ls', 'cd', 'cat', 'tail']; // 静态命令列表
terminal.onKey((event) => {
const printable = !event.domEvent.altKey && !event.domEvent.altGraphKey && !event.domEvent.ctrlKey && !event.domEvent.metaKey;
if (event.domEvent.keyCode === 13) {
socket.send(currentInput);
terminal.writeln('\r\n$ ' + currentInput);
currentInput = '';
} else if (event.domEvent.keyCode === 8) {
// Handle backspace
if (terminal._core.buffer.x > 2) {
terminal.write('\b \b');
currentInput = currentInput.slice(0, -1);
}
} else if (event.domEvent.keyCode === 9) {
// Handle Tab key
event.domEvent.preventDefault();
autoComplete();
} else if (event.domEvent.ctrlKey && event.domEvent.keyCode === 67) {
// Handle Ctrl + C
socket.send('\x03');
} else if (event.domEvent.keyCode === 8 || event.domEvent.keyCode === 46) {
// 处理 Backspace 和 Delete 键的按下事件
event.domEvent.preventDefault();
handleDelete()
} else if (printable) {
currentInput += event.key;
terminal.write(event.key);
}
});
function autoComplete() {
const inputTokens = currentInput.split(' ');
const lastToken = inputTokens[inputTokens.length - 1];
if (lastToken.length > 0) {
const matchedCommands = commands.filter((command) => command.startsWith(lastToken));
if (matchedCommands.length === 1) {
const newInput = inputTokens.slice(0, -1).join(' ') + ' ' + matchedCommands[0];
terminal.write(matchedCommands[0].slice(lastToken.length));
currentInput = newInput;
} else if (matchedCommands.length > 1) {
terminal.writeln('\r\n');
terminal.writeln(matchedCommands.join('\t'));
terminal.writeln('\r\n');
terminal.write('$ ' + currentInput);
}
}
}
function handleDelete() {
const buffer = terminal._core.buffer;
if (buffer.x > 2) {
const line = buffer.getLine(buffer.y);
const charData = line.loadCell(buffer.x - 2);
// 删除终端上的字符
terminal.write('\b \b');
// 更新当前输入
const inputTokens = currentInput.split(' ');
const lastToken = inputTokens[inputTokens.length - 1];
inputTokens[inputTokens.length - 1] = lastToken.slice(0, -1);
currentInput = inputTokens.join('');
// 更新终端光标位置
terminal._core.buffer.x = charData[1].charCodeAt(0) - 32;
terminal._core.buffer.y = buffer.y;
terminal._core.buffer.lines.get(buffer.y).dirty = true;
}
}
</script>
</body>
</html>