基于LSTM、GRU和RNN的交通时间序列预测(Python)

news2024/11/23 17:09:04

近年来,人工智能技术的发展推动了智慧交通领域的进步,交通流预测日益成为研究热点之一。交通流预测是基于历史的交通数据对未来时段的交通流状态参数进行预测。作为交通流状态的直接反映,交通流参数的预测结果可以直接应用于 ATIS 和ATMS 中,为交通管理者实施管控、诱导措施提供有效参考。

根据基础数据的时间窗和预测步长,交通流预测存在长时、短时预测之分。长时预测的预测步长往往是一小时或更长的时间,预测结果常应用于交通规划、交通影响评价。短时预测的预测步长一般在 15 分钟以内(如:2 分钟、5 分钟),预测结果常作为实时交通管控和交通诱导的有效参考,也更具研究价值,预测方法可分为模型驱动方法和数据驱动方法两大类。

交通流参数预测的模型驱动方法主要以动态交通流仿真模型为代表,如DynaMIT-P,DynaSmart-X,Visum-online 等。数据驱动方法主要通过数理分析、机器学习等方法,以历史交通数据为主要研究对象进行预测,该方法从数据本身出发,避免了分析复杂的交通系统,且运算效率更适用于实时的在线运行。数据驱动的交通流预测方法可分为线形预测方法、非线性预测方法和智能预测方法三类。

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error
import math
from models import LSTMModel, RNNModel, GRUModel, SimpleTransformerModel
import pandas as pd
import matplotlib.pyplot as plt




def train_model(model, train_inputs, train_labels, learning_rate, epochs, batch_size=64):
    # Wrap training data and labels into TensorDataset and use DataLoader
    train_dataset = TensorDataset(train_inputs, train_labels)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)


    # Define loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.AdamW(model.parameters(), lr=learning_rate)


    # Train the model
    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()


        # Print average loss after each epoch
        print(f'Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}')




def evaluate_model(model, test_inputs, test_labels, means, stds):
    model.eval()
    with torch.no_grad():
        predictions = model(test_inputs)
        predictions = predictions.squeeze().detach().numpy()
        test_labels = test_labels.squeeze().detach().numpy()


        predictions = predictions * stds.reshape(-1) + means.reshape(-1)
        # Initialize lists to store evaluation metrics
        rmses = []
        maes = []
        mapes = []


        # Define a threshold to filter out samples with near-zero true values
        threshold = 1e-6


        # Calculate metrics for each dimension of test_labels
        for i in range(test_labels.shape[1]):
            true = test_labels[:, i]
            pred = predictions[:, i]
            rmse = math.sqrt(mean_squared_error(true, pred))
            mae = mean_absolute_error(true, pred)


            # Calculate MAPE only for samples with true values above the threshold
            valid_indices = true > threshold
            valid_true = true[valid_indices]
            valid_pred = pred[valid_indices]
            if len(valid_true) > 0:
                mape = np.mean(np.abs((valid_true - valid_pred) / valid_true)) * 100
            else:
                mape = float('nan')


            rmses.append(rmse)
            maes.append(mae)
            mapes.append(mape)


        # Create a DataFrame to store and display evaluation metrics
        metrics_df = pd.DataFrame({
            'RMSE': rmses,
            'MAE': maes,
            'MAPE': mapes
        }, index=[f'Time Step {i + 1}' for i in range(test_labels.shape[1])])


        print(metrics_df)
        return metrics_df




def plot_predictions(model, inputs, labels, means, stds, vis_len, vis_step):
    model.eval()
    with torch.no_grad():
        # Obtain the predictions
        predictions = model(inputs[:vis_len]).squeeze().detach().numpy()
        # Denormalize the predictions and actual values
        predictions = predictions * stds.reshape(-1) + means.reshape(-1)
        predictions = predictions[:, vis_step - 1]
        actuals = labels[:vis_len, vis_step - 1].squeeze().detach().numpy()


    # Plot the graph
    plt.figure(figsize=(10, 6))
    plt.plot(predictions, label=f'Prediction (Step {vis_step})', color='red', linewidth=2, linestyle='-')
    plt.plot(actuals, label='Actual', color='black', linewidth=2, linestyle='--')
    plt.title('Predictions vs Actual')
    plt.xlabel('Time')
    plt.ylabel('Number of vehicles')
    plt.legend()
    plt.savefig('visualisation/predictions_vs_actual.png', dpi=300)




