Java手动编码实现与k8s交互式shell

news2024/11/25 4:48:53

前言

要手动编码,和k8s做shell交互,我们需要弄清以下两个问题:

1、Java如何与k8s做shell交互
2、前端界面如何与Java后台交互
3、多个用户并发访问如何实现
问题1:
  • k8s官方提供了各种语言的KubernetesAPI,对于Java语言来说,采用KubernetesClient即可实现执行shell命令。
  • 需要pod的容器的基础镜像本身支持bash、sh等终端
问题2:
  • 为了交互的实时性,我们与前端的交互采用长连接
问题3:
  • 为各个用户分配独立的窗口线程

综上,我绘制了以下架构图:
在这里插入图片描述

一、后台代码

我们假设crm-publiccloud-5fcdb4749b-rlr8s这个pod中,有一个容器crm,自身的基础镜像支持bash。

1、通过Netty建立websocket-server:

import io.fabric8.kubernetes.client.KubernetesClient;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Slf4j
@Component
public class NettyWebSocketServer {

    @Autowired
    private KubernetesClient k8sClient;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private Channel serverChannel;

    @Value("${netty.websocket.port:6698}")
    private int port;

    @PostConstruct
    public void startNettyServer() {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(Channel channel) throws Exception {
                            ChannelPipeline pipeline = channel.pipeline();
                            //websocket协议本身是基于http协议的,所以也要用http编解码器

                            pipeline.addLast(new HttpServerCodec());
                            //以块的方式来写的处理器
                            pipeline.addLast(new ChunkedWriteHandler());
                            //netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度

                            pipeline.addLast(new HttpObjectAggregator(64 * 1024));
                            pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
                            pipeline.addLast(new WebSocketHandler(k8sClient));
                        }
                    });
            serverChannel = bootstrap.bind(port).sync().channel();
            log.info("Netty Ws started on port {}", port);
        } catch (Exception ex) {
            log.error("startNettyServer error : {}", ex.getMessage(), ex);
            stop();
        }
    }

    @PreDestroy
    public void stop() {
        if (serverChannel != null) {
            serverChannel.close();
        }
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
        log.info("Netty Ws stopped");
    }
}

2、WebSocketHandler

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;

import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static final Map<String, ShellTerminal> SHELL_WINDOWS = new ConcurrentHashMap<>();

    private KubernetesClient client;

    public WebSocketHandler(KubernetesClient k8sClient) {
        client = k8sClient;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 处理WebSocket消息
        Channel channel = ctx.channel();
        String id = channel.id().asLongText();
        SocketAddress remoteAddress = channel.remoteAddress();
        String inputText = msg.text();
        log.info("[{}] Receive ws msg:{}, {}", id, inputText, remoteAddress);
        ShellTerminal shellTerminal = SHELL_WINDOWS.get(id);
        if (shellTerminal != null) {
            shellTerminal.input(inputText);
        } else {
            log.error("[{}] there is no shell terminal:{}", id, remoteAddress);
        }

    }

    private ExecWatch getOpsExecWatch() {
        String podName = "crm-publiccloud-5fcdb4749b-rlr8s";
        String namespace = "default";
        String containerName = "crm";
        String command = "bash";
        return client.pods()
                .inNamespace(namespace)
                .withName(podName)
                .inContainer(containerName)
                .redirectingInput()
                .redirectingOutput()
                .redirectingError()
                .withTTY()
                .exec(command);
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        String id = channel.id().asLongText();
        SocketAddress remoteAddress = channel.remoteAddress();
        log.info("[{}] Ws client connected {}", id, remoteAddress);
        try {
            ExecWatch opsExecWatch = getOpsExecWatch();
            ShellTerminal shellTerminal = new ShellTerminal(id, opsExecWatch, ctx);
            SHELL_WINDOWS.put(id, shellTerminal);
        } catch (Exception e) {
            log.error("[{}] init ops exec error:{}", e.getMessage(), e);
            ctx.close();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        String id = channel.id().asLongText();
        SocketAddress remoteAddress = channel.remoteAddress();

        ShellTerminal shellTerminal = SHELL_WINDOWS.get(id);
        if (shellTerminal != null) {
            shellTerminal.closeExecWatch();
            SHELL_WINDOWS.remove(id);
            ctx.close();
        }
        log.info("[{}] Ws client disconnected: {}", id, remoteAddress);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
        Channel channel = ctx.channel();
        String id = channel.id().asLongText();
        SocketAddress remoteAddress = channel.remoteAddress();
        ShellTerminal shellTerminal = SHELL_WINDOWS.get(id);
        if (shellTerminal != null) {
            shellTerminal.closeExecWatch();
            SHELL_WINDOWS.remove(id);
        }
        ctx.close();
        log.error("[{}] Ws exceptionCaught {}:{}", id, remoteAddress, e.getMessage(), e);
    }

}

