学习了一年的 rust 了,但是不知道用来做些什么,也没能赋能到工作中,现在前端基建都已经开始全面进入 rust 领域了,rust 的前端生态是越来越好。但是自己奈何水平不够,想贡献点什么,无从下手。
遂想自己捣鼓个什么东西,可以帮助到日常工作的。
记录一下在完成功能时遇到的一些问题,以及是怎么解决的。
解决的需求
公司有很多项目,都是依赖公司技术部门的一个框架,虽然说不行,但还是要用,里面有一些基础业务功能,也是避免了重复。公司框架的依赖常常更新不及时,依赖安装经常会报错,比如经常要使用--legacy-peer-deps
,对于新项目使用框架创建项目后还需要二次调整。
但随着时间的推移,项目越来越多,依赖也会慢慢变成旧版本,一个一个查看升级实属是一个体力活。那就写个脚本吧,它能做什么:
- 解析
package.json
文件,获取到dependencies/devDependencies
依赖列表。 - 远程请求依赖包,获取依赖的版本信息,并从中过滤出当前版本的最新版本信息
- 通过 web 页面展示这些数据,并通过 tag 表示版本。
- 选择性升级或者批量升级依赖到最新版本。
使用
- 直接下载并安装
github 下载地址
gitee 下载地址
解压文件,使用解压工具解压,或者使用命令行工具解压,得到一个可以执行文件。
$> tar -xzvf rsup.tar.gz
在终端将执行文件移动到/usr/local/bin
目录下,使得rsup
命令全局可用。或者直接在解压后的文件夹中执行。
$> sudo mv rsup /usr/local/bin/
- 使用安装脚本安装
在终端执行命令
# github
$> curl -fsSL https://github.com/ngd-b/rsup/raw/main/install.sh | bash
# gitee
$> curl -fsSL https://gitee.com/hboot/rsup/raw/master/install.sh | bash
提示安装成功后,就可以在终端执行rsup
命令。必须是当前目录下存在package.json
文件或者通过参数设置绝对路径rsup -d $path
。
执行效果,展示项目下的依赖版本、最新版本;可以点击操作进行升级。
如何设计
命令行工具分为三个功能包:
core
主程序执行入口,根据参数执行不同的功能pkg
解析指定目录下的package.json
文件,获取依赖列表并远程请求依赖包信息,过滤出最新版本信息。web
提供 web 页面展示数据,并提供升级功能。通过 websocket 实现数据交互。
为了各个子包之间的工作互不影响我们开启线程去处理异步任务,并通过channel
实现线程之间的信息交互。
使用tokio
实现异步任务,就不开启线程了,因为我们每个功能都是异步任务,开启线程没必要。还需要去处理线程调用异步任务时需要异步运行时呢。
为了保持主线程的活跃性,我们将 web 服务运行在主线程中,通过tokio::task::spawn
开启异步任务,当然了我们是不需要任务阻塞主线程的。
使用到的主要 crate 以及其能力,具体可查看文档。包括但不限于:
clap
解析命令行参数reqwest
处理网络请求serde \ serde_derive \ serde_json
结构体数据序列化与反序列化tokio
异步运行时actix-web
提供 web 服务actix-ws
实现 websocket 通信actix_cors
actix-web 中间键,设置 web 服务的响应头信息,设置跨域actix_files
提供静态文件服务futures_util
处理异步任务扩展库
遇到的问题
大致包括数据共享、传递。websocket 数据通信。结构体定义、数据序列化与反序列化。
使用clap
处理命令行参数
我们有多个子命令,在执行命令时通过指定某个子命令来执行不同的功能,比如cargo run --bin core pkg
,就是指定执行pkg
子命令。
可以通过clap
提供的#[command(subcommand)]
属性来定义子命令。
#[derive(Parser, Debug)]
#[command(name = "rsup", author, version, about)]
struct Cli {
#[command(subcommand)]
command: Commands
}
#[derive(Subcommand, Debug)]
enum Commands {
Pkg(pkg::Args),
}
然后在解析参数时,通过match
匹配子命令,执行不同的功能。并且针对于子命令的参数也可以被解析。
#[tokio::main]
async fn main() {
let args = Cli::parse();
// ... 省略其他代码
match args.command {
Commands::Pkg(args) => {
let data_clone = data.clone();
let tx_clone = tx.clone();
task::spawn(async move {
if let Err(e) = pkg::run(args, data_clone, tx_clone).await {
println!("Error run subcommand pgk {}", e);
};
});
}
}
}
后来我需要默认执行该子命令pkg
,不需要在运行时指定子命令,只需要在运行时指定参数即可。但是每一个子包如pkg
都有自己需要接收的参数,但是我们执行的是core
包,所以需要处理合并各个子包的参数。
通过clap
提供的#[flatten]
属性,去合并各个子命令的参数。
#[derive(Parser, Debug)]
#[command(name = "rsup", author, version, about)]
struct Cli {
#[clap(flatten)]
pkg_args: pkg::Args,
}
#[tokio::main]
async fn main() {
let args = Cli::parse();
// ... 省略其他代码
// 默认启动pkg解析服务
let data_clone = data.clone();
let tx_clone = tx.clone();
task::spawn(async move {
if let Err(e) = pkg::run(args.pkg_args, data_clone, tx_clone).await {
println!("Error run subcommand pgk {}", e);
};
});
}
使用Arc<Mutex<>>
共享数据
在文章中rust 自动化测试、迭代器与闭包、智能指针、无畏并发 中提到过使用Arc<Mutex<>>
共享数据。
文章里使用的是std::sync::Mutex
也就是标准库提供的Mutext<>
, 它是同步阻塞的,在阻塞式代码中更加高效,而我们的项目需要异步非阻塞,所以我们需要使用tokio::sync::Mutex
来更好的处理异步代码。
我们在主包中定义共享数据,然后克隆引用传递给各个子包。
#[tokio::main]
async fn main() {
// ... 省略其他代码
let data: Arc<Mutex<pkg::Pkg>> = Arc::new(Mutex::new(pkg::Pkg::new()));
}
数据结构是在子包pkg
中定义的,因为所有的数据操作、包括更新都在pkg
中完成。通过clone()
方法将数据传递给各个子包。我们的web
子包主要是将数据传递给页面。
mpsc::channel
创建通信通道
在解析pkg
获取到package.json
文件后,数据就需要去更新,并且需要通知web
子包数据变更要向前端页面发送数据了。
在上面的文章 👆 也给出了如何在线程间传递信息。里面使用的std::sync::mpsc
同样的,我们的任务是 I/O 密集型,使用异步编程更高效。我们采用了tokio::sync::mpsc
tokio::sync::mpsc
和标准库不同的是需要设置容量,防止数据溢出。通信通道是多生产者单消费者,web 服务就是消费者,它接收到数据更新消息后就像前端发送数据,而pkg
子包就是生产者,负责更新数据。
#[tokio::main]
async fn main() {
// ... 省略其他代码
let (tx, rx) = channel(100);
// web 服务 将rx 传递给 web 子包
let _ = web::run(data.clone(), rx).await;
}
因为我们单数据对象、单数据源,所以不会发生数据锁死的情况,因为每次更新整个数据都会全部锁定,然后去做的更新。一旦我们的业务出现多数据源,互相依赖时就需要认真考虑锁的粒度,一旦数据全锁,其他数据有依赖时需要读取更新就会等待造成阻塞。
使用serde 、 serde_json
序列化与反序列化
通过网络请求或者直接读取的package.json
都是返回的 json 格式的数据,我们需要将数据反序列化成我们需要的结构体。
{
"id": "",
"name": ""
}
通过serde_derive
的#[derive(Deserialize, Serialize)]
属性,可以很方便的将 json 数据反序列化成结构体。
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PkgJson {
// ... 省略其他字段
}
我们读取的package.json
,直接使用serde_json::from_reader()
进行反序列化。
pub fn read_pkg_json<P: AsRef<Path>>(
path: P,
) -> Result<PkgJson, Box<dyn std::error::Error + Send + Sync>> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let package = serde_json::from_reader(reader)?;
Ok(package)
}
通过网络读取的依赖包信息,通过使用serde_json::from_str()
进行反序列化。
pub async fn fetch_pkg_info(
client: &Client,
pkg_name: &str,
) -> Result<PkgInfo, Box<dyn std::error::Error>> {
// ... 省略其他代码
if res.status().is_success() {
let body = res.text().await?;
let info: PkgInfo = serde_json::from_str(&body)?;
Ok(info)
} else {
// ... 省略错误处理代码
}
}
如何进行序列化操作呢,我们在接收到数据更新后,需要将数据序列化成 json 格式发送给前端页面。
通过serde_json::json
宏函数json!()
将结构体序列化成 json 格式。
可以直接通过serde_json::to_string()
将 json 数据转为 json 字符串发送给前端。
// ... 省略其他代码
pub async fn send_message(&self, mut session: Session) {
let locked_data = self.data.lock().await;
if let Err(e) = session
.text(serde_json::to_string(&locked_data.clone()).unwrap())
.await
{
eprintln!("Failed to send message to client: {:?}", e);
}
}
为了符合 rust 的字段命名风格,我们需要将一些驼峰式的命名转换成下划线命名。通过 #[serde(rename = "devDependencies")]
属性定义
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PkgJson {
// ... 省略其他字段
#[serde(rename = "devDependencies")]
pub dev_dependencies: Option<HashMap<String, String>>,
}
除了反序列化给定的数据为结构体,我们可能还需要自定义数据字段,这时如果转换的数据里没有这个字段,我们就需要给它默认值。通过使用#[serde(default)]
属性定义该字段取默认值,我们需要为这个结构体实现Default
trait。
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct PkgInfo {
// ... 省略其他字段
#[serde(default)]
pub is_finish: bool,
}
impl Default for PkgInfo {
fn default() -> Self {
Self {
// ... 省略其他字段
is_finish: false,
}
}
}
rsup-web
前端页面
将前端部分独立一个项目rsup-web
,使用了vite-vue3
开发,配置了unocss
减少 css 的编写。
项目打包后放在web
包下的static
目录,并提供静态资源访问服务。
/// 获取静态文件路径
pub fn static_file_path() -> String {
format!("{}/src/static", env!("CARGO_MANIFEST_DIR"))
}
pub async fn run(
data: Arc<Mutex<Pkg>>,
rx: Receiver<()>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// ... 省略其他代码
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(Arc::clone(&data)))
.service(index)
.service(Files::new("/static", static_file_path()).prefer_utf8(true))
.app_data(ms.clone())
.route("/ws", web::get().to(socket_index))
})
.bind(format!("0.0.0.0:{}", port))?
.run()
.await?;
}
因为我们是在 core 主包中调用的 web 子包目录,需要处理资源路径文件,通过env!("CARGO_MANIFEST_DIR")
获取当前项目路径。
记得在前端项目中配置base
,资源是通过/static
访问的。
前端 socket 服务连接后立即发送数据
在页面连接 socket 服务后,需要立即发送数据给前端。是为了处理这种情况:后台服务消息已接收处理完,前端连接后没有数据展示。
一种简单的方法就是前端连接后发送一条消息,然后后台接收到消息后再向前端发送数据。
我们想要实现的是后端监听前端连接,成功时发送数据给前端。socket_index
函数处理 socket 连接服务,在通过 actix_web::rt::spawn
启动了一个异步任务,调用了Ms::handle_message
处理消息。
async fn socket_index(
req: HttpRequest,
stream: web::Payload,
ms: web::Data<Arc<Mutex<Ms>>>,
) -> Result<HttpResponse, Error> {
let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
let ms = ms.get_ref().clone();
let client_ip = req.connection_info().realip_remote_addr().unwrap()
actix_web::rt::spawn(async move {
println!(
"new connection client's ip : {} ",
clinent_ip
);
Ms::handle_message(ms, session, msg_stream).await;
});
Ok(res)
}
在Ms::handle_message
处理消息时,通过loop { }
语法循环检测是否有消息过来,当通道有消息时,rx.recv()
接收数据更新,然后向前端发送数据。这就造成了ms_lock
一直被锁定,我们想要在开始执行发送数据,但是由于ms
数据对象一致被循环锁定,异步任务无法获取到数据对象,就无法发送数据。
pub async fn handle_message(
ms: Arc<Mutex<Ms>>,
mut session: Session,
mut msg_stream: MessageStream,
) {
// ... 省略其他代码
// 向前端发送消息
let ms_clone = ms.clone();
let session_clone = session.clone();
tokio::spawn(async move {
let ms_lock = ms_clone.lock().await;
ms_lock.send_message(session_clone).await;
});
loop {
let mut ms_lock = ms.lock().await;
// ... 省略其他代码
Some(_) = ms_lock.rx.recv()=> {
println!("Got message");
drop(ms_lock);
let ms_lock = ms.lock().await;
ms_lock.send_message(session.clone()).await;
}
}
}
在不改变现有的逻辑下,采取超时没有接收到消息时,结束本次循环。这样就释放了当前数据锁,给了一段异步任务获取数据对象的时间,从而可以发送数据。
use tokio::time::{timeout, Duration};
pub async fn handle_message(
ms: Arc<Mutex<Ms>>,
mut session: Session,
mut msg_stream: MessageStream,
) {
// ... 省略其他代码
loop {
// ... 省略其他代码
result = timeout(Duration::from_millis(100),ms_lock.rx.recv())=>{
match result{
Ok(Some(_))=>{
drop(ms_lock);
let ms_lock = ms.lock().await;
ms_lock.send_message(session.clone()).await;
}
Ok(None)=>{
break;
}
Err(_)=>{
continue;
}
}
}
}
}
设置了 100ms 的超时时间,没有消息时,结束本次循环。在释放ms_lock
数据锁后,异步任务获取到数据对象,发送数据。
这一块的逻辑会导致很多问题。已完全重构。
数据更新
我们通过创建通信通道tokio::sync::channel
发送数据更新的消息。共享数据data
和通道tx\rx
都是分开的,这就导致了在所有数据更新的地方都需要发送更新通知tx
实例引用,需要同时传送多个参数。
#[tokio::main]
async fn main() {
// ... 省略其他代码
let data: Arc<Mutex<pkg::Pkg>> = Arc::new(Mutex::new(pkg::Pkg::new()));
let (tx, rx) = channel(100);
// ... 省略其他代码
}
为了方便,我们定义结构体Package
,将data
和tx
封装到结构体中。为了实现克隆,我们需要使用Arc<Mutex<T>>
包装它们。并需要实现Clone
特性。
pub struct Package {
pub pkg: Arc<Mutex<Pkg>>,
pub sender: Arc<Mutex<Sender<()>>>,
pub receiver: Arc<Mutex<Receiver<()>>>,
}
impl Clone for Package {
fn clone(&self) -> Self {
Self {
pkg: self.pkg.clone(),
sender: self.sender.clone(),
receiver: self.receiver.clone(),
}
}
}
这样在主入口中,我们就可以通过Package::new()
创建实例,然后传递给需要更新的地方。
之前理解的
channel
通道,以为多生产单消费是不能引用receiver
实例的,原来是可以通过Arc<Mutex<T>>
包装引用的,只是在消费时,如果有多个地方消费,只会有一个地方收到消息。
依赖版本对比
获取到目录下的package.json
文件以及通过请求https://registry.npmjs.org/{pkg_name}
获取到依赖包信息后,怎么过滤出需要更新的版本呢。
使用semver
crate 包进行版本对比。数据格式要求是MAJOR.MINOR.PATCH
MAJOR
主版本更新,不兼容的 API 修改MINOR
次要版本更新,兼容的功能性新增PATCH
补丁版本更新,兼容的 bug 修复
pub fn compare_version(
current_v: &str,
latest_v: &str,
all_v: HashMap<String, VersionInfo>,
) -> HashMap<String, VersionInfo> {
// ... 省略其他代码
// 当前版本
let c_v = Version::parse(&clear_current_v).unwrap();
// 最新版本
let l_v = Version::parse(&latest_v).unwrap();
let mut vs: Vec<Version> = all_v
.keys()
.filter_map(|k| Version::parse(k).ok())
.filter(|v| *v > c_v && *v <= l_v)
.collect();
// ... 省略其他代码
}
总结
文章中的某些设计逻辑可能现在已经优化改掉了,只是作为过程中的想法记录一下。
往期关联学习文章:
- 模式匹配、trait 特征行为、必包、宏
- 多线程任务执行
- 并发线程间的数据共享
- 包、模块,引用路径
- 开发一个命令行工具