【Python】以邮件的方式定时发送一天的股票分析报告
文章目录
- 【Python】以邮件的方式定时发送一天的股票分析报告
- 1、Python发送邮件
- 1)`EmailSender`封装
- 2)可能存在的问题
- 2、jinja2动态渲染html页面
- 3、阿里云OSS搭建图床
- 1)Python上传图片到OSS中
- 2)使用`PicGo`上传图片到OSS中
- 3)图片链接访问报错解决
- 4、APScheduler定时任务
- 1)APScheduler四个组件
- 2)配置scheduler调度器
假设我现在的需求是:
假设我已经实现了对某只股票的分时/日/周/月K线,以及对应指标(RSI,BOLL,OBV,MACD)的绘制。
如果我想让系统每天在股市结束后,给我发送关于几只自选股票的分析报告,我可以通过如下方式实现:
1)让系统对今天几只股票的分时/日/周/月K线以及对应指标进行计算和绘制;
2)绘制的图片自动上传到阿里云OSS中,并返回关于已上传图片的
url
链接;3)根据给定的
html
模板,利用Jinja2
工具包,将今天的股票绘制结果动态渲染到html
中4)系统将生成的
html
文件以邮件的形式发送到指定的收件人中;
这里分成4个模块,依次对邮件发送、html模板渲染、图床搭建、定时任务进行介绍,这里并没有给出实现这个需求的完整代码。
1、Python发送邮件
参考资料:
-
简单三步,用 Python 发邮件_程序员王饱饱的博客-CSDN博客_python 发送邮件
-
Python SMTP发送邮件 | 菜鸟教程
1)EmailSender
封装
封装好的邮件发送类EmailSender
代码如下:包括文本内容、带txt
、pdf
、图片和html
的附件发送。
-
MIMEMultipart
可以允许带附件; -
如果想添加一个
txt
或者html
文本附件,用MIMEText
封装; -
如果想添加一个
pdf
文本附件, 用MIMEApplication
封装,参考# 【Mail小技巧】如何使用Python优雅的发送带有pdf附件的电子邮件; -
如果想添加一个照片附件,用
MIMEImage
封装;
# 参考 https://blog.csdn.net/weixin_55154866/article/details/128098092
# 参考 https://www.runoob.com/python/python-email.html
# 参考 https://blog.csdn.net/YP_FlowerSky/article/details/124451913
import smtplib
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from email.mime.image import MIMEImage
import pdfplumber
from pathlib import Path
class EmailSender:
def __init__(self,mail_host,mail_user,mail_pass,sender):
'''
@param mail_host 设置登录及服务器信息,比如网易邮箱是smtp.163.com; type=str
@param mail_user 邮箱用户名; type=str
@param mail_pass 邮箱授权码; type=str
@param sender 邮件发送方邮箱地址; type=str
'''
self.mail_host = mail_host
self.mail_user = mail_user
self.mail_pass = mail_pass
self.sender = sender
'''初始化一封邮件'''
def init_email(self,receivers,content,subject):
'''
@param receivers 接收方邮箱集合,适合群发; type=list
@param content 文本内容;type=str
@param subject 主题; type=str
'''
# 参考 https://blog.csdn.net/YP_FlowerSky/article/details/124451913
self.receivers = receivers
# 添加一个MIMEmultipart类,处理正文及附件
self.message = MIMEMultipart() #MIMEMultipart可以允许带附件
self.message['From'] = self.sender #发送方邮箱地址
self.message['To'] = ','.join(receivers) #接收方邮箱地址, 将['1046474088@qq.com','2802428220@qq.com']处理成'1046474088@qq.com,2802428220@qq.com'的str
self.message['Subject'] = subject
self.message.attach(MIMEText(content,'plain', 'utf-8')) # 文本内容 (plain文本格式,utf-8编码)
'''为邮件添加附件'''
def email_wrapper(self,filePath,fileType="text"):
'''
@param filePath 文件路径; type=str
@param fileType 文件类型,可选 ['text','html','image']; type=str
'''
if(fileType == 'text'):
suffix = filePath.split(".")[-1]
# 添加一个pdf文本附件, 用MIMEApplication封装 参考 https://blog.csdn.net/popboy29/article/details/126396549
if (suffix == 'pdf'):
with open(filePath, "rb") as f:
pdf_attach = MIMEApplication(f.read(), _subtype="pdf")
#如果出现邮件发送成功,但邮箱接收到的附件变为bin格式的情况时,检查add_header是否出错 参考https://blog.csdn.net/hxchuadian/article/details/125773738
pdf_attach.add_header('Content-Disposition', 'attachment', filename=str(Path(filePath).name))
self.message.attach(pdf_attach)
else:
#添加一个txt文本附件,用MIMEText封装
with open(filePath,'r')as h:
content2 = h.read()
#设置txt参数
text_attach = MIMEText(content2,'plain','utf-8')
#附件设置内容类型,方便起见,设置为二进制流
text_attach['Content-Type'] = 'application/octet-stream'
#设置附件头,添加文件名
text_attach['Content-Disposition'] = f'attachment;filename="{filePath}"'
self.message.attach(text_attach)
if (fileType == 'html'):
# 推荐使用html格式的正文内容,这样比较灵活,可以附加图片地址,调整格式等
with open(filePath,'r') as f:
# 设置html格式参数
html_attach = MIMEText(f.read(), 'base64', 'gb2312') # 将html文件以附件的形式发送
html_attach['Content-Type'] = 'application/octet-stream'
html_attach.add_header('Content-Disposition', 'attachment',filename=str(Path(filePath).name)) # filename是指下载的附件的命名
self.message.attach(html_attach)
if (fileType == 'image'):
# 添加照片附件,用MIMEImage封装
with open(filePath, 'rb') as fp:
picture_attach = MIMEImage(fp.read())
# 与txt文件设置相似
picture_attach['Content-Type'] = 'application/octet-stream'
picture_attach['Content-Disposition'] = f'attachment;filename="{filePath}"'
# 将内容附加到邮件主体中
self.message.attach(picture_attach)
#登录并发送
def sendEmail(self):
try:
smtpObj = smtplib.SMTP()
smtpObj.connect(self.mail_host, 25)
smtpObj.login(self.mail_user, self.mail_pass)
smtpObj.sendmail(
self.sender, self.receivers, self.message.as_string()) #receivers群发, receivers是一个列表['1046474088@qq.com','2802428220@qq.com']
print('success')
smtpObj.quit()
except smtplib.SMTPException as e:
print('error', e)
2)可能存在的问题
参考
-
如果出现
list‘ object has no attribute ‘encode‘_list' object has no attribute 'encode
,主要原因是self.message['To']
赋值有误,要想实现群发,需要将['1046474088@qq.com','2802428220@qq.com']
处理成'1046474088@qq.com,2802428220@qq.com'
的str
类型参考Selenium /Python 配置QQ邮箱后台自动发送邮件unittest//发送多人邮件报错: ‘list‘ object has no attribute ‘encode‘_list’ object has no attribute 'encode
-
如果出现邮件发送成功,但邮箱接收到的附件变为
bin
格式的情况时,检查add_header
是否出错。可参考Python 发送邮件时图片附件变为bin格式的解决方案
2、jinja2动态渲染html页面
参考Python之jinja2模板引擎生成HTML_宗而研之的博客-CSDN博客_python 生成html
html
模板如下:
<meta http-equiv="Content-Type" content="text/html;charset=utf-8">
<html align='left'>
<body>
<h1>{{today}}股票分析报告</h1>
<table>
{% for stock in stocks %}
<tr align='center'>
<td>{{ stock.code }}</td>
<td>{{ stock.codeName }}</td>
<td><a href="{{stock.minute_kline_path}}">分时K线图</a> </td>
<td><a href="{{stock.daily_kline_path}}">日K线图</a> </td>
<td><a href="{{stock.week_kline_path}}">周K线图</a> </td>
<td><a href="{{stock.month_kline_path}}">月K线图</a> </td>
</tr>
{% endfor%}
</table>
</body>
</html>
python
代码如下:
import datetime
from jinja2 import Environment, FileSystemLoader
import datetime
import sys
import os
from pathlib import Path
curPath = os.path.abspath(os.path.dirname(__file__))
rootPath = os.path.split(curPath)[0]
sys.path.append(rootPath)
imgDir = os.path.join(rootPath,"html_task/temp/")
def generate_html(today, stocks):
env = Environment(loader=FileSystemLoader('./'))
template = env.get_template('template.html')
with open("result.html", 'w+') as fout:
html_content = template.render(today=today,
stocks=stocks)
fout.write(html_content)
if __name__ == "__main__":
today = datetime.datetime.now().strftime("%Y-%m-%d")
stocks = []
stock1_path = os.path.join(imgDir,"sh601728")
stock2_path = os.path.join(imgDir,"sz000722")
stock1 = {'code': 'sh601728', 'codeName': '中国电信',
'minute_kline_path': stock1_path + "/" + "minute_K_line.png",
'daily_kline_path': stock1_path + "/" + "daily_K_line.png",
'week_kline_path': stock1_path + "/" + "week_K_line.png",
"month_kline_path" : stock1_path + "/" + "month_K_line.png", }
stock2 = {'code': 'sz000722', 'codeName': '湖南发展',
'minute_kline_path': stock2_path + "/" + "minute_K_line.png",
'daily_kline_path': stock2_path + "/" + "daily_K_line.png",
'week_kline_path': stock2_path + "/" + "week_K_line.png",
"month_kline_path" : stock2_path + "/" + "month_K_line.png", }
stocks.append(stock1)
stocks.append(stock2)
generate_html(today, stocks) #图片无法正常显示,会报错:Not allowed to load local resource
# 图片无法正常显示 解决方法参考 http://www.kuazhi.com/post/319149.html
生成的html
可视化效果如下:
但是存在一个问题 - 点击链接并不能正常下载或访问图片,主要原因是:浏览器出于安全方面的考虑,禁止网页访问本地文件,因为图片是存在项目目录下的,所以无法通过本地的url进行访问。(参考浏览器报错:Not allowed to load local resource 原因及解决办法_扭不开瓶盖的三水的博客-CSDN博客)
因此这里打算用图床返回的图片url
链接来解决Not allowed to load local resource
问题。
3、阿里云OSS搭建图床
参考
-
阿里云 oss 服务 —— 上传图片,获取url
-
阿里云OSS使用流程
-
使用阿里云OSS搭建图床 - 简书
1)Python上传图片到OSS中
使用python
将图片上传到阿里云OSS(挺便宜的,买了1年9 rmb的资源包)中,然后通过url
链接访问图片。其中建议使用RAM用户的ACCESS_KEY_ID
和ACCESS_KEY_SECRET
,BUCKET_NAME
是购买的OSS
实例名称,ENDPOINT
是这个OSS
实例的地域节点,具体获取方式参考阿里云OSS使用流程
# 使用阿里云OSS + picGo搭建图床 参考 https://www.jianshu.com/p/111ce9603ea6l
# -*- coding: utf-8 -*-
import datetime
import oss2
import unittest
# 阿里云OSS使用流程 参考 https://zhuanlan.zhihu.com/p/567771838
ACCESS_KEY_ID = "LTAI5*****Hu6m" #RAM账号access_key_id,如果没有用主账号登录工作台创建并授权,关于RAM角色参考 https://ram.console.aliyun.com/roles
ACCESS_KEY_SECRET = "mkit8YsLh*****TYmoh1QRzK" #RAM账号access_key_secret
ENDPOINT = "oss-cn-shenzhen.aliyuncs.com" #可以在bucket中获取地域节点endpoint 参考 https://zhuanlan.zhihu.com/p/567771838
BUCKET_NAME = "w*****i-20200401"
#参考 https://www.likecs.com/show-308529932.html#sc=900
class Oss:
"""
oss存储类
上传bytes流,返回状态码和url
"""
def __init__(self, access_key_id=ACCESS_KEY_ID, access_key_secret=ACCESS_KEY_SECRET,
endpoint=ENDPOINT, bucket_name=BUCKET_NAME):
# 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
auth = oss2.Auth(access_key_id, access_key_secret)
# Endpoint以杭州为例,其它Region请按实际情况填写。'http://oss-cn-hangzhou.aliyuncs.com'
self.bucket = oss2.Bucket(auth, endpoint, bucket_name)
def upload_bytes(self, file_bytes, image_name):
"""上传bytes文件"""
result = self.bucket.put_object('{}'.format(image_name), file_bytes)
class OSSTest(unittest.TestCase):
def test_oss_uploadFile(self):
oss_obj = Oss()
with open("temp/sh601728/minute_K_line.png","rb") as f:
oss_obj.upload_bytes(f.read(),"minute_K_line.png")
def test_oss_downloadFile(self):
# 上传后,可以访问的 url 的组成
photo_name = 'minute_K_line.png'
domain = f'https://{BUCKET_NAME}.{ENDPOINT}/'
url_photo = domain + photo_name
print(url_photo)
#访问该图片时可能会报错:You have no right to access this object because of bucket acl.
# 解决方法:在bucket的权限控制中,将私有修改为公共读 参考 https://blog.csdn.net/zsy3757486/article/details/126938973
2)使用PicGo
上传图片到OSS中
在官网[Releases · Molunerfinn/PicGo · GitHub](https://github.com/Molunerfinn/PicGo/releases)
下载安装好PicGo
之后,在图床配置中配置好阿里云OSS的ACCESS_KEY_ID
、ACCESS_KEY_SECRET
、BUCKET_NAME
和存储区域地址
后,即可实现图片上传,具体参考使用阿里云OSS搭建图床 - 简书
3)图片链接访问报错解决
正常情况下上传到图片可以通过如下链接访问:
https://{BUCKET_NAME}.{ENDPOINT}/photo_name #photo_name是上传到图片名称
但是在测试图片url
连接时可能会报错:You have no right to access this object because of bucket acl.
解决方法:在bucket
的权限控制中,将私有修改为公共读。参考【阿里云OSS】You have no right to access this object because of bucket acl._路遥叶子的博客-CSDN博客
4、APScheduler定时任务
参考
- Python 定时任务的实现方式
- BlockingScheduler与BackgroundScheduler区别
- APScheduler 定时任务详解
APScheduler定时框架:终于找到了可以每天定时喊我起床的方式了
APScheduler是一个 Python 定时任务框架,使用起来十分方便。提供了基于日期、固定时间间隔以及 crontab 类型的任务,并且可以持久化任务、并以 daemon 方式运行应用。
使用 APScheduler 需要安装
pip install apscheduler
首先来看一个周一到周五每天早上6点半喊我起床的例子
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
# 输出时间
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# BlockingScheduler
scheduler = BlockingScheduler()
scheduler.add_job(job, 'cron', day_of_week='1-5', hour=6, minute=30)
scheduler.start()
1)APScheduler四个组件
APScheduler 四个组件分别为:触发器(trigger),作业存储(job store),执行器(executor),调度器(scheduler)。
a、触发器(trigger)
包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的
APScheduler 有三种内建的 trigger:
date: 特定的时间点触发
interval: 固定时间间隔触发
cron: 在特定时间周期性地触发
b、作业存储(job store)
存储被调度的作业,默认的作业存储是简单地把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据讲在保存在持久化作业存储时被序列化,并在加载时被反序列化。调度器不能分享同一个作业存储。
APScheduler 默认使用 MemoryJobStore,可以修改使用 DB 存储方案
c、执行器(executor)
处理作业的运行,他们通常通过在作业中提交制定的可调用对象到一个线程或者进城池来进行。当作业完成时,执行器将会通知调度器。
最常用的 executor 有两种:
ProcessPoolExecutor
ThreadPoolExecutor
d、调度器(scheduler)
通常在应用中只有一个调度器,应用的开发者通常不会直接处理作业存储、调度器和触发器,相反,调度器提供了处理这些的合适的接口。配置作业存储和执行器可以在调度器中完成,例如添加、修改和移除作业。
2)配置scheduler调度器
APScheduler
提供了许多不同的方式来配置调度器,你可以使用一个配置字典或者作为参数关键字的方式传入。你也可以先创建调度器,再配置和添加作业,这样你可以在不同的环境中得到更大的灵活性。
下面来看一个简单的 BlockingScheduler
例子
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime
def job():
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 定义BlockingScheduler
sched = BlockingScheduler()
sched.add_job(job, 'interval', seconds=5)
sched.start()
上述代码创建了一个 BlockingScheduler
,并使用默认内存存储和默认执行器。(默认选项分别是 MemoryJobStore
和 ThreadPoolExecutor
,其中线程池的最大线程数为10)。配置完成后使用 start()
方法来启动。
如果要给job传参,可以在add_job
中使用args
参数,如果要给job设置指定id,可以使用id
参数
rom datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def func(name):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(now + f" Hello world, {name}")
scheduler = BlockingScheduler()
scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="func")
scheduler.start()
移除job:
- 1)通过job的ID来调用
remove_job
方法 - 2)通过在
add_job()
中得到的job实例调用remove()
方法 - 如果一个job完成了调度(例如他的触发器不会再被触发), 它会自动被移除
如果job_id
不存在,remove_job
会报错,可以用try - except来处理
# remove
job = scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")
job.remove()
# remove_job
scheduler.add_job(func, 'interval', seconds=3, args=["desire"], id="job_remove")
scheduler.remove_job(job_id="job_remove")
终止调度器中的执行器:
scheduler.shutdown() #终止调度器中的任务存储器以及执行器
scheduler.shutdown(wait=False)
默认情况,会终止任务存储器以及执行器,然后等待所有目前执行的job完成后(自动终止),wait=False
此参数不会等待任何运行中的任务完成,直接终止。但是如果scheduler没有执行,shutdown()
会报错。