3、ShellTerminal类

import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.nio.charset.StandardCharsets;


@Data
@Slf4j
public class ShellTerminal {

    private static final String EXIT_CODE = "exit";
    private static final String CLOSED = "closed";

    private String id;
    private String remoteAddress;
    private ExecWatch execWatch;
    private ChannelHandlerContext ctx;

    public ShellTerminal(String id, ExecWatch opsExecWatch, ChannelHandlerContext ctx) {
        this.id = id;
        this.execWatch = opsExecWatch;
        this.ctx = ctx;
        initListenThread(id, execWatch, ctx);
    }

    public void input(String inputText) throws IOException {
        if (EXIT_CODE.equals(inputText)) {
            closeExecWatch();
            this.ctx.close();
            return;
        }
        inputText += "\n";
        OutputStream stdin = execWatch.getInput();
        if (stdin != null) {
            stdin.write(inputText.getBytes(StandardCharsets.UTF_8));
            stdin.flush();
        }
    }


    public void closeExecWatch() {
        if (execWatch == null) {
            log.info("[{}] execWatch is null {}", id, remoteAddress);
            return;
        }
        try {
            OutputStream stdin = execWatch.getInput();
            InputStream stdout = execWatch.getOutput();
            InputStream stderror = execWatch.getError();
            if (stdin != null) {
                stdin.close();
            }
            if (stdout != null) {
                stdout.close();
            }
            if (stderror != null) {
                stderror.close();
            }
        } catch (Exception ex) {
            log.error("[{}] close execWatch error {}:{}", id, remoteAddress, ex.getMessage(), ex);
        }
    }


    public void initListenThread(String id, ExecWatch execWatch, ChannelHandlerContext ctx) {
        // 创建一个线程用于持续读取输出并打印
        Thread outputThread = new ShellOutputThread(id, execWatch, ctx);
        outputThread.start();
    }


    public static class ShellOutputThread extends Thread {

        private final String id;
        private final ChannelHandlerContext ctx;
        private final ExecWatch execWatch;

        public ShellOutputThread(String id, ExecWatch execWatch, ChannelHandlerContext ctx) {
            this.id = id;
            this.execWatch = execWatch;
            this.ctx = ctx;
        }

        @Override
        public void run() {
            try {
                InputStream output = execWatch.getOutput();
                BufferedReader stdout = new BufferedReader(new InputStreamReader(output));
                String line;
                while ((line = stdout.readLine()) != null) {
                    System.out.println(line + "\n");
                    ctx.channel().writeAndFlush(new TextWebSocketFrame(line + "\n"));
                }
            } catch (Exception e) {
                if ((e instanceof IOException) && CLOSED.equals(e.getMessage())) {
                    log.error("[" + id + "] shell output stream closed");
                } else {
                    log.error("[" + id + "] shell output read error:{}", e.getMessage(), e);
                }
            }
        }
    }
}

二、测试

我们可以通过一些在线工具来测试:在这里插入图片描述

三、前端

采用xterm.js库来实现

<!DOCTYPE html>
<html>