if __name__ == "__main__":
    # a. Load data
    file_path = 'data/'
    month = 1
    target = "Observations"
    input_seq_len = 12
    output_seq_len = 6
    file_name = f'traffic_data_M{month}_{target}_IN{input_seq_len}_OUT{output_seq_len}.npz'
    data = np.load(file_path + file_name)
    train_inputs = torch.Tensor(data['train_inputs'])
    train_labels = torch.Tensor(data['train_labels'])
    test_inputs = torch.Tensor(data['test_inputs'])
    test_labels = torch.Tensor(data['test_labels'])
    means = data['means']
    stds = data['stds']


    # b. Construct model
    input_dim = 1  # Default number of features is 1
    hidden_dim = 32  # hidden layer dimension
    output_seq_len = 6  # Output dimension
    model = LSTMModel(input_dim, hidden_dim, output_seq_len)
    #model = RNNModel(input_dim, hidden_dim, output_seq_len)
    #model = GRUModel(input_dim, hidden_dim, output_seq_len)


    # c. Train the model
    learning_rate = 0.005
    epochs = 20
    train_model(model, train_inputs, train_labels, learning_rate, epochs)


    # d. Evaluate the model
    evaluate_model(model, test_inputs, test_labels, means, stds)


    # e. Plot model prediction results
    vis_len = 100 # Length of time for visualization
    vis_step = 6 # Prediction step number for visualization
    plot_predictions(model, test_inputs, test_labels, means, stds, vis_len, vis_step)
import torch
import torch.nn as nn
import torch.nn.functional as F
import math




class RNNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_seq_len):
        super(RNNModel, self).__init__()
        self.rnn = nn.RNN(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_seq_len)


    def forward(self, x):
        # x shape: (batch_size, seq_length, input_dim)
        x, _ = self.rnn(x)
        # Select the last time step's output
        x = x[:, -1, :]
        x = self.fc(F.relu(x))
        return x




class GRUModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_seq_len):
        super(GRUModel, self).__init__()
        self.rnn = nn.GRU(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_seq_len)


    def forward(self, x):
        # x shape: (batch_size, seq_length, input_dim)
        x, _ = self.rnn(x)
        # Select the last time step's output
        x = x[:, -1, :]
        x = self.fc(F.relu(x))
        return x




class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_seq_len):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_seq_len)  # Output dim remains 1 since we're predicting one step at a time


    def forward(self, x):
        x, (hn, cn) = self.lstm(x)
        # Assuming x is of shape (batch_size, seq_length, hidden_dim)
        # We take the output of the last time step and repeat it output_seq_len times
        x = x[:, -1, :]
        # Now pass each time step output through the fully connected layer
        x = self.fc(F.relu(x))
        return x




class TransformerModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_seq_len, nhead=2, num_encoder_layers=1):
        super(TransformerModel, self).__init__()
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=input_dim, nhead=nhead, dim_feedforward=hidden_dim)
        self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_encoder_layers)
        self.fc = nn.Linear(input_dim, output_seq_len)


    def forward(self, x):
        # x shape: (seq_length, batch_size, input_dim)
        x = x.permute(1, 0, 2)  # Transformer expects (seq_length, batch_size, input_dim)
        x = self.transformer_encoder(x)
        # Select the last time step's output
        x = x[-1, :, :]
        x = self.fc(x)
        return x




class SimpleTransformerModel(nn.Module):
    def __init__(self, input_dim, d_model, nhead, num_encoder_layers, output_seq_len):
        super(SimpleTransformerModel, self).__init__()
        self.d_model = d_model
        self.input_embedding = nn.Linear(input_dim, d_model)  # 将输入映射到较高维度
        self.pos_encoder = PositionalEncoding(d_model)
        encoder_layers = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=d_model * 4)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer=encoder_layers, num_layers=num_encoder_layers)
        self.fc_out = nn.Linear(d_model, output_seq_len)  # 假设输出序列长度固定


    def forward(self, src):
        src = self.input_embedding(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        src = src.permute(1, 0, 2)  # 转换为Transformer期望的格式
        output = self.transformer_encoder(src)
        output = output.permute(1, 0, 2)  # 转换回(batch_size, seq_len, features)
        output = self.fc_out(output[:, -1, :])  # 只取序列的最后一步
        return output


class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)


        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, d_model)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)


    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)



