说明:
使用python编程结合多线程技术,将已经在python文件中的数据批量写入到数据库,便于数据关系结构化管理。
环境配置:
certifi==2019.6.16
chardet==3.0.4
idna==2.8
PyMySQL==0.9.3
requests==2.22.0
urllib3==1.25.3
将所需要的环境保存到文本中: pip freeze -> 1.txt
创建数据库表:
CREATE TABLE tax_web(
id int(10) PRIMARY KEY AUTO_INCREMENT,
tax_name VARCHAR(64),
tax_area_code VARCHAR(32),
request_url VARCHAR(255),
response_time VARCHAR(32) DEFAULT NULL,
create_time datetime DEFAULT NOW(),
update_time datetime DEFAULT NULL,
is_delete int(1) DEFAULT 0
);
查询已经知道的表在哪个数据库里面:
查询表中的数据(运行完代码后才会有数据):
select * from tax_web;
要写入的文本(Constant.py):
ALL_LOGIN_URL = {
'内蒙古': 'https://etax.neimenggu.chinatax.gov.cn/login-web/index.html?v=20190701105613#/',
'甘肃': 'https://etax.gansu.chinatax.gov.cn/bszm-web/apps/views/beforeLogin/indexBefore/pageIndex.html',
'陕西': 'https://etax.shaanxi.chinatax.gov.cn/xxmh/html/index.html',
'青岛': 'https://etax.qingdao.chinatax.gov.cn:7553/sso/login?service=https%3A%2F%2Fetax.qingdao.chinatax.gov.cn%2Fportal%2F',
'江苏': 'https://etax.jiangsu.chinatax.gov.cn/sso/login',
'浙江': 'https://etax.zhejiang.chinatax.gov.cn/zjgfdzswj/main/index.html',
'河南': 'https://dzswj.ha-n-tax.gov.cn/web/dzswj/ythclient/mh.htm',
'河北': 'https://etax.hebei.chinatax.gov.cn/bszm-web/apps/views-zj/index/index.html',
'广东': 'https://www.etax-gd.gov.cn/xxmh/html/index.html',
'上海': 'https://etax.shanghai.chinatax.gov.cn/wszx-web/bszm/apps/views/beforeLogin/indexBefore/pageIndex.html#/',
'北京': 'https://etax.beijing.chinatax.gov.cn/xxmh/html/index.html',
'辽宁': 'https://etax.liaoning.chinatax.gov.cn/sword?ctrl=LoginCtrlTmp_logout',
'安徽': 'https://etax.anhui.chinatax.gov.cn/',
'江西': 'https://etax.jiangxi.chinatax.gov.cn/etax/jsp/index.jsp',
'重庆': 'https://etax.chongqing.chinatax.gov.cn/',
'湖南': 'https://etax.hunan.chinatax.gov.cn/wsbs/toLogin.do',
'四川': 'https://etax.sichuan.chinatax.gov.cn/home/',
'宁波': 'https://etax.ningbo.chinatax.gov.cn/nbdzswj-web/apps/views/beforeLogin/indexBefore/pageIndex.html#/?_t=1563945395113',
'山西': 'https://etax.shanxi.chinatax.gov.cn/login',
'山东': 'http://218.57.142.38:8080/',
'福建': '',
'天津': 'https://wsswj.tjsat.gov.cn/apps/view/login.html',
'吉林': 'https://etax.jilin.chinatax.gov.cn:10812/sword?ctrl=LoginCtrlTmp_logout',
'黑龙江': 'https://www.hljetax.gov.cn/',
'湖北': 'https://wsswj.hb-n-tax.gov.cn/portal/',
'广西': 'https://etax.guangxi.chinatax.gov.cn:9723/web/dzswj/ythclient/mh.html',
'海南': 'https://etax.hainan.chinatax.gov.cn/sword?ctrl=LoginCtrlTmp_logout',
'贵州': 'https://etax.guizhou.chinatax.gov.cn/xxmh/html/index.html',
'云南': 'https://etax.yunnan.chinatax.gov.cn/zjgfdzswj/main/index.html',
'西藏': 'https://etax.xizang.chinatax.gov.cn:8443/sword?ctrl=DZBSXTKJ027HomePagelCtrl_welcome',
'青海': 'https://etax.qinghai.chinatax.gov.cn/xxmh/html/index.html',
'宁夏': 'https://etax.ningxia.chinatax.gov.cn/sword?ctrl=LoginCtrlTmp_logout',
'新疆': 'https://etax.xinjiang.chinatax.gov.cn/wszx-web/bszm/apps/views/beforeLogin/indexBefore/pageIndex.html',
}
ALL_TAX_NAME = [
'内蒙古',
'甘肃',
'陕西',
'青岛',
'江苏',
'浙江',
'河南',
'河北',
'广东',
'上海',
'北京',
'辽宁',
'安徽',
'江西',
'重庆',
'湖南',
'四川',
'宁波',
'山西',
'山东',
'福建',
'天津',
'吉林',
'黑龙江',
'湖北',
'广西',
'海南',
'贵州',
'云南',
'西藏',
'青海',
'宁夏',
'新疆',
]
ALL_TAX_AREA_CODE = {
'内蒙古': '15',
'甘肃': '62',
'陕西': '61',
'青岛': '1',
'江苏': '32',
'浙江': '33',
'河南': '41',
'河北': '13',
'广东': '44',
'广西': '45',
'上海': '31',
'北京': '11',
'辽宁': '21',
'安徽': '34',
'江西': '36',
'重庆': '2',
'湖南': '43',
'四川': '51',
'宁波': '66',
'山西': '14',
'山东': '37',
'福建': '35',
'天津': '12',
'吉林': '22',
'黑龙江': '23',
'湖北': '42',
'海南': '46',
'贵州': '52',
'云南': '53',
'西藏': '54',
'青海': '63',
'宁夏': '64',
'新疆': '65',
}
# 数据库配置
DB_CONFIG = {
'host': '127.0.0.1',
'port': 3306,
'user': 'root',
'passwd': 'root',
'db': 'face_recognition',
'charset': 'utf8'
}
运行的代码(check_url.py):
import datetime
import pymysql
import requests
import threading
from threading import Lock
import queue
import Constant
# 全局锁
g_lock = Lock()
def fetch_web_data(url,timeout=10):
try:
r = requests.get(url,timeout=timeout)
response_time = r.elapsed.total_seconds()
return str(response_time)
except Exception as e:
return None
class FetchListThread(threading.Thread):
def __init__(self, mq):
threading.Thread.__init__(self)
self.__mq = mq
def run(self):
'''
获取各省市的url,保存到mq
:return:
'''
[self.__mq.put(Constant.ALL_LOGIN_URL[tax_name]) for tax_name in Constant.ALL_TAX_NAME]
class IPCheckThread(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.__queue = queue
def run(self):
while True:
try:
login_url = self.__queue.get(timeout=10)
except Exception as e:
return False
data = fetch_web_data(login_url, timeout=10)
print(f'当前url==》{login_url}响应时长为==》{data}')
g_lock.acquire()
self.sql_insert(login_url, data)
g_lock.release()
def sql_insert(self, request_url, response_time):
'''
批量插入数据库
:return:
'''
for tax_nam, login_url in Constant.ALL_LOGIN_URL.items():
if login_url == request_url:
tax_area_code = Constant.ALL_TAX_AREA_CODE[tax_nam]
update_time = datetime.datetime.now()
conn = pymysql.connect(host=Constant.DB_CONFIG['host'],
port=Constant.DB_CONFIG['port'],
user=Constant.DB_CONFIG['user'],
passwd=Constant.DB_CONFIG['passwd'],
charset=Constant.DB_CONFIG['charset'],
db=Constant.DB_CONFIG['db'])
cursor = conn.cursor()
try:
sql_select = "SELECT tax_name From tax_web where tax_name = %s"
cursor.execute(sql_select, tax_nam)
results = cursor.fetchall()
conn.commit()
# 如果查询到该条数据,执行修改操作,否则执行添加操作
if results:
sql = "update tax_web set response_time = %s, update_time = %s where tax_name = %s"
# 开启事物
conn.begin()
cursor.execute(sql, (response_time, update_time, tax_nam))
else:
sql = "INSERT INTO tax_web (tax_name, tax_area_code, request_url, response_time, update_time, is_delete) VALUES (%s,%s,%s,%s,%s,%s)"
conn.begin()
cursor.execute(sql, (tax_nam, tax_area_code, request_url, response_time, update_time, 0))
# 提交
conn.commit()
except Exception as e:
conn.rollback()
print(str(e) + tax_nam + ":" + request_url)
# 报错事务回滚
finally:
# 关闭连接
cursor.close()
conn.close()
def process():
# 定时器构造函数主要有2个参数,第一个参数为时间,第二个参数为函数名
timer = threading.Timer(60*60*60, process) # 1h调用一次函数
timer.start()
mq = queue.Queue()
fth = FetchListThread(mq)
thread_num = 10
thread_list = []
for i in range(thread_num):
t = IPCheckThread(mq)
thread_list.append(t)
fth.start()
[th.start() for th in thread_list]
fth.join()
[th.join() for th in thread_list]
print('all work has done')
if __name__ == '__main__':
process()
'''
sql语句:
CREATE TABLE tax_web(
id int(10) PRIMARY KEY AUTO_INCREMENT,
tax_name VARCHAR(64),
tax_area_code VARCHAR(32),
request_url VARCHAR(255),
response_time VARCHAR(32) DEFAULT NULL,
create_time datetime DEFAULT NOW(),
update_time datetime DEFAULT NULL,
is_delete int(1) DEFAULT 0
)
'''
代码运行结果:
项目下载地址:https://download.csdn.net/download/mzl_18353516147/87517478