基于自编码器的心电图信号异常检测(Python)

news2025/1/19 14:14:09

使用的数据集来自PTB心电图数据库,包括14552个心电图记录,包括两类:正常心跳和异常心跳,采样频率为125Hz。

import numpy as np
np.set_printoptions(suppress=True)
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.io import arff
from sklearn.model_selection import train_test_split
import matplotlib
matplotlib.rcParams["figure.figsize"] = (6, 4)
plt.style.use("ggplot")
import tensorflow as tf
from tensorflow import data
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import mae
from tensorflow.keras import layers
from tensorflow import keras
from sklearn.metrics import accuracy_score, recall_score, precision_score, confusion_matrix, f1_score, classification_report
import os
isGPU = tf.config.list_physical_devices('GPU')
directory_path = r'ECG Heartbeat Categorization Dataset'
for dirname, _, filenames in os.walk(directory_path):
    for filename in filenames:
        print(os.path.join(dirname, filename))
normal_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_normal.csv").iloc[:, :-1]
anomaly_df = pd.read_csv("ECG Heartbeat Categorization Dataset/ptbdb_abnormal.csv").iloc[:, :-1]
print("Shape of Normal data", normal_df.shape)
print("Shape of Abnormal data", anomaly_df.shape)
Shape of Normal data (4045, 187)
Shape of Abnormal data (10505, 187)
def plot_sample(normal, anomaly):
    index = np.random.randint(0, len(normal_df), 2)


    fig, ax = plt.subplots(1, 2, sharey=True, figsize=(10, 4))
    ax[0].plot(normal.iloc[index[0], :].values, label=f"Case {index[0]}")
    ax[0].plot(normal.iloc[index[1], :].values, label=f"Case {index[1]}")
    ax[0].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)
    ax[0].set_title("Normal")


    ax[1].plot(anomaly.iloc[index[0], :].values, label=f"Case {index[0]}")
    ax[1].plot(anomaly.iloc[index[1], :].values, label=f"Case {index[1]}")
    ax[1].legend(shadow=True, frameon=True, facecolor="inherit", loc=1, fontsize=9)
    ax[1].set_title("Anomaly")


    plt.tight_layout()
    plt.show()
plot_sample(normal_df, anomaly_df)

CLASS_NAMES = ["Normal", "Anomaly"]


normal_df_copy = normal_df.copy()
anomaly_df_copy = anomaly_df.copy()
print(anomaly_df_copy.columns.equals(normal_df_copy.columns))
normal_df_copy = normal_df_copy.set_axis(range(1, 188), axis=1)
anomaly_df_copy = anomaly_df_copy.set_axis(range(1, 188), axis=1)
normal_df_copy = normal_df_copy.assign(target = CLASS_NAMES[0])
anomaly_df_copy = anomaly_df_copy.assign(target = CLASS_NAMES[1])




df = pd.concat((normal_df_copy, anomaly_df_copy))
def plot_smoothed_mean(data, class_name = "normal", step_size=5, ax=None):
    df = pd.DataFrame(data)
    roll_df = df.rolling(step_size)
    smoothed_mean = roll_df.mean().dropna().reset_index(drop=True)
    smoothed_std = roll_df.std().dropna().reset_index(drop=True)
    margin = 3*smoothed_std
    lower_bound = (smoothed_mean - margin).values.flatten()
    upper_bound = (smoothed_mean + margin).values.flatten()


    ax.plot(smoothed_mean.index, smoothed_mean)
    ax.fill_between(smoothed_mean.index, lower_bound, y2=upper_bound, alpha=0.3, color="red")
    ax.set_title(class_name, fontsize=9)
fig, axes = plt.subplots(1, 2, figsize=(8, 4), sharey=True)
axes = axes.flatten()
for i, label in enumerate(CLASS_NAMES, start=1):
    data_group = df.groupby("target")
    data = data_group.get_group(label).mean(axis=0, numeric_only=True).to_numpy()
    plot_smoothed_mean(data, class_name=label, step_size=20, ax=axes[i-1])
fig.suptitle("Plot of smoothed mean for each class", y=0.95, weight="bold")
plt.tight_layout()

normal_df.drop("target", axis=1, errors="ignore", inplace=True)
normal = normal_df.to_numpy()
anomaly_df.drop("target", axis=1, errors="ignore", inplace=True)
anomaly = anomaly_df.to_numpy()