完整代码:https://mbd.pub/o/bread/mbd-ZZ6VmJly

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1888121.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QT Designer中的qrc文件如何创建,将图片添加进qrc文件

创建qrc文件可以在qt中给空间添加个性化属性 一、创建qrc文件的方式 1、将以下代码复制到txt文件文件中 <!DOCTYPE RCC> <RCC version"1.0"> <qresource prefix"/"><file>background_img.png</file><file>backgrou…

第二证券:可转债基础知识?想玩可转债一定要搞懂的交易规则!

可转债&#xff0c;全称是“可转化公司债券”&#xff0c;是上市公司为了融资&#xff0c;向社会公众所发行的一种债券&#xff0c;具有股票和债券的双重特点&#xff0c;投资者可以选择按照发行时约定的价格将债券转化成公司一般股票&#xff0c;也可作为债券持有到期后收取本…

计算机网络网络层复习题1

一. 单选题&#xff08;共27题&#xff09; 1. (单选题)以太网 MAC 地址、IPv4 地址、IPv6 地址的地址空间大小分别是&#xff08; &#xff09;。 A. 2^48&#xff0c;2^32&#xff0c;2^128B. 2^32&#xff0c;2^32&#xff0c;2^96C. 2^16&#xff0c;2^56&#xff0c;2^6…

【51单片机入门】矩阵键盘

文章目录 前言矩阵键盘介绍与检测原理原理图代码讲解总结 前言 在嵌入式系统设计中&#xff0c;键盘输入是一种常见的人机交互方式。其中&#xff0c;矩阵键盘因其简单、方便和易于扩展的特性&#xff0c;被广泛应用于各种设备中。本文将介绍如何使用51单片机来实现矩阵键盘的…

修改Springboot项目名称

修改Springboot项目名称 1. 整体描述2. 具体步骤2.1 修改module名称2.2 修改程序包名2.3 mybatis/mybatis-plus配置修改2.4 logback文件2.5 yml配置2.6 Application启动类2.7 其他 3. 总结 1. 整体描述 开发过程中&#xff0c;经常遇到新来个项目&#xff0c;需要一份初始代码…

数字化精益生产系统--RD研发管理系统

R&D研发管理系统是一种用于管理和监督科学研究和技术开发的软件系统&#xff0c;其设计和应用旨在提高企业研发活动的效率、质量和速度。以下是对R&D研发管理系统的功能设计&#xff1a;

学习springMVC

第四章 Spring MVC 第一节 Spring MVC 简介 1. Spring MVC SpringMVC是一个Java 开源框架&#xff0c; 是Spring Framework生态中的一个独立模块&#xff0c;它基于 Spring 实现了Web MVC&#xff08;数据、业务与展现&#xff09;设计模式的请求驱动类型的轻量级Web框架&am…

和鲸“101”计划领航!和鲸科技携手北中医,共话医学+AI 实验室建设及创新人才培养

为进一步加强医学院校大数据管理与应用、信息管理与信息系统&#xff0c;医学信息工程等专业建设&#xff0c;交流实验室建设、专业发展与人才培养经验&#xff0c;6 月 22 日&#xff0c;由北京中医药大学&#xff08;简称“北中医”&#xff09;主办&#xff0c;上海和今信息…

使用Spring Boot实现博客管理系统

文章目录 引言第一章 Spring Boot概述1.1 什么是Spring Boot1.2 Spring Boot的主要特性 第二章 项目初始化第三章 用户管理模块3.1 创建用户实体类3.2 创建用户Repository接口3.3 实现用户Service类3.4 创建用户Controller类 第四章 博客文章管理模块4.1 创建博客文章实体类4.2…

to_json 出现乱码的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

PyCharm远程开发

PyCharm远程开发 1- 远程环境说明 每个人的本地电脑环境差别很大。各自在自己电脑上开发功能&#xff0c;测试/运行正常。但是将多个人的代码功能合并&#xff0c;运行服务器上&#xff0c;会出现各种版本兼容性问题。 在实际企业中&#xff0c;一般会有两套环境。第一套是测…

