爬虫中使用多进程、多线程的混合方式遇到的数据丢失问题

news2024/11/14 13:48:17

多进程爬虫

项目场景:

网络爬虫项目,主要实现多进程、多线程方式快速缓存网页资源到MongoDB,并解析网页数据,将信息写入到csv文件中。


问题描述

在单独使用多线程的过程中,是没有问题的,比如这个爬虫示例是爬取豆瓣电影排行榜TOP250,解析到csv中数据还是250条,在实现多进程的方式中,主要是通过MongoDB来实现一个队列的效果,多条进程从数据库中取出待解析的链接进行解析,在实现的过程中,发现解析数据是没有问题的,打印到控制台的数据是没有丢失数据的情况,但是在最终写出的csv文件中,数据只有一小部分。
在尝试了国内所有能用的AI之后无果,AI只能对逻辑问题判断,而对一些Runtime问题还是差点意思,好在CSDN有大佬,将问题发布到问答区后,大佬一句话就点醒了我,在此表示感谢。
在这里插入图片描述

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime,timedelta
from multiprocessing.dummy import Pool
import os
import random
import re
import threading
import time
import urllib.parse
import urllib.request
import urllib3
from urllib.parse import urlparse, urlsplit
from urllib.parse import urljoin
import urllib.robotparser
from lxml import html as lhtml
import csv
import pickle
import zlib
from bson.binary import Binary
from pymongo import MongoClient
from zipfile import ZipFile
from io import StringIO
# 多线程爬虫
# 封装MongoDB缓存类
class MongoCache:
    def __init__(self,client=None,expires=timedelta(days=30)):
        if client == None:
            self.client = MongoClient('localhost',27017)
        else:
            self.client = client
        self.db = self.client['cache']
        self.webpage = self.db['webcrawler']
        self.expires = expires
        self.webpage.create_index('timestamp',expireAfterSeconds=expires.total_seconds())

    def __getitem__(self,url):
        '''
            根据url从磁盘提取缓存
        '''
        record = self.webpage.find_one({'_id':url})
        if record:
            return pickle.loads(zlib.decompress(record['result']))
        else:
            raise KeyError(url + "不存在")

    def __setitem__(self,url,result):
        '''
            将数据存入磁盘缓存中
        '''
        record = {'result':Binary(zlib.compress(pickle.dumps(result))),'timestamp':datetime.now()}
        self.webpage.update_one({'_id':url},{'$set':record},upsert=True)