<head>
    <title>交互式终端</title>
    <style>
        body,
        html {
            height: 100%;
            margin: 0;
            padding: 0;
        }

        #terminal-container {
            height: 100vh;
            background-color: black;
        }

        #terminal {
            width: 100%;
            height: 100%;
        }
    </style>
</head>

<body>
    <div id="terminal-container">
        <div id="terminal"></div>
    </div>
    <script src="https://cdn.jsdelivr.net/npm/xterm@5.2.1/lib/xterm.min.js" referrerpolicy="no-referrer"></script>
    <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/xterm@5.2.1/css/xterm.min.css" />

    <script>
        const terminal = new Terminal({ cursorBlink: true });
        terminal.open(document.getElementById('terminal'));

        const socket = new WebSocket("ws://127.0.0.1:6698/ws");
        let currentInput = '';

        socket.onopen = () => {
            terminal.writeln("Connected to the server");
        };

        socket.onmessage = (event) => {
            const message = event.data;
            if (message === 'clear') {
                terminal.clear();
            } else {
                terminal.writeln(message);
            }
        };

        const commands = ['ls', 'cd', 'cat', 'tail']; // 静态命令列表

        terminal.onKey((event) => {
            const printable = !event.domEvent.altKey && !event.domEvent.altGraphKey && !event.domEvent.ctrlKey && !event.domEvent.metaKey;

            if (event.domEvent.keyCode === 13) {
                socket.send(currentInput);
                terminal.writeln('\r\n$ ' + currentInput);
                currentInput = '';
            } else if (event.domEvent.keyCode === 8) {
                // Handle backspace
                if (terminal._core.buffer.x > 2) {
                    terminal.write('\b \b');
                    currentInput = currentInput.slice(0, -1);
                }
            } else if (event.domEvent.keyCode === 9) {
                // Handle Tab key
                event.domEvent.preventDefault();
                autoComplete();
            } else if (event.domEvent.ctrlKey && event.domEvent.keyCode === 67) {
                // Handle Ctrl + C
                socket.send('\x03');
            } else if (event.domEvent.keyCode === 8 || event.domEvent.keyCode === 46) {
                // 处理 Backspace 和 Delete 键的按下事件
                event.domEvent.preventDefault();
                handleDelete()
            } else if (printable) {
                currentInput += event.key;
                terminal.write(event.key);
            }
        });

        function autoComplete() {
            const inputTokens = currentInput.split(' ');
            const lastToken = inputTokens[inputTokens.length - 1];

            if (lastToken.length > 0) {
                const matchedCommands = commands.filter((command) => command.startsWith(lastToken));

                if (matchedCommands.length === 1) {
                    const newInput = inputTokens.slice(0, -1).join(' ') + ' ' + matchedCommands[0];
                    terminal.write(matchedCommands[0].slice(lastToken.length));
                    currentInput = newInput;
                } else if (matchedCommands.length > 1) {
                    terminal.writeln('\r\n');
                    terminal.writeln(matchedCommands.join('\t'));
                    terminal.writeln('\r\n');
                    terminal.write('$ ' + currentInput);
                }
            }
        }
        function handleDelete() {
            const buffer = terminal._core.buffer;

            if (buffer.x > 2) {
                const line = buffer.getLine(buffer.y);
                const charData = line.loadCell(buffer.x - 2);

                // 删除终端上的字符
                terminal.write('\b \b');

                // 更新当前输入
                const inputTokens = currentInput.split(' ');
                const lastToken = inputTokens[inputTokens.length - 1];
                inputTokens[inputTokens.length - 1] = lastToken.slice(0, -1);
                currentInput = inputTokens.join('');

                // 更新终端光标位置
                terminal._core.buffer.x = charData[1].charCodeAt(0) - 32;
                terminal._core.buffer.y = buffer.y;
                terminal._core.buffer.lines.get(buffer.y).dirty = true;
            }
        }

    </script>
</body>

</html>

效果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/667048.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用AI修复郭德纲远古相声;小红书爆款文案Prompt模板;用AI经营一家三明治店;AI将实现80%编程 | ShowMeAI日报