X_train, X_test = train_test_split(normal, test_size=0.15, random_state=45, shuffle=True)
print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}, anomaly shape: {anomaly.shape}")
Train shape: (3438, 187), Test shape: (607, 187), anomaly shape: (10505, 187)
class AutoEncoder(Model):
    def __init__(self, input_dim, latent_dim):
        super(AutoEncoder, self).__init__()
        self.input_dim = input_dim
        self.latent_dim = latent_dim


        self.encoder = tf.keras.Sequential([
            layers.Input(shape=(input_dim,)),
            layers.Reshape((input_dim, 1)),  # Reshape to 3D for Conv1D
            layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling1D(2, padding="same"),
            layers.Conv1D(128, 3, strides=1, activation='relu', padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling1D(2, padding="same"),
            layers.Conv1D(latent_dim, 3, strides=1, activation='relu', padding="same"),
            layers.BatchNormalization(),
            layers.MaxPooling1D(2, padding="same"),
        ])
        # Previously, I was using UpSampling. I am trying Transposed Convolution this time around.
        self.decoder = tf.keras.Sequential([
            layers.Conv1DTranspose(latent_dim, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),
            layers.BatchNormalization(),
            layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),
            layers.BatchNormalization(),
            layers.Conv1DTranspose(128, 3, strides=1, activation='relu', padding="same"),
#             layers.UpSampling1D(2),
            layers.BatchNormalization(),
            layers.Flatten(),
            layers.Dense(input_dim)
        ])


    def call(self, X):
        encoded = self.encoder(X)
        decoded = self.decoder(encoded)
        return decoded




input_dim = X_train.shape[-1]
latent_dim = 32


model = AutoEncoder(input_dim, latent_dim)
model.build((None, input_dim))
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.01), loss="mae")
model.summary()
Model: "auto_encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 sequential (Sequential)     (None, 24, 32)            63264     
                                                                 
 sequential_1 (Sequential)   (None, 187)               640603    
                                                                 
=================================================================
Total params: 703867 (2.69 MB)
Trainable params: 702715 (2.68 MB)
Non-trainable params: 1152 (4.50 KB)
epochs = 100
batch_size = 128
early_stopping = EarlyStopping(patience=10, min_delta=1e-3, monitor="val_loss", restore_best_weights=True)




history = model.fit(X_train, X_train, epochs=epochs, batch_size=batch_size,
                    validation_split=0.1, callbacks=[early_stopping])

plt.plot(history.history['loss'], label="Training loss")
plt.plot(history.history['val_loss'], label="Validation loss", ls="--")
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.title("Training loss")
plt.ylabel("Loss")
plt.xlabel("Epoch")
plt.show()

train_mae = model.evaluate(X_train, X_train, verbose=0)
test_mae = model.evaluate(X_test, X_test, verbose=0)
anomaly_mae = model.evaluate(anomaly_df, anomaly_df, verbose=0)


print("Training dataset error: ", train_mae)
print("Testing dataset error: ", test_mae)
print("Anormaly dataset error: ", anomaly_mae)

Training dataset error:  0.014224529266357422
Testing dataset error:  0.01488062646239996
Anormaly dataset error:  0.043484628200531006
def predict(model, X):
    pred = model.predict(X, verbose=False)
    loss = mae(pred, X)
    return pred, loss

_, train_loss = predict(model, X_train)
_, test_loss = predict(model, X_test)
_, anomaly_loss = predict(model, anomaly)
threshold = np.mean(train_loss) + np.std(train_loss) # Setting threshold for distinguish normal data from anomalous data


bins = 40
plt.figure(figsize=(9, 5), dpi=100)
sns.histplot(np.clip(train_loss, 0, 0.5), bins=bins, kde=True, label="Train Normal")
sns.histplot(np.clip(test_loss, 0, 0.5), bins=bins, kde=True, label="Test Normal")
sns.histplot(np.clip(anomaly_loss, 0, 0.5), bins=bins, kde=True, label="anomaly")


ax = plt.gca()  # Get the current Axes
ylim = ax.get_ylim()
plt.vlines(threshold, 0, ylim[-1], color="k", ls="--")
plt.annotate(f"Threshold: {threshold:.3f}", xy=(threshold, ylim[-1]), xytext=(threshold+0.009, ylim[-1]),
             arrowprops=dict(facecolor='black', shrink=0.05), fontsize=9)
