TimesFM(Time Series Foundation Model)时间序列预测的数据研究(3)-CSDN博客文章浏览阅读846次,点赞19次,收藏12次。1. **表示预测区间**:在很多预测任务中,模型给出的不只是一个单一的预测值(比如预测某个时间点的数值),而是一个预测区间来体现不确定性。像如果预测某商品未来销量,通过计算不同 `quantiles` 的值,能知道销量大概率(比如 90% 的置信区间,对应 0.05 和 0.95 分位数)会落在哪个范围,以及最有可能的中间值(0.5 分位数即中位数)是多少。通过比较模型输出的 `quantiles` 对应的预测区间和实际观测值落在该区间的比例等情况,可以判断模型对不确定性的把握能力。https://blog.csdn.net/chenchihwen/article/details/144444583?spm=1001.2014.3001.5501前一回把时间序列进行了安装
利用 baostock 获取宁德时代(SZ.300750)股票数据 为例进行预测
从 start_date='2020-01-01', end_date=system date
并从60 天前开始做隔日的收盘价格预测 然后不断的调整 trainning date 往后加上1天,一直到前天的历史数据预测
预测的结果与实际的价格如下截图
结论,不是很准确。
import os
import time
import pandas as pd
import numpy as np
import timesfm
import baostock as bs
from datetime import datetime, timedelta
# 步骤一:从Baostock获取宁德时代(SZ.300750)股票数据,并保存到Excel
def get_stock_data(code, start_date='2020-01-01', end_date=None):
data_list = []
lg = bs.login()
if lg.error_code == '0':
if end_date is None:
end_date = datetime.now().strftime('%Y-%m-%d') # 设置为系统当前日期
rs = bs.query_history_k_data_plus(code,
"date,open,high,low,close,volume",
start_date=start_date, end_date=end_date,
frequency='d')
while (rs.error_code == '0') & rs.next():
data_list.append(rs.get_row_data())
result = pd.DataFrame(data_list, columns=rs.fields)
bs.logout()
else:
print("登录Baostock失败:", lg.error_msg)
result = None
return result
_MODEL_PATH = "google/timesfm-1.0-200m" # 直接定义模型路径,也可以设置为命令行参数传入,此处简化处理
_BATCH_SIZE = 64 # 定义批次大小,同样可作为参数传入,这里先写死示例
_HORIZON = 128 # 定义预测的水平长度,可按需调整,此处示例写死
_BACKEND = "gpu" # 定义后端,示例写死为gpu,可根据实际情况改变
_NUM_JOBS = 1 # 定义作业数量,示例值
_SAVE_DIR = "./results" # 定义保存结果的目录,示例路径,可按需调整
# QUANTILES = list(np.arange(1, 10) / 10.0) # 定义分位数列表,示例简化写法,和原代码功能类似,可按需细化
QUANTILES = list[0.2, 0.5, 0.8]
def main():
# 获取股票数据
stock_data = get_stock_data('SZ.300750')
if stock_data is not None:
# 将日期列转换为日期时间类型
stock_data['date'] = pd.to_datetime(stock_data['date'])
# 获取系统当前日期往前推60天作为划分依据(tday)
tday = datetime.now() - timedelta(days=60)
# 初始化一个空的DataFrame用于存储最终合并的结果
result_df = pd.DataFrame()
# 获取stock_data['date']中的所有日期数据列表,用于后续遍历
all_dates = stock_data['date'].unique()
# 筛选出大于等于tday的日期,确保从设置的起始日期开始遍历
valid_dates = [date for date in all_dates if date >= tday]
for current_date in valid_dates:
# 划分训练集和测试集(这里只做了简单示意,和原代码逻辑保持一致,可能需要根据实际情况调整)
train = stock_data[stock_data['date'] <= current_date]
test = stock_data[stock_data['date'] >= current_date - timedelta(days=5)]
# 初始化一个空的DataFrame用于存储本次循环调整后的数据(类似原代码中的test_pd作用)
test_pd = pd.DataFrame(columns=['unique_id', 'ds', 'y'])
# 循环处理每个特征列,构建符合要求的结构
for col in train.columns[1:]: # 从第2列开始,跳过日期列(索引为0)
temp_df = pd.DataFrame({
'unique_id': f'sz.300750_{col.lower()}',
'ds': train['date'], # 直接使用stock_data中的日期列
'y': train[col]
})
# 尝试将y列转换为数字类型,如果转换失败会将非数字值设置为NaN
temp_df['y'] = pd.to_numeric(temp_df['y'], errors='coerce')
test_pd = pd.concat([test_pd, temp_df], ignore_index=True)
# 构建TimesFM模型(这部分和原代码保持一致,可能需要根据实际需求调整相关参数等)
tfm = timesfm.TimesFm(
hparams=timesfm.TimesFmHparams(
backend=_BACKEND,
per_core_batch_size=_BATCH_SIZE,
horizon_len=_HORIZON,
),
checkpoint=timesfm.TimesFmCheckpoint(
huggingface_repo_id=_MODEL_PATH),
)
run_id = np.random.randint(100000) # 简单示例设置运行ID为0,你可以根据实际需求调整生成方式,比如用时间戳等
model_name = "timesfm"
print(f"Evaluating model {model_name} on custom data", flush=True)
# 获取test_pd数据对应的频率,这里假设是日频率'd',你可能需要根据实际情况调整获取逻辑
freq = 'd'
init_time = time.time()
# 直接使用test_pd进行预测,调用forecast_on_df方法
fcsts_df = tfm.forecast_on_df(
inputs=test_pd,
freq=freq,
value_name="y",
model_name=model_name,
forecast_context_len=512, # 这里的上下文长度你可以根据实际需求调整
num_jobs=_NUM_JOBS,
)
fcsts_df = fcsts_df[fcsts_df['ds'] <= (current_date + timedelta(days=12))]
test_df12 = test[test['date'] <= (current_date + timedelta(days=12))]
fields_to_join = ['sz.300750_open', 'sz.300750_high', 'sz.300750_low', 'sz.300750_close', 'sz.300750_volume']
# 先将test_df12表赋值给merged_df,后续在此基础上不断合并新列(原代码是复制test_df12,这里改为直接赋值)
merged_df = test_df12
for field in fields_to_join:
temp_merged = pd.merge(
merged_df,
fcsts_df[fcsts_df['unique_id'] == field ][['ds','timesfm']],
left_on='date',
right_on='ds',
how='left',
suffixes=('', '_fcsts') # 指定后缀,避免列名冲突
)
# 根据当前的字段重命名合并后得到的'timesfm'列
new_column_name = field.split('.')[-1]
temp_merged.rename(columns={'timesfm': new_column_name}, inplace=True)
merged_df = temp_merged
# 删除不需要的列
merged_df.drop(columns=['ds_fcsts'], inplace=True)
# 删除不需要的列
merged_df.drop(columns=['ds'], inplace=True)
merged_df['forecastOnTday'] = current_date
# 将本次循环得到的merged_df合并到result_df中
print('merged_df:',merged_df)
result_df = pd.concat([result_df, merged_df], ignore_index=True)
# 打印最终的 DataFrame(这里打印合并后的result_df)
print(result_df)
save_path = os.path.join(_SAVE_DIR, str(run_id))
print(f"Saving results to {save_path}", flush=True)
os.makedirs(save_path, exist_ok=True)
result_df.to_csv(f"{save_path}/results.csv")
else:
print("未能成功获取股票数据,无法进行后续操作。")
if __name__ == "__main__":
main()
执行的截图