持久化
# 爬回来,解析完了,想存储,有两种方案
## 方案一:一般不用 parse必须有return值,必须是列表套字典形式--->使用命令,可以保存到json格式中,csv中
scrapy crawl cnblogs -o cnbogs.json #以json形式保存
scrapy crawl cnblogs -o cnbogs.csv #以csv形式保存
#### 方案二: 我们用的,使用pipline存储---》可以存到多个位置
-第一步:在item.py中写一个类
class FirstscrapyItem(scrapy.Item):
title = scrapy.Field()
author_img = scrapy.Field()
author_name = scrapy.Field()
desc = scrapy.Field()
url = scrapy.Field()
# 博客文章内容,但是暂时没有
content = scrapy.Field()
-第二步:在pipline.py中写代码,写一个类:open_spide,close_spider,process_item
-open_spide:开启爬虫会触发
-close_spider:爬完会触发
-process_ite:每次要保存一个对象会触发
class FirstscrapyFilePipeline:
def open_spider(self, spider):
print('我开了')
self.f=open('a.txt','w',encoding='utf-8')
def close_spider(self, spider):
print('我关了')
self.f.close()
# 这个很重要
def process_item(self, item, spider):
self.f.write(item['title']+'\n')
return item
-第三步:配置文件配置
ITEM_PIPELINES = {
"firstscrapy.pipelines.FirstscrapyFilePipeline": 300, # 数字越小,优先级越高
}
-第四步:在解析方法parse中yield item对象
全站爬取cnblgos
# 继续爬取下一页
# 爬取文章详情
# Request创建:在parse中,for循环中,创建Request对象时,传入meta
# item对象一定要在for循环中创建,否则,当前页面都用同一个item导致同一页数据都一样
yield Request(url=url, callback=self.detail_parse,meta={'item':item})
# 在parser_detail中取出来
item=response.meta.get('item')
# Response对象:detail_parse中,通过response取出meta取出item,把文章详情写入
def parser_detail(self,response):
# content = response.css('#cnblogs_post_body').extract_first()
item=response.meta.get('item')
content=str(response.xpath('//div[@id="cnblogs_post_body"]').extract_first())
item['content']=content
yield item
cnblogs.py
import scrapy
from scrapy import Request
# from scrapy.http.request import Request
from mysfirstscrapy.items import CnblogsItem
# 爬虫类,继承了scrapy.Spider
class CnblogsSpider(scrapy.Spider):
name = 'cnblogs' # 爬虫名字
allowed_domains = ['www.cnblogs.com'] # 允许爬取的域---》
start_urls = ['http://www.cnblogs.com/'] # 开始爬取的地址
def parse(self, response):
# item = CnblogsItem() #会有问题,是个引用类型
article_list = response.xpath('//article[contains(@class,"post-item")]') # 列表中放对象
print(len(article_list))
for article in article_list:
item = CnblogsItem() #每次新造一个对象
title = article.xpath('.//a/text()').extract_first()
desc = article.xpath('.//p[contains(@class,"post-item-summary")]/text()').extract()
real_desc = desc[0].replace('\n', '').replace(' ', '')
if real_desc:
desc = real_desc
else:
real_desc = desc[1].replace('\n', '').replace(' ', '')
desc = real_desc
author_img = article.xpath('.//p//img/@src').extract_first()
author_name = article.xpath('.//footer//span/text()').extract_first()
url = article.xpath('.//div[contains(@class,"post-item-text")]//a/@href').extract_first()
item['title'] = title
item['desc'] = desc
item['author_img'] = author_img
item['author_name'] = author_name
item['url'] = url
yield Request(url=url,callback=self.parser_detail,meta={'item':item}) # 详情
next='https://www.cnblogs.com'+response.xpath('//div[contains(@class,"pager")]/a[last()]/@href').extract_first()
print(next) # 拿到地址,继续爬取,组装成一个Request对象
#callback 参数是控制返回response后使用的解析方法
yield Request(url=next,callback=self.parse) # 下一页地址,继续爬取,解析还是用parse
def parser_detail(self,response):
# content = response.css('#cnblogs_post_body').extract_first()
item=response.meta.get('item')
content=str(response.xpath('//div[@id="cnblogs_post_body"]').extract_first())
item['content']=content
yield item
items.py
# django模型类
class CnblogsItem(scrapy.Item):
# title, desc, author_img, author_name, url
title = scrapy.Field()
desc = scrapy.Field()
author_img = scrapy.Field()
author_name = scrapy.Field()
url = scrapy.Field()
#------文章详情,暂时没有-----
content = scrapy.Field()
piplines.py
class MyCnblogsMySqlPipeline:
def open_spider(self, spider):
self.count=0
print('我开了')
self.conn = pymysql.connect(
user='root', # The first four arguments is based on DB-API 2.0 recommendation.
password="123",
host='127.0.0.1',
port=3306,
database='cnblogs')
self.cursor = self.conn.cursor()
def close_spider(self, spider):
print('我关了')
self.cursor.close()
self.conn.close()
def process_item(self, item, spider):
print('我来了-----')
self.count+=1
print(self.count)
sql='insert into article (title,url,`desc`,author_name,author_img,content) values (%s,%s,%s,%s,%s,%s)'
self.cursor.execute(sql,args=[item['title'],item['url'],item['desc'],item['author_name'],item['author_img'],item['content']])
self.conn.commit()
return item
settings.py
ITEM_PIPELINES = {
# 'mysfirstscrapy.pipelines.MyCnblogsPipeline': 300,
'mysfirstscrapy.pipelines.MyCnblogsMySqlPipeline': 301,
}
爬虫中间件和下载中间件
# 爬虫中间件:爬虫和引擎之间
-用的很少,了解即可
# 下载中间件:引擎和下载器之间
-用的多,能干啥?
-进来request对象
-加代理
-加cookie
-加请求头
-出去response对象
-修改响应对象,最后进入到爬虫的parser中就是修改后的response
# 爬虫中间件 (了解) middlewares.py
class MysfirstscrapySpiderMiddleware:
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
def process_spider_input(self, response, spider):
return None
def process_spider_output(self, response, result, spider):
for i in result:
yield i
def process_spider_exception(self, response, exception, spider):
pass
def process_start_requests(self, start_requests, spider):
for r in start_requests:
yield r
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
# 下载中间件
class MysfirstscrapyDownloaderMiddleware:
@classmethod
def from_crawler(cls, crawler):
# This method is used by Scrapy to create your spiders.
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
return s
# 请求来了执行
def process_request(self, request, spider):
# 返回值可以是如下
# return None:继续处理本次请求,执行执行下一个中间件的process_request
#return Response:执行当前中间件的process_response回去,进入到引擎,被调度,进入第6步,返回到爬虫的解析方法中
# return a Request:直接返回,给引擎,被调度,进入第2步,进入调度器等待下次被调度爬取
# raise IgnoreRequest:执行 process_exception
return None
# 请求走了
def process_response(self, request, response, spider):
# 返回如下
# return Response :继续往后走,进入到引擎,被调度到爬虫中解析
# return Request :进入到引擎,被调度进调度器
# - or raise IgnoreRequest:会执行process_exception
return response
def process_exception(self, request, exception, spider):
# Called when a download handler or a process_request()
# (from other downloader middleware) raises an exception.
# Must either:
# - return None: continue processing this exception
# - return a Response object: stops process_exception() chain
# - return a Request object: stops process_exception() chain
pass
def spider_opened(self, spider):
spider.logger.info('Spider opened: %s' % spider.name)
# 在配置文件中配置
scrapy加代理,cookie,header
加代理
# 在下载中间件的def process_request(self, request, spider):写代码
# 第一步:
-在下载中间件写process_request方法
def get_proxy(self):
import requests
res = requests.get('http://127.0.0.1:5010/get/').json()
if res.get('https'):
return 'https://' + res.get('proxy')
else:
return 'http://' + res.get('proxy')
def process_request(self, request, spider):
request.meta['proxy'] = self.get_proxy()
return None
# 第二步:代理可能不能用,会触发process_exception,在里面写
def process_exception(self, request, exception, spider):
print('-----',request.url) # 这个地址没有爬
return request
加cookie,修改请求头,随机生成UserAgent
#### 加cookie
def process_request(self, request, spider):
print(request.cookies)
request.cookies['name']='lqz'
return None
# 修改请求头
def process_request(self, request, spider):
print(request.headers)
request.headers['referer'] = 'http://www.lagou.com'
return None
# 动态生成User-agent使用
def process_request(self, request, spider):
# fake_useragent模块
from fake_useragent import UserAgent
ua = UserAgent()
request.headers['User-Agent']=str(ua.random)
print(request.headers)
return None
scrapy集成selenium
# 使用scrapy默认下载器---》类似于requests模块发送请求,不能执行js,有的页面拿回来数据不完整
# 想在scrapy中集成selenium,获取数据更完整,获取完后,自己组装成 Response对象,就会进爬虫解析,现在解析的是使用selenium拿回来的页面,数据更完整
# 集成selenium 因为有的页面,是执行完js后才渲染完,必须使用selenium去爬取数据才完整
# 保证整个爬虫中,只有一个浏览器器
# 只要爬取 下一页这种地址,使用selenium,爬取详情,继续使用原来的
# 第一步:在爬虫类中写
from selenium import webdriver
class CnblogsSpider(scrapy.Spider):
bro = webdriver.Chrome(executable_path='./chromedriver.exe')
bro.implicitly_wait(10)
def close(spider, reason):
spider.bro.close() #浏览器关掉
# 第二步:在中间件中
def process_request(self, request, spider):
# 爬取下一页这种地址---》用selenium,但是文章详情,就用原来的
if 'sitehome/p' in request.url:
spider.bro.get(request.url)
from scrapy.http.response.html import HtmlResponse
response = HtmlResponse(url=request.url, body=bytes(spider.bro.page_source, encoding='utf-8'))
return response
else:
return None
源码去重规则(布隆过滤器)
# 如果爬取过的地址,就不会再爬了
# 调度器可以去重,研究一下,如何去重的---》使用了集合
# 要爬取的Request对象,在进入到scheduler调度器排队之前,先执行enqueue_request,它如果return False,这个Request就丢弃掉,不爬了----》如何判断这个Request要不要丢弃掉,执行了self.df.request_seen(request),它来决定的-----》RFPDupeFilter类中的方法----》request_seen---》会返回True或False----》如果这个request在集合中,说明爬过了,就return True,如果不在集合中,就加入到集合中,然后返回False
# 调度器源码
from scrapy.core.scheduler import Scheduler
# 这个方法如果return True表示这个request要爬取,如果return False表示这个网址就不爬了(已经爬过了)
def enqueue_request(self, request: Request) -> bool:
# request当次要爬取的地址对象
if self.df.request_seen(request):
# 有的请情况,在爬虫中解析出来的网址,不想爬了,就就可以指定
# yield Request(url=url, callback=self.detail_parse, meta={'item': item},dont_filter=True)
# 如果符合这个条件,表示这个网址已经爬过了
return False
return True
# self.df 去重类 是去重类的对象 RFPDupeFilter
-在配置文件中如果配置了:DUPEFILTER_CLASS = 'scrapy.dupefilters.RFPDupeFilter'表示,使用它作为去重类,按照它的规则做去重
-RFPDupeFilter的request_seen
def request_seen(self, request: Request) -> bool:
# request_fingerprint 生成指纹
fp = self.request_fingerprint(request) #request当次要爬取的地址对象
#判断 fp 在不在集合中,如果在,return True
if fp in self.fingerprints:
return True
#如果不在,加入到集合,return False
self.fingerprints.add(fp)
return False
# 传进来是个request对象,生成的是指纹
-爬取的网址:https://www.cnblogs.com/teach/p/17238610.html?name=lqz&age=19
-和 https://www.cnblogs.com/teach/p/17238610.html?age=19&name=lqz
-它俩是一样的,返回的数据都是一样的,就应该是一条url,就只会爬取一次
-所以 request_fingerprint 就是来把它们做成一样的(核心原理是把查询条件排序,再拼接到后面)
-生成指纹,指纹是什么? 生成的指纹放到集合中去重
-www.cnblogs.com?name=lqz&age=19
-www.cnblogs.com?age=19&name=lqz
-上面的两种地址生成的指纹是一样的
# 测试指纹
from scrapy.utils.request import RequestFingerprinter
from scrapy import Request
fingerprinter = RequestFingerprinter()
request1 = Request(url='http://www.cnblogs.com?name=lqz&age=20')
request2 = Request(url='http://www.cnblogs.com?age=20&name=lqz')
res1 = fingerprinter.fingerprint(request1).hex()
res2 = fingerprinter.fingerprint(request2).hex()
print(res1)
print(res2)
# 集合去重,集合中放
# a一个bytes
# 假设爬了1亿条url,放在内存中,占空间非常大
a6af0a0ffa18a9b2432550e1914361b6bffcff1a
a6af0a0ffa18a9b2432550e191361b6bffc34f1a
# 想一种方式,极小内存实现去重---》布隆过滤器
# 总结:scrapy的去重规则
-根据配置的去重类RFPDupeFilter的request_seen方法,如果返回True,就不爬了,如果返回False就爬
-后期咱们可以使用自己定义的去重类,实现去重
# 更小内存实现去重
-如果是集合:存的数据库越多,占内存空间越大,如果数据量特别大,可以使用布隆过滤器实现去重
# 布隆过滤器:https://zhuanlan.zhihu.com/p/94668361
#bloomfilter:是一个通过多哈希函数映射到一张表的数据结构,能够快速的判断一个元素在一个集合内是否存在,具有很好的空间和时间效率。(典型例子,爬虫url去重)
# 原理: BloomFilter 会开辟一个m位的bitArray(位数组),开始所有数据全部置 0 。当一个元素(www.baidu.com)过来时,能过多个哈希函数(h1,h2,h3....)计算不同的在哈希值,并通过哈希值找到对应的bitArray下标处,将里面的值 0 置为 1 。
# Python中使用布隆过滤器
# 测试布隆过滤器
# 可以自动扩容指定错误率,底层数组如果大于了错误率会自动扩容
# from pybloom_live import ScalableBloomFilter
# bloom = ScalableBloomFilter(initial_capacity=100, error_rate=0.001, mode=ScalableBloomFilter.LARGE_SET_GROWTH)
# url = "www.cnblogs.com"
# url2 = "www.liuqingzheng.top"
# bloom.add(url)
# bloom.add(url2)
# print(url in bloom)
# print(url2 in bloom)
from pybloom_live import BloomFilter
bf = BloomFilter(capacity=10)
url = 'www.baidu.com'
bf.add(url)
bf.add('aaaa')
bf.add('ggg')
bf.add('deww')
bf.add('aerqaaa')
bf.add('ae2rqaaa')
bf.add('aerweqaaa')
bf.add('aerwewqaaa')
bf.add('aerereweqaaa')
bf.add('we')
print(url in bf)
print("wa" in bf)
# 如果有去重的情况,就可以使用集合---》但是集合占的内存空间大,如果到了亿级别的数据量,想一种更小内存占用,而去重的方案----》布隆过滤器
# 布隆过滤器:通过不同的hash函数,加底层数组实现的极小内存去重
# python中如何使用:pybloom_live
-指定错误率
-指定大小
# 使用redis实现布隆过滤器
-编译redis---》把第三方扩展布隆过滤器编译进去,才有这个功能
-https://zhuanlan.zhihu.com/p/94668736
# 重写scrapy的过滤类
分布式爬虫
# 原来scrapy的Scheduler维护的是本机的任务队列(待爬取的地址)+本机的去重队列(放在集合中)---》在本机内存中
# 如果把scrapy项目,部署到多台机器上,多台机器爬取的内容是重复的
# 所以实现分布式爬取的关键就是,找一台专门的主机上运行一个共享的队列比如Redis,
然后重写Scrapy的Scheduler,让新的Scheduler到共享队列存取Request,并且去除重复的Request请求,所以总结下来,实现分布式的关键就是三点:
#1、多台机器共享队列
#2、重写Scheduler,让其无论是去重还是任务都去访问共享队列
#3、为Scheduler定制去重规则(利用redis的集合类型)
# scrapy-redis实现分布式爬虫
-公共的去重
-公共的待爬取地址队列
# 使用步骤
1 把之前爬虫类,继承class CnblogsSpider(RedisSpider):
2 去掉起始爬取的地址,加入一个类属性
redis_key = 'myspider:start_urls' # redis列表的key,后期我们需要手动插入起始地址
3 配置文件中配置
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter" # scrapy redis去重类,使用redis的集合去重
# 不使用原生的调度器了,使用scrapy_redis提供的调度器,它就是使用了redis的列表
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
REDIS_HOST = 'localhost' # 主机名
REDIS_PORT = 6379 # 端口
ITEM_PIPELINES = {
# 'mysfirstscrapy.pipelines.MyCnblogsPipeline': 300,
'mysfirstscrapy.pipelines.MyCnblogsMySqlPipeline': 301,
'scrapy_redis.pipelines.RedisPipeline': 400,
}
# 再不同多台机器上运行scrapy的爬虫,就实现了分布式爬虫