首先在 Cargo.toml 里添加 serde 和 toml。我们计划使用 toml 做配置文件,serde 用来处理配置的序列化和反序列化:
[dependencies]
...
serde = { version = "1", features = ["derive"] } # 序列化/反序列化
...
toml = "0.5" # toml 支持
...
然后来创建一个 src/config.rs,构建 KV server 的配置:
use crate::KvError;
use serde::{Deserialize, Serialize};
use std::fs;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ServerConfig {
pub general: GeneralConfig,
pub storage: StorageConfig,
pub tls: ServerTlsConfig,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ClientConfig {
pub general: GeneralConfig,
pub tls: ClientTlsConfig,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct GeneralConfig {
pub addr: String,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", content = "args")]
pub enum StorageConfig {
MemTable,
SledDb(String),
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ServerTlsConfig {
pub cert: String,
pub key: String,
pub ca: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct ClientTlsConfig {
pub domain: String,
pub identity: Option<(String, String)>,
pub ca: Option<String>,
}
impl ServerConfig {
pub fn load(path: &str) -> Result<Self, KvError> {
let config = fs::read_to_string(path)?;
let config: Self = toml::from_str(&config)?;
Ok(config)
}
}
impl ClientConfig {
pub fn load(path: &str) -> Result<Self, KvError> {
let config = fs::read_to_string(path)?;
let config: Self = toml::from_str(&config)?;
Ok(config)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn server_config_should_be_loaded() {
let result: Result<ServerConfig, toml::de::Error> =
toml::from_str(include_str!("../fixtures/server.conf"));
assert!(result.is_ok());
}
#[test]
fn client_config_should_be_loaded() {
let result: Result<ClientConfig, toml::de::Error> =
toml::from_str(include_str!("../fixtures/client.conf"));
assert!(result.is_ok());
}
}
你可以看到,在 Rust 下,有了 serde 的帮助,处理任何已知格式的配置文件,是多么容易的一件事情。我们只需要定义数据结构,并为数据结构使用 Serialize/Deserialize 派生宏,就可以处理任何支持 serde 的数据结构。
我还写了个 examples/gen_config.rs(你可以自行去查阅它的代码),用来生成配置文件,下面是生成的服务端的配置:
[general]
addr = '127.0.0.1:9527'
[storage]
type = 'SledDb'
args = '/tmp/kv_server'
[tls]
cert = """
-----BEGIN CERTIFICATE-----\r
MIIBdzCCASmgAwIBAgIICpy02U2yuPowBQYDK2VwMDMxCzAJBgNVBAYMAkNOMRIw\r
EAYDVQQKDAlBY21lIEluYy4xEDAOBgNVBAMMB0FjbWUgQ0EwHhcNMjEwOTI2MDEy\r
NTU5WhcNMjYwOTI1MDEyNTU5WjA6MQswCQYDVQQGDAJDTjESMBAGA1UECgwJQWNt\r
ZSBJbmMuMRcwFQYDVQQDDA5BY21lIEtWIHNlcnZlcjAqMAUGAytlcAMhAK2Z2AjF\r
A0uiltNuCvl6EVFl6tpaS/wJYB5IdWT2IISdo1QwUjAcBgNVHREEFTATghFrdnNl\r
cnZlci5hY21lLmluYzATBgNVHSUEDDAKBggrBgEFBQcDATAMBgNVHRMEBTADAQEA\r
MA8GA1UdDwEB/wQFAwMH4AAwBQYDK2VwA0EASGOmOWFPjbGhXNOmYNCa3lInbgRy\r
iTNtB/5kElnbKkhKhRU7yQ8HTHWWkyU5WGWbOOIXEtYp+5ERUJC+mzP9Bw==\r
-----END CERTIFICATE-----\r
"""
key = """
-----BEGIN PRIVATE KEY-----\r
MFMCAQEwBQYDK2VwBCIEIPMyINaewhXwuTPUufFO2mMt/MvQMHrGDGxgdgfy/kUu\r
oSMDIQCtmdgIxQNLopbTbgr5ehFRZeraWkv8CWAeSHVk9iCEnQ==\r
-----END PRIVATE KEY-----\r
"""
性能测试
基本的想法是我们连上 100 个 subscriber 作为背景,然后看 publisher publish 的速度。
因为 BROADCAST_CAPACITY 有限,是 128,当 publisher 速度太快,而导致 server 不能及时往 subscriber 发送时,server 接收 client 数据的速度就会降下来,无法接收新的 client,整体的 publish 的速度也会降下来,所以这个测试能够了解 server 处理 publish 的速度。为了确认这一点,我们在 start_tls_server() 函数中,在 process() 之前,再加个 100ms 的延时,人为减缓系统的处理速度:
async move {
let stream = ProstServerStream::new(stream.compat(), svc1.clone());
// 延迟 100ms 处理
time::sleep(Duration::from_millis(100)).await;
stream.process().await.unwrap();
Ok(())
}
在 Rust 下,我们可以用 criterion 库。它可以处理基本的性能测试,并生成漂亮的报告。所以在 Cargo.toml 中加入:
[dev-dependencies]
...
criterion = { version = "0.3", features = ["async_futures", "async_tokio", "html_reports"] } # benchmark
...
rand = "0.8" # 随机数处理
...
[[bench]]
name = "pubsub"
harness = false
最后这个 bench section,描述了性能测试的名字,它对应 benches 目录下的同名文件。我们创建和 src 平级的 benches,然后再创建 benches/pubsub.rs,添入如下代码:
use anyhow::Result;
use criterion::{criterion_group, criterion_main, Criterion};
use futures::StreamExt;
use kv6::{
start_client_with_config, start_server_with_config, ClientConfig, CommandRequest, ServerConfig,
StorageConfig, YamuxCtrl,
};
use rand::prelude::SliceRandom;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::runtime::Builder;
use tokio::time;
use tokio_rustls::client::TlsStream;
use tracing::info;
async fn start_server() -> Result<()> {
let addr = "127.0.0.1:9999";
let mut config: ServerConfig = toml::from_str(include_str!("../fixtures/server.conf"))?;
config.general.addr = addr.into();
config.storage = StorageConfig::MemTable;
tokio::spawn(async move {
start_server_with_config(&config).await.unwrap();
});
Ok(())
}
async fn connect() -> Result<YamuxCtrl<TlsStream<TcpStream>>> {
let addr = "127.0.0.1:9999";
let mut config: ClientConfig = toml::from_str(include_str!("../fixtures/client.conf"))?;
config.general.addr = addr.into();
Ok(start_client_with_config(&config).await?)
}
async fn start_subscribers(topic: &'static str) -> Result<()> {
let mut ctrl = connect().await?;
let stream = ctrl.open_stream().await?;
info!("C(subscriber): stream opened");
let cmd = CommandRequest::new_subscribe(topic.to_string());
tokio::spawn(async move {
let mut stream = stream.execute_streaming(&cmd).await.unwrap();
while let Some(Ok(data)) = stream.next().await {
drop(data);
}
});
Ok(())
}
async fn start_publishers(topic: &'static str, values: &'static [&'static str]) -> Result<()> {
let mut rng = rand::thread_rng();
let v = values.choose(&mut rng).unwrap();
let mut ctrl = connect().await.unwrap();
let mut stream = ctrl.open_stream().await.unwrap();
info!("C(publisher): stream opened");
let cmd = CommandRequest::new_publish(topic.to_string(), vec![(*v).into()]);
stream.execute_unary(&cmd).await.unwrap();
Ok(())
}
fn pubsub(c: &mut Criterion) {
// tracing_subscriber::fmt::init();
// 创建 Tokio runtime
let runtime = Builder::new_multi_thread()
.worker_threads(4)
.thread_name("pubsub")
.enable_all()
.build()
.unwrap();
let values = &["Hello", "Tyr", "Goodbye", "World"];
let topic = "lobby";
// 运行服务器和 100 个 subscriber,为测试准备
runtime.block_on(async {
eprint!("preparing server and subscribers");
start_server().await.unwrap();
time::sleep(Duration::from_millis(50)).await;
for _ in 0..100 {
start_subscribers(topic).await.unwrap();
eprint!(".");
}
eprintln!("Done!");
});
// 进行 benchmark
c.bench_function("publishing", move |b| {
b.to_async(&runtime)
.iter(|| async { start_publishers(topic, values).await })
});
}
criterion_group! {
name = benches;
config = Criterion::default().sample_size(10);
targets = pubsub
}
criterion_main!(benches);
大部分的代码都很好理解,就是创建服务器和客户端,为测试做准备。说一下这里面核心的 benchmark 代码:
c.bench_function("publishing", move |b| {
b.to_async(&runtime)
.iter(|| async { start_publishers(topic, values).await })
});
对于要测试的代码,我们可以封装成一个函数进行测试。这里因为要做 async 函数的测试,需要使用 runtime。普通的函数不需要调用 to_async。对于更多有关 criterion 的用法,可以参考它的文档。运行 cargo bench 后,会见到如下打印(如果你的代码无法通过,可以参考 repo 里的 diff_benchmark,我顺便做了一点小重构)
preparing server and subscribers....................................................................................................Done!
publishing time: [419.73 ms 426.84 ms 434.20 ms]
change: [-1.6712% +1.0499% +3.6586%] (p = 0.48 > 0.05)
No change in performance detected.
可以看到,单个 publish 的处理速度要 426ms,好慢!我们把之前在 start_tls_server() 里加的延迟去掉,再次测试:
preparing server and subscribers....................................................................................................Done!
publishing time: [318.61 ms 324.48 ms 329.81 ms]
change: [-25.854% -23.980% -22.144%] (p = 0.00 < 0.05)
Performance has improved.
嗯,这下 324ms,正好是减去刚才加的 100ms。可是这个速度依旧不合理,凭直觉我们感觉一下这个速度,是 Python 这样的语言还正常,如果是 Rust 也太慢了吧?
测量和监控
工业界有句名言:如果你无法测量,那你就无法改进(If you can’t measure it, you can’t improve it)。现在知道了 KV server 性能有问题,但并不知道问题出在哪里。我们需要使用合适的测量方式。
目前,比较好的端对端的性能监控和测量工具是 jaeger,我们可以在 KV server/client 侧收集监控信息,发送给 jaeger 来查看在服务器和客户端的整个处理流程中,时间都花费到哪里去了。
之前我们在 KV server 里使用的日志工具是 tracing,不过日志只是它的诸多功能之一,它还能做 instrument,然后配合 opentelemetry 库,我们就可以把 instrument 的结果发送给 jaeger 了。好,在 Cargo.toml 里添加新的依赖:
[dependencies]
...
opentelemetry-jaeger = "0.15" # opentelemetry jaeger 支持
...
tracing-appender = "0.1" # 文件日志
tracing-opentelemetry = "0.15" # opentelemetry 支持
tracing-subscriber = { version = "0.2", features = ["json", "chrono"] } # 日志处理
有了这些依赖后,在 benches/pubsub.rs 里,我们可以在初始化 tracing_subscriber 时,使用 jaeger 和 opentelemetry tracer:
fn pubsub(c: &mut Criterion) {
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("kv-bench")
.install_simple()
.unwrap();
let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer);
tracing_subscriber::registry()
.with(EnvFilter::from_default_env())
.with(opentelemetry)
.init();
let root = span!(tracing::Level::INFO, "app_start", work_units = 2);
let _enter = root.enter();
// 创建 Tokio runtime
...
}
设置好 tracing 后,就在系统的主流程上添加相应的 instrument:
新添加的代码你可以看 repo 中的 diff_telemetry。注意 instrument 可以用不同的名称,比如,对于 TlsConnector::new() 函数,可以用 #[instrument(name = “tls_connector_new”)],这样它的名字辨识度高一些。
为主流程中的函数添加完 instrument 后,你需要先打开一个窗口,运行 jaeger(需要 docker):
docker run -d -p6831:6831/udp -p6832:6832/udp -p16686:16686 -p14268:14268 jaegertracing/all-in-one:latest
然后带着 RUST_LOG=info 运行 benchmark:
RUST_LOG=info cargo bench
由于我的 OS X 上没装 docker(docker 不支持 Mac,需要 Linux VM 中转),我就在一个 Ubuntu 虚拟机里运行这两条命令:
preparing server and subscribers....................................................................................................Done!
publishing time: [1.7464 ms 1.9556 ms 2.2343 ms]
Found 2 outliers among 10 measurements (20.00%)
1 (10.00%) high mild
1 (10.00%) high severe
并没有做任何事情,似乎只是换了个系统,性能就提升了很多,这给我们一个 tip:也许问题出在 OS X 和 Linux 系统相关的部分。不管怎样,已经发送了不少数据给 jaeger,我们到 jaeger 上看看问题出在哪里。
打开 http://localhost:16686/,service 选 kv-bench,Operation 选 app_start,点击 “Find Traces”,我们可以看到捕获的 trace。因为运行了两次 benchmark,所以有两个 app_start 的查询结果:
可以看到,每次 start_client_with_config 都要花 1.6-2.5ms,其中有差不多一小半时间花在了 TlsClientConnector::new() 上:
如果说 TlsClientConnector::connect() 花不少时间还情有可原,因为这是整个 TLS 协议的握手过程,涉及到网络调用、包的加解密等。
但 TlsClientConnector::new() 就是加载一些证书、创建 TlsConnector 这个数据结构而已,为何这么慢?仔细阅读 TlsClientConnector::new() 的代码,你可以对照注释看:
#[instrument(name = "tls_connector_new", skip_all)]
pub fn new(
domain: impl Into<String> + std::fmt::Debug,
identity: Option<(&str, &str)>,
server_ca: Option<&str>,
) -> Result<Self, KvError> {
let mut config = ClientConfig::new();
// 如果有客户端证书,加载之
if let Some((cert, key)) = identity {
let certs = load_certs(cert)?;
let key = load_key(key)?;
config.set_single_client_cert(certs, key)?;
}
// 加载本地信任的根证书链
config.root_store = match rustls_native_certs::load_native_certs() {
Ok(store) | Err((Some(store), _)) => store,
Err((None, error)) => return Err(error.into()),
};
// 如果有签署服务器的 CA 证书,则加载它,这样服务器证书不在根证书链
// 但是这个 CA 证书能验证它,也可以
if let Some(cert) = server_ca {
let mut buf = Cursor::new(cert);
config.root_store.add_pem_file(&mut buf).unwrap();
}
Ok(Self {
config: Arc::new(config),
domain: Arc::new(domain.into()),
})
}
可以发现,它的代码唯一可能影响性能的就是加载本地信任的根证书链的部分。这个代码会和操作系统交互,获取信任的根证书链。也许,这就是影响性能的原因之一?
那我们将其简单重构一下。因为根证书链,只有在客户端没有提供用于验证服务器证书的 CA 证书时,才需要,所以可以在没有 CA 证书时,才加载本地的根证书链:
#[instrument(name = "tls_connector_new", skip_all)]
pub fn new(
domain: impl Into<String> + std::fmt::Debug,
identity: Option<(&str, &str)>,
server_ca: Option<&str>,
) -> Result<Self, KvError> {
let mut config = ClientConfig::new();
// 如果有客户端证书,加载之
if let Some((cert, key)) = identity {
let certs = load_certs(cert)?;
let key = load_key(key)?;
config.set_single_client_cert(certs, key)?;
}
// 如果有签署服务器的 CA 证书,则加载它,这样服务器证书不在根证书链
// 但是这个 CA 证书能验证它,也可以
if let Some(cert) = server_ca {
let mut buf = Cursor::new(cert);
config.root_store.add_pem_file(&mut buf).unwrap();
} else {
// 加载本地信任的根证书链
config.root_store = match rustls_native_certs::load_native_certs() {
Ok(store) | Err((Some(store), _)) => store,
Err((None, error)) => return Err(error.into()),
};
}
Ok(Self {
config: Arc::new(config),
domain: Arc::new(domain.into()),
})
}
完成这个修改后,我们再运行 RUST_LOG=info cargo bench,现在的性能达到了 1.64ms,相比之前的 1.95ms,提升了 16%。打开 jaeger,看最新的 app_start 结果,发现 TlsClientConnector::new() 所花时间降到了 ~12us 左右。嗯,虽然没有抓到服务器本身的 bug,但客户端的 bug 倒是解决了一个。
至于服务器,如果我们看 Service::execute 的主流程,执行速度在 40-60us,问题不大:
再看服务器的主流程 server_process:
这是我们在 start_tls_server() 里额外添加的 tracing span:
loop { let root = span!(tracing::Level::INFO, “server_process”);
let _enter = root.enter(); …}
把右上角的 trace timeline 改成 trace graph,然后点右侧的 time:
可以看到,主要的服务器时间都花在了 TLS accept 上,所以,目前服务器没有太多值得优化的地方。
由于 tracing 本身也占用不少 CPU,所以我们直接 cargo bench 看看目前的结果:
preparing server and subscribers....................................................................................................Done!
publishing time: [1.3986 ms 1.4140 ms 1.4474 ms]
change: [-26.647% -19.977% -10.798%] (p = 0.00 < 0.05)
Performance has improved.
Found 2 outliers among 10 measurements (20.00%)
2 (20.00%) high severe
不加 RUST_LOG=info 后,整体性能到了 1.4ms。这是我在 Ubuntu 虚拟机下的结果。我们再回到 OS X 下测试,看看 TlsClientConnector::new() 的修改,对 OS X 是否有效:
preparing server and subscribers....................................................................................................Done!
publishing time: [1.4086 ms 1.4229 ms 1.4315 ms]
change: [-99.570% -99.563% -99.554%] (p = 0.00 < 0.05)
Performance has improved.
嗯,在我的 OS X 下,现在整体性能也到了 1.4ms 的水平。这也意味着,在有 100 个 subscribers 的情况下,我们的 KV server 每秒钟可以处理 714k publish 请求;而在 1000 个 subscribers 的情况下,性能在 11.1ms 的水平,也就是每秒可以处理 90k publish 请求: