背景
公司需要爬取指定网站的产品数据。但是个人对python的多进程和协程不是特别熟悉。所以,想通过php异步协程,发起爬取url请求控制python爬虫脚本,达到分布式爬取的效果。
准备
- 1.准备一个mongodb数据库用于存放爬取数据
- 2.引入flask包,方便php通过调用url发起请求控制脚本
- 3.引入selenium、BeautifulSoup4、webdriver等python包
- 4.使用php的swoole异步协程发送url请求
python爬虫脚本
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
import time
from flask import Flask,jsonify,request
from pymongo import MongoClient
from datetime import datetime
from gevent import pywsgi
import sys
import requests
import re
from tornado.httpserver import HTTPServer
from tornado.wsgi import WSGIContainer
from tornado.ioloop import IOLoop
#创建一个服务,赋值给APP
app = Flask(__name__)
@app.route('/get_ic_product_list',methods=['post']) #指定接口访问的路径,支持什么请求方式get,post
def get_ic_product_list():
# url = 'https://www.ic.com/psen/ClassList2.aspx?id=3'
# url = 'https://www.ic.com/ClassList2.aspx?id=3'
chrome_options = webdriver.ChromeOptions()
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
chrome_options.add_argument('--ignore-certificate-errors')
browser = webdriver.Chrome()
conn = MongoClient('192.168.0.143', 27017)
try:
uri = request.form.get('url')
# 是否按照零库存处理 yes-是 no-否
isContinue = request.form.get('is_continue_stock', 'yes')
# uri = sys.argv[1]
# isContinue = sys.argv[2]
if uri == '':
return jsonify({'status': 0, 'msg': 'uri is null'})
browser.get(uri)
browser.implicitly_wait(3)
time.sleep(2)
# 连接mongo
database = conn.ic
collection = database.ic_product
# 时间格式化
nowtime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
spiderTime = datetime.now().strftime('%Y%m%d')
spidertime = int(spiderTime)
# 总页数
pageTag = browser.find_element(By.CLASS_NAME, 'P_Page')
total_page = pageTag.text.strip().split('/')[1]
# 分类列表,不按照stock=0处理
cateMap = [
'21', '43', '175','179','184','241','250','268','306','317','325','343','397','394','409','467','547','556','606','638','657','678','1150','1158','2315','2383'
]
if total_page:
t_page = int(total_page)
if t_page > 20:
totalPage = 55
else:
totalPage = t_page
else:
totalPage = 20
stop = 0
soup = BeautifulSoup(browser.page_source, 'html.parser')
tables = soup.findAll(name="table", attrs={"class": "prilist"})
if len(tables) > 0:
for table in tables:
# 阶梯价
price_tb = table.findAll('tr')
price = [tb.text.strip() for tb in price_tb]
# 行数据
cols = table.parent.parent
item = [col.text.strip() for col in cols]
time.sleep(2)
print('==========item==========')
print(item)
# 库存为0,终止程序
stock = item[11]
if stock == '-':
stop += 1
continue
pos = item[3].find('Promotion')
if pos != -1:
sku = str(item[3].replace('(Promotion)', ''))
else:
sku = item[3]
# 格式化数据
collection.insert_one({
'datasheet': str(item[0]),
'img_url': str(item[1]),
'productDescEn': str(item[2]),
'part': sku,
'manufacturer': str(item[5]),
'description': str(item[7]),
'pdfLinkUrl': str(item[10]),
'availability': str(item[11]),
'price': price,
'created_at': nowtime,
'spider_time': spidertime,
'sync_time': 20010101,
'updated_at': '2001-01-01 00:00:00',
})
# 接口请求在click
for i in range(totalPage):
print('==========i==========')
print(i)
browser.find_element(By.XPATH, './/div[@id="Pager1"]/a[9]').click()
soup = BeautifulSoup(browser.page_source, 'html.parser')
tables = soup.findAll(name="table", attrs={"class": "prilist"})
print('==========stop==========')
print(stop)
if stop>10 and isContinue=='yes':
break
if len(tables) < 1:
return jsonify({'status': 0, 'msg': 'product list empty'})
print('==========tables==========')
print(tables)
mongoData = []
for table in tables:
# 阶梯价
price_tb = table.findAll('tr')
price = [tb.text.strip() for tb in price_tb]
# 行数据
cols = table.parent.parent
item = [col.text.strip() for col in cols]
time.sleep(2)
print('==========item==========')
print(item)
# 库存为0,终止程序
stock = item[11]
if stock == '-':
stop += 1
continue
pos = item[3].find('Promotion')
if pos != -1:
sku = str(item[3].replace('(Promotion)', ''))
else:
sku = item[3]
# 格式化数据
mongoData.append({
'datasheet': str(item[0]),
'img_url': str(item[1]),
'productDescEn': str(item[2]),
'part': sku,
'manufacturer': str(item[5]),
'description': str(item[7]),
'pdfLinkUrl': str(item[10]),
'availability': str(item[11]),
'price': price,
'created_at': nowtime,
'spider_time': spidertime,
'sync_time': 20010101,
'updated_at': '2001-01-01 00:00:00',
})
# 插入mongo
mongoData and collection.insert_many(mongoData)
print('================mongoData================')
print(mongoData)
del price, tables
time.sleep(5)
return jsonify({'status': 1,'msg': 'ok'})
except Exception as error:
return jsonify({'status':0,'msg':error})
finally:
conn.close()
browser.close()
if __name__ == '__main__':
app.run(host='0.0.0.0',port=8005,debug=True) # 启动服务器
# Tornado启动服务
# s = HTTPServer(WSGIContainer(app))
# s.listen(8005) # 监听 8080 端口
# IOLoop.current().start()
php协程调用curl请求封装
在laravel 框架下创建command服务
此处有用到redis队列的,有不熟悉的请阅读redis相关资料
<?php
/**
* 爬取IC任务
*/
namespace App\Console\Commands;
use App\Models\IcProductCategoryModel;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\Redis;
use Swoole\Timer;
class SpiderIcProductList extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'spider:ic_product';
/**
* The console command description.
*
* @var string
*/
protected $description = '每天定时爬取IC网站数据';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
$this->getProduct();
return 0;
}
public function getProduct()
{
/**
* ic product_num> 0, 分类约345个
* 目前15min【900 * 1000】爬取10个分类,一天能爬取960个分类,一天能爬2次
*/
Timer::tick(60 * 15 * 1000,function (){
$categoryModel = new IcProductCategoryModel();
$key = $categoryModel::getCategoryQueue();
echo 'time:' .date('H:i:s') .space();
if (Redis::lLen($key) > 0) {
$data = Redis::lRange($key,0,9);
foreach ($data as $categoryId){
go(function () use ($categoryId){
$client = new \Swoole\Client(SWOOLE_SOCK_TCP);
// 尝试与指定 TCP 服务端建立连接(IP和端口号需要与服务端保持一致,超时时间为0.5秒)
if ($client->connect("127.0.0.1", 9505, 0.5)) {
// 建立连接后发送内容
$data = [
'service' => 'RemoteApiSpider',
'method' => 'getProduct',
'param' => [
'category_id' => $categoryId,
'platform' => PLATFORM_IC,
],
];
$sendData = jsonE($data);
echo 'send=' .$sendData .' ,'. date('H:i:s') .space();
$client->send($sendData);
// 打印接收到的消息
echo $client->recv();
// 关闭连接
$client->close();
} else {
echo "connect failed.";
}
});
}
Redis::lTrim($categoryModel::getCategoryQueue(),10,-1);
unset($data);
}
});
echo 'spider product end time:' .date('H:i:s') .space();
}
}
创建服务端Server
<?php
namespace App\Console\Commands;
use App\Services\RemoteApiSpider;
use App\Services\TcpService;
use App\Services\TestService;
use Swoole\Database\MysqliConfig;
use Swoole\Database\MysqliPool;
use Swoole\Server;
class Swoole extends Base
{
protected $ws;
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'swoole {action} {--d}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'swoole service command';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
global $argv;
$action = $this->argument('action');
$argv[0] = 'wk';
$argv[1] = $action;
$argv[2] = $this->option('d') ? '-d' : '';
$this->startServer();
return 0;
}
public function startServer()
{
$server = new \Swoole\Server("127.0.0.1", 9505);
// 设置异步任务的工作进程数量
$log_path = storage_path() . '/logs/server.log';
$server->set(
[
'daemonize' => true,
'log_file' => $log_path,
'task_worker_num' => 50,
]
);
//收到请求时触发
$server->on('receive', function(\Swoole\Server $server, $fd, $from_id, $data) {
//投递异步任务
$task_id = $server->task($data);
echo "异步任务投递成功: id=$task_id , recv={$data} " .his() . space();
$server->send($fd, "task_id[{$task_id}],recv={$data},数据已接收,处理中... " .his() . space());
});
// 处理异步任务
$server->on('Task', array($this, 'onTask'));
// $server->on('task', function (\Swoole\Server $server, $task_id, $from_id, $data) {
// echo "新的待处理异步任务[id=$task_id] ".his() . space();
// /**
// $data = ['data' => [],'code' => '200','msg' => 'false']
// */
// $service = new RemoteApiSpider();
// $service->setDebug(true);
// if($data && is_string($data)){
// $data = jsonD($data);
// }
// $ret = false;
// if($data['code']==200 && $data['msg'] == 'ok'){
// if(isset($data['data']['category_id']) && $data['data']['category_id']){
// $categoryId = $data['data']['category_id'];
// $ret = $service->getProduct($categoryId);
// }
// }
// // todo 处理异步任务
// // 返回任务执行的结果
// $msg = "task_id[{$task_id}] ,category_id:" .$data . " ,ret:".$ret . " ," . his() . space();
// $server->finish($msg);
// });
// 处理异步任务的结果
$server->on('finish', function (\Swoole\Server $server, $task_id, $data) {
echo "异步任务[$task_id] 处理完成: $data ". his() . space();
});
$server->start();
}
public function process()
{
$server = new TcpService();
// 定义连接建立回调函数
$server->onConnect = function ($conn) {
echo "onConnect -- accepted " . stream_socket_get_name($conn, true) . "\n";
};
// 定义收到消息回调函数
$server->onMessage = function ($conn, $msg) {
echo "onMessage --" . $msg . "\n";
$service = new RemoteApiSpider();
$service->setDebug(true);
$content = $service->getProduct($msg);
fwrite($conn, "received " . $msg . "\n");
};
// 定义连接关闭回调函数
$server->onClose = function ($conn) {
echo "onClose --" . stream_socket_get_name($conn, true) . "\n";
};
// 启动服务器主进程
$server->run();
}
public function onTask($serv, $task_id, $from_id, $data)
{
//传类名、方法、参数实现公共使用
try {
if (!$data) {
throw new \Exception(' task_id=' . $task_id . ',暂无需要处理的task任务', 400);
}
if (is_string($data)) {
$data = jsonD($data);
}
echo 'recv: ' . jsonE($data) . space();
$className = "App\Services\\{$data['service']}";
if (!class_exists($className)) {
throw new \Exception(' task_id=' . $task_id . ',未找到服务类名', 401);
}
$class = new $className;
$func = $data['method'];
if(isset($data['param'])){
$ret = $class->$func($data['param']);
}else{
$ret = $class->$func();
}
return jsonE($ret);
} catch (\Exception $e) {
$msg = ' task_id=' . $task_id . ' ,error: ' . $e->getMessage();
echo jsonE($msg) . space();
}
$serv->finish($msg);
//每分钟执行一下service下的start方法,发送二维数组请求给tcp服务端,服务端调用service下的process方法实现业务逻辑
//服务端拿到数据后,处理后结果格式:['code'=> 200,'msg'=> 'ok','data' => maxid]
// $data = [
// 'service' => 'TestService',
// 'method' => 'process',
// 'param' => ['debug'=> true,'send' =>'sucess'],
// ];
//暂时留空 2.3部分会完善
}
}
curl发送异步请求调用python脚本
<?php
/**
* 爬取接口封装
*/
namespace App\Services;
use App\Library\Oneyac\Oneyac;
use App\Models\HqchipProductCategoryModel;
use App\Models\HqchipProductMongoModel;
use App\Models\IcProductCategoryModel;
use App\Models\IcProductMongoModel;
use App\Models\LcscProductCategoryModel;
use App\Models\LcscProductMongoModel;
use App\Models\OneyacProductCategoryModel;
use App\Models\OneyacProductMongoModel;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
class RemoteApiSpider extends Base
{
/**
* 根据分类循环分页爬取产品列表
* @param $data
* @return int
* @throws \GuzzleHttp\Exception\GuzzleException
*/
public function getProduct($data)
{
$cateId = 0;
$categoryId = $data['category_id'] ?? $cateId;
$platform = $data['platform'] ?? PLATFORM_LCSC;
$platform = strtolower(trim($platform));
$categoryId = (int)$categoryId;
switch ($platform) {
case PLATFORM_IC:
$cateModel = new IcProductCategoryModel();
Redis::hDel($cateModel::getCategoryListKey(),$categoryId);
//TODO 调用python API接口爬取产品数据
//s2 20分钟处理4个 与 s1 20分钟处理6个
$header = [];
$header['Content-Type'] = 'application/json;charset=UTF-8';
$query_param['page'] = 1;
$res = $this->getIcProduct($categoryId, $platform,$query_param, $header);
$res && $cateId = $categoryId;
break;
}
return $cateId;
}
/**
* 获取ic产品信息
* @param $categoryId
* @param $platform
* @param $query_param
* @param $header
* @return bool
* @throws \GuzzleHttp\Exception\GuzzleException
*/
public function getIcProduct($categoryId, $platform, $query_param, $header)
{
$module = 'product_list';
$this->getPlatformConf(PLATFORM_IC,$module);
$msg = ' ' . $platform . '_' . $module . ' ';
echo $msg. ' ,category_id:' . $categoryId . ' spidering' .space();
try {
//$uri = 'https://www.ic.com/psen/ClassList2.aspx?id=3';
$uri = $this->base_uri . $this->uri . '?id=' . $categoryId;
# todo 3的倍数推入s2,其他推入s1
$config = config('ic');
# 重点:此处可以部署到不同服务器进行爬取,我这里只部署了2台服务,s1与s2
if(($categoryId%3)===0){//s2
$curl = $config['spider_uri']['s2'] ?: 'http://192.168.0.124:8005/get_ic_product_list';
}else{//s1
$curl = $config['spider_uri']['s1'] ?: 'http://192.168.0.127:8005/get_ic_product_list';
}
$response = curlPost($curl,['url' => $uri,'is_continue_stock' => 'yes']);
$flag = false;
if(isset($response) && !empty($response)){
$response = is_array($response) ? $response : jsonD($response);
$flag = ($response['msg']=='ok' && $response['status']==1);
}
if ($flag) {
$this->debug && Log::info($msg . 'category_id:' . $categoryId . ' ,mongo-batch-insert:' . $flag);
} else {
throw new \Exception($msg.'ic爬虫异常,请检查python爬虫');
}
unset($response);
return $flag;
}catch (\Exception $e){
Log::error($msg .'爬取产品异常,异常原因:'.$e->getMessage());
return false;
}
}
nginx配置
server {
listen 8080;
server_name spidertest.test *.spidertest.test;
root "D:/WWW/spidertest/";
location / {
proxy_pass http://127.0.0.1:8005;
}
charset utf-8;
}
- 注:
1.在不同服务器上部署python爬虫脚本,并配置nginx服务。配置完成后安装下面命令启动脚本。接下来开启异步多线程执行python爬虫脚本即可。
2.记得启动swoole服务。命令如下:
php artisan swoole start
运行
python脚本启动
postman测试:
php启动协程命令:
爬取回来的数据结果: