引言:为什么选择 TDengine 与 Rust?
TDengine 是一款专为物联网、车联网、工业互联网等时序数据场景优化设计的开源时序数据库,支持高并发写入、高效查询及流式计算,通过“一个数据采集点一张表”与“超级表”的概念显著提升性能。
Rust 作为一门系统级编程语言,近年来在数据库、嵌入式系统、分布式服务等领域迅速崛起,以其内存安全、高性能著称,与 TDengine 的高效特性天然契合,适合构建高可靠、高性能的数据处理系统。
TDengine Rust 连接器的设计与架构
TDengine Rust 连接器的主要目标是提供一个高效、安全且易于使用的接口,让开发者能够通过 Rust 语言与 TDengine 数据库进行高效的交互。连接器的设计充分考虑了 Rust 语言的优势,如内存安全、并发处理和高性能,同时确保与 TDengine 数据库之间的通信可靠且高效。
主要模块间的架构如下图:
解释:
-
taos-query 模块作为核心,负责定义公共接口和数据结构。
-
taos-optin 模块和 taos-ws 模块分别实现了这些公共接口,负责原生连接和 WebSocket 连接的业务逻辑实现。
-
taos 模块封装了 taos-optin 和 taos-ws 模块中的实现,同时将 taos-query 模块所定义的公共接口和数据结构予以暴露。
-
最终,用户只需依赖 taos 模块,即可轻松使用整个系统,而无需了解内部复杂的实现细节。
快速入门:从安装到代码开发
1.在 Cargo.toml 中添加以下内容:
[dependencies]
anyhow = "1.0.96"
chrono = "0.4.39"
serde = "1.0.217"
taos = "0.12.3"
tokio = "1.43.0"
2.代码演示,数据写入与查询:
use chrono::{DateTime, Local};
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Establish native connection
let dsn = "taos://localhost:6030";
let taos = TaosBuilder::from_dsn(dsn)?.build().await?;
let db = "power";
// Prepare the database
taos.exec_many([
format!("DROP DATABASE IF EXISTS {db}"),
format!("CREATE DATABASE {db}"),
format!("USE {db}"),
])
.await?;
let inserted = taos.exec_many([
// Create a super table
"CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \
TAGS (group_id INT, location varchar(20))",
// Create a subtable
"CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",
// Write one record at a time
"INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",
// Write Null values
"INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",
// Automatically create table on insert
"INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",
// Write multiple records at once
"INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",
]).await?;
assert_eq!(inserted, 6);
let mut result = taos.query("SELECT * FROM meters").await?;
for field in result.fields() {
println!("Get the field: {}", field.name());
}
println!();
// Query option 1, using row stream.
let mut rows = result.rows();
while let Some(row) = rows.try_next().await? {
for (name, value) in row {
println!("Get the value of {}: {}", name, value);
}
println!()
}
// Query option 2, deserialize using serde.
#[derive(Debug, serde::Deserialize)]
#[allow(dead_code)]
struct Record {
// Deserialize timestamp to chrono::DateTime<Local>
ts: DateTime<Local>,
// Deserialize float to f32
current: Option<f32>,
// Deserialize int to i32
voltage: Option<i32>,
// Deserialize float to f32
phase: Option<f32>,
// Deserialize int to i32
group_id: i32,
// Deserialize varchar to String
location: String,
}
let records: Vec<Record> = taos
.query("SELECT * FROM meters")
.await?
.deserialize()
.try_collect()
.await?;
println!("Get records: {:?}", records);
Ok(())
}
高级功能与最佳实践
连接池优化:提升高并发性能
频繁创建和销毁数据库连接会带来显著的开销,尤其是在高并发场景(如物联网设备高频上报数据)中。通过连接池复用连接,可减少 TCP 握手、认证等重复操作,提升整体吞吐量。
代码示例:
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Create a connection pool
let dsn = "taos://localhost:6030";
let pool = TaosBuilder::from_dsn(dsn)?.pool()?;
let taos = pool.get().await?;
let db = "power";
// Prepare the database
taos.exec_many([
format!("DROP DATABASE IF EXISTS {db}"),
format!("CREATE DATABASE {db}"),
format!("USE {db}"),
])
.await?;
let inserted = taos.exec_many([
// Create a super table
"CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \
TAGS (group_id INT, location varchar(20))",
// Create a subtable
"CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",
// Write one record at a time
"INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",
// Write Null values
"INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",
// Automatically create table on insert
"INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",
// Write multiple records at once
"INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",
]).await?;
assert_eq!(inserted, 6);
Ok(())
}
原生连接与 WebSocket 连接
对于原生连接和 WebSocket 连接这两种方式,除了建立连接所使用的 DSN 不同外,其余接口调用没有差异。
代码示例:
use taos::*;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// let dsn = "taosws://localhost:6041"; // WebSocket connection
let dsn = "taos://localhost:6030"; // Native connection
let _taos = TaosBuilder::from_dsn(dsn)?.build().await?;
Ok(())
}
同步接口和异步接口
TDengine Rust 连接器的接口分为同步接口和异步接口。通常情况下,同步接口基于异步接口实现,二者的方法签名,除了异步接口多了 async 关键字外,基本一致。
异步接口的代码示例可查阅第 3 章节,同步接口的代码示例如下:
use taos::sync::*;
fn main() -> anyhow::Result<()> {
// Establish native connection
let dsn = "taos://localhost:6030";
let taos = TaosBuilder::from_dsn(dsn)?.build()?;
let db = "power";
// Prepare the database
taos.exec_many([
format!("DROP DATABASE IF EXISTS {db}"),
format!("CREATE DATABASE {db}"),
format!("USE {db}"),
])?;
let inserted = taos.exec_many([
// Create a super table
"CREATE STABLE meters(ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) \
TAGS (group_id INT, location varchar(20))",
// Create a subtable
"CREATE TABLE d0 USING meters TAGS(0, 'Beijing Chaoyang')",
// Write one record at a time
"INSERT INTO d0 VALUES(now, 10.15, 217, 0.33)",
// Write Null values
"INSERT INTO d0 VALUES(now, NULL, NULL, NULL)",
// Automatically create table on insert
"INSERT INTO d1 USING meters TAGS(1, 'Beijing Haidian') VALUES(now, 10.1, 119, 0.32)",
// Write multiple records at once
"INSERT INTO d1 VALUES(now+1s, 10.22, 120, 0.33) (now+2s, 11.23, 121, 0.34) (now+3s, 12.23, 118, 0.32)",
])?;
assert_eq!(inserted, 6);
Ok(())
}
结语:开源社区的星辰大海
TDengine Rust 连接器的成长史,是一个典型的技术民主化故事——从社区萌芽到官方支持,从边缘工具到核心生态。它证明了两个事实:
-
开源协作的效率:在数据库领域,社区贡献者的“需求反哺”比闭门造车更能击中痛点。
-
Rust 的生态潜力:作为系统级编程语言的 Rust,正在时序数据处理这样的垂直领域开辟新战场。
我们期待你的参与,欢迎提交 PR,一起推动 TDengine Rust 连接器的进步,携手开创更加美好的开源未来!
附录
-
https://docs.taosdata.com/
-
https://github.com/taosdata/taos-connector-rust