文章目录
- 简介
- 环境配置
- 企业微信机器人
- 创建群聊
- 设置机器人信息
- 脚本详解
- 导入必要的库
- 获取节假日信息
- 判断是否为工作日或节假日
- 获取天气预报
- 获取每日一句
- 发送消息到微信
- 主函数
- 加入定时任务
- 总结
- 完整代码
简介
在日常工作和生活中,自动化任务可以帮助我们节省大量时间和精力。本文将介绍如何编写一个Python脚本,该脚本可以检查指定日期是否为节假日,并在工作日通过企业微信发送每日信息,包括天气预报和每日一句励志话语。此脚本综合了多个功能模块,包括节假日判断、天气预报获取、每日句子读取和消息发送。
环境配置
首先,确保你的环境中安装了以下Python库:
pip install requests parsel
企业微信机器人
每个机器人发送的消息不能超过20条/分钟。
创建群聊
同样也是先创建群,需要在创建群后再手机端创建机器人
设置机器人信息
脚本详解
导入必要的库
import json
import re
import requests
from datetime import datetime
from parsel import Selector
这些库分别用于处理JSON数据、正则表达式、HTTP请求、日期时间和HTML解析。
获取节假日信息
节假日信息来源于万年历。我们通过以下函数获取指定月份的节假日数据:
def get_calendar(year_and_month):
url = "https://wannianrili.bmcx.com/ajax/"
params = {"q": year_and_month, "v": "22121303"}
headers = {"User-Agent": "Mozilla/5.0"}
res = requests.get(url, params=params, headers=headers)
sel = Selector(text=res.text)
rows = sel.css(".wnrl_riqi")
data = {}
for row in rows:
class_name = row.css("a::attr(class)").extract_first("").strip()
day = row.css("a::attr(onclick)").extract_first("").strip()
comment = row.css(".wnrl_td_bzl::text").extract_first("").strip()
ret = re.search("\d{4}-\d{2}-\d{2}", day)
day = ret.group(0)
data[day] = {"class_name": class_name, "comment": comment}
return data
判断是否为工作日或节假日
使用上面获取的数据,我们可以判断某一天是否为工作日或节假日:
def get_day_item(day):
calendar = get_calendar("-".join(day.split("-")[:2]))
return calendar.get(day)
def is_workday(day):
workday_class_list = ["", "wnrl_riqi_ban"]
day_item = get_day_item(day)
if day_item:
return day_item.get("class_name") in workday_class_list
return False
def is_holiday(day):
holiday_class_list = ["wnrl_riqi_xiu", "wnrl_riqi_mo"]
day_item = get_day_item(day)
if day_item:
return day_item.get("class_name") in holiday_class_list
return False
获取天气预报
通过API获取当天的天气预报北京
:
def get_weather():
url = "http://t.weather.itboy.net/api/weather/city/101010100"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
forecast = data.get("data", {}).get("forecast", [])
today = datetime.now().strftime("%Y-%m-%d")
for item in forecast:
if item.get("ymd") == today:
high = item.get("high", "未知")
low = item.get("low", "未知")
weather_type = item.get("type", "未知")
notice = item.get("notice", "未知")
weather_report = (
f"今日气温 {low} - {high},天气情况:{weather_type}\n"
f"温馨提示:{notice}"
)
return weather_report
return "无法获取今天的天气信息"
else:
return f"请求失败,状态码:{response.status_code}"
获取每日一句
从本地文件中读取每日一句:
- 索引文件:
current_index.txt
用于跟踪当前读取的行数。如果这个文件不存在,则初始化为0。 - 文本文件:
XMYX-0.txt
包含要读取的句子。每次调用 get_daily_sentence 时,从这个文件中读取下一行。 - 读取和更新:函数从文件中读取当前行的句子,并在每次调用后更新
current_index.txt
文件中的索引。如果XMYX-0.txt
文件中的行数少于调用次数,函数会返回“没有更多的句子”。
确保 current_index.txt
和 XMYX-0.txt
文件与脚本在同一目录下。如果 XMYX-0.txt
文件的行数少于调用次数,函数会在读取完所有句子后返回“没有更多的句子”。
def get_daily_sentence():
index_file = "/data/script/qqq/current_index.txt"
text_file = "/data/script/qqq/XMYX-0.txt"
try:
with open(index_file, "r") as f:
index = int(f.read().strip())
except (FileNotFoundError, ValueError):
index = 0
try:
with open(text_file, "r", encoding="utf-8") as f:
lines = f.readlines()
if index < len(lines):
sentence = lines[index].strip()
else:
sentence = "没有更多的句子"
except FileNotFoundError:
sentence = "句子文件不存在"
index += 1
with open(index_file, "w") as f:
f.write(str(index))
return sentence
发送消息到微信
通过企业微信Webhook发送消息:
def send_to_wechat(content):
url = "https://qyapi.weixin.qq.com/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
headers = {"Content-Type": "application/json"}
data = {"msgtype": "text", "text": {"content": content}}
response = requests.post(url, headers=headers, data=json.dumps(data))
return response.status_code
主函数
结合以上功能模块,实现每天在工作日发送消息:
if __name__ == "__main__":
today = datetime.now().strftime("%Y-%m-%d")
if is_workday(today) and not is_holiday(today):
weather_report = get_weather()
daily_sentence = get_daily_sentence()
message_content = (
"各位同学早上好!\n"
+ " "
+ weather_report.replace("\n", "\n ")
+ "\n"
+ " "
+ daily_sentence.replace("\n", "\n ")
+ "新的一天,早安!"
)
status_code = send_to_wechat(message_content)
if status_code == 200:
print("消息发送成功")
else:
print(f"消息发送失败,状态码:{status_code}")
加入定时任务
#企业微信通知
0 9 * * * /usr/bin/python3 /data/script/qqq/start.py
在每个小时的第0分钟(即每个小时的开始),如果当前小时是9,
那么就执行 /usr/bin/python3 /data/script/qqq/start.py 这个命令。
但是,这里有一个小的误解点需要澄清。Cron作业中的时间字段并不直接表示“如果当前小时是9”,而是分别独立地指定了分钟、小时、日、月和星期几。在这个例子中,各个字段的含义如下:
0(分钟):表示在每个小时的第0分钟执行。
9(小时):表示在每天的09:00时执行。
接下来的两个*(日和月):分别表示每个月的每一天和每年的每个月。
最后一个*(星期几):表示不考虑星期几,每天都可能执行(但由于前面的小时字段已经明确指定为09:00,所以实际上只会在09:00执行)。
因此,这条Cron作业实际上会在每天的09:00执行指定的Python脚本,而不是仅仅在“如果当前小时是9”的条件下执行。由于分钟字段被设置为0,所以它会精确地在09:00这个时刻执行。
总结来说,这条Cron作业会在每天的09:00使用Python 3解释器执行位于/data/script/qqq/目录下的start.py脚本。
总结
通过这个脚本,我们可以每天自动获取天气信息和每日一句,并在工作日通过企业微信发送给指定的群聊。这不仅节省了人工操作时间,还能提高日常信息传递的效率。你可以根据自己的需求,进一步扩展这个脚本,例如添加更多的数据来源或定制化消息内容。希望这个教程对你有所帮助,欢迎分享和交流你的实现和改进。
完整代码
# -*- coding: utf-8 -*-
"""
@Date : 2024-07-30
"""
import json
import re # 添加此行
import requests
from datetime import datetime
from parsel import Selector
def get_calendar(year_and_month):
"""
获取指定月份的日历信息,包括节假日和特殊备注
数据来源:https://wannianrili.bmcx.com/
@param year_and_month: 年月,格式为 'YYYY-MM'
@return: 字典,键为日期,值为包含 class_name 和 comment 的字典
"""
url = "https://wannianrili.bmcx.com/ajax/"
params = {"q": year_and_month, "v": "22121303"}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:64.0) Gecko/20100101 Firefox/64.0"
}
res = requests.get(url, params=params, headers=headers)
sel = Selector(text=res.text)
rows = sel.css(".wnrl_riqi")
data = {}
for row in rows:
class_name = row.css("a::attr(class)").extract_first("").strip()
day = row.css("a::attr(onclick)").extract_first("").strip()
comment = row.css(".wnrl_td_bzl::text").extract_first("").strip()
ret = re.search(r"\d{4}-\d{2}-\d{2}", day)
day = ret.group(0)
data[day] = {"class_name": class_name, "comment": comment}
return data
def get_day_item(day):
"""
获取指定日期的节假日信息
@param day: 日期,格式为 'YYYY-MM-DD'
@return: 包含 class_name 和 comment 的字典
"""
calendar = get_calendar("-".join(day.split("-")[:2]))
return calendar.get(day)
def is_workday(day):
"""
判断指定日期是否为工作日
@param day: 日期,格式为 'YYYY-MM-DD'
@return: 如果是工作日返回 True,否则返回 False
"""
workday_class_list = ["", "wnrl_riqi_ban"]
day_item = get_day_item(day)
if day_item:
return day_item.get("class_name") in workday_class_list
return False
def is_holiday(day):
"""
判断指定日期是否为节假日
@param day: 日期,格式为 'YYYY-MM-DD'
@return: 如果是节假日返回 True,否则返回 False
"""
holiday_class_list = ["wnrl_riqi_xiu", "wnrl_riqi_mo"]
day_item = get_day_item(day)
if day_item:
return day_item.get("class_name") in holiday_class_list
return False
def get_weather():
"""
获取今天的天气信息
@return: 包含今天气温和天气情况的字符串
"""
url = "http://t.weather.itboy.net/api/weather/city/101010100"
response = requests.get(url)
if response.status_code == 200:
data = response.json()
forecast = data.get("data", {}).get("forecast", [])
today = datetime.now().strftime("%Y-%m-%d")
for item in forecast:
if item.get("ymd") == today:
high = item.get("high", "未知")
low = item.get("low", "未知")
weather_type = item.get("type", "未知")
notice = item.get("notice", "未知")
weather_report = (
f"今日气温 {low} - {high},天气情况:{weather_type}\n"
f"温馨提示:{notice}"
)
return weather_report
return "无法获取今天的天气信息"
else:
return f"请求失败,状态码:{response.status_code}"
def get_daily_sentence():
"""
从文件中按顺序读取每日一句
@return: 每日一句的字符串
"""
index_file = "/data/script/qqq/current_index.txt"
text_file = "/data/script/qqq/XMYX-0.txt"
try:
with open(index_file, "r") as f:
index = int(f.read().strip())
except (FileNotFoundError, ValueError):
index = 0
try:
with open(text_file, "r", encoding="utf-8") as f:
lines = f.readlines()
if index < len(lines):
sentence = lines[index].strip()
else:
sentence = "没有更多的句子"
except FileNotFoundError:
sentence = "句子文件不存在"
index += 1
with open(index_file, "w") as f:
f.write(str(index))
return sentence
def send_to_wechat(content):
"""
发送消息到企业微信
@param content: 消息内容
@return: HTTP 响应状态码
"""
url = "https://qyapi.weixin.qq.com/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
headers = {"Content-Type": "application/json"}
data = {"msgtype": "text", "text": {"content": content}}
response = requests.post(url, headers=headers, data=json.dumps(data))
return response.status_code
if __name__ == "__main__":
today = datetime.now().strftime("%Y-%m-%d")
# 检查今天是否为工作日且不是节假日
if is_workday(today) and not is_holiday(today):
weather_report = get_weather()
daily_sentence = get_daily_sentence()
# 生成消息内容
message_content = (
"各位同学早上好!\n"
+ " " + weather_report.replace("\n", "\n ") + "\n"
+ " " + daily_sentence.replace("\n", "\n ") + "\n"
+ "新的一天,早安!"
)
# print(message_content)
# 发送消息到企业微信
status_code = send_to_wechat(message_content)
# if status_code == 200:
# print("消息发送成功")
# else:
# print(f"消息发送失败,状态码:{status_code}")
# else:
# print("今天是节假日或非工作日,不发送消息。")