plt.legend(shadow=True, frameon=True, facecolor="inherit", loc="best", fontsize=9)
plt.show()

def plot_examples(model, data, ax, title):
    pred, loss = predict(model, data)
    ax.plot(data.flatten(), label="Actual")
    ax.plot(pred[0], label = "Predicted")
    ax.fill_between(range(1, 188), data.flatten(), pred[0], alpha=0.3, color="r")
    ax.legend(shadow=True, frameon=True,
              facecolor="inherit", loc=1, fontsize=7)
#                bbox_to_anchor = (0, 0, 0.8, 0.25))


    ax.set_title(f"{title} (loss: {loss[0]:.3f})", fontsize=9.5)
fig, axes = plt.subplots(2, 5, sharey=True, sharex=True, figsize=(12, 6))
random_indexes = np.random.randint(0, len(X_train), size=5)


for i, idx in enumerate(random_indexes):
    data = X_train[[idx]]
    plot_examples(model, data, ax=axes[0, i], title="Normal")


for i, idx in enumerate(random_indexes):
    data = anomaly[[idx]]
    plot_examples(model, data, ax=axes[1, i], title="anomaly")


plt.tight_layout()
fig.suptitle("Sample plots (Actual vs Reconstructed by the CNN autoencoder)", y=1.04, weight="bold")
fig.savefig("autoencoder.png")
plt.show()

Model Evaluation

def evaluate_model(model, data):
    pred, loss = predict(model, data)
    if id(data) == id(anomaly):
        accuracy = np.sum(loss > threshold)/len(data)
    else:
        accuracy = np.sum(loss <= threshold)/len(data)
    return f"Accuracy: {accuracy:.2%}"
print("Training", evaluate_model(model, X_train))
print("Testing", evaluate_model(model, X_test))
print("Anomaly", evaluate_model(model, anomaly))
Training Accuracy: 88.66%
Testing Accuracy: 84.51%
Anomaly Accuracy: 77.34%
def prepare_labels(model, train, test, anomaly, threshold=threshold):
    ytrue = np.concatenate((np.ones(len(X_train)+len(X_test), dtype=int), np.zeros(len(anomaly), dtype=int)))
    _, train_loss = predict(model, train)
    _, test_loss = predict(model, test)
    _, anomaly_loss = predict(model, anomaly)
    train_pred = (train_loss <= threshold).numpy().astype(int)
    test_pred = (test_loss <= threshold).numpy().astype(int)
    anomaly_pred = (anomaly_loss < threshold).numpy().astype(int)
    ypred = np.concatenate((train_pred, test_pred, anomaly_pred))


    return ytrue, ypred
def plot_confusion_matrix(model, train, test, anomaly, threshold=threshold):
    ytrue, ypred = prepare_labels(model, train, test, anomaly, threshold=threshold)
    accuracy = accuracy_score(ytrue, ypred)
    precision = precision_score(ytrue, ypred)
    recall = recall_score(ytrue, ypred)
    f1 = f1_score(ytrue, ypred)
    print(f"""\
        Accuracy: {accuracy:.2%}
        Precision: {precision:.2%}
        Recall: {recall:.2%}
        f1: {f1:.2%}\n
        """)


    cm = confusion_matrix(ytrue, ypred)
    cm_norm = confusion_matrix(ytrue, ypred, normalize="true")
    data = np.array([f"{count}\n({pct:.2%})" for count, pct in zip(cm.ravel(), cm_norm.ravel())]).reshape(cm.shape)
    labels = ["Anomaly", "Normal"]


    plt.figure(figsize=(5, 4))
    sns.heatmap(cm, annot=data, fmt="", xticklabels=labels, yticklabels=labels)
    plt.ylabel("Actual")
    plt.xlabel("Predicted")
    plt.title("Confusion Matrix", weight="bold")
    plt.tight_layout()
plot_confusion_matrix(model, X_train, X_test, anomaly, threshold=threshold)
Accuracy: 80.32%
Precision: 59.94%
Recall: 88.03%
f1: 71.32%

ytrue, ypred = prepare_labels(model, X_train, X_test, anomaly, threshold=threshold)
print(classification_report(ytrue, ypred, target_names=CLASS_NAMES))