# 将下载功能封装成一个类
class Downloader:
    def __init__(self,delay=5,user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0',proxies=None,request_max=3,cache=None):
        self.throttle = Throttle(delay=delay)
        self.user_agent = user_agent
        self.proxies = proxies
        self.request_max = request_max
        self.cache = cache
        # 定义连接管理池
        self.http = urllib3.PoolManager()
    def __call__(self,url):
        result = None
        if self.cache:
            try:
                result = self.cache[url]
            except KeyError:
                pass
            else:
                if self.request_max > 0 and 500 <= result['code'] < 600:
                    result = None
        if result is None:
            self.throttle.wait(url)
            proxy = random.choice(self.proxies) if self.proxies else None
            headers = {'User-agent':self.user_agent}
            result = self.download(url,headers,self.request_max,proxy)
            if self.cache:
                self.cache[url] = result
        return result['html']
    def download(self,url,headers,request_max,proxy=None):
        print("正在下载, URL==>{}".format(url))
        # 发起GET请求
        request = urllib.request.Request(url,headers=headers)
        response = self.http.request('GET', url,headers=headers)
        # 如果使用代理的话
        if proxy:
            opener = urllib.request.build_opener()
            proxy_params = {urlparse.urlparse(url).scheme:proxy}
            opener.add_handler(urllib.request.ProxyHandler(proxy_params))
            try:
                response = opener.open(request)
                if response.status == 200:
                    html = response.data
                else:
                    print("遇到了错误,状态码是:{}".format(response.status))
                    if request_max >= 0:
                        self.download(url,headers,request_max-1,proxy)
            except Exception as e:
                print("下载遇到了错误,错误代码是==>{}".format(e))
                html = None
                if request_max >=3:
                    html = self.download(url,headers,request_max-1,proxy)
            finally:
                response.release_conn()
            return {'html':html,'code':response.status}
        else:
            # 如果没有选择代理,那就正常请求
            try:
                if response.status == 200:
                    html_file = response.data  # 或者 response.data.decode('utf-8') 如果需要字符串
                    # 在这里处理 htmlfile,比如保存到文件或进行解析等
                    return {'html':html_file,'code':response.status}
                else:
                    print("遇到了错误,状态码是:{}".format(response.status))
                    if request_max >= 0:
                        self.download(url,headers,request_max-1)
            except urllib3.exceptions.HTTPError as e:
                print("遇到了错误,错误代码是==>{}".format(e))
            except Exception as ex:
                print("遇到了错误,错误代码是==>{}".format(ex))
            finally:
                response.release_conn()
# 定义一个scrape_callback类,用于存储解析到的数据
class Scrape_callback:
    def __init__(self):
        self.writer = csv.writer(open('D:/Crawl_Results/downloaded_data.csv','w', encoding='utf-8',newline='',errors='replace'))
        self.fields = ('中文名','外文名','评分','上映时间','国家','导演','时长','类型')
        self.writer.writerow(self.fields)
    def __call__(self,html):
        if not self.writer:
            raise RuntimeError("CSV writer is not initialized. Call open_writer() first.")
        html_string = html.decode("utf-8")
        root = lhtml.fromstring(html_string)
        result_list = []
        try:
            # 解析电影标题
            title_content = root.cssselect("div#content")[0]
            span_title = title_content.cssselect('span[property="v:itemreviewed"]')[0]
            title_text = span_title.text_content().split(" ",1)
            for name in title_text:
                result_list.append(name)
                if len(title_text) == 1:
                    result_list.append('--')
            # 解析电影评分
            rate_span = root.cssselect('strong[property="v:average"]')[0]
            rate_text = rate_span.text_content()
            result_list.append(rate_text)
            # 解析上映国家及日期
            date_span = root.cssselect('span[property="v:initialReleaseDate"]')[0]
            date_text = date_span.text_content()
            parenthesis_index = date_text.find('(')
            if parenthesis_index != -1:
                # 提取日期部分(括号前的所有字符)
                date = date_text[:parenthesis_index]
                # 提取国家部分(括号内及之后的字符,再去除括号)
                country = date_text[parenthesis_index + 1:-1]
            else:
                # 如果没有找到括号,则只有日期部分
                date = date_text
                country = "--"
            result_list.append(date)
            result_list.append(country)
            # 解析导演
            direct_by_a = root.cssselect('a[rel="v:directedBy"]')[0]
            direct_by_text = direct_by_a.text_content()
            result_list.append(direct_by_text)
            # 解析片长
            runtime_span = root.cssselect('span[property="v:runtime"]')[0]
            runtime_text = runtime_span.text_content()
            result_list.append(runtime_text)
            gener_text=''
            # 解析类型
            gener_spans = root.cssselect('span[property="v:genre"]')
            for gener_span in gener_spans:
                gener_text += gener_span.text_content() + '|'
            gener_text = gener_text.rstrip('|')
            result_list.append(gener_text)
            print("{}|{}|{}|{}|{}".format(title_text,rate_text,date,country,direct_by_text,gener_text))
            self.writer.writerow(result_list)
            result_list.clear()  # 清空列表以备下次使用,而不是重新创建
        except IndexError:
            print("未找到指定的元素")
        except Exception as e:
            print(f"处理过程中发生错误: {e}")
    def do_write(self,result_list):
        if not self.writer == None:
            self.writer.writerow(result_list)
        else:
            print("打开文件失败")
    def close_writer(self):
        # 如果writer是外部创建的,则不应在此关闭文件
        # 但在当前上下文中,文件是在这个类中打开的,所以应该在这里关闭
        if self.writer:
            #self.writer.writerow([])  # 写入空行作为结束标记(可选)
            # 注意:在with块外不需要手动关闭文件,它会自动处理
            self.writer = None  # 清除writer引用,帮助垃圾回收
# 定义一个类,用于控制延时
class Throttle:
    '''
    用于控制爬虫访问统一域名资源时的延时
    '''
    # 初始化函数
    def __init__(self,delay):
        self.delay = delay
        self.domains = {}
    # 控制延时
    def wait(self,url):
        # 解析url,获取域名
        domain = urlparse(url).netloc
        # 获取上一次访问的时间
        last_accessed = self.domains.get(domain)
        # 如果设置到延时并且已经访问过了
        if self.delay > 0 and last_accessed is not None:
            # 计算从上次访问到当前时间过去的秒数与规定的延迟时长的差值
            sleep_secs = self.delay - (datetime.now() - last_accessed).seconds
            # 判断距离上次访问的时间间隔是否达到了延迟要求
            if sleep_secs > 0:
                print("正在休眠,将等待{}秒后再次连接".format(sleep_secs))
                # 如果时间还没有达到,就调用time.sleep,进行休眠
                time.sleep(sleep_secs)
        # 更新本次访问的时间
        self.domains[domain] = datetime.now()
# 爬取网页的函数
def threaded_crawler(delay,request_max,seed_url,link_regex,max_deepth=5,max_threads=6,scrape_callback=Scrape_callback(),cache=MongoCache(),proxies=None):
    # 定义一个User_agent列表
    user_agent_list = ['BadCrawler','GoodCrawler']
    # 解析网站的robots.txt
    rp = urllib.robotparser.RobotFileParser()
    rp.set_url(f"{seed_url}/robots.txt")
    rp.read()
    # 定义一个用户当前设置的user_agent
    current_user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0'
    # 只有当默认的火狐这个User-agent被禁,再从user_agent_list中找看还有合适的没
    if not rp.can_fetch(current_user_agent,seed_url):
        # 从列表中找一个网站允许的user_agent
        for user_agent in user_agent_list:
            if rp.can_fetch(user_agent,seed_url):
                current_user_agent = user_agent
                break
            else:
                print("该网站的robots.txt禁止我们访问")
    # 从提供的种子url生成一个待解析的url列表
    crawl_url_queue = [seed_url]
    # 定义一个字典,记录链接和深度,用于判断链接是否已经下载,避免在不同页面中反复横跳
    have_crawl_url_queue = {seed_url:0}
    downloader = Downloader(delay=delay,user_agent=current_user_agent,cache=cache,request_max=request_max,proxies=proxies)

    for item_count in range(1,10):
        current_url = "{}?start={}".format(seed_url,item_count*25)
        crawl_url_queue.append(current_url)
        have_crawl_url_queue[current_url] = 0

    def process_queue():
        current_thread = threading.current_thread()
        thread_name = current_thread.name
        while crawl_url_queue:
            try:
                # 只要列表中有值,则弹出一个url用于解析
                url = crawl_url_queue.pop()
                print("线程{}==>正在处理:{}".format(thread_name,url))
            except IndexError as index_error:
                break
            else:
                # 读取当前要解析url的深度,如果深度超过最大值,则停止
                deepth = have_crawl_url_queue[url]
                if deepth <= max_deepth:
                    # 执行下载
                    html = downloader(url)
                    if not html == None:
                        # 如果有传入提取数据的回调函数,则调用它
                        if scrape_callback:
                            scrape_callback(html)
                        # 从下载到的html网页中递归的获取链接
                        links_from_html = get_links(html)
                        if not links_from_html == None:
                            for link in links_from_html:
                                link = urljoin(seed_url,link)
                                # 判断找到的链接是否符合我们想要的正则表达式
                                if re.match(link_regex,link):
                                    # 如果符合,再判断是否已经下载过了,如果没有下载过,就把它加到待解析的url列表和已下载集合中
                                    if link not in have_crawl_url_queue:
                                        have_crawl_url_queue[link] = deepth + 1
                                        crawl_url_queue.append(link)
    threads = []
    while threads or crawl_url_queue:
        for thread in threads:
            if not thread.is_alive():
                threads.remove(thread)
        while len(threads) < max_threads and crawl_url_queue:
            thread = threading.Thread(target=process_queue)
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)

