引言
在医疗领域,数据就是生命的密码,每一个数据点都可能蕴含着拯救生命的关键信息。特别是在 ICU 这样的重症监护场景中,医生需要实时、准确地了解患者的病情变化,以便做出及时有效的治疗决策。而随着医疗技术的飞速发展,医疗数据的规模和复杂性也在呈指数级增长,这给传统的数据分析方法带来了巨大的挑战。人工智能技术的出现,为医疗数据分析带来了新的曙光。通过机器学习、深度学习等算法,人工智能能够从海量的医疗数据中挖掘出有价值的信息,帮助医生更准确地诊断疾病、预测病情发展、制定个性化的治疗方案。
然而,要充分发挥人工智能在医疗数据分析中的潜力,高效的数据处理工具是必不可少的。Polars 作为一款基于 Rust 开发的高性能数据处理库,在处理大规模医疗数据时展现出了卓越的性能和效率。它不仅支持多线程并行计算,能够充分利用现代硬件的多核优势,还提供了丰富的数据操作函数和灵活的数据结构,使得数据处理变得更加简洁和高效。在本文中,我们将深入探讨如何使用 Polars 进行人工智能医疗数据分析,特别是在 ICU 场景中的应用,通过实际案例和代码示例,展示 Polars 在医疗数据处理中的强大能力。
Polars 初印象
(一)Polars 是什么
Polars 是一个用 Rust 编写的数据帧库,它基于 Arrow 数据结构,专为提供快速的 DataFrame 操作而设计。Rust 语言的高效性和安全性赋予了 Polars 出色的性能和稳定性,使其在处理大规模数据时表现卓越。与其他常见的数据处理库不同,Polars 通过独特的表达式语法支持查询优化和执行,能够更智能地处理数据操作,大大提高了数据处理的效率。
(二)独特优势
与传统的数据处理工具如 Pandas 相比,Polars 具有诸多显著优势。在性能方面,Polars 利用多线程并行计算,能够充分发挥多核 CPU 的优势,极大地提升了数据处理速度。例如,在对大规模的医疗记录进行分组统计时,Polars 的处理速度可能是 Pandas 的数倍甚至数十倍。在内存占用上,Polars 采用了更高效的内存管理策略,能够处理比可用内存更大的数据集,这对于医疗领域中常常出现的海量数据处理尤为重要。同时,Polars 的查询优化功能,如谓词下推、投影下推等,能够减少不必要的数据计算和传输,进一步提高了数据处理的效率和性能。
Polars 在 ICU 医疗数据分析中的应用
(一)数据读取与预处理
在 ICU 医疗数据分析中,数据读取是第一步,也是至关重要的一步。Polars 提供了强大的文件读取功能,能够快速读取各种常见格式的医疗数据文件,如 CSV、Parquet 等。以 CSV 文件为例,使用 Polars 读取数据的代码非常简洁:
import polars as pl
# 读取CSV文件
df = pl.read_csv('icu_data.csv')
这里的read_csv函数会自动识别文件的格式,并将数据解析为 Polars 的 DataFrame 结构。与 Pandas 相比,Polars 在读取大规模 CSV 文件时具有明显的速度优势。例如,对于一个包含 100 万行数据的 CSV 文件,使用 Pandas 读取可能需要数十秒甚至数分钟,而 Polars 往往能在几秒内完成读取。这是因为 Polars 在底层使用了高效的 Rust 代码进行数据解析,并且支持多线程并行读取,能够充分利用多核 CPU 的性能。
读取到数据后,数据清洗是必不可少的环节。在 ICU 数据中,可能存在各种质量问题,如缺失值、异常值和重复数据等。对于缺失值的处理,Polars 提供了多种方法。可以使用drop_nulls函数直接删除包含缺失值的行:
# 删除包含缺失值的行
cleaned_df = df.drop_nulls()
也可以使用fill_null函数对缺失值进行填充,例如用指定的值填充:
# 用0填充缺失值
cleaned_df = df.fill_null(0)
还可以使用更复杂的方法,如用均值、中位数等统计量来填充数值型数据的缺失值:
# 用均值填充数值型数据的缺失值
numerical_columns = df.select(pl.col(pl.NUMERIC_DTYPES)).columns
for col in numerical_columns:
mean_value = df[col].mean()
cleaned_df = df.fill_null({col: mean_value})
对于异常值,我们可以通过设定合理的阈值来进行检测和处理。以心率数据为例,正常的心率范围通常在 60 - 100 次 / 分钟之间,我们可以过滤掉超出这个范围的数据:
# 过滤心率异常的数据
cleaned_df = df.filter((pl.col("heart_rate") > 60) & (pl.col("heart_rate") < 100))
对于重复数据,使用unique函数可以轻松去除:
# 去除重复数据
cleaned_df = df.unique()
数据转换也是数据预处理的重要步骤。在 ICU 数据中,经常需要将数据转换为适合分析的格式。例如,将数据类型进行转换,把字符串类型的数值转换为数值类型,以便进行数学运算:
# 将字符串类型的数值列转换为数值类型
cleaned_df = cleaned_df.with_columns(
pl.col("numeric_column").str.to_f32()
)
时间格式的处理也非常关键。ICU 数据中的时间戳通常需要转换为特定的时间格式,以便进行时间序列分析。假设数据中有一个时间戳列timestamp,我们可以将其转换为日期时间类型:
# 将时间戳列转换为日期时间类型
cleaned_df = cleaned_df.with_columns(
pl.col("timestamp").str.strptime(pl.Datetime, fmt="%Y-%m-%d %H:%M:%S")
)
通过以上的数据读取、清洗和转换步骤,我们可以将原始的 ICU 医疗数据处理成干净、整齐的数据,为后续的数据分析和建模打下坚实的基础。
(二)数据分析与洞察
在完成数据预处理后,接下来就是进行数据分析,以获取有价值的信息和洞察。基本统计分析是了解数据特征的重要手段。使用 Polars,我们可以轻松计算 ICU 患者生命体征的各种统计量。以计算心率、血氧饱和度和体温的均值、中位数和标准差为例:
# 计算生命体征的均值、中位数和标准差
stats = df.select(
pl.col(["heart_rate", "blood_oxygen", "body_temperature"]).agg([
pl.mean(),
pl.median(),
pl.std()
])
)
print(stats)
上述代码中,select函数用于选择需要分析的列,agg函数用于对选中的列进行聚合操作,这里分别计算了均值、中位数和标准差。通过这些统计量,我们可以了解患者生命体征的整体水平和波动情况。
特征工程是从原始数据中提取和创建新特征的过程,它对于提高模型的性能和准确性非常重要。在 ICU 医疗数据分析中,我们可以根据患者的生命体征数据计算一些新的特征,如生命体征的变化率。以计算心率的变化率为例:
# 计算心率变化率
df = df.with_columns(
(pl.col("heart_rate").diff() / pl.col("heart_rate").shift()).alias("heart_rate_change_rate")
)
上述代码中,diff函数用于计算相邻两行数据的差值,shift函数用于将数据向下移动一行,从而实现与前一行数据的比较。通过计算心率变化率,我们可以了解患者心率的动态变化情况,这对于判断患者的病情发展具有重要意义。
关联分析可以帮助我们发现不同医疗指标之间的潜在关系。在 ICU 数据中,我们可以分析心率、血氧饱和度、体温等指标之间的相关性。使用 Polars 结合 Seaborn 库进行关联分析和可视化:
import seaborn as sns
import matplotlib.pyplot as plt
# 计算相关性矩阵
corr_matrix = df.select(pl.col(["heart_rate", "blood_oxygen", "body_temperature"])).corr()
# 绘制热力图
plt.figure(figsize=(10, 8))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm')
plt.title('Correlation Heatmap of Vital Signs')
plt.show()
上述代码中,corr函数用于计算相关性矩阵,然后使用 Seaborn 库的heatmap函数绘制热力图。在热力图中,颜色越深表示相关性越强,通过观察热力图,我们可以直观地看到不同生命体征之间的相关性,例如,可能发现心率与体温之间存在一定的正相关关系,这对于深入了解患者的生理机制和病情诊断提供了有价值的线索。
(三)异常检测与预警
在 ICU 医疗场景中,及时发现患者生命体征的异常变化对于保障患者的生命安全至关重要。我们可以集成异常检测模型来实现这一目标。以 Isolation Forest 模型为例,它是一种基于隔离思想的异常检测算法,能够有效地识别数据中的异常点。首先,我们需要使用历史数据对 Isolation Forest 模型进行训练:
from sklearn.ensemble import IsolationForest
import numpy as np
# 准备训练数据
historical_data = df.select(pl.col(["heart_rate", "blood_oxygen", "body_temperature"])).to_numpy()
# 训练Isolation Forest模型
model = IsolationForest(contamination=0.1)
model.fit(historical_data)
这里的contamination参数表示数据中异常点的比例,我们根据实际情况设置为 0.1。训练好模型后,就可以使用它对实时数据进行异常检测:
# 实时检测异常
def detect_anomaly(data_point):
features = np.array([data_point["heart_rate"], data_point["blood_oxygen"], data_point["body_temperature"]]).reshape(1, -1)
return model.predict(features)[0] == -1
上述代码中,detect_anomaly函数接受一个数据点,将其转换为模型所需的输入格式,然后使用训练好的模型进行预测。如果预测结果为 - 1,则表示该数据点为异常点。
结合实时数据处理,我们可以构建一个实时预警系统。假设我们有一个实时数据生成器simulate_patient_stream,它不断生成患者的生命体征数据:
import time
# 模拟患者生命体征数据流
def simulate_patient_stream():
while True:
patient_id = "patient_1"
timestamp = int(time.time() * 1000)
heart_rate = np.random.randint(60, 100)
blood_oxygen = np.random.uniform(95.0, 100.0)
body_temperature = np.random.uniform(36.0, 38.0)
yield {
"patient_id": patient_id,
"timestamp": timestamp,
"heart_rate": heart_rate,
"blood_oxygen": blood_oxygen,
"body_temperature": body_temperature
}
time.sleep(1)
# 实时处理流数据并触发预警
stream = simulate_patient_stream()
for data in stream:
if detect_anomaly(data):
print(f"⚠️ 异常告警:患者 {data['patient_id']} 生命体征异常!")
上述代码中,simulate_patient_stream函数模拟了患者生命体征的实时数据流,time.sleep(1)表示每秒生成一个数据点。
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1698765432000, 'heart_rate': 75, 'blood_oxygen': 97.5, 'body_temperature': 37.2}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1698765433000, 'heart_rate': 85, 'blood_oxygen': 96.8, 'body_temperature': 36.9}
⚠️ 异常告警:患者 patient_1 生命体征异常!
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1698765434000, 'heart_rate': 110, 'blood_oxygen': 98.1, 'body_temperature': 37.5}
在数据处理循环中,我们调用detect_anomaly函数对每个数据点进行异常检测,如果检测到异常,则打印预警信息,及时通知医护人员进行处理,从而实现对患者生命体征的实时监测和异常预警,为患者的生命安全提供有力保障。
案例实战
(一)案例背景
在本次案例中,我们聚焦于某大型医院的 ICU 病房,该病房肩负着救治各类重症患者的重任。我们的目标是通过对 ICU 患者的医疗数据进行深入分析,挖掘其中潜在的信息,为医疗决策提供有力支持。
数据规模方面,我们收集了近一年来 ICU 病房中有特色的 100例患者的医疗记录,这些数据涵盖了患者的基本信息、生命体征数据、检验检查结果、治疗方案以及病情转归等多个方面,数据总量超过 10 万条。数据来源主要包括医院的电子病历系统(EMR)、重症监护信息系统(ICIS)以及各类医疗设备,如监护仪、呼吸机等。这些设备通过实时监测患者的生命体征,如心率、血氧饱和度、血压、体温等,将数据自动传输并存储到相应的信息系统中。
本次分析的目标明确,旨在通过对这些数据的处理和分析,实现以下几个关键目标:一是及时发现患者生命体征的异常变化,为医护人员提供预警,以便采取及时有效的治疗措施;二是深入分析患者的病情发展趋势,预测患者的治疗效果和预后情况,帮助医生制定更加科学合理的治疗方案;三是评估不同治疗方案的有效性,为临床治疗提供参考依据,推动医疗质量的提升。通过实现这些目标,我们期望能够提高 ICU 患者的救治成功率,降低死亡率,改善患者的预后情况。
(二)实现步骤
- 数据准备:数据获取是整个分析流程的第一步。我们从医院的电子病历系统和重症监护信息系统中提取相关数据,这些数据以 CSV 文件的形式存储。在提取过程中,我们需要确保数据的完整性和准确性,避免数据丢失或错误。例如,对于一些关键的生命体征数据,如心率、血氧饱和度等,我们要检查数据是否存在缺失值或异常值。数据清洗是数据准备阶段的关键环节。在 ICU 数据中,经常会出现各种质量问题,如缺失值、异常值和重复数据等。对于缺失值,我们采用了多种处理方法。对于数值型数据,如年龄、心率等,我们使用均值填充法,即计算该列数据的均值,用均值来填充缺失值。对于分类数据,如性别、疾病类型等,我们使用众数填充法,即使用该列数据中出现次数最多的值来填充缺失值。对于异常值,我们根据数据的实际情况设定合理的阈值进行过滤。例如,正常的心率范围通常在 60 - 100 次 / 分钟之间,我们将超出这个范围的数据视为异常值进行处理。对于重复数据,我们通过比较数据的关键列,如患者 ID、时间戳等,删除完全相同的记录,确保数据的唯一性。
- Polars 分析流程:数据处理是数据分析的核心环节之一。使用 Polars 进行数据处理时,我们首先读取清洗后的数据。以读取 CSV 文件为例,代码如下:
import polars as pl
# 读取CSV文件
df = pl.read_csv('icu_data_cleaned.csv')
读取数据后,我们可以进行各种数据操作。例如,计算患者生命体征的统计量,如均值、中位数和标准差:
# 计算生命体征的均值、中位数和标准差
stats = df.select(
pl.col(["heart_rate", "blood_oxygen", "body_temperature"]).agg([
pl.mean(),
pl.median(),
pl.std()
])
)
print(stats)
在这个代码中,select函数用于选择需要分析的列,agg函数用于对选中的列进行聚合操作,这里分别计算了均值、中位数和标准差。
shape: (1, 9)
┌───────────────┬───────────────┬───────────────┬────────────────┬────────────────┬────────────────┬───────────────┬───────────────┬───────────────┐
│ heart_rate_mean│ blood_oxygen_mean│ body_temperature_mean│ heart_rate_median│ blood_oxygen_median│ body_temperature_median│ heart_rate_std│ blood_oxygen_std│ body_temperature_std│
│ --- │ --- │ --- │ --- │ --- │ --- │ --- │ --- │ --- │
│ f64 │ f64 │ f64 │ f64 │ f64 │ f64 │ f64 │ f64 │ f64 │
├───────────────┼───────────────┼───────────────┼────────────────┼────────────────┼────────────────┼───────────────┼───────────────┼───────────────┤
│ 99.0 │ 97.16 │ 37.08 │ 95.0 │ 97.0 │ 37.0 │ 15.8114 │ 0.9274 │ 0.2631 │
└───────────────┴───────────────┴───────────────┴────────────────┴────────────────┴────────────────┴───────────────┴───────────────┴───────────────┘
我们还可以进行数据过滤,筛选出特定条件的数据。例如,筛选出心率大于 100 次 / 分钟的患者数据:
# 筛选出心率大于100次/分钟的患者数据
filtered_df = df.filter(pl.col("heart_rate") > 100)
print(filtered_df)
输出示例结果为:
shape: (2, 4)
┌────────────┬────────────┬───────────────┬──────────────────┐
│ patient_id ┆ heart_rate ┆ blood_oxygen ┆ body_temperature │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ i64 ┆ f64 ┆ f64 │
├────────────┼────────────┼───────────────┼──────────────────┤
│ 2 ┆ 110 ┆ 96.8 ┆ 36.9 │
│ 4 ┆ 120 ┆ 95.5 ┆ 37.5 │
└────────────┴────────────┴───────────────┴──────────────────┘
异常检测在 ICU 医疗数据分析中至关重要。我们使用 Isolation Forest 模型进行异常检测,该模型能够有效地识别数据中的异常点。首先,我们需要使用历史数据对 Isolation Forest 模型进行训练:
from sklearn.ensemble import IsolationForest
import numpy as np
# 准备训练数据
historical_data = df.select(pl.col(["heart_rate", "blood_oxygen", "body_temperature"])).to_numpy()
# 训练Isolation Forest模型
model = IsolationForest(contamination=0.1)
model.fit(historical_data)
训练好模型后,我们可以使用它对实时数据进行异常检测:
# 实时检测异常
def detect_anomaly(data_point):
features = np.array([data_point["heart_rate"], data_point["blood_oxygen"], data_point["body_temperature"]]).reshape(1, -1)
return model.predict(features)[0] == -1
输出结果示例为:
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1698765432000, 'heart_rate': 75, 'blood_oxygen': 97.5, 'body_temperature': 37.2}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1698765433000, 'heart_rate': 85, 'blood_oxygen': 96.8, 'body_temperature': 36.9}
⚠️ 异常告警:患者 patient_1 生命体征异常!
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1698765434000, 'heart_rate': 110, 'blood_oxygen': 98.1, 'body_temperature': 37.5}
在实际应用中,我们可以结合实时数据处理,构建一个实时预警系统。假设我们有一个实时数据生成器simulate_patient_stream,它不断生成患者的生命体征数据:
import time
# 模拟患者生命体征数据流
def simulate_patient_stream():
while True:
patient_id = "patient_1"
timestamp = int(time.time() * 1000)
heart_rate = np.random.randint(60, 100)
blood_oxygen = np.random.uniform(95.0, 100.0)
body_temperature = np.random.uniform(36.0, 38.0)
yield {
"patient_id": patient_id,
"timestamp": timestamp,
"heart_rate": heart_rate,
"blood_oxygen": blood_oxygen,
"body_temperature": body_temperature
}
time.sleep(1)
# 实时处理流数据并触发预警
stream = simulate_patient_stream()
for data in stream:
if detect_anomaly(data):
print(f"⚠️ 异常告警:患者 {data['patient_id']} 生命体征异常!")
输出示例:
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652571657, 'heart_rate': 74, 'blood_oxygen': 96.1585746478717, 'body_temperature': 37.11160976367698}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652573658, 'heart_rate': 84, 'blood_oxygen': 98.18907140455404, 'body_temperature': 36.8864460600183}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652575659, 'heart_rate': 67, 'blood_oxygen': 98.21842389126833, 'body_temperature': 37.60197714267675}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652577659, 'heart_rate': 97, 'blood_oxygen': 98.99925164545152, 'body_temperature': 37.439169552714986}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652579660, 'heart_rate': 94, 'blood_oxygen': 98.86733605913881, 'body_temperature': 36.71330296790575}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652581660, 'heart_rate': 68, 'blood_oxygen': 97.88245541703247, 'body_temperature': 36.91134320119017}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652583661, 'heart_rate': 83, 'blood_oxygen': 96.45605835515127, 'body_temperature': 37.19832896149465}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652585662, 'heart_rate': 63, 'blood_oxygen': 96.64478822029321, 'body_temperature': 37.93596463389256}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652587663, 'heart_rate': 80, 'blood_oxygen': 99.08248917154252, 'body_temperature': 36.07374289135749}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652589663, 'heart_rate': 72, 'blood_oxygen': 98.85073005324531, 'body_temperature': 36.39633327245946}
📊 当前数据:{'patient_id': 'patient_1', 'timestamp': 1740652591664, 'heart_rate': 76, 'blood_oxygen': 96.89227001790177, 'body_temperature': 37.238381897570875}
- 结果展示与解读:分析结果的展示对于医疗决策具有重要意义。我们可以使用 Polars 结合其他可视化库,如 Matplotlib、Seaborn 等,将分析结果以直观的图表形式呈现出来。例如,绘制患者心率的箱线图,以展示心率数据的分布情况:
import matplotlib.pyplot as plt
import seaborn as sns
# 绘制患者心率的箱线图
sns.boxplot(data=df, x="heart_rate")
plt.title("Box Plot of Heart Rate")
plt.show()
在这个箱线图中,我们可以清晰地看到心率数据的中位数、四分位数以及异常值的分布情况。通过观察箱线图,我们可以了解患者心率的整体水平和波动情况,为医疗决策提供参考。再如,绘制患者生命体征之间的相关性热力图,以展示不同生命体征之间的相关性:
import polars as pl
import seaborn as sns
import matplotlib.pyplot as plt
# 假设df是包含数据的Polars DataFrame
numeric_cols = ["heart_rate", "blood_oxygen", "body_temperature"]
# 转换为Pandas DataFrame并计算相关性矩阵
df_pandas = df.select(numeric_cols).to_pandas()
corr_matrix = df_pandas.corr()
# 绘制热力图
plt.figure(figsize=(10, 8))
sns.heatmap(
corr_matrix,
annot=True,
cmap='coolwarm',
fmt=".2f", # 显示两位小数
linewidths=0.5,
annot_kws={"size": 12}
)
plt.title('Correlation Heatmap of Vital Signs', fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.show()
输出热力图示例如下:
在这个热力图中,颜色越深表示相关性越强。通过观察热力图,我们可以直观地看到不同生命体征之间的相关性,例如,可能发现心率与体温之间存在一定的正相关关系,这对于深入了解患者的生理机制和病情诊断提供了有价值的线索。这些分析结果对于医疗决策具有重要的指导意义。通过及时发现患者生命体征的异常变化,医生可以迅速采取相应的治疗措施,如调整药物剂量、进行紧急抢救等,从而提高患者的救治成功率。通过分析患者的病情发展趋势和治疗效果,医生可以为患者制定更加个性化的治疗方案,提高治疗的针对性和有效性。通过评估不同治疗方案的有效性,医生可以选择最优的治疗方案,为患者提供更好的医疗服务。
优化与拓展
(一)性能优化
- 懒加载模式:在处理大规模医疗数据时,数据量往往非常庞大,传统的立即执行模式可能会导致内存占用过高,甚至出现内存不足的情况。Polars 的懒加载模式,也称为延迟执行模式,能够有效解决这个问题。在懒加载模式下,当我们对数据进行操作时,Polars 并不会立即执行这些操作,而是构建一个计算图,记录下所有的操作步骤。只有当我们真正需要结果时,Polars 才会根据计算图来优化和执行这些操作。例如,当我们读取一个大型的 ICU 医疗数据文件并进行一系列的数据过滤和聚合操作时,如果使用立即执行模式,每一步操作都会立即执行并占用内存,而使用懒加载模式,这些操作会被记录下来,直到最后需要结果时才会一次性执行,这样可以大大减少中间结果的内存占用,提高处理效率。使用懒加载模式的代码示例如下:
import polars as pl
# 以懒加载模式读取CSV文件
lazy_df = pl.scan_csv('icu_data.csv')
# 进行数据过滤和聚合操作,这些操作不会立即执行
filtered_df = lazy_df.filter((pl.col("heart_rate") > 60) & (pl.col("heart_rate") < 100))
aggregated_df = filtered_df.groupby("patient_id").agg(pl.col("heart_rate").mean())
# 执行计算,获取最终结果
result = aggregated_df.collect()
在上述代码中,scan_csv函数以懒加载模式读取数据,返回一个LazyFrame对象。后续的filter和groupby操作也都是在LazyFrame上进行的,不会立即执行。只有当调用collect方法时,Polars 才会根据计算图优化并执行这些操作,返回最终的结果。
2. 多线程与并行计算:现代计算机的 CPU 通常具有多个核心,充分利用这些核心可以显著提高数据处理的速度。Polars 内置了对多线程和并行计算的支持,能够将数据处理任务分配到多个线程中并行执行,从而充分发挥多核 CPU 的优势。在进行数据聚合操作时,Polars 可以自动将数据分成多个块,每个块由一个线程进行处理,最后将各个线程的结果合并起来。这样可以大大缩短数据处理的时间,提高效率。以计算 ICU 患者生命体征的统计量为例,使用多线程并行计算的代码如下:
import polars as pl
# 读取CSV文件
df = pl.read_csv('icu_data.csv')
# 启用多线程计算(Polars 默认启用多线程,无需额外配置)
# pl.Config.set_global_string_cache(True) # 这一行与当前任务无关,可以移除
# 计算生命体征的均值、中位数和标准差
stats = df.select(
pl.col(["heart_rate", "blood_oxygen", "body_temperature"]).agg([
pl.mean().suffix("_mean"), # 添加后缀以区分统计量
pl.median().suffix("_median"),
pl.std().suffix("_std"),
])
)
# 打印结果
print(stats)
输出结果示例图:
shape: (1, 9)
┌───────────────┬───────────────┬───────────────┬────────────────┬────────────────┬────────────────┬───────────────┬───────────────┬───────────────┐
│ heart_rate_mean│ blood_oxygen_mean│ body_temperature_mean│ heart_rate_median│ blood_oxygen_median│ body_temperature_median│ heart_rate_std│ blood_oxygen_std│ body_temperature_std│
│ --- │ --- │ --- │ --- │ --- │ --- │ --- │ --- │ --- │
│ f64 │ f64 │ f64 │ f64 │ f64 │ f64 │ f64 │ f64 │ f64 │
├───────────────┼───────────────┼───────────────┼────────────────┼────────────────┼────────────────┼───────────────┼───────────────┼───────────────┤
│ 80.8333 │ 96.8167 │ 36.85 │ 80.0 │ 97.0 │ 36.95 │ 18.9041 │ 1.6251 │ 0.5244 │
└───────────────┴───────────────┴───────────────┴────────────────┴────────────────┴────────────────┴───────────────┴───────────────┴───────────────┘
在上述代码中,通过pl.Config.set_global_string_cache(True)启用了多线程计算,这样在执行agg操作时,Polars 会自动利用多线程并行计算,提高计算速度。
(二)拓展应用
- 结合机器学习模型:在医疗领域,利用机器学习模型进行疾病预测和诊断是一个重要的研究方向。Polars 处理后的数据可以作为机器学习模型的输入,用于训练和部署各种疾病预测模型。以预测 ICU 患者的病情恶化风险为例,我们可以使用逻辑回归、决策树、随机森林等机器学习算法,结合 Polars 处理后的患者生命体征数据、病史等信息进行模型训练。训练好的模型可以根据实时的患者数据预测病情恶化的风险,为医生提供决策支持。使用 Polars 结合 Scikit - learn 库进行疾病预测模型训练的代码示例如下:
import polars as pl
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
# 读取并处理数据
df = pl.read_csv('icu_data.csv')
# 假设特征为心率、血氧饱和度、体温,目标为病情是否恶化(0表示未恶化,1表示恶化)
features = df.select(["heart_rate", "blood_oxygen", "body_temperature"]).to_numpy()
target = df.select("deterioration_risk").to_numpy().flatten()
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.2, random_state=42)
# 训练逻辑回归模型
model = LogisticRegression()
model.fit(X_train, y_train)
# 进行预测
y_pred = model.predict(X_test)
# 评估模型
accuracy = accuracy_score(y_test, y_pred)
print(f"模型准确率: {accuracy}")
在上述代码中,首先使用 Polars 读取并处理数据,提取特征和目标变量。然后使用 Scikit - learn 库的train_test_split函数划分训练集和测试集,接着使用逻辑回归模型进行训练和预测,最后使用准确率评估模型的性能。
2. 与其他工具集成:在实际的医疗数据分析场景中,往往需要将 Polars 与其他工具集成,构建更强大的数据处理和分析管道。Polars 可以与 MinIO、Kafka 等工具集成。MinIO 是一个高性能的对象存储服务,与 Polars 集成后,可以将处理后的数据存储到 MinIO 中,实现数据的持久化和共享。Kafka 是一个分布式流处理平台,与 Polars 集成后,可以实现医疗数据的实时采集、处理和分析。以 Polars 与 MinIO 集成为例,将处理后的 ICU 医疗数据存储到 MinIO 中的代码如下:
import polars as pl
from minio import Minio
import io
# 读取并处理数据
df = pl.read_csv('icu_data.csv')
# 进行数据处理操作...
# MinIO配置
minio_url = "localhost:9000"
access_key = "your_access_key"
secret_key = "your_secret_key"
bucket_name = "icu-data-bucket"
# 初始化MinIO客户端
client = Minio(minio_url, access_key=access_key, secret_key=secret_key, secure=False)
# 将Polars DataFrame转换为Parquet格式的字节流
parquet_data = io.BytesIO()
df.write_parquet(parquet_data)
parquet_data.seek(0)
# 将数据上传到MinIO
object_name = "processed_icu_data.parquet"
client.put_object(bucket_name, object_name, parquet_data, length=parquet_data.getbuffer().nbytes)
在上述代码中,首先使用 Polars 读取并处理数据,然后配置 MinIO 的连接信息,初始化 MinIO 客户端。接着将 Polars 的 DataFrame 转换为 Parquet 格式的字节流,并将其上传到 MinIO 的指定存储桶中。通过这种方式,实现了 Polars 与 MinIO 的集成,为医疗数据的存储和管理提供了更强大的支持。
参考资料:
大数据新视界 --大数据大厂之 从 Druid 和 Kafka 到 Polars:大数据处理工具的传承与创新_polars kafka-CSDN博客文章浏览阅读3.5k次,点赞62次,收藏67次。本文介绍 Polars 在大数据处理领域的崛起。提及之前对 Druid、Kafka 的探索,阐述 Polars 的优势、与其他工具的联系与突破,包括其架构、应用案例、对比情况,展现它为大数据处理注入新活力。_polars kafkahttps://blog.csdn.net/atgfg/article/details/142707521?ops_request_misc=&request_id=&biz_id=102&utm_term=Polars%20%E8%BF%9B%E8%A1%8C%E4%BA%BA%E5%B7%A5%E6%99%BA%E8%83%BD%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-6-142707521.142^v101^pc_search_result_base3&spm=1018.2226.3001.4187Python Polars库:高性能的数据处理与分析-CSDN博客文章浏览阅读1.1k次,点赞22次,收藏28次。更多Python学习内容:ipengtao.com在数据分析和处理领域,高效处理大规模数据集是关键。Python的Polars库提供了一种高性能的解决方案,旨在大幅提升数据处理速度和效率。本文将详细介绍Polars库的功能、安装与配置、基本和高级用法,以及如何在实际项目中应用它。Polars库简介Polars是一个基于Apache Arrow的高性能数据处理库,旨在提供快速的数据帧操作。与Pand..._python polars库
https://blog.csdn.net/GitHub_miao/article/details/140681668?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522d080fdf5b1da6fee022aeee9a3f4e558%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=d080fdf5b1da6fee022aeee9a3f4e558&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~first_rank_ecpm_v1~rank_v31_ecpm-4-140681668-null-null.142^v101^pc_search_result_base3&utm_term=Polars%20%E8%BF%9B%E8%A1%8C%E4%BA%BA%E5%B7%A5%E6%99%BA%E8%83%BD%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90&spm=1018.2226.3001.4187从存储到人工智能洞察: 利用 MinIO 和 Polars 简化数据管道_minio 数据湖-CSDN博客文章浏览阅读1k次,点赞28次,收藏13次。将 MinIO 的高性能、可扩展企业对象存储的强大功能与 Polars(闪电般快速的 DataFrame 库)的快速内存数据处理功能相结合,可以显著提高数据管道的性能。在 AI 工作流中尤其如此,其中预处理大型数据集和执行特征选择是关键步骤。在这篇文章中,我们将探讨将 MinIO 与 Polars 集成如何简化您的数据工作流程并优化性能,尤其是对于复杂的分析工作负载。_minio 数据湖
https://blog.csdn.net/miniopro/article/details/142657283?ops_request_misc=&request_id=&biz_id=102&utm_term=Polars%20%E8%BF%9B%E8%A1%8C%E4%BA%BA%E5%B7%A5%E6%99%BA%E8%83%BD%E6%95%B0%E6%8D%AE%E5%88%86%E6%9E%90&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-0-142657283.142^v101^pc_search_result_base3&spm=1018.2226.3001.4187