precision recall f1-score support
Normal 0.94 0.77 0.85 10505
Anomaly 0.60 0.88 0.71 4045
accuracy 0.80 14550
macro avg 0.77 0.83 0.78 14550
weighted avg 0.85 0.80 0.81 14550

工学博士,担任《Mechanical System and Signal Processing》《中国电机工程学报》《控制与决策》等期刊审稿专家,擅长领域:现代信号处理,机器学习,深度学习,数字孪生,时间序列分析,设备缺陷检测、设备异常检测、设备智能故障诊断与健康管理PHM等。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1832336.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

架构设计 - nginx 的核心机制与主要应用场景

一、nginx 的核心机制&#xff1a; 1. 事件驱动模型&#xff08;epoll 多路复用&#xff09; 事件循环&#xff1a; Nginx的核心组件是一个事件循环&#xff0c;它不断地监听事件&#xff08;如新连接的到来、请求数据的可读性等&#xff09;。 当有事件发生时&#xff0c;事…

双层循环和循环控制语句的使用,以及while和until的语法使用

echo 打印 -n 表示不换行输出 -e 输出转义字符 /b&#xff1a;相当于退格键&#xff08;backspace&#xff09; /n&#xff1a; 换行&#xff0c;相当于回车 /f&#xff1a; 换行&#xff0c;换行后的新行的开头连着上一行的行尾 /t&#xff1a; 相当于tab键 又叫做横向制…

Python自动化测试面试题精选(一)

今天大家介绍一些Python自动化测试中常见的面试题&#xff0c;涵盖了Python基础、测试框架、测试工具、测试方法等方面的内容&#xff0c;希望能够帮助你提升自己的水平和信心。 项目相关 什么项目适合做自动化测试&#xff1f; 答&#xff1a;一般来说&#xff0c;适合做自…

Minillama3->训练tokenizer