# 从下载到的html中继续解析连接
def get_links(html):
    webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']',re.IGNORECASE)
    if not html == None:
        html_string = html.decode("utf-8")
        return webpage_regex.findall(html_string)
    else:
        return None

# 测试
seed_url="https://movie.douban.com/top250"
link_regex="^https://(?!music\\.douban\\.com/subject/)movie\\.douban\\.com/subject/(\\d+)/$"
threaded_crawler(5,5,seed_url,link_regex,5)


原因分析:

当多个进程或线程试图同时写入同一个CSV文件时,因为文件I/O操作不是线程安全的,特别是在没有适当锁定机制的情况下,在这个脚本中,虽然使用了锁,但是锁只是锁定了线程间的竞争,多个进程在写入的时候,实际上是存在文件覆盖的情况的;为了解决这个问题,我们可以采用“分而治之”的策略:让每个进程将其结果写入一个独立的CSV文件,然后再合并这些文件。


解决方案:

在调用Scrape_callback()类时,为其传入进程的ID,让每一条进程单独处理一个csv文件,这样就不存在文件覆盖的问题,在解析完所有的文件后,再将这些csv文件合并为一个文件输出。

from datetime import datetime,timedelta
import multiprocessing
import os
import random
import re
import threading
import time
import urllib.parse
import urllib.request
import urllib3
from urllib.parse import urlparse, urlsplit
from urllib.parse import urljoin
import urllib.robotparser
from lxml import html as lhtml
import csv
import pickle
import zlib
from bson.binary import Binary
from pymongo import MongoClient,errors
from zipfile import ZipFile
from io import StringIO
# 多进程
# 封装MongoDB进程队列
class MongoQueue:
    OUTSTANDING,PROCESSING,COMPLETE = range(3)
    def __init__(self,client=None,timeout=300):
        if client == None:
            self.client = MongoClient('localhost',27017)
        else:
            self.client = client
        self.db = self.client['cache']
        self.webpage = self.db['crawler_queue']
        self.timeout = timeout
        self.lock = threading.Lock()

    def __bool__(self):
        record = self.webpage.find_one({'status':{'$ne':self.COMPLETE}})
        if record:
            return True
        else:
            return False

    def push(self,url):
        with self.lock:
            try:
                self.webpage.insert_one({'_id':url,'status':self.OUTSTANDING,'timestamp':datetime.now()})
            except errors.DuplicateKeyError as e:
                self.repair()
                pass

    def pop(self):
        with self.lock:
            record = self.webpage.find_one_and_update(
                filter = {'status':self.OUTSTANDING},
                update={'$set':{'status':self.PROCESSING,'timestamp':datetime.now()}}
            )
            if record:
                return record['_id']
            else:
                self.repair()
                raise KeyError()

    def complete(self,url):
        #self.webpage.update_one({'_id':url},{'$set':{'status':self.COMPLETE}})
        self.webpage.delete_one({'_id':url})

    def repair(self):
        record = self.webpage.find_one_and_update(
            filter={'timestamp':{'$lt':datetime.now() - timedelta(seconds=self.timeout)},
                    'status':{'$ne':self.OUTSTANDING}},
                    update={'$set':{'status':self.OUTSTANDING}}
        )
        if record:
            print("Released:{}".format(record['_id']))

    def clear(self):
        self.webpage.delete_many({'status':{'$ne':self.OUTSTANDING}})

