场景
业务上有许多发送邮件的场景,发送的邮件基本上都是自动发送的,而且邮件内容是很重要的,对于邮件发没发送,发送的时间点对不对每次回归测试工作量太大了,所以考虑把这部分内容加入到自动化测试中
工具
-
python
-
gmail
要访问gmail先要去console.cloud.google.com创建访问凭证,如图所示,进行下载后获取一个json文件credentials.json
实现代码
其中DateFormat 和get_path是自定义方法,分别是获取时间和获取文件
from __future__ import print_function
import base64
import os.path
import logging
import time
from utils.get_file_path import get_path
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from utils.date_format import DateFormat
from datetime import datetime
# If modifying these scopes, delete the file token.json.
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.modify']
class GmailSearch:
def __init__(self):
"""
get token.json.
"""
self.creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
file_token = get_path('token.json', 'utils')
if os.path.exists(file_token):
self.creds = Credentials.from_authorized_user_file(file_token, SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not self.creds or not self.creds.valid:
if self.creds and self.creds.expired and self.creds.refresh_token:
self.creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
get_path('credentials.json', 'utils'), SCOPES)
self.creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(file_token, 'w') as token:
token.write(self.creds.to_json())
def search_email(self, **kwargs):
'''
search email by keywords ( from, sender , subject, before, after, unread, interval)
:param kwargs:
默认查询当前时间前后10分钟未读的邮件, 如果第一次未查询到,默认会间隔10s钟查询一次,共查询三次, 如果找到的话,会将找到的标记成已读
查询主题参数: subject
发送人参数: sender
是否已读 : unread (bool类型)
查询时间段参数: before, after
设置查询间隔: interval
设置查询次数: count
:return: the email, if no result is found, return none
'''
count = kwargs.get("count") if kwargs.get("count") is not None else 3
if count == 0:
return "no email found"
search_str = ["in:inbox"]
unread = False if kwargs.get("unread") else True
if unread:
search_str.append("is:unread")
if kwargs.get("attachment"):
search_str.append("has:attachment")
if kwargs.get("sender"): # value like atest@email.com, btest@email.com
search_str.append(f'from:({kwargs.get("sender")})')
if kwargs.get("to"): # value like atest@email.com, btest@email.com
search_str.append(f'to:({kwargs.get("to")})')
if kwargs.get("subject"):
search_str.append(f'subject:({kwargs.get("subject")})')
if kwargs.get("before"):
search_str.append(f'before:{kwargs.get("before")}')
else: # default value is current + 10.minutes
search_str.append(f'before:{DateFormat.from_current_minutes(10)}')
if kwargs.get("after"):
search_str.append(f'after:{kwargs.get("after")}')
else: # default value is current - 10.minutes
search_str.append(f'after:{DateFormat.from_current_minutes(-10)}')
interval = kwargs.get("interval") if kwargs.get("interval") else 10
query = " ".join(search_str)
unread = kwargs.get("unread")
time.sleep(interval)
logging.info(datetime.now().strftime("%H:%M:%S"))
logging.info(str(count - 1))
try:
# Call the Gmail API
service = build('gmail', 'v1', credentials=self.creds)
results = service.users().messages().list(userId='me', q=query).execute()
messages = results.get("messages")
if not messages:
logging.info('No email found, continue to search')
kwargs.update({"count": count-1})
return self.search_email_content(**kwargs)
# get the last email, mark read, return email body
# body = []
last_email = service.users().messages().get(userId='me', id=messages[0]['id']).execute()
payload = last_email['payload']
# internalDate = last_email['internalDate']
# The Body of the message is in Encrypted format. So, we have to decode it.
# Get the data and decode it with base 64 decoder.
parts = payload.get('parts')[0]['body'] if payload.get('parts') else payload.get('body')
data = parts['data']
data = data.replace("-", "+").replace("_", "/")
decoded_data = base64.b64decode(data)
body = str(decoded_data, 'utf-8')
# mark read
if unread:
logging.info(" mark read");
service.users().messages().modify(userId='me', id=messages[0]['id'],
body={'removeLabelIds': ['UNREAD']}).execute()
return body
except HttpError as error:
logging.info(f'An error occurred: {error}')
# test=GmailSearch()
# test.search_email(before='2023/10/3', after='2023/9/25', to="test@test.com", unread=True)
参考文档:Gmail api文档