GitHub - charent/ChatLM-mini-Chinese: 中文对话0.2B小模型(ChatLM-Chinese-0.2B),开源所有数据集来源、数据清洗、tokenizer训练、模型预训练、SFT指令微调、RLHF优化等流程的全部代码。支持下游任务sft微调,给出三元组信息抽取微调示例。中文对话0.2B小模型(ChatLM-Chi…

Java面试题之MySQL事务详解

事务是什么 MySQL中的事务&#xff08;Transaction&#xff09;是数据库管理系统执行的一个逻辑操作单元&#xff0c;它是由一系列数据库操作组成的逻辑工作单元。事务是并发控制的单位&#xff0c;也是用户定义的一个操作序列。事务的主要目的是确保数据的完整性和一致性&…

2024年上网行为审计软件排名,推荐这五款上网行为管理软件

上网行为审计软件是企业IT管理中不可或缺的一部分&#xff0c;它们旨在帮助组织监控、管理、审计员工的互联网使用情况&#xff0c;确保网络资源的合理利用&#xff0c;提高工作效率&#xff0c;同时维护企业信息安全。下面将介绍几款市场上知名的上网行为审计软件&#xff0c;…

又一个新项目完结,炸裂!

又一个新项目完结&#xff0c;炸裂&#xff01; 大家好&#xff0c;我是程序员鱼皮。 经过了一个多月的爆肝&#xff0c;我在自己的编程导航的第 9 个有 保姆级教程 的大项目 —— 鱼答答 AI 答题应用平台&#xff0c;完结啦&#xff01; 除了全程直播讲解的保姆级视频教程外…

《详解》如何在ROS中建立MQTT通信

观前提醒&#xff1a;本期主要内容为ROS中MQTT通信节点的编程&#xff0c;和ROS部分底层通信机制的浅析 一、复习一下&#xff1a;ROS通信机制&MQTT通信异同点 ROS通信机制概述 ROS中的主要通信机制有以下几种&#xff1a; 话题 (Topics) 发布/订阅模型&#xff08;Pu…

ECharts词云图(案例一)+配置项详解

ECharts词云图&#xff08;案例一&#xff09;配置项详解 ECharts 是一款由百度团队开发的基于 JavaScript 的开源可视化图表库&#xff0c;它提供了丰富的图表类型&#xff0c;包括常见的折线图、柱状图、饼图等&#xff0c;以及一些较为特殊的图表&#xff0c;如词云图。从版…

5个超实用1688选品技巧!轻松出单999+

1、研究市场需求 通过市场调查和分析&#xff0c;了解目标市场的消费者喜好和趋势。选择具有市场需求且竞争相对较小的产品类别。 用店雷达热销商 品榜和飙升商 品榜。比如做女装类目&#xff0c;选择“女士T恤”我们可以根据日、周、月为时间维度下商品的销售笔数、件数、销…

【数据库】世界上使用最多的引擎SqlLite

文章目录 概述特点安装安装方式一安装方式二 命令语法编程操作打开/创建数据库创建表Insert数据SELECT操作UPDATE操作DELETE操作 来源 概述 SQLite 是一个进程内库&#xff0c;它实现了一个独立的、无服务器的、零配置的事务性 SQL 数据库引擎。 SQLite的代码属于公共领域&…

谷歌倾斜摄影覆盖面积究竟有多大?这里有了准确数字

自谷歌地球诞生以来&#xff0c;凭借着数据种类多、覆盖面积广、数据精度高、更新及时、交互体验良好的优势&#xff0c;很多人喜欢在上面恣意浏览&#xff0c;足不出户&#xff0c;俯瞰地球美好河山&#xff0c;探索自然地理奇妙景观。谷歌地球中倾斜摄影数据是继谷歌卫星影像…

RT-Thread简介及启动流程分析

阅读引言&#xff1a; 最近在学习RT-Thread的内部机制&#xff0c;觉得这个启动流程和一些底层原理还是挺重要的&#xff0c; 所以写下此文。 目录 1&#xff0c; RT-Thread简介 2&#xff0c;RT-Thread任务的几种状态 3&#xff0c; 学习资源推荐 4&#xff0c; 启动流程分…

MySQL商品购物数据库建表

goods表 mysql> create table if not exists goods(-> goods_id int primary key auto_increment comment 商品编号,-> goods_name varchar(32) not null comment 商品名称,-> unitprice int not null default 0 comment 单价&#xff0c;单位分,-> category v…

微服务开发与实战Day10 - Redis面试篇

一、Redis主从集群 1. 搭建主从集群 1.1 主从集群结构 单节点Redis的并发能力是有限的&#xff0c;要进一步提高Redis的并发能力&#xff0c;就需要搭建主从集群&#xff0c;实现读写分离。 如图所示&#xff0c;集群中有一个master节点、两个slave节点&#xff08;现在叫re…

思科配置:vlan、两个交换机、两个路由器、四台主机

一、如图配置 各设备ip地址、接口、vlan如图所示。 二、配置各主机ip、子网掩码、默认网关 PC0 PC8 PC1 PC9 PC2 PC10 PC3 PC11 三、配置Switch0 &#xff08;期间报错为拼写错误&#xff09; MySwitch0> MySwitch0>en MySwitch0#conf t Enter configuration co…

OSPF被动接口配置(华为)

#交换设备 OSPF被动接口配置 一、基本概念 OSPF被动接口&#xff0c;也称为抑制接口&#xff0c;即将路由器某一接口配置为被动接口后&#xff0c;该接口不会再接受和发送OSPF报文 二、使用场景 在路由器与终端相近或者直接相连的一侧配置被动接口 因为OSPF会定期发送报文…

MongoDB 多层级查询

多层级查询 注意&#xff1a;要注意代码顺序 查询层级数据代码放前面&#xff0c;查询条件放后面 if (StringUtils.isBlank(params.getDocType())) {params.setDocType(DOC_TDCTYPE);}String docName mapper.findByDocInfo(params.getDocType());List<ExpertApprovalOpin…

【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 任务积分优化问题(100分) - 三语言AC题解(Python/Java/Cpp)

&#x1f36d; 大家好这里是清隆学长 &#xff0c;一枚热爱算法的程序员 ✨ 本系列打算持续跟新华为OD-C/D卷的三语言AC题解 &#x1f4bb; ACM银牌&#x1f948;| 多次AK大厂笔试 &#xff5c; 编程一对一辅导 &#x1f44f; 感谢大家的订阅➕ 和 喜欢&#x1f497; &#x1f…

配置文件-基础配置,applicationproperties.yml

黑马程序员Spring Boot2 文章目录 1、属性配置2、配置文件分类3、yaml文件4、yaml数据读取4.1 读取单个数据4.2 读取全部属性数据4.3 读取引用类型属性数据 1、属性配置 SpringBoot默认配置文件application.properties&#xff0c;通过键值对配置对应属性修改配置 修改服务器端…