# 封装磁盘缓存类
class DiskCache:
    def __init__(self,max_length,cache_dir='D:\\Crawl_Results\\cache',expires=timedelta(days=30)):
        self.cache_dir = cache_dir
        self.max_length = max_length
        self.expires = expires

    def url_to_path(self,url):
        '''
        从传入的url中创建文件路径
        '''
        components = urlsplit(url)
        path = components.path
        if not path:
            path = '/index.html'
        elif path.endswith('/'):
            path += 'index.html'
        filename = components.netloc + path + components.query
        filename = re.sub('[^/0-9a-zA-Z\\-.,;]','_',filename)
        filename =  '/'.join(segment[:255] for segment in filename.split('/'))
        return os.path.join(self.cache_dir,filename)

    def __getitem__(self,url):
        '''
            根据url从磁盘提取缓存
        '''
        path = self.url_to_path(url)
        if os.path.exists(path):
            with open(path,'rb') as fp:
                #result,timestamp = pickle.loads(zlib.decompress(fp.read()))
                result = fp.read()
                # if self.has_expired(timestamp):
                #     raise KeyError(url + '缓存资源已过期')
                # return result
        else:
            raise KeyError(url + "不存在")

    def __setitem__(self,url,result):
        '''
            将数据存入磁盘缓存中
        '''
        path = self.url_to_path(url)
        folder = os.path.dirname(path)
        # 时间戳
        # timestamp = datetime.now()
        # data = pickle.dumps((result,timestamp))
        if not os.path.exists(folder):
            os.makedirs(folder)
            print("保存到了{}".format(folder))
        with open(path,'wb') as fp:
            #fp.write(zlib.compress(data))
            fp.write(result)

    def has_expired(self, timestamp):
        '''
            判断缓存是否过期
        '''
        return datetime.now() > timestamp + self.expires

# 封装MongoDB缓存类
class MongoCache:
    def __init__(self,client=None,expires=timedelta(days=30)):
        if client == None:
            self.client = MongoClient('localhost',27017)
        else:
            self.client = client
        self.db = self.client['cache']
        self.webpage = self.db['webcrawler']
        self.expires = expires
        self.webpage.create_index('timestamp',expireAfterSeconds=expires.total_seconds())

    def __getitem__(self,url):
        '''
            根据url从磁盘提取缓存
        '''
        record = self.webpage.find_one({'_id':url})
        if record:
            return pickle.loads(zlib.decompress(record['result']))
        else:
            raise KeyError(url + "不存在")

    def __setitem__(self,url,result):
        '''
            将数据存入磁盘缓存中
        '''
        record = {'result':Binary(zlib.compress(pickle.dumps(result))),'timestamp':datetime.now()}
        self.webpage.update_one({'_id':url},{'$set':record},upsert=True)

