1.在Azure 门户注册应用程序
微软文档地址
重定向的地址配置(微软地址): https://login.microsoftonline.com/common/oauth2/nativeclient
注册应用地址
2.程序代码
#安装包以及需要的驱动
pip3 install playwright
playwright install
import base64
import json
import logging
from io import BytesIO
from playwright.sync_api import sync_playwright
import time
import urllib.parse
import requests
from pia.utils.cache import get_redis_value, set_redis_expire
from pia.utils.constants import OUTLOOK_CLIENT_ID, OUTLOOK_TENANT_ID, OUTLOOK_CLIENT_SECRET, OUTLOOK_REDIRECT_URI, \
COS_OUTLOOK_DIR, OUTLOOK_TOP
from pia.utils.cos_upload import upload_stream_to_cos, check_exists
from pia.utils.reids_key import OUTLOOK_TOKEN
log = logging.getLogger(__name__)
#账号登录 官方文档 https://learn.microsoft.com/en-us/azure/active-directory/develop/v2-oauth2-auth-code-flow
def get_authorization_code(_user_name, _pass_word):
with sync_playwright() as play_wright:
browser = play_wright.chromium.launch(headless=True)
context = browser.new_context(locale="zh-CN", accept_downloads=True)
page = context.new_page()
url = f"https://login.microsoftonline.com/{OUTLOOK_TENANT_ID}/oauth2/v2.0/authorize?client_id={OUTLOOK_CLIENT_ID}&response_type=code&redirect_uri={OUTLOOK_REDIRECT_URI}&response_mode=query&scope=https%3A%2F%2Fgraph.microsoft.com%2Fmail.read&state=12345"
page.goto(url)
page.click("[placeholder=\"电子邮件、电话或\\ Skype\"]")
page.fill("[placeholder=\"电子邮件、电话或\\ Skype\"]", _user_name)
with page.expect_navigation():
page.click("text=下一步")
page.click("[placeholder=\"密码\"]")
page.fill("[placeholder=\"密码\"]", _pass_word)
with page.expect_navigation():
page.click("text=登录")
page.click("text=是")
time.sleep(3)
query = dict(urllib.parse.parse_qsl(urllib.parse.urlsplit(page.url).query))
authorization_code = query.get('code')
context.close()
browser.close()
return authorization_code
def get_token(authorization_code):
param = {'client_id': OUTLOOK_CLIENT_ID,
'code': authorization_code,
'redirect_uri': OUTLOOK_REDIRECT_URI,
'grant_type': 'authorization_code',
'client_secret': OUTLOOK_CLIENT_SECRET
}
token_headers = {"Content-Type": "application/x-www-form-urlencoded"}
token_url = f'https://login.microsoftonline.com/{OUTLOOK_TENANT_ID}/oauth2/v2.0/token'
res = requests.post(url=token_url, headers=token_headers, data=param)
access_token = json.loads(res.text).get('access_token')
return f'Bearer {access_token}'
# api官方文档 https://learn.microsoft.com/en-us/graph/api/mailfolder-list-messages?view=graph-rest-1.0
def get_inbox(authorization, str_start, str_end):
condition = '?$filter = '
if str_start:
condition += f'ReceivedDateTime ge {str_start} and '
condition += f'receivedDateTime lt {str_end}'
# 获取收件箱里面的 邮件
endpoint = f"https://graph.microsoft.com/v1.0/me/mailFolders/inbox/messages{condition}&$top={OUTLOOK_TOP}"
http_headers = {'Authorization': authorization,
'Accept': 'application/json',
'Content-Type': 'application/json'}
data = requests.get(endpoint, headers=http_headers, stream=False).json()
return data
def get_email_attachments(authorization, email_id):
# 获取收件箱里面的邮件附件
endpoint = f"https://graph.microsoft.com/v1.0/me/messages/{email_id}/attachments"
http_headers = {'Authorization': authorization,
'Accept': 'application/json',
'Content-Type': 'application/json'}
data = requests.get(endpoint, headers=http_headers, stream=False).json()
return data
#程序入口
def deal_user_email(_user_name, _pass_word, str_start, str_end) -> list:
result = []
redis_key = f'{OUTLOOK_TOKEN}:{_user_name}'
# 缓存
authorization = get_redis_value(redis_key)
if authorization:
pass
else:
authorization_code = get_authorization_code(_user_name, _pass_word)
authorization = get_token(authorization_code)
if authorization:
set_redis_expire(redis_key, authorization, 60 * 60)
if authorization:
email = get_inbox(authorization, str_start, str_end)
if email:
email_values = email.get("value")
if email_values:
for value in email_values:
# 邮件 id
email_id = value.get("id")
# 是否存在附件 True/False
has_attachments = value.get("hasAttachments")
value.update({"attachments": {}})
if has_attachments and email_id:
attachment_dict = upload_attachment_to_cos(authorization, email_id, _user_name)
value.update({"attachments": attachment_dict})
result.append(value)
else:
log.error(f"outlook user_name: {_user_name} Authorization Failed")
return result
'''
附件上传到cos
'''
def upload_attachment_to_cos(authorization, email_id, _user_name):
attachment_dict = {}
attachments = get_email_attachments(authorization, email_id)
if attachments:
attachment_values = attachments.get("value")
if attachment_values:
for _value in attachment_values:
# 附件 name
attachment_name = _value.get("name")
# 附件 内容
attachment_content = _value.get("contentBytes")
# Step 1: 解码 Base64 字符串
decoded_data = base64.b64decode(attachment_content)
# Step 2: 创建一个 BytesIO 对象作为文件流
file_stream = BytesIO(decoded_data)
object_name = f'{COS_OUTLOOK_DIR}/{_user_name}/{email_id}/{attachment_name}'
is_exists, url = check_exists(object_name)
if not is_exists:
url = upload_stream_to_cos(file_stream, object_name)
attachment_dict.update({attachment_name: url})
return attachment_dict
import traceback
from datetime import datetime, timedelta
from django.core.management import BaseCommand
import logging
import uuid
from django.db import transaction
from pia.models import PiaOutLookTask, PiaOutLookData
from pia.utils.aes import decrypted
from pia.utils.outlook import deal_user_email
logging = logging.getLogger('task')
#任务的方式拉取
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('trace_id', type=str)
def handle(self, *args, **options):
trace_id = options["trace_id"]
if not trace_id:
trace_id = str(uuid.uuid4())
self.trace_id = trace_id
self.writeLog('outlook email start')
outlook_task = PiaOutLookTask.objects.values("id", "user_name", "pwd", "execution_time")
for x in outlook_task:
_id = x.get("id")
user_name = x.get("user_name")
self.writeLog(f'############## outlook email user_name:{user_name} start ##############')
pwd = x.get("pwd")
execution_time = x.get("execution_time")
# 获取当前时间
current_time = datetime.now()
# 格式化为 YYYY-MM-DD
end_time = current_time.strftime('%Y-%m-%dT%H:%M:%S')
try:
if user_name and pwd:
_pwd = decrypted(pwd)
result = deal_user_email(user_name, _pwd, f'{execution_time}Z', F'{end_time}Z')
with transaction.atomic():
PiaOutLookTask.objects.filter(id=_id).update(status=0, execution_time=end_time)
outlook_data = PiaOutLookData.objects.filter(outlook_task_id=_id, start_time=execution_time,
end_time=end_time)
if outlook_data:
outlook_data.update(content=result)
else:
PiaOutLookData.objects.create(outlook_task_id=_id, start_time=execution_time,
end_time=end_time, content=result)
self.writeLog(f'############## outlook email user_name:{user_name} end ##############')
except Exception:
PiaOutLookTask.objects.filter(id=_id).update(status=1, execution_time=end_time)
self.writeLog(
f'############## outlook email user_name:{user_name} execution failed::::{traceback.format_exc()}')
self.writeLog('outlook email end')
def writeLog(self, msg: str):
logging.info(f'[{self.trace_id}] {msg}')