中间涉及到的技术点有:
- 模拟登陆
- 模拟下载
- 解析exal文件数据流
- 读取exal文件,拿出订单号
- 还有最后一点请求接口
下面就给大家挨个说一下,刚拿到需求其实还是很模糊的,因为一个都没做过,等静下心来去理解的时候,发现并没有那么难,反而很简单
模拟登陆
一、分析页面请求头
本次登陆地址是https://huoche.alitrip.com/hello.htm
1、先登陆了一遍查看了一下请求头,发现就携带了三个东西,隐藏token,用户名,密码
一看一目了然,就一个后台页面,可想而知相对来说还是很简单,哈哈,下一步我只需要封装一下cookie,然后带上tocken,username,passwd去登陆咯
给大家说下,python的requests模块可以忽略cookie,自己创建一个session对象,他自己去给咱们匹配cookie,不用去挨个试cookie,这样就节省了好多代码和时间
2、代码如下
class TbTomas(object):
def __init__(self):
# 配置初始化
self.session_obj = requests.session()
def download_file(self,thomas_username,thomas_password,):
hello_url = 'https://huoche.alitrip.com/hello.htm'
# 获取原文
hello_response = self.session_obj.get(hello_url)
# 正则匹配原文
h_u_s = re_search('<input type="hidden" id="h_u_s" name="h_u_s" value="(.*?)">', hello_response.text)
h_u_s = base64.b64encode(h_u_s)
headers = {
'Accept': 'text/html, application/xhtml+xml, image/jxr, */*',
'Referer': 'https://huoche.alitrip.com/hello.htm',
'Accept-Language': 'zh-CN',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept-Encoding': 'gzip, deflate',
'Host': 'huoche.alitrip.com',
'Content-Length': '73',
'Connection': 'Keep-Alive',
'Cache-Control': 'no-cache'
}
post_data = {
'h_u_s': base64.b64encode(h_u_s),
'h_u_n': thomas_username,
'h_u_p': base64.b64encode(thomas_password)
}
index_url = 'https://huoche.alitrip.com/index.htm'
index_response = self.session_obj.post(index_url, headers=headers, data=post_data)
最后一提交post请求,就可以判断有没有登录成功了,是不是很简单,哈哈!
数据下载
下载也是和登录是一样的道理,下载的时候肯定也是像网页发一个post请求,然后就回去下载exal文件咯,python有这么一个模块xlrd,可以去操作exal文件,非常方便
1、原文是让我们输入时间看,下载那一天的数据,领导给的任务是下载前一天的,所以上一天时间要写几行代码来实现
代码如下:
today = datetime.datetime.now()
yesterday = today + datetime.timedelta(days=-1)
trade_date = yesterday.strftime('%Y-%m-%d')
2、查看下载文件请求的url,以及提交的数据,一张图一切都明白了
从图中可以看到,该文发送的url,请求方式,请求头,和返回的数据
3、模拟请求下载,只需用提交一下日期就OK搞定,文件下载完毕,接下开要读文件拿自己想要的东西啦
post_data = {
'orderExportDate': trade_date
}
sheet_content = ""
for _ in xrange(3):
try:
# 得到exal文件流
download_response = self.session_obj.post(download_url, data=post_data)
# 打开exal文件
xls_content = xlrd.open_workbook(file_contents=download_response.content)
sheet_content = xls_content.sheets()[0]
break
except Exception as e:
continue
4、这个就众所周知,和读取文件一样,for循环一行一行读取,然后把订单号挨个添加给一个列表啥啦乱七八糟的
order_item = []
for line_num in range(sheet_content.nrows):
line_item = sheet_content.row_values(line_num)
if line_item[2]:
order_item.append(line_item[2], ) # 订单号 order_no
# 获取到所有订单号
order_item = order_item[1:]
拿到订单号要去获取订单详情了,但是领导给我说这个已经有同事写好代码了,只需要调用那个接口就好,所以别人的代码我就不往上面展示了,原理很简单
requests模块,请求url,get传入订单号,发送请求,就可以返回数据咯,web页面展示,那个需求,每个公司都不一样,存入数据库,自己取自己想要的吧。
本文就到这里吧,学到一点东西的请点赞,哈哈
最后附带源码,用户名和密码就不告诉大家啦,啊哈哈
#!/usr/bin/python
# coding:utf-8
import sys
import os
import django
reload(sys)
sys.setdefaultencoding('utf8')
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) # 把manage.py所在目录添加到系统目录
os.environ['DJANGO_SETTINGS_MODULE'] = 'business.settings' # 设置setting文件
django.setup() # 初始化Django环境
import requests
import re
import logging
import base64
import xlrd
import datetime
import time
import MySQLdb
import threadpool
from business import settings
from train.depends.platform import Platform
from train.models import TbTomasOrder,TbTomasEpay,TtTicketThomas,TbTomasLinkman
from train import utils
from train.status import OrderStatus
from django.core.mail import EmailMultiAlternatives
from train.busi import insert_order,insert_ticket,insert_epay,insert_linkman
logger = logging.getLogger('django')
class TbTomas(object):
succ_number = 0
fail_number = 0
fail_order = []
def __init__(self,thread_num = 3):
# 配置初始化
self.session_obj = requests.session()
self.fail_order = []
self.succ_number = 0
self.fail_number = 0
self.thread_num = thread_num
self.start_date = ""
self.end_date = ""
self.trade_date = utils.now()
def login_thomas(self,thomas_username,thomas_password):
hello_url = 'https://huoche.alitrip.com/hello.htm'
hello_response = self.session_obj.get(hello_url)
h_u_s = re_search('<input type="hidden" id="h_u_s" name="h_u_s" value="(.*?)">', hello_response.text)
h_u_s = base64.b64encode(h_u_s)
headers = {
'Accept': 'text/html, application/xhtml+xml, image/jxr, */*',
'Referer': 'https://huoche.alitrip.com/hello.htm',
'Accept-Language': 'zh-CN',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Safari/537.36 Edge/13.10586',
'Content-Type': 'application/x-www-form-urlencoded',
'Accept-Encoding': 'gzip, deflate',
'Host': 'huoche.alitrip.com',
'Content-Length': '73',
'Connection': 'Keep-Alive',
'Cache-Control': 'no-cache'
}
post_data = {
'h_u_s': base64.b64encode(h_u_s),
'h_u_n': thomas_username,
'h_u_p': base64.b64encode(thomas_password)
}
index_url = 'https://huoche.alitrip.com/index.htm'
index_response = self.session_obj.post(index_url, headers=headers, data=post_data)
logger.info(u"登陆成功,等待下载文件...")
def download_file(self,thomas_username,thomas_password,args):
for _ in xrange(3):
try:
self.login_thomas(thomas_username,thomas_password)
break
except Exception as e:
logger.error(e)
continue
# 处理时间
all_time = self.date_time_handle(args)
if not all_time:
logger.error(u"日期格式错误!!")
return
for trade_date in all_time:
try:
self.trade_date = trade_date
post_data = {
'orderExportDate': trade_date
}
download_url = 'https://huoche.alitrip.com/orderlistexp.do'
sheet_content = ""
for _ in xrange(3):
try:
# 得到exal文件流
download_response = self.session_obj.post(download_url, data=post_data)
# 打开exal文件
xls_content = xlrd.open_workbook(file_contents=download_response.content)
sheet_content = xls_content.sheets()[0]
logger.info(u"下载文件成功,正在拿取订单号")
break
except Exception as e:
logger.error(u"下载文件超时,正在等待重新登录后下载...")
self.login_thomas(thomas_username, thomas_password)
continue
order_item = []
if not sheet_content:
logger.error(u'下载文件失败,正在重新登录...')
continue
for line_num in range(sheet_content.nrows):
line_item = sheet_content.row_values(line_num)
if line_item[2] and line_item[2] not in order_item:
order_item.append(line_item[2], ) # 订单号 order_no
# 获取到所有订单号
order_item = order_item[1:]
# 根据订单号去拿订单详情
logger.info(u"正在写入数据库")
# 多线程去执行
pool = threadpool.ThreadPool(self.thread_num)
reqs = threadpool.makeRequests(self.create_order_info, order_item)
[pool.putRequest(req) for req in reqs]
pool.wait()
logger.info(u'写入完成,完成时间为:%s'% self.trade_date)
content = self.add_content(len(order_item), self.succ_number, self.fail_number, self.fail_order)
self.send_mail(content=content)
self.succ_number,self.fail_order = 0,0
self.fail_order = []
# self.create_order_info(order_item)
except Exception as e:
logger.error(e)
def date_time_handle(self,args):
all_time = []
if args:
if len(args) == 1:
self.start_date = datetime.datetime.strptime(args[0], "%Y-%m-%d").date()
self.end_date = datetime.datetime.strptime(datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d").date()
elif len(args) == 2:
self.start_date = datetime.datetime.strptime(args[0], "%Y-%m-%d").date()
self.end_date = datetime.datetime.strptime(args[1], "%Y-%m-%d").date()
elif len(args) == 3:
self.start_date = datetime.datetime.strptime(args[0], "%Y-%m-%d").date()
self.end_date = datetime.datetime.strptime(args[1], "%Y-%m-%d").date()
self.thread_num = int(args[2])
else:
logger.error(u"传入参数错误,请重新执行")
return
i = 0
while True:
tomoary = self.start_date + datetime.timedelta(days=i)
trade_date = tomoary.strftime('%Y-%m-%d')
all_time.append(trade_date)
i += 1
if tomoary == self.end_date:
break
else:
today = datetime.datetime.now()
yesterday = today + datetime.timedelta(days=-1)
trade_date = yesterday.strftime('%Y-%m-%d')
all_time.append(trade_date)
return all_time
def create_order_info(self, order):
platform_obj = Platform()
order_info = platform_obj.get_order(order)
if not order_info:
self.fail_order.append(order)
self.fail_number += 1
logger.error('获取订单号:[%s]失败'%order)
return
try:
# 插入order表
if TbTomasOrder.objects.filter(order_no=order).exists():
logger.error('订单号:[%s]已经存在于TbTomasOrder'%order)
self.fail_order.append(order)
self.fail_number += 1
return
else:
insert_order(order_info,order,self.trade_date)
self.succ_number += 1
# 插入ticket表
insert_ticket(order_info,order,self.trade_date)
# 插入联系人
if TbTomasLinkman.objects.filter(order_no=order).exists():
logger.error('订单号:[%s]已经存在于TbTomasLinkman'%order)
else:
insert_linkman(order_info,order,self.trade_date)
# 插入epay表
if TbTomasEpay.objects.filter(order_no=order).exists():
logger.error('订单号:[%s]已经存在于TbTomasEpay'%order)
else:
insert_epay(order_info,order,self.trade_date)
except Exception as e:
logger.error(e)
self.fail_number +=1
def add_content(self,total,succ_number,fail_number,fail_order):
content = u'''
<h3>托马斯导入订单报表</h3>
<div class="col-xs-12">
<table border="1" cellpadding="3" cellspacing="1">
<tr>
<td>日期</td>
<td>总单数</td>
<td>成功单数</td>
<td>失败单数</td>
<td>失败订单号</td>
</tr>
<tr>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
</table>
</div>
'''%(datetime.datetime.now().strftime("%Y-%m-%d %H:%M"),total,succ_number,fail_number,fail_order)
return content
def send_mail(self, content):
time_target = self.trade_date
subject = u'托马斯数据抓取邮件 %s' % (time_target)
logger.info(u'准备发送邮件....%s', subject)
mail_address = settings.mail_address_thomas
to_addr = []
if isinstance(mail_address, list):
to_addr += mail_address
elif isinstance(mail_address, str):
to_addr.append(mail_address)
logger.debug(to_addr)
from_email = settings.DEFAULT_FROM_EMAIL
msg = EmailMultiAlternatives(subject, 'result', from_email, to_addr)
msg.attach_alternative(content, "text/html")
flag = msg.send()
if flag:
logger.info(u'%s发送成功', subject)
else:
logger.error(u'%s发送失败', subject)
return
def run(self, username,passwd,args):
# 登陆托马斯后台
for _ in xrange(3):
try:
self.download_file(username,passwd,args)
break
except Exception as e:
logger.error(e)
continue
def re_search(regex, subject):
subject = str(subject)
obj = re.compile(regex)
match = obj.search(subject)
if match:
result = match.group(1)
else:
result = ''
return result
def main():
username = base64.b64decode(settings.THOMAS_USERNAME)
passwd = base64.b64decode(settings.THOMAS_PASSWORD)
args = sys.argv[1:] if sys.argv[1:] else ""
TbTomas().run(username,passwd,args)
if __name__ == "__main__":
main()
thread_code