# 将下载功能封装成一个类
class Downloader:
    def __init__(self,delay=5,user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0',proxies=None,request_max=3,cache=None):
        self.throttle = Throttle(delay=delay)
        self.user_agent = user_agent
        self.proxies = proxies
        self.request_max = request_max
        self.cache = cache
        # 定义连接管理池
        self.http = urllib3.PoolManager()

    def __call__(self,url):
        result = None
        if self.cache:
            try:
                result = self.cache[url]
            except KeyError:
                pass
            else:
                if self.request_max > 0 and 500 <= result['code'] < 600:
                    result = None
        if result is None:
            self.throttle.wait(url)
            proxy = random.choice(self.proxies) if self.proxies else None
            headers = {'User-agent':self.user_agent}
            result = self.download(url,headers,self.request_max,proxy)
            if self.cache:
                self.cache[url] = result
        return result['html']

    def download(self,url,headers,request_max,proxy=None):
        print("正在下载, URL==>{}".format(url))
        # 发起GET请求
        request = urllib.request.Request(url,headers=headers)
        response = self.http.request('GET', url,headers=headers)
        # 如果使用代理的话
        if proxy:
            opener = urllib.request.build_opener()
            proxy_params = {urlparse.urlparse(url).scheme:proxy}
            opener.add_handler(urllib.request.ProxyHandler(proxy_params))
            try:
                response = opener.open(request)
                if response.status == 200:
                    html = response.data
                else:
                    print("遇到了错误,状态码是:{}".format(response.status))
                    if request_max >= 0:
                        self.download(url,headers,request_max-1,proxy)
            except Exception as e:
                print("下载遇到了错误,错误代码是==>{}".format(e))
                html = None
                if request_max >=3:
                    html = self.download(url,headers,request_max-1,proxy)
            finally:
                response.release_conn()
            return {'html':html,'code':response.status}
        else:
            # 如果没有选择代理,那就正常请求
            try:
                if response.status == 200:
                    html_file = response.data  # 或者 response.data.decode('utf-8') 如果需要字符串
                    # 在这里处理 htmlfile,比如保存到文件或进行解析等
                    return {'html':html_file,'code':response.status}
                else:
                    print("遇到了错误,状态码是:{}".format(response.status))
                    if request_max >= 0:
                        self.download(url,headers,request_max-1)
            except urllib3.exceptions.HTTPError as e:
                print("遇到了错误,错误代码是==>{}".format(e))
            except Exception as ex:
                print("遇到了错误,错误代码是==>{}".format(ex))
            finally:
                response.release_conn()

# 定义一个scrape_callback类,用于存储解析到的数据
class Scrape_callback:
    def __init__(self,process_id):
        self.writer = csv.writer(open(f'D:/Crawl_Results/downloaded_data_{process_id}.csv','w', encoding='utf-8',newline='',errors='replace'))
        self.fields = ('中文名','外文名','评分','上映时间','国家','导演','时长','类型')
        self.writer.writerow(self.fields)
        self.process_id = process_id
        self.lock = threading.Lock()


    def __call__(self,html):
            with self.lock:
                if not self.writer:
                    raise RuntimeError("CSV writer is not initialized. Call open_writer() first.")
                html_string = html.decode("utf-8")
                root = lhtml.fromstring(html_string)
                result_list = []
                try:
                    # 解析电影标题
                    title_content = root.cssselect("div#content")[0]
                    span_title = title_content.cssselect('span[property="v:itemreviewed"]')[0]
                    title_text = span_title.text_content().split(" ",1)
                    for name in title_text:
                        result_list.append(name)
                        if len(title_text) == 1:
                            result_list.append('--')
                    # 解析电影评分
                    rate_span = root.cssselect('strong[property="v:average"]')[0]
                    rate_text = rate_span.text_content()
                    result_list.append(rate_text)
                    # 解析上映国家及日期
                    date_span = root.cssselect('span[property="v:initialReleaseDate"]')[0]
                    date_text = date_span.text_content()
                    parenthesis_index = date_text.find('(')
                    if parenthesis_index != -1:
                        # 提取日期部分(括号前的所有字符)
                        date = date_text[:parenthesis_index]
                        # 提取国家部分(括号内及之后的字符,再去除括号)
                        country = date_text[parenthesis_index + 1:-1]
                    else:
                        # 如果没有找到括号,则只有日期部分
                        date = date_text
                        country = "--"
                    result_list.append(date)
                    result_list.append(country)
                    # 解析导演
                    direct_by_a = root.cssselect('a[rel="v:directedBy"]')[0]
                    direct_by_text = direct_by_a.text_content()
                    result_list.append(direct_by_text)
                    # 解析片长
                    runtime_span = root.cssselect('span[property="v:runtime"]')[0]
                    runtime_text = runtime_span.text_content()
                    result_list.append(runtime_text)
                    gener_text=''
                    # 解析类型
                    gener_spans = root.cssselect('span[property="v:genre"]')
                    for gener_span in gener_spans:
                        gener_text += gener_span.text_content() + '|'
                    gener_text = gener_text.rstrip('|')
                    result_list.append(gener_text)
                    print("{}|{}|{}|{}|{}".format(title_text,rate_text,date,country,direct_by_text,gener_text))
                    self.writer.writerow(result_list)
                    result_list.clear()  # 清空列表以备下次使用,而不是重新创建
                except IndexError:
                    print("未找到指定的元素")
                except Exception as e:
                    print(f"处理过程中发生错误: {e}")

    def do_write(self,result_list):
        if not self.writer == None:
            self.writer.writerow(result_list)
        else:
            print("打开文件失败")

    def close_writer(self):
        # 如果writer是外部创建的,则不应在此关闭文件
        # 但在当前上下文中,文件是在这个类中打开的,所以应该在这里关闭
        if self.writer:
            #self.writer.writerow([])  # 写入空行作为结束标记(可选)
            # 注意:在with块外不需要手动关闭文件,它会自动处理
            self.writer = None  # 清除writer引用,帮助垃圾回收

# 定义一个类,用于控制延时
class Throttle:
    '''
    用于控制爬虫访问统一域名资源时的延时
    '''
    # 初始化函数
    def __init__(self,delay):
        self.delay = delay
        self.domains = {}
    # 控制延时
    def wait(self,url):
        # 解析url,获取域名
        domain = urlparse(url).netloc
        # 获取上一次访问的时间
        last_accessed = self.domains.get(domain)
        # 如果设置到延时并且已经访问过了
        if self.delay > 0 and last_accessed is not None:
            # 计算从上次访问到当前时间过去的秒数与规定的延迟时长的差值
            sleep_secs = self.delay - (datetime.now() - last_accessed).seconds
            # 判断距离上次访问的时间间隔是否达到了延迟要求
            if sleep_secs > 0:
                print("正在休眠,将等待{}秒后再次连接".format(sleep_secs))
                # 如果时间还没有达到,就调用time.sleep,进行休眠
                time.sleep(sleep_secs)
        # 更新本次访问的时间
        self.domains[domain] = datetime.now()


# 爬取网页的函数
def threaded_crawler(seed_url,link_regex,process_id,max_threads=3,crawl_queue = MongoQueue()):
    # 创建用于文件解析的类
    scrape_callback = Scrape_callback(process_id)
    # 定义一个User_agent列表
    user_agent_list = ['BadCrawler','GoodCrawler']
    # 解析网站的robots.txt
    rp = urllib.robotparser.RobotFileParser()
    rp.set_url(f"{seed_url}/robots.txt")
    rp.read()
    # 定义一个用户当前设置的user_agent
    current_user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:128.0) Gecko/20100101 Firefox/128.0'
    # 只有当默认的火狐这个User-agent被禁,再从user_agent_list中找看还有合适的没
    if not rp.can_fetch(current_user_agent,seed_url):
        # 从列表中找一个网站允许的user_agent
        for user_agent in user_agent_list:
            if rp.can_fetch(user_agent,seed_url):
                current_user_agent = user_agent
                break
            else:
                print("该网站的robots.txt禁止我们访问")
    # 创建队列并把种子url添加进去
    crawl_queue.push(seed_url)
    downloader = Downloader(delay=5,user_agent=current_user_agent,cache=MongoCache(),request_max=5,proxies=None)
    for item_count in range(1,10):
        current_url = "{}?start={}".format(seed_url,item_count*25)
        crawl_queue.push(current_url)
    def process_queue():
        current_thread = threading.current_thread()
        thread_name = current_thread.name
        while crawl_queue:
            try:

                #print("当前有带解析的链接共{}条".format(len(crawl_queue)))
                # 只要列表中有值,则弹出一个url用于解析
                url = crawl_queue.pop()
            except IndexError as index_error:
                print("出错了")
                break
            except KeyError as keyerror:
                pass
            else:
                # 执行下载
                html = downloader(url)
                if not html == None:
                    # 如果有传入提取数据的回调函数,则调用它
                    scrape_callback(html)
                    # 从下载到的html网页中递归的获取链接
                    links_from_html = get_links(html)
                    if not links_from_html == None:
                        for link in links_from_html:
                            link = urljoin(seed_url,link)
                            # 判断找到的链接是否符合我们想要的正则表达式
                            if re.match(link_regex,link):
                                crawl_queue.push(link)

                # 修改url的状态
                crawl_queue.complete(url)
    threads = []
    while threads or crawl_queue:
        for thread in threads:
            if not thread.is_alive():
                threads.remove(thread)
        while len(threads) < max_threads and crawl_queue:
            thread = threading.Thread(target=process_queue)
            thread.setDaemon(True)
            thread.start()
            threads.append(thread)
    scrape_callback.close_writer()
    crawl_queue.clear()