&#x1f440;日报&周刊合集 | &#x1f3a1;生产力工具与行业应用大全 | &#x1f9e1; 点赞关注评论拜托啦&#xff01; &#x1f916; B站UP主使用AI修复郭德纲远古相声&#xff0c;10天播放近70万 B站UP主 野老相声-风景-4K修复 使用了AI换脸技术&#xff0c;对郭德纲、…

【ArcGIS Pro二次开发】(41):勘测定界txt文件转数据库(批量)

在规划工作中有时候会收到一种带坐标点的txt文件&#xff1a; 上网查了一下资料&#xff0c;这是根据《勘测定界界址点坐标交换格式》制作的固定格式文件。 其中包含了坐标系、精度、地块编号、地块名称、坐标点等信息。 这个工具的目的就是将TXT格式坐标批量转换为数据库文件…

生物群落(生态)数据统计分析与绘图

R 语言作的开源、自由、免费等特点使其广泛应用于生物群落数据统计分析。生物群落数据多样而复杂&#xff0c;涉及众多统计分析方法。以生物群落数据分析中的最常用的统计方法回归和混合效应模型、多元统计分析技术及结构方程等数量分析方法为主线&#xff0c;通过多个来自经典…

一些WEB测试方法

ladys and 乡亲们&#xff0c;long time no see&#xff0c;发个笔记&#xff1a;&#xff09; 首先&#xff0c;WEB是咋组成的 Web应用程序一般是B/S模式&#xff0c;一个Web应用程序是由完成特定任务的各种Web组件(web components)构成的并通过Web将服务展示给外界&#xff…

最喜爱的编程语言——Python

一、编程语言发展 编程语言&#xff08;programming language&#xff09;可以简单的理解为一种计算机和人都能识别的语言。一种能够让程序员准确地定义计算机所需数据的计算机语言&#xff0c;并精确地定义在不同情况下所应当采取的行动。 编程语言处在不断的发展和变化中&…

上海亚商投顾:沪指延续调整 机器人概念股掀涨停潮

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 市场情绪 大小指数今日略显分化&#xff0c;沪指全天震荡调整&#xff0c;深成指、创业板指则拉升翻红。机器人概念股掀涨停…

手把手教你使用抖音DOU+创作个性化视频!

抖音DOU是抖音推出的一项全新功能&#xff0c;旨在为用户提供更加个性化、丰富多彩的内容创作和交流体验。DOU提供了一系列强大的创作工具&#xff0c;包括音频剪辑、特效滤镜、场景转换、字幕编辑等&#xff0c;让用户可以轻松地将自己的创意通过视频分享到抖音平台上来。下面…

数值分析第四章节 用Python实现数值积分与数值微分

参考书籍&#xff1a;数值分析 第五版 李庆杨 王能超 易大义编 第4章 数值积分与数值微分 文章声明&#xff1a;如有发现错误&#xff0c;欢迎批评指正 文章目录 梯形公式矩形公式辛普森公式柯特斯公式复合梯形公式复合辛普森公式 4.1数值积分概论 4.1.1数值积分基本思想 使用某…

【Vue全家桶高仿小米商城】——(四)项目基础架构

第四章&#xff1a;项目基础架构 此章节全力讲解前端基本项目架构&#xff0c;通过此章节可搭建一个通用性的前端架构&#xff0c;内容涵盖跨域方案、路由封装、错误拦截等。 文章目录 第四章&#xff1a;项目基础架构一、前端跨域解决什么是前端跨域&#xff1f;怎么解决前端…

将h5项目转成uniapp小程序

打开微信开发者工具&#xff0c;新建项目&#xff1b;pages下index文件中index.wxml文件打开内容全删除&#xff1b;写入<web-view srchttp://域名.com/></web-view>&#xff1b;编译&#xff0c;成功在小程序中展示&#xff1b;其后&#xff0c;正常按照小程序流程…

scp命令及后台运行