2.3 主程序和外部IO交互 (文件映射方式)----IO Server实现

2.3 主程序和外部IO交互 &#xff08;文件映射方式&#xff09;----IO Server C实现 效果显示 1 内存共享概念 基本原理&#xff1a;以页面为单位&#xff0c;将一个普通文件映射到内存中&#xff0c;达到共享内存和节约内存的目的&#xff0c;通常在需要对文件进行频繁读写时…

【单片机毕业设计选题24043】-可旋转式电视支架控制系统设计与实现

系统功能: 系统操作说明&#xff1a; 上电后OLED显示 “欢迎使用电视支架系统请稍后”&#xff0c;两秒后进入正常界面显示 第一页面第一行显示 Mode:Key&#xff0c; 第二行显示 TV:Middle 短按B5按键可控制步进电机左转&#xff0c; 第二行显示 TV:Left 后正常显示 TV:…

【操作系统期末速成】 EP04 | 学习笔记(基于五道口一只鸭)

文章目录 一、前言&#x1f680;&#x1f680;&#x1f680;二、正文&#xff1a;☀️☀️☀️2.1 考点七&#xff1a;进程通信2.2 考点八&#xff1a;线程的概念2.3 考点九&#xff1a;处理机调度的概念及原则2.4 考点十&#xff1a;调度方式与调度算法 一、前言&#x1f680;…

因为文件共享不安全,所以你不能连接到文件共享。此共享需要过时的SMB1协议,而此协议是不安全的 解决方法

目录 1. 问题所示2. 解决方法3. 解决方法1. 问题所示 输入共享文件地址的时候,出现如下信息: 因为文件共享不安全,所以你不能连接到文件共享。此共享需要过时的SMB1协议,而此协议是不安全的,可能会是你的系统遭受攻击。你的系统需要SMB2或更高版本截图如下所示: 2. 解决…

查看电脑显卡(NVIDIA)应该匹配什么版本的CUDA Toolkit

被串行计算逼到要吐时&#xff0c;决定重拾CUDa了&#xff0c;想想那光速般的处理感觉&#xff08;夸张了&#xff09;不要太爽&#xff0c;记下我的闯关记录。正好我的电脑配了NVIDIA独显&#xff0c;GTX1650&#xff0c;有菜可以炒呀&#xff0c;没有英伟达的要绕道了。回到正…

茗鹤APS高级计划排程系统,在集团多工厂协同生产下的应用

随着业务规模的扩大和市场的全球化&#xff0c;越来越多的企业选择“总部多工厂基地”的模式&#xff0c;此种模式大幅提升企业的产能与产量&#xff0c;有效分散风险。然后&#xff0c;与之而来的是对企业的管理提出更高的管理要求。多个生产基地不仅面临集团下发的周期性计划…

[漏洞分析] CVE-2024-6387 OpenSSH核弹核的并不是很弹

文章目录 漏洞简介漏洞原理补丁分析漏洞原理 漏洞利用漏洞利用1: SSH-2.0-OpenSSH_3.4p1 Debian 1:3.4p1-1.woody.3 (Debian 3.0r6, from 2005) [无ASLR无NX]漏洞利用原理漏洞利用关键点 漏洞利用2: SSH-2.0-OpenSSH_4.2p1 Debian-7ubuntu3 (Ubuntu 6.06.1, from 2006) [无ASLR…

Python模拟火焰文字效果:炫酷的火焰字动效

文章目录 引言准备工作前置条件 代码实现与解析导入必要的库初始化Pygame定义火焰效果类主循环 完整代码 引言 火焰文字效果是一种炫酷的视觉效果&#xff0c;常用于广告、游戏和艺术设计中。在这篇博客中&#xff0c;我们将使用Python创建一个火焰文字的动画效果。通过利用Py…

元素的宽度和高度未知,如何水平垂直居中,3个小办法。

在前端开发中&#xff0c;如果元素的宽度和高度未知&#xff0c;但需要将其水平和垂直居中&#xff0c;可以使用以下方法之一&#xff1a; 使用Flexbox布局&#xff1a; Flexbox是一种强大的布局模型&#xff0c;可以轻松实现元素的居中。可以通过以下CSS代码实现水平和垂直居…