# 多进程函数
def process_link_crawler(args,**kwargs):
    # 解包参数以获取seed_url和link_regex
    seed_url, link_regex = args
    num_cpus = multiprocessing.cpu_count()
    processes = []
    csv_files = []

    use_cpu = num_cpus//4

    for i in range(use_cpu):
        process_id = f"pid_{os.getpid()}_{i}"  # 生成唯一的进程ID标识符
        csv_files.append(f'D:/Crawl_Results/downloaded_data_{process_id}.csv')
        p = multiprocessing.Process(target=threaded_crawler, args=(seed_url, link_regex, process_id))
        p.start()
        processes.append(p)
    for p in processes:
        p.join()
    # 解析完毕,开始合并文件
    # 合并CSV文件
    merge_csv_files(csv_files, 'D:/Crawl_Results/merged_data.csv')

# 用于合并csv的
def merge_csv_files(csv_files, output_file):
    with open(output_file, 'w', encoding='utf-8', newline='') as outfile:
        writer = csv.writer(outfile)
        for csv_file in csv_files:
            with open(csv_file, 'r', encoding='utf-8', errors='replace') as infile:
                reader = csv.reader(infile)
                next(reader)  # 跳过标题行,因为它已经在第一个文件中写入了
                for row in reader:
                    writer.writerow(row)
    print("合并成功!")
    # 清理单独的CSV文件(可选)
    for csv_file in csv_files:
        os.remove(csv_file)