将项目从一个服务器迁移到另外一个服务器的时候 当项目很大的时候 可以用到如下 1、scp -r 本地项目路径 需要迁移服务器的IP:/存放路径 scp -r /u01/media/Disk1/ 192.168.1.31:/u01/media/ reverse mapping checking getaddrinfo for bogon failed - POSSIBLE BREAK-IN ATTEM…

算法篇——动态规划 01背包问题 (js版)——更新新题

416. 分割等和子集 给你一个 只包含正整数 的 非空 数组 nums 。请你判断是否可以将这个数组分割成两个子集&#xff0c;使得两个子集的元素和相等。 链接&#xff1a;力扣 解题思路&#xff1a; 这道题看似是比较简单的背包问题&#xff1a; 首先可以通过判断数组和是否是…

【ZenUML】时序图之ZenUML详解

时序图 序列图是一种交互图&#xff0c;显示进程如何彼此操作以及顺序。 Mermaid可以使用ZenUML渲染序列图。请注意&#xff0c;ZenUML使用的语法与mermaid中的原始序列图不同。 目前&#xff0c;最新版本mermaid v10.2.3 暂时不单独支持zenuml语法,需要配合mermaid-zenuml一…

动态规划_可视化校园导航Floyd算法应用

目录 引言 图片展示 视频展示 针对校园导航问题的分析 关键技术和算法介绍 详细介绍&#xff1a;算法的实现 总结 代码 附件&#xff1a;Map.png 引言 本文主要通过详细的程序打印和作者的推理过程&#xff0c;描述作者对Floyd算法的理解&#xff0c;阐述其中的动态规划思想是如…

突然发现CSDN变得不一样了【建议】【活动】

突然发现CSDN变得不一样了【活动】 前言推荐突然发现CSDN变得不一样了关于上传代码包关于上传视频关于运行代码关于插入代码1关于插入代码2关于社区的建立关于社区的管理关于此次活动的评选关于排行突然发现说明一下关于我 最后 前言 2023-6-19 23:34:04 本文章仅用于参加 20…

【Python 随练】年龄计算问题

题目&#xff1a; 有 5 个人坐在一起&#xff0c;问第五个人多少岁&#xff1f;他说比第 4 个人大 2 岁。问第 4 个人岁数&#xff0c;他说比第3 个人大 2 岁。问第三个人&#xff0c;又说比第 2 人大两岁。问第 2 个人&#xff0c;说比第一个人大两岁。最后问第一个人&#x…

C++基础(8)——类和对象(6)

前言 本文主要介绍了C中多态的基本知识 4.7.1&#xff1a;多态的基本概念和原理剖析 1&#xff1a;基本概念 静态多态&#xff1a;函数重载、运算符重载 动态多态&#xff1a;派生类和虚函数实现运行时多态 静态多态在编译阶段确定函数地址&#xff1b;动态多态在运行阶段…

微信小程序uniapp+springboot实现小程序服务通知

微信小程序uniappspringboot实现小程序服务通知 1. 实现效果 2. 模板选用及字段类型判断 2.1 开通订阅消息,并选用模板 如果点击订阅消息让开启消息订阅开启后就可以出现以下页面,我本次使用的模板是月卡到期提醒模板,点击选用即可 2.2 查看模板字段类型 TemplateId后续会使用…

面试官问:Redis 分布式锁如何自动续期?

资深面试官&#xff1a;你们项目中的分布式锁是怎么实现的&#xff1f; 老任&#xff1a;基于redis的set命令&#xff0c;该命令有nx和ex选项。 资深面试官&#xff1a;那如果锁到期了&#xff0c;业务还没结束&#xff0c;如何进行自动续期呢&#xff1f; 老任&#xff1a;…

第九章 番外篇:TORCHSCRIPT

下文中的代码都使用参考教程中的例子。 会给出一点自己的解释。 参考教程&#xff1a; 文章目录 Introduction复习一下nn.Module()Torchscripttorch.jit.ScriptModule()torch.jit.script()torch.jit.trace()一个小区别 使用示例tracing Modulesscripting ModuleMixing scripti…