# 从下载到的html中继续解析连接
def get_links(html):
    webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']',re.IGNORECASE)
    if not html == None:
        html_string = html.decode("utf-8")
        return webpage_regex.findall(html_string)
    else:
        return None

def main():
    # 测试
    seed_url2="https://movie.douban.com/annual/2023/"
    seed_url="https://movie.douban.com/top250"
    link_regex="^https://(?!music\\.douban\\.com/subject/)movie\\.douban\\.com/subject/(\\d+)/$"

    process_link_crawler((seed_url,link_regex))

if __name__ == '__main__':
    main()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1984857.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

成都云飞浩容文化传媒有限公司共绘电商服务新蓝图

在这个日新月异的电商时代&#xff0c;每一家企业都渴望在浩瀚的互联网海洋中脱颖而出&#xff0c;实现品牌的飞跃式增长。而在这场没有硝烟的战争中&#xff0c;专业的电商服务成为了企业破局的关键。成都云飞浩容文化传媒有限公司&#xff0c;作为电商服务领域的佼佼者&#…

(ABC三题完整内容)2024年华数杯大学生数学建模竞赛解题思路完整代码论文集合

我是Tina表姐&#xff0c;毕业于中国人民大学&#xff0c;对数学建模的热爱让我在这一领域深耕多年。我的建模思路已经帮助了百余位学习者和参赛者在数学建模的道路上取得了显著的进步和成就。现在&#xff0c;我将这份宝贵的经验和知识凝练成一份全面的解题思路与代码论文集合…

【mars3d】加载超图s3m模型说明

建议替换Cesium库&#xff0c;换成 超图版本Cesium mars3d mars3d-supermap &#xff0c;需要引入的资源为&#xff1a; "mars3d": ["Cesium-supermap/Widgets/widgets.css", //超图版本Cesium "Cesium-supermap/Cesium.js","mars3d/plu…

vue使用响应式API和页面组件ref相同名称问题

最近在使用vue3vite学习前端的东西。在学习form表单时发现&#xff0c;<el-form>里面的<el-input>绑定数据时&#xff0c;页面输入框在键盘输入之后没有反应&#xff0c;每次只能输入一各字母。 <template><div class"login-container"><…

电子产品CE认证标准以及欧盟需要做的认证有哪些?

​一、家电产品GB质检报告测试内容有哪些&#xff1f; 1.家用电器的测试内容包括预检测和测试 预检测是指在测试之前判定产品是否符合标准要求的检查&#xff0c;主要包括以下三个方面&#xff1a; 1.1产品铭牌、使用的标志、说明书检查 1.2产品的外观及内部结构检查 1.3产品…

Java程序员接单分享

作为一名Java程序员&#xff0c;这阵子通过承接些小型项目&#xff0c;我顺利跨过了月薪破万的门槛。这些项目虽小&#xff0c;却如同磨刀石般&#xff0c;让我在实战中发现了自身技术栈的棱角与不足&#xff0c;尤其是意识到了在Java这一浩瀚技术海洋中的诸多未知领域。我深知…

Python 爬虫项目实战(六):爬取大众点评商家数据

前言 网络爬虫&#xff08;Web Crawler&#xff09;&#xff0c;也称为网页蜘蛛&#xff08;Web Spider&#xff09;或网页机器人&#xff08;Web Bot&#xff09;&#xff0c;是一种按照既定规则自动浏览网络并提取信息的程序。爬虫的主要用途包括数据采集、网络索引、内容抓…

开放式耳机有哪些优缺点,2024年开放式耳机推荐

&#xff08;一&#xff09;开放式耳机有哪些优缺点 优点&#xff1a;良好的音质&#xff0c;佩戴舒适不伤害耳朵&#xff0c;便于携带&#xff0c;一句话就是舒适又安全&#xff1b; 缺点&#xff1a;最大的缺点就是隔音效果不佳&#xff0c;而且很大的可能性会出现漏音现象…

亚马逊、速卖通卖家必看:自养号测评助力转化率提升

在亚马逊、速卖通等电商平台的激烈竞争中&#xff0c;卖家们深谙流量之于店铺转化率的重要性&#xff0c;而测评补单作为提升业绩的关键策略之一&#xff0c;其重要性不言而喻。它不仅是日常运营中不可或缺的一环&#xff0c;更是助力产品在众多竞品中脱颖而出的竞争利器。特别…

Google Play Instant免安装应用,助力市场推广!

什么是免安装应用&#xff1f; 一句话总结&#xff1a;Google Play 免安装应用就是允许用户在不安装应用的情况下访问应用的内容。 那么它有什么作用呢&#xff1f; 1.增强与用户的互动&#xff0c;推出可吸引用户安装App的活动或者功能进行极致体验&#xff0c;提升安装量并…

09.XSS跨站脚本攻击(超详细!!!)

1、什么是XSS XSS&#xff08;跨站脚本攻击&#xff09;&#xff1a;攻击者利用这个漏洞将恶意脚本注入到网页中&#xff0c;当其它用户浏览这些页面时&#xff0c;恶意脚本会在用户的浏览器中执行。XSS攻击允许攻击者在用户的浏览器上执行脚本&#xff0c;从而可能获取用户的…

好书推荐|大模型必学《Transformer自然语言处理实战》

今天又来给大家分享ai大模型书籍了&#xff0c;今天是这本非常畅销的书----《Transformer自然语言处理实战》涵盖了Transformer在NLP领域的主要应用。 首先介绍Transformer模型和Hugging Face 生态系统。然后重点介绍情感分析任务以及Trainer API、Transformer的架构&#xff…

深入了解App设计流程的7个关键阶段

在当今数字时代&#xff0c;每个人的日常生活都与各种应用密切相关。APP已经成为我们生活中不可或缺的一部分&#xff0c;无论是社交网络、健康服务、购物还是娱乐。优秀的APP设计不仅能提供良好的用户体验&#xff0c;还能吸引用户的注意力&#xff0c;有效传达信息。作为一名…

《python语言程序设计》2018版第6章第33题使用五边形面积,利用函数重写编程3.4题 返回五边形的面积

之前03.04.01version 2024.02.04side_num eval(input("Enter the side: ")) area_num (5 * pow(side_num, 2)) / (4 * math.tan(math.pi / 5)) print("The area of the pentagon is {:>.20f}".format(area_num))本次代码 def area(side_num):side_num…

十分钟带你学会 Vue-router

安装、配置 Router Vue Router 是 Vue.js 官方的路由管理器。它和 Vue.js 的核心深度集成&#xff0c;让构建单页面应用变得易如反掌。 了解路由之前&#xff0c;我们需要先理解一个概念&#xff1a;单页应用。 单页应用 SPA(single page application):单一页面应用程序&am…

【微信小程序实战教程】之微信小程序中的 JavaScript

微信小程序中的 JavaScript 微信小程序的业务逻辑都是通过JavaScript语言来实现的&#xff0c;本章我们将详细的讲解JavaScript的基本概念&#xff0c;以及在小程序中如何使用JavaScript语言。JavaScript是一种轻量的、解释型的、面向对象的头等函数语言&#xff0c;是一种动态…

uniapp用自带的canvas做画板签字

如下图移动端经常需要做此功能,用户签字。用户填表啥的。 先用touch进行监听手指的触摸事件 获取所点击的坐标位置。 这里有很多要注意的地方。 初始化 uniapp里的canvas与原生的canvas有区别,渲染之后会多很多莫名其妙的div节点,并且还有个div直接就把原生的canvas覆盖…

仿真入门必看:怎么用CST软件自带宏提取材料的DK,Df值

我们知道如果在CST中要做精确的仿真&#xff0c;进行仿真测试对比&#xff0c;其中第一步就是要搞清楚仿真模型的参数&#xff0c;如果输入数据不对&#xff0c;那也避免不了垃圾进垃圾出的原则。和仿真相关的数据很多&#xff0c;其中PCB板的介质参数Dk, Df就是介电常数的实部…

吓傻!自有品牌社交电商靠AI 智能名片商城小程序逆天改命!

摘要&#xff1a;本文深入探讨了自有品牌型社交电商的发展历程、显著特点以及未来趋势&#xff0c;特别以微商品牌为典型案例进行了详细剖析。同时&#xff0c;重点阐述了在数字化时代的大背景下&#xff0c;自有品牌型社交电商如何通过与 AI 智能名片商城小程序的有机融合&…

VueRouter 相关信息

VueRouter 是Vue.js官方路由插件&#xff0c;与Vue.js深度集成&#xff0c;用于构建单页面应用。构建的单页面是基于路由和组件&#xff0c;路由设定访问路径&#xff0c;将路径与组件进行映射。VueRouter有两中模式 &#xff1a;hash 和 history &#xff0c;默认是hash模式。…