【Python 图片下载器】一款专门为爬虫制作的图片下载器,多线程下载,速度快,支持续传/图片缩放/图片压缩/图片转换

news2024/12/24 6:32:47

文章日期:2024.12.23

使用工具:Python

本章知识:制作一款图片下载器_DOS窗口(爬虫专用)

文章难度:低等(没难度)

文章全程已做去敏处理!!!  【需要做的可联系我】 

AES解密处理(直接解密即可)(crypto-js.js 标准算法):​​​​​​在线AES加解密工具

注意:网络的带宽会对下载速度有所影响,包括图片链接的服务器所在地区,如果图片链接的服务器所在地区是海外的,那么下载图片的时候尽量使用代理流量包下载会更快。如有其他问题可以留言,勿喷!!!

23年7月左右测速最高达到了800M/s,实际速度请按照你们自己测出的来

讲的不是很详细,有问题请留言,欢迎大家指导

程序的界面是DOS窗口,并非GUI窗口。下载器不依赖于任何数据库,方便使用和转移

本章内的源代码是由23年4月份制作,经过多次修改,现已稳定使用,如有不同场景使用不了,可以告知作者,作者将会根据实际情况进行整改。

此项目不是针对于单个固定的网站下载图片,所以我采用的是读取本地文件实时保存本地图片路径数据的方法进行读取和存储(excel格式),此方法比较灵活,不依赖数据库,也不依赖GUI窗口。如有介意可以自行修改源代码添加自定义功能

源代码大部分是在23年制作和修改,代码的命名可能不是很友好,但也进行了部分优化,凑合着看吧。

⬇️⬇️⬇️为了照顾部分使用人群,我将程序打包成了exe文件,供大家使用。源代码在文章最底部
名称下载地址(百度盘)
极速图片下载器 EXE(支持win10/win11)下载 (提取码 bfri ) 22M
演示 / 测试文件(部分图片链接可能会失效导致报错)下载 (提取码 k3tr ) 3M 

给大家看一下程序运行时的图片,gif动图太大放不了,凑合看吧

出现错误连接的是不会被保存的,一整条都不会被保存 

下面讲解整个运转流程步骤,文章结尾附上源码

【简单的流程:设置配置 -> 读取文件 -> 线程分配 -> 数据存储】

        1、【配置设置】线程量、下载速度、超时时间、图片的处理、路径的存放、请求的参数

        2、【读取文件】读取文件并将文件内的数据转为程序可识别的统一模板

        3、【线程分配】按照设置好的线程量进行并发下载

        4、【数据存储】下载完成的图片路径和信息会存储到队列,然后又独立的线程把队列里的数据进行保存到本地

1、【配置设置】

[headers][UA]:自定义可以直接输入你自己的UA。如果用固定默认的UA就输入1。如果想要用默认且不固定的就输入2。有效应对大部分网站的限制

[headers][Cookie]:自定义Cookie, 直接粘贴你要的cookie即可。这个功能基本不用,除非是特殊网站

[proxies][ip]:自定义海外流量包,专门用于下载海外图片,只要输入流量包的ip和端口即可,采用Socks4协议,可以通过源代码自行修改

[根目录][目录设置]:这个是设置所有图片的根目录,比如我设置了img,那么他会自动在img里创建多个字文件夹,每个文件夹对应每条不同的数据图片。

[每张图片下载的间隔时间][速度限制]:设置图片的下载间隔时间,一般情况设置0.001,如果网站卡,或网络卡,就不要设置太低,设置0.09也可以

[下载超时时间][请求超时设置]:这个是每个图片请求的总时间,超过这个时间就会直接返回错误,当然,软件内默认会重试3次。还是适当调整即可

[启用多少线程][并发每行数据]:根据自身情况设置并发线程量,如果不着急下载,就不要设置太高,因为对你自身的网卡占用也高,如果把人家网站搞崩了,本人是不负责的

[图片功能][是否启用]:内置了一些图片处理功能,可以边下载边处理图片

[图片压缩][压缩质量]:压缩图片专用,一般情况不设置,或设置90-100即可

[图片放大缩小][像素大小]:等比例缩放图片,程序会根据你设置的大小,把图片等比例缩放到你设置的大小范围,也是根据情况设置,不要超过4000就行,因为会模糊

[读取配置]:这个是链接的分割方法,如果你的链接是用列表保存的,则输入1,如果是用一写符号作为分割,那你就要输入对应的分割符,让程序可以读取识别出每一条链接

2、【读取文件】

我们要创建一个固定的文件,名称也是固定的【000.txt】,然后把表格里要下的链接粘贴到这个文件内,然后再去执行程序下载图片。

【000.txt】只能读取的两列,第一列为【唯一值】第二列为链接,这个链接可以是列表形式,也可以是连在一起的,但要有分隔符

下面这个图片是按照分隔符保存的

3、【线程分配】

程序就是进程,他会在程序刚开始的时候先创建一个线程用于监测数据保存队列(下面图片内没有展示),只有所有图片都下载完成后,他才会停止。

下载图片的线程分配逻辑如下图,程序会根据你设置的线程量进行创建父线程,每个父线程是对应一个条数据,父线程会创建多个子线程,子线程数量是和当前数据的连接数量匹配,然后由子线程下载图片,父线程监测子线程,并完成最后的数据整理,存放到数据保存队列内,由队列线程进行保存,这是为了防止资源争夺。如果保存数据的时候,采用的是数据库,则无需使用队列线程

4、【数据存储】

这是一条独立的线程执行的函数,当程序下载图片开始的时候,此线程就会启动,并一直处于监测状态,监测队列有没有数据,有则保存无则等待。如果不想要这个累赘的东西,可以采用数据库保存的方法执行,就可以去除这个烦人的队列。

下面给大家看一下源码,并略微讲一下每个模块的作用

1、给大家先看一下类,程序内只有这三个类,也是为了方法修改和整理,有些代码还是比较分散的,大家可以自行整理,我都用习惯了。

2、先来看一下 【配置】 的类

这是一些默认值,我所规定的默认值比较严格,就是太快要求有点高,大家还是要根据自己情况进行设置

这是dos窗口所需要的配置输入功能,这没什么可说的,比较简单。代码丑不要介意啊

分别是获取局域网的ip,方便大家使用(支持获取无线网 / 支持获取有线网)。还有函数一个是用于输出控制台不同颜色字体的功能

第一个是检测文件是否存在,另一个是用于图片的处理,比如图片压缩/等比例缩放/png背景填充/格式转换。注意:想使用图片处理的功能,那么线程就要启动少一点,否则线程过多运算不过来,CPU会瞬间沾满,会卡顿的,为了避免这种请求,使用图片处理功能,就不要开太多线程

3、来看一下 【下载模块】 的类

这个是下载图片的核心部位,是采用 requests 模块下载,我只想将他包装了一下,也没用什么可稀奇的

4、来看一下 【线程资源分配】 的类

直接看图吧,图更容易理解,结合源代码去看去理解

【附上源码】 切记,只能读取【000.txt】文件。不喜勿喷

"""
File Name: 超级下载器_DOS版本
Author: 小木_.
Date Created: 2022-11-2
Last Modified: 2024-12-23
Version: 2.0
Python version: 3.12.7
Description: 一款图片超级下载装置,拉满你的带宽下载图片,可以边下载边处理图片,支持jpg/png格式图的下载与处理。支持压缩/有损放大缩小/图片检测功能。支持多级路径保存图片
"""
import json
import subprocess
import filetype
import time
import random
import os
import re
import copy
import requests
from concurrent.futures import ThreadPoolExecutor
import colorama
from typing import Union, Dict, Literal, List
from io import BytesIO

colorama.init(autoreset=True)
from PIL import Image
'''
安装模块
filetype==1.2.0
requests==2.32.3
colorama==0.4.6
Pillow==11.0.0
'''

'''
模板:
——-——-——-——-——-——-——-——
|         |          |
|  唯一值  |  图片链接  |
|         |          |
——-——-——-——-——-——-——-——
注意:如果遇到图片有问题的,比如图片的后缀不是jpg或png的,可以尝试修改链接的后缀。或者在下载图片的时候不要添加UA头,已确保图片的内容是正常的jpg/png
'''

'''[配置信息] - 只实列化一次'''
class config:
    def __init__(self,
                 ua_str: str = '',
                 cookie: str = '',
                 proxies_ip: str = '',
                 root_name: str = 'root_directory',
                 rewrite_sku: bool = False,
                 time_sleep: float = 0.09,
                 timeout: int = 10,
                 thread_num: int = 1,
                 img_function: bool = False,
                 compress_img: Union[int, None] = None,
                 conversion_img: bool = False,
                 enlarge_img: Union[int, None] = None,
                 img_segmentation: str = '|'):

        '''[头部设置]'''
        self.headers_ua = ua_str
        self.headers_cookie = cookie
        '''[代理设置]:127.0.0.1:7890'''
        self.proxies_ip = proxies_ip
        '''[根目录名称/文件夹]'''
        self.root_name = root_name
        '''[是否启用重写SKU唯一值]'''
        self.rewrite_sku = rewrite_sku
        '''[每张图片下载的间隔时间]:0.001-0.09-值越小越稳定越慢'''
        self.time_sleep = time_sleep
        '''[下载图片时超时时间]'''
        self.timeout = timeout
        '''[线程量/同时下载多少个商品]'''
        self.thread_num = thread_num

        '''[图片功能是否启用]'''
        self.img_function = img_function
        '''[图片压缩]'''
        self.compress_img = compress_img
        '''[图片转换]'''
        self.conversion_img = conversion_img
        '''[图片放大]'''
        self.enlarge_img = enlarge_img

        '''[读取文件]'''
        self.FILE_data = '000.txt'
        '''[完成数据存储]'''
        self.FILE_data_done = '[下载]完成-完成-完成.txt'
        # '''[查重数据存储]'''
        # self.FILE_data_check = '[下载]查重-查重-查重.txt'
        '''[图片错误数据存储]'''
        self.FILE_data_error = '[下载]图片错误-图片错误-图片错误.txt'
        # '''[全部错误数据存储]'''
        # self.FILE_data_all_error = '[下载]全部错误-全部错误-全部错误.txt'

        '''[读取配置]'''
        self.img_segmentation = img_segmentation

        '''[整个程序核心开关]'''
        self.run_switch: bool = True
        '''[启动] 如果不用DOS窗口,可以直接移除此手动选项'''
        self.run()

    '''配置自动设置'''
    def run(self):
        '''run仅用于DOS输入选择,如果该配置模块预装到程序软件内,则无需run模块执行'''
        print(self.colors('Red', r'''
         ___                                           
        |_ _|_ __ ___   __ _  __ _  ___                
         | || '_ ` _ \ / _` |/ _` |/ _ \               
         | || | | | | | (_| | (_| |  __/               
        |___|_| |_| |_|\__,_|\__, |\___|             _ 
        |  _ \  _____      __|___/| | ___   __ _  __| |
        | | | |/ _ \ \ /\ / / '_ \| |/ _ \ / _` |/ _` |
        | |_| | (_) \ V  V /| | | | | (_) | (_| | (_| |
        |____/ \___/ \_/\_/ |_| |_|_|\___/ \__,_|\__,_|   图片下载器...'''))
        # 检测 000.txt文件是否存在,不存在则执行停止
        if not self.check_file(self.FILE_data):
            self.run_switch = False
            print(self.colors('Red',
                              '\n\t[配置设置]:要读取的文件不存在,请检查文件是否和当前程序在同一路径!\n\t[error]:缺少000.txt文件!!!读取不到'))
            input('\n按回车退出程序....')
            quit()

        print(self.colors('Red',
                          '\n\n\t[配置设置快捷操作]:请按照要求进行配置\n\t1、不需要设置的内容请直接按回车,程序会自动使用默认值,没有默认值的则不会启动\n\t2、凡是布尔类型的选项直接回车默认都是False'))

        __ua = input(
            '\n\n\t[headers][UA]设置User-Agent / 直接输入UA / 1为内置固定UA / 2为防指纹UA(默认为空)\n\t请输入:')
        if __ua == '1': self.headers_ua = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36 Edg/91.0.864.48'
        if __ua and __ua != '1': self.headers_ua = __ua

        __cookie = input('\n\t[headers][Cookie]设置Cookie(默认为空)\n\t请输入:')
        if __cookie: self.headers_cookie = __cookie

        __ip = input(
            f'\n\t[proxies][ip]设置代理IP 格式为:ip:port | 当前局域网ip为 {self.get_local_ip()}:8080(默认为空)\n\t请输入:')
        if __ip: self.proxies_ip = __ip

        __root_name = input(
            f'\n\t[根目录][目录设置]设置图片根文件夹名称(自动创建/纯英文+数字) 可设置多级 img/nike(默认为 root_directory文件夹)\n\t请输入:')
        if __root_name: self.root_name = __root_name

        # __rewrite_sku = input(f'\n\t[图片文件夹][SKU设置]【研发中..】设置唯一值是否要重置刷 - 新请输入选项 / 1-False / 2-True:')
        # if str(__rewrite_sku) == '2':self.rewrite_sku = True

        __time_sleep = input(
            f'\n\t[每张图片下载的间隔时间][速度限制]请输入小数-值大则慢则稳定-值小则快则不稳定 / 0.001~0.09(默认为0.09)\n\t请输入:')
        if __time_sleep: self.time_sleep = float(__time_sleep)

        __timeout = input(
            f'\n\t[下载超时时间][请求超时设置]请输入整数-值小则快则不稳定-值大则慢则稳定 / 1~50(默认为10)\n\t请输入:')
        if __timeout: self.timeout = int(__timeout)

        __thread_num = input(
            f'\n\t[启用多少线程][并发每行数据]请输入整数-值小则线程少则慢则稳定-值大则线程越多则快则不稳定 / 1~100(默认为1)\n\t请输入:')
        if __thread_num: self.thread_num = int(__thread_num)

        __img_function = input(f'\n\t[图片功能][是否启用]请输入数字选项 / 1-False / 2-True(默认为1)\n\t请输入:')
        if __img_function == '2': self.img_function = True

        if self.img_function:
            __compress_img = input(f'\n\t[图片压缩][压缩质量]请输入整数-值越小越模糊 / 1~100(默认为空)\n\t请输入:')
            if __compress_img: self.compress_img = int(__compress_img)

            # __conversion_img = input(f'\n\t[图片转换][是否启用]【默认为启动无法关闭】转换为jpg 透明底会被填充为白色 请输入选项 / 1-False / 2-True:')
            # if str(__conversion_img) == '2': self.conversion_img = True

            __enlarge_img = input(
                f'\n\t[图片放大缩小][像素大小]请设置图片最大宽高的限制(程序会自动等比例缩放) / 500~4000(默认为空)\n\t请输入:')
            if __enlarge_img: self.enlarge_img = int(__enlarge_img)
            if not self.compress_img and not self.enlarge_img:
                self.img_function = False
                print(
                    self.colors('Yellow', '\t[图片功能][是否启用]图片压缩未设置/图片放大缩小未设置,已自动关闭图片功能'))

        __img_segmentation = input(
            f'\n\t[读取配置]请输入图片的分割方式 / 通常是【|#$@】分割,如果是列表请输入1(默认为|符号)\n\t请输入:')
        if __img_segmentation == '1': self.img_segmentation = '列表'

        # 头部
        if self.headers_ua or self.headers_cookie:
            self.headers = {}
            (lambda x: self.headers.update({'User-Agent': x}) if bool(x) else None)(self.headers_ua)
            (lambda x: self.headers.update({'Cookie': x}) if bool(x) else None)(self.headers_cookie)
        else:
            self.headers = None

        # 代理
        if self.proxies_ip:
            self.proxies = {'https': f'Socks4://{self.proxies_ip}', 'http': f'Socks4://{self.proxies_ip}'}
        else:
            self.proxies = None

    # 获取本地局域网ip地址
    def get_local_ip(self) -> str:
        try:
            # 使用Windows命令 ipconfig 获取局域网IP
            result = subprocess.run(['ipconfig'], capture_output=True, text=True)
            # 处理一下dos输出内容,方便提取 为了防止出现问题,结尾补上两个空的
            output = result.stdout.replace('\t', '').replace(' ', '').split('\n\n') + [''] * 2
            for i in range(len(output) - 1):
                network1 = output[i].replace('\n', '')
                network2 = output[i + 1].replace('\n', '')
                if '以太网适配器以太网' in network1 and '媒体状态............:媒体已断开连接' not in network2:
                    _ip = re.findall('IPv4地址.*?:(.*?)子', network2)
                    ip_address = _ip and _ip[0] or '获取失败'
                    return ip_address
                if '无线局域网适配器WLAN' in network1 and '媒体状态............:媒体已断开连接' not in network2:
                    _ip = re.findall('IPv4地址.*?:(.*?)子', network2)
                    ip_address = _ip and _ip[0] or '获取失败'
                    return ip_address
            return '未联网'
        except:
            return '报错'

    # 控制台颜色
    def colors(self,
               value: Literal["Red", "Green", "Yellow", "Blue", "Cyan", "White", "darkWhite", "darkGreen", "darkRed"],
               text: str) -> str:
        return {
            'Red': "\033[91m{}\033[00m".format(text),
            'Green': "\033[92m{}\033[00m".format(text),
            'Yellow': "\033[93m{}\033[00m".format(text),
            'Blue': "\033[94m{}\033[00m".format(text),
            'Cyan': "\033[96m{}\033[00m".format(text),
            'White': "\033[97m{}\033[00m".format(text),
            'darkWhite': "\033[2;97m{}\033[00m".format(text),
            'darkGreen': "\033[32;11m{}\033[00m".format(text),
            'darkRed': "\033[31;12m{}\033[00m".format(text),
        }[value]

    # 检测文件是否存在
    def check_file(self, FILE_data: str) -> bool:
        if not os.path.exists(FILE_data):
            return False
        else:
            return True

    # 图片转换压缩调整
    def image_convert(self, img_file: str, background_color: str = '#ffffff', format: Literal["JPEG", "PNG"] = 'JPEG',
                      max_size: Union[int, None] = None, quality_size: Union[int, None] = None):
        '''
        :param img_file: 图片路径名称
        :param background_color:  背景填充颜色 rgb(255,255,255) 背景填充颜色 hxe #ffffff  默认填充 #ffffff
        :param format: 图片保存格式 PNG JPEG  默认JPEG
        :param max_size:  图片大小等比例调整,会将图片等比例缩放到当前设置的大小范围  默认无
        :param quality_size:  图片压缩 100-0 None为不压缩,0为极致压缩  50比较好  默认无
        该模块的作用:  注意,该模块会对图片里的属性内容进行清除,清除后图片的占用大小会自然降低,也有可能会变高一点,对质量的影响可以忽略不计。如果不想清除原有图片的内容信息,可以不使用该模块或将该模块进行二次开发
        1、支持压缩图片质量
        2、支持将png透明背景进行填充
        3、支持将图片转换为jpg/png
        4、支持等比例缩放图片大小
        '''
        dict_data = {'msg': '', 'code': ''}
        # 打开图片并转换为RGBA模式
        image = Image.open(img_file).convert('RGBA')
        if background_color:  # (255,255,255)  #ffffff
            # 创建白色背景图片
            background = Image.new('RGB', image.size, background_color)
            dict_data['REG'] = str(background_color)
        else:
            return {'code': 'False', 'msg': '[background_color]背景颜色必须设置', 'file': img_file}

        # 将源图片粘贴到背景图片上
        background.paste(image, (0, 0), image)
        image = background
        # 确定调整后的尺寸
        width, height = image.size
        dict_data['size'] = f'{width}x{height}'
        # 图片大小设置
        if max_size:
            if width > height:
                new_width = max_size
                new_height = int((height / width) * max_size)
            else:
                new_width = int((width / height) * max_size)
                new_height = max_size
            # 调整图像大小
            image = image.resize((new_width, new_height))
            dict_data['size'] = f'{new_width}x{new_height}'
            dict_data['max_size'] = max_size

        # 确定输出文件名和路径
        if format == 'PNG':
            output_filename = os.path.splitext(os.path.basename(img_file))[0] + '.png'
            output_filename = os.path.split(img_file)[0] + '/' + output_filename
            dict_data['format'] = 'PNG'
        else:
            if format == 'JPEG':
                output_filename = os.path.splitext(os.path.basename(img_file))[0] + '.jpg'
                output_filename = os.path.split(img_file)[0] + '/' + output_filename
                dict_data['format'] = 'JPEG'
            else:
                return {'code': 'False', 'msg': '设置图片格式错误', 'file': img_file}
        dict_data['file'] = img_file
        # 保存转换后的图像
        if quality_size:
            image.save(output_filename, format=format, quality=quality_size)
            dict_data['file_save'] = output_filename
            dict_data['quality_size'] = quality_size
            dict_data['code'] = 'True'
            dict_data['msg'] = '数据保存完成 - 已成功转换'
            return dict_data
        else:
            image.save(output_filename, format=format)
            dict_data['file_save'] = output_filename
            dict_data['code'] = 'True'
            dict_data['msg'] = '数据保存完成 - 已成功转换'
            return dict_data


'''[新版图片下载器]'''
class Image_Downloader:
    # 获取图片像素大小
    def get_image_size(self, image_path: Union[str, None] = None, image_content: Union[bytes, None] = None) -> Union[
        list, str]:
        if not image_path and not image_content:
            return '图片像素大小获取失败,未接收到文件或数据'
        try:
            if image_content:
                image_path = BytesIO(image_content)
            with Image.open(image_path) as img:
                width, height = img.size
            return [width, height]
        except Exception as e:
            return f'图片有问题,未读取出像素大小 - {e}'

    # 正常下载图片模块 单
    def XZTP(self, URL: str, FILE_data1: str = 'img', FILE_data2: str = '1.jpg', headers: Union[Dict, None] = None,
             proxies: Union[Dict, None] = None, timeout: Union[int, None] = 1, verify: bool = True) -> Dict[
        str, Union[str, int, list, float, requests.Response]]:
        '''
        :param URL: 图片链接
        :param FILE_data1: 图片路径 不含文件名称   D:/图片
        :param FILE_data2: 要保存的图片名称+后缀  11.jpg
        :param headers: 伪装头
        :param proxies: 代理 请用字典格式 默认None 未开启
        :param timeout: 超时时间 默认1秒内
        :param verify:         是否验证SSL证书  【False=禁用 SSL 证书验证。   True=开启】默认开启    提示:禁用可以有效提高速度
        :return: 返回数据 字典格式 code为固定值
        '''
        try:
            # 如果文件夹不存在,则直接创建,exist_ok忽略已存在的文件夹
            try:
                os.makedirs(FILE_data1, exist_ok=True)
            except Exception as e:
                return {'msg': f'地址路径错误 - {e}', 'code': -1, 'URL': URL}
            # 进行检测
            if not os.path.exists(FILE_data1) and (FILE_data1 != ''):
                return {'msg': '地址路径错误', 'code': -1, 'URL': URL}
            try:  # 开始请求
                Conin = requests.get(URL, timeout=timeout, headers=headers, proxies=proxies, verify=verify)
            except Exception as e:
                return {'msg': f'[Get]请求失败,请检查 网络/代理/URL链接 - {e}', 'code': -1, 'Response': '...',
                        'URL': URL}
            # 字节
            data_length = int(Conin.headers.get("Content-Length", len(Conin.content)))
            # 计算 MB/KB
            ContentLength = ['MB', round(data_length / 1024 / 1024, 3)] if data_length > 2048 else ['KB', round(
                data_length / 1024, 3)]
            if Conin.status_code != 200:
                return {'msg': '[Get]请求成功,状态码非200', 'code': 0, 'Response': Conin, 'URL': URL,
                        'TimeElapsed': (Conin.elapsed.total_seconds() if type(
                            Conin.elapsed) != float else Conin.elapsed),
                        f"Content-Length({ContentLength[0]})": ContentLength[1]}
            # 请求成功  wb+ 强制顶替
            with open(FILE_data1 + '/' + FILE_data2, 'wb+') as file:
                file.write(Conin.content)
        except Exception as e:
            return {'msg': f'[Get]错误语句 - {e}', 'code': -1, 'Response': '...', 'URL': URL}
        # 检查图片是否存在
        if os.path.exists(FILE_data1 + '/' + FILE_data2):
            # 检查文件是否是正常图片
            if filetype.guess(FILE_data1 + '/' + FILE_data2):
                return {'msg': '[Get]保存成功', 'code': 1, 'Response': Conin,
                        # 'X-Y': self.get_image_size(image_path=str(FILE_data1 + '/' + FILE_data2)),
                        'X-Y': self.get_image_size(image_content=Conin.content),
                        'file': str(FILE_data1 + '/' + FILE_data2), 'URL': URL,
                        f"Content-Length({ContentLength[0]})": ContentLength[1]}
            return {'msg': '[Get]保存成功 - 非正常图片', 'code': -1, 'Response': Conin,
                    'file': str(FILE_data1 + '/' + FILE_data2), 'URL': URL,
                    f"Content-Length({ContentLength[0]})": ContentLength[1]}
        return {'msg': '[Get]保存失败 - 图片不存在', 'code': -1, 'Response': Conin,
                'file': str(FILE_data1 + '/' + FILE_data2), 'URL': URL,
                f"Content-Length({ContentLength[0]})": ContentLength[1]}

    # 正常下载图片模块 多
    def XZTP_D(self, URL: str, FILE_data1: str = 'img', FILE_data2: str = '1.jpg', headers: Union[Dict, None] = None,
               proxies: Union[Dict, None] = None, timeout: Union[int, None] = 1, verify: bool = True,
               time_s: Union[int, float] = 1,
               SetRequestsFrequency: Literal["A", "B"] = 'A') -> Dict[
        str, Union[str, int, list, float, requests.Response]]:
        '''
        :param URL:  ...
        :param FILE_data1: 图片路径 不含文件名称   D:/图片
        :param FILE_data2: 要保存的图片名称+后缀  11.jpg
        :param headers: ...
        :param proxies: ...
        :param timeout:  超时时间
        :param time_s:  重试的间隔时间
        :param SetRequestsFrequency:   A:请求一次  B:请求三次
        :return:
        '''

        # 计算重试次数
        Frequency_limit: int = 1
        while True:
            # 请求1次 无论是好是坏都返回
            if SetRequestsFrequency == 'A':
                Conin = self.XZTP(URL, FILE_data1=FILE_data1, FILE_data2=FILE_data2, headers=headers, proxies=proxies,
                                  timeout=timeout, verify=verify)
                Conin['msg'] = f'[{Frequency_limit}]' + Conin['msg']
                return Conin

            # 请求3次  三次过后无论是好是坏都返回,如果三次内有200则直接返回
            if SetRequestsFrequency == 'B':
                Conin = self.XZTP(URL, FILE_data1=FILE_data1, FILE_data2=FILE_data2, headers=headers, proxies=proxies,
                                  timeout=timeout, verify=verify)
                if Conin['code'] == 1:
                    Conin['msg'] = f'[{Frequency_limit}]' + Conin['msg']
                    return Conin

                if Frequency_limit == 3:
                    Conin['msg'] = f'[{Frequency_limit}]' + Conin['msg']
                    return Conin
                Frequency_limit += 1
                # 重试的间隔时间
                time.sleep(time_s)


'''线程调用 主程序'''
class start_up:
    def __init__(self):
        '''[当前线程数量]'''
        self.thread_num: int = 0
        '''[数据存储]'''
        self.data_root: List[Dict] = []
        '''[总数据行数-固定]'''
        self.data_rows_num: int = 0
        '''[总图片数量-计算]'''
        self.data_img_num: int = 0
        '''[成功下载数据行数/成功下载图片总数/失败下载数据行数/失败下载图片总数]'''
        self.data_num_assemble: List[int] = [0, 0, 0, 0]
        '''[采集的数据行数-实时刷新]'''
        self.data_gather_num: int = 0
        '''[采集的开始时间 秒单位(时:分:秒:毫 = --:--:--:---)]'''
        self.time_consuming: int = int(time.time() * 1000)
        '''[所有的采集图片总大小计算 未开发]'''
        # self.img_occupancy: int = 0
        # 检测是否可以执行程序 - 配置有没有完成
        if configs.run_switch:
            self.run()
    def Identify_content(self, data: str) -> list:
        '''内容识别 模板内的链接转为列表返回'''
        if configs.img_segmentation == '列表':
            try:
                # 将列表格式化成字典格式
                # data: list = json.loads(data)
                data: list = json.loads(json.dumps(eval(data)))
                # 检测是否是列表
                if type(data) != list:
                    print(configs.colors('Red', f'你的图片并非列表格式,程序已退出'))
                    return []
                return data
            except Exception as e:
                # 关闭核心全部线程
                configs.run_switch = False
                print(configs.colors('Red', f'抱歉,你的图片并非列表格式,或者json格式有误,程序已退出 - {e}'))
                return []
        return data.split(configs.img_segmentation)

    def run(self):
        # 创建 ThreadPoolExecutor 一个线程,用于队列存储数据
        # 创建后直接运行线程,该线程会伴随着程序一直运行,直到程序停止
        ThreadPoolExecutor(max_workers=1).submit(self.thread_coroutine_transfer)

        # 读取000文件数据
        with open(configs.FILE_data, 'r', encoding='utf-8') as f:
            data_read = f.readlines()
        # 总数据量
        self.data_rows_num = len(data_read)
        # 循环每一行数据
        for data in data_read:
            # 去掉开头结尾的空格和回车字符,并转为列表方便提取数据
            data_list = data.strip().split('\t')
            '''[唯一值/SKU]'''
            SKU = data_list[0]
            '''[url链接/列表形式] 无论图片链接是怎么存储的,都要转换为列表才能进行分配线程进行采集'''
            url_list = self.Identify_content(data_list[1])
            # 用while循环是为了方便监控线程
            while True:
                # 监测线程的数量,有缺位立马补上
                if self.thread_num != configs.thread_num:
                    # 线程量+1
                    self.thread_num += 1
                    # 创建 ThreadPoolExecutor,创建一个父线程,然后在父线程里创建多个子线程
                    # 子线程下载图片,父线程用于监控子线程的执行,子线程都支持完成后,父线程将把数据进行整理然后传递给存储队列
                    # 创建后直接运行线程即可,while循环将会一直监测线程数量的创建,不用担心
                    ThreadPoolExecutor(max_workers=1).submit(self.download_main, SKU, url_list)
                    # 略微给一个休息时间,不给也可以,自己看着办
                    time.sleep(0.01)
                    break
                # 等待某一个线程结束,然后开启新线程
                time.sleep(0.4)
        # 等待线程全部结束
        while self.thread_num != 0:
            time.sleep(0.3)
        # 运行完成了,可以关闭核心进程了,提醒其他线程,我们已经完事了,都收工吧
        configs.run_switch = False

    # 父-下载模块-用于线程监控 链接处理 线程资源分配
    def download_main(self, SKU: str, url_list: list):
        """
        :param SKU: 就是一个唯一值,这个sku是用于对图片的命名和文件夹的创建
        :param url_list: 图片列表
        :return:
        """
        '''[线程量]'''
        thread_count = len(url_list)
        # 数据传输,每次调用都要实列化一个新的容器壳子,防止数据修改或数据混乱
        resource = {
            "z_img": [""] * thread_count,
            "b_img": [""] * thread_count
        }
        # 实时采集数量+1
        self.data_gather_num += 1
        # 临时获取数量
        num = self.data_gather_num
        # 创建 ThreadPoolExecutor,最大线程数为10
        with ThreadPoolExecutor(max_workers=thread_count) as executor:
            # 循环开启线程,一个一个启动
            futures = []

            '''[图片的命名/图片的存储位置]'''
            fixed = 0
            # 将列表内的链接循环出来 单个执行
            for url in url_list:
                # 总图片进行叠加计算
                self.data_img_num += 1
                # 将链接进行简单处理 去掉头部尾部的空格和换行符 防止出现请求错误
                url_handle = url.strip()
                # 每张图片下载的间隔时间 每次循环都会+1 所以我们在刚开始的时候可以设置1~100(越小越慢) 用来控制此处的间隔速度 防止高并发
                time.sleep(configs.time_sleep)
                # 创建图片存放的文件夹
                os.makedirs(f'{configs.root_name}/{SKU}', exist_ok=True)
                FILE_data1 = f'{configs.root_name}/{SKU}'
                FILE_data2 = f'{SKU}_{fixed}.jpg'
                # 启动线程
                futures += [executor.submit(self.download, url_handle, FILE_data1, FILE_data2, fixed, resource, num)]
                fixed += 1

            # 等待所有任务完成 进行阻塞等待
            for future in futures:
                future.result()
        resource['SKU'] = SKU
        resource['code'] = 1
        # 检测下载结果有没有空的,出现空的图片,说明下载失败
        if '' in resource['b_img']:
            resource['code'] = -1
        self.data_root += [resource]

        # 【外部线程】告诉线程我执行完成了 可以开启下一个线程了
        self.thread_num -= 1

    # 子下载模块-用于下载图片 返回结果
    def download(self, url: str, FILE_data1: str, FILE_data2: str, fixed: int, resource: dict, num: int):
        """
        :param url: 图片链接
        :param FILE_data1:  图片路径 不含文件名称   D:/图片
        :param FILE_data2:  要保存的图片名称+后缀  11.jpg
        :param fixed:  fixed 关联着图片的位置(链接对应本地图片) 和 图片的命名(防重复)
        :param resource: resource 图片的存储遍历,此线程会把指定的图片按照 fixed 的位置存储到字典里,返回给父线程
        :param num: 当前采集的数据编号(排序号)
        :return:
        """
        # 如果是给空的,则直接跳过
        if url == "":
            resource['z_img'][fixed] = ""
            resource['b_img'][fixed] = ""
            return
        print(configs.colors('Cyan', f'开始下载[{num}/{self.data_rows_num}]:{url}'))
        file_img_filetype = None
        if os.path.exists(FILE_data1 + '/' + FILE_data2):
            file_img_filetype = filetype.guess(FILE_data1 + '/' + FILE_data2)
            print(configs.colors('White',
                                 str(file_img_filetype) + '  --  ' + str(file_img_filetype.MIME) + '  --  ' + str(
                                     file_img_filetype.EXTENSION)))

        # 如果图片不存在或图片有问题,则重新下载
        if not file_img_filetype:
            # 一定要在线程内实列化,防止发生数据交换,每次使用都要重新实列化
            Image_Downloaders = Image_Downloader()
            headers = copy.deepcopy(configs.headers)
            # UA防指纹
            if configs.headers and configs.headers.get('User-Agent') == '2':
                headers[
                    "User-Agent"] = f"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/{random.randint(512, 538)}.{random.randint(9, 37)} (KHTML, like Gecko) Chrome/{random.randint(100, 121)}.0.0.0 Safari/{random.randint(512, 538)}.{random.randint(9, 37)}"
                headers[
                    "Sec-Ch-Ua"] = f"\"Not A(Brand\";v=\"{random.randint(70, 99)}\", \"Brave\";v=\"{random.randint(70, 120)}\", \"Chromium\";v=\"{random.randint(70, 120)}\""
                headers[
                    f"F{random.randint(10000000, 99999999)}"] = f"D{random.randint(100, 999)}{random.randint(100, 999)}{random.randint(100, 999)}"
            data_dict = Image_Downloaders.XZTP_D(URL=url, FILE_data1=FILE_data1, FILE_data2=FILE_data2,
                                                 headers=headers,
                                                 proxies=configs.proxies, timeout=configs.timeout, time_s=1,
                                                 SetRequestsFrequency='B')
            if data_dict['code'] == 1:
                # 成功下载图片总数
                self.data_num_assemble[1] += 1
                print(configs.colors('Green', str(data_dict)))
                resource['z_img'][fixed] = url
                resource['b_img'][fixed] = data_dict['file']
                # 如果启动了图片功能则执行
                if configs.img_function:
                    time.sleep(0.001)
                    __data = configs.image_convert(data_dict['file'], format='JPEG', max_size=(
                        lambda x: configs.enlarge_img if configs.enlarge_img else None)(configs.enlarge_img),
                                                   quality_size=(lambda
                                                                     x: configs.compress_img if configs.compress_img else None)(
                                                       configs.compress_img))
                    print(configs.colors('White', str(__data)))
            else:
                # 失败下载图片总数
                self.data_num_assemble[3] += 1
                print(configs.colors('Red', str(data_dict)))
        else:
            # 成功下载图片总数
            self.data_num_assemble[1] += 1
            print(configs.colors('Green', str({'msg': '[Get]保存已存在 - 跳过下载 - 直接保存', 'code': 'True',
                                               'file': str(FILE_data1 + '/' + FILE_data2)})))
            resource['z_img'][fixed] = url
            resource['b_img'][fixed] = FILE_data1 + '/' + FILE_data2

    # 存储队列
    def thread_coroutine_transfer(self):
        # Temporary 的作用就是方便while检测,可延迟两次,防止数据有遗漏
        Temporary = ['', '']
        # 这个循环主要是检测线程有没有全部关闭,如果全部关闭了,则要经过两次的 Temporary 删除才能关闭整个程序的运行,都是为了防止数据遗漏未保存
        while configs.run_switch or Temporary:
            # 当线程都说已经结束了,我们需要等待几秒,再运行2次,防止数据有遗漏
            if not configs.run_switch:
                # 删除
                Temporary.pop(0)
                time.sleep(1)
            else:
                time.sleep(2)
            for _ in range(len(self.data_root)):
                data = self.data_root.pop(0)
                z_img = (
                    configs.img_segmentation.join(data['z_img']) if configs.img_segmentation != '列表' else json.dumps(
                        data['z_img']))
                b_img = (
                    configs.img_segmentation.join(data['b_img']) if configs.img_segmentation != '列表' else json.dumps(
                        data['b_img']))
                if data['code'] == -1:
                    '''[图片错误数据存储]'''
                    # 失败下载数据行数
                    self.data_num_assemble[2] += 1
                    with open(configs.FILE_data_error, 'a', encoding='utf-8') as f:
                        f.write(f'{data["SKU"]}\t' + z_img + '\t' + b_img + '\n')
                elif data['code'] == 1:
                    '''[完好数据存储]'''
                    # 成功下载数据行数
                    self.data_num_assemble[0] += 1
                    with open(configs.FILE_data_done, 'a', encoding='utf-8') as f:
                        f.write(f'{data["SKU"]}\t' + z_img + '\t' + b_img + '\n')


if __name__ == "__main__":
    # 公用配置信息,实列化
    configs = config()
    # 启动
    start = start_up()
    # 延迟个一秒
    time.sleep(1)

    # 计算出时间戳相差值
    time_diff = int(time.time() * 1000) - start.time_consuming
    # 计算出所消耗的时间
    # 计算小时、分钟、秒、毫秒
    hours = time_diff // (1000 * 3600)  # 计算小时
    minutes = (time_diff % (1000 * 3600)) // (1000 * 60)  # 计算分钟
    seconds = (time_diff % (1000 * 60)) // 1000  # 计算秒
    milliseconds = time_diff % 1000  # 计算毫秒
    print(f'\n\t总用时(时:分:秒:毫):{hours:02}:{minutes:02}:{seconds:02}:{milliseconds:03}')
    print(f'\t数据行数为:{start.data_rows_num}    总图片数量为:{start.data_img_num}')
    print(f'\t成功下载数据行数为:{start.data_num_assemble[0]}    成功下载图片总数为:{start.data_num_assemble[1]}')
    print(f'\t失败下载数据行数为:{start.data_num_assemble[2]}    失败下载图片总数为:{start.data_num_assemble[3]}')
    print('\n\t数据已经爬取完成!!!')
    input('\t请回车进行关闭页面!!!')
    input('\t请再次回车进行关闭页面!!!\n\n')

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2264582.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于博客系统的自动化功能测试报告

1.项目背景 基于 SSM 的个人博客系统测试 博客系统采用前后端分离的方法来实现,同时使用了数据库来存储相关的数据,前端主要有四个页面构成:登录页、列表页、详情页以及编辑页,模拟实现了个人博客列表页面,其结合后端实现了以下的…

计算机的错误计算(一百八十九)

摘要 用大模型计算 tan(12.345) . 自变量取弧度。结果保留10位有效数字。不同于前面两节的大模型,本节调用了新的两个大模型。然而,很遗憾,它们给出的答案似乎仍然是“匹配”出来的,不是计算出来的。当然,均是错误的。…

IP地址数据信息和爬虫拦截的关联

IP地址数据信息和爬虫拦截的关联主要涉及到两方面的内容,也就是数据信息和爬虫。IP 地址数据信息的内容丰富,包括所属地域、所属网络运营商、访问时间序列、访问频率等。 从IP地址信息中可以窥见多样的数据,那么我们应该怎么利用IP地址信息来…

springboot+logback学习文档

目录 1、前提说明2、引入依赖、将logback配置文件打到classes下2.1、引入依赖2.2、将logback配置文件打到classes下 3、使用说明3.1、配置文件名称和位置3.2、常规用法3.2.1、property标签(普通变量)3.2.2、springProperty标签(spring变量&am…

Laya ios接入goole广告,开始接入 2

开始使用 | iOS | Google for Developers 谷歌广告的官网,需要搭梯子,API你说详细吧,也就那样,主要是没接过 一步步来吧 0.laya导包 前端出包原生 screenorientation 全部 portrait,我这个是竖屏的 注意这个&a…

详解js柯里化原理及用法,探究柯里化在Redux Selector 的场景模拟、构建复杂的数据流管道、优化深度嵌套函数中的精妙应用

目录 详解js柯里化原理及用法,探究柯里化在Redux Selector 的场景模拟、构建复杂的数据流管道、优化深度嵌套函数中的精妙应用 一、什么是柯里化? 1、原理解析 2、一个直观的例子 二、如何实现柯里化? 1、底层实现 2、工作原理解析 3…

EDGE浏览器每次关闭时再次打开保存的密码就消失如何解决

文章目录 EDGE浏览器每次重启的时候保存的密码都消失如何解决? 打开EDGE浏览器点击三个点 点击设置 点击隐私、搜索和服务 找到选择每次关闭浏览器时要清除的内容 将开启的关闭即可

数据流图和流程图的区别

在结构化建模中,数据流图和流程图都是非常重要的工具,它们为开发人员提供了强大的手段来分析和设计系统。尽管两者在表面上看起来有些相似,但它们在功能、用途和表达方式上存在显著的区别。本文将详细探讨数据流图和流程图的区别,…

云计算中的容器技术(如Docker)是什么?

今天想和大家聊聊容器技术,特别是Docker这个大家可能经常听到的名词。记得我刚接触容器技术时也觉得挺抽象的,让我用简单的比喻来说明吧。 想象一下你在搬家。传统方式是把所有家具、电器分散装车,到了新家还要重新组装、调试。这就像我们以…

《Opencv》基础操作详解(2)

接上篇:《Opencv》基础操作详解(1)-CSDN博客 目录 Opencv基础操作 11、B、G、R颜色通道提取 12、显示单个通道颜色 13、 合并颜色通道 14、图像添加马赛克 15、图片区域替换 16、图片的缩放(常用) 17、图像运算…

STM32——“SPI Flash”

引入 在给单片机写程序的时候,有时会用到显示屏,就拿市面上的0.96寸单色显示器来说,一张全屏的图片就占用8x1281024个字节,即1kb的空间,这对于单片机来说确实有点奢侈,于是我买了一个8Mb的SPI Flash&#x…

深入浅出:AWT的基本组件及其应用

目录 前言 1. AWT简介 2. AWT基本组件 2.1 Button:按钮 2.2 Label:标签 ​编辑 2.3 TextField:文本框 2.4 Checkbox:复选框 2.5 Choice:下拉菜单 2.6 List:列表 综合案例 注意 3. AWT事件处理 …

Flutter组件————PageView

PageView 可以创建滑动页面效果的widget&#xff0c;它允许用户通过水平或垂直滑动手势在多个子页面&#xff08;child widgets&#xff09;之间切换。每个子页面通常占据屏幕的全部空间。 参数 参数名类型描述childrenList<Widget>包含在 PageView 中的所有子部件&am…

三种电子画册制作方法

今天教大家三种电子画册的制作方法&#xff0c;很容易上手&#xff0c;需要的赶紧收藏起来 一、 利用在线平台--FLBOOK 1.注册并登录在线平台。 2.选择喜欢的模板&#xff0c;根据需求进行修改 3.批量上传PDF文件一键转换H5翻页电子画册 4.添加图片、文字等元素&#xff0c…

以太坊账户详解

文章目录 一、账户基本概念1.1 外部账户1.2 合约账户1.3 差异对比 二、帐户创建2.1 外部账户创建2.2 合约账户创建 三、账户数据结构3.1 账户状态3.2 账户状态结构 对比比特币的 “UTXO” 余额模型&#xff0c;以太坊使用“账户”余额模型。 以太坊丰富了账户内容&#xff0c;除…

AWS Transfer 系列:简化文件传输与管理的云服务

在数字化转型的今天&#xff0c;企业对文件传输、存储和管理的需求日益增长。尤其是对于需要大量数据交换的行业&#xff0c;如何高效、可靠地传输数据成为了一大挑战。为了解决这一难题&#xff0c;AWS 提供了一系列的文件传输服务&#xff0c;统称为 AWS Transfer 系列。这些…

基础I/O -> 如何谈文件与文件系统?

文件的基础理解 空文件也要在磁盘上占据空间。文件 文件内容文件属性。文件操作 对内容的操作 对属性的操作或者是对内容和属性的操作。标定一个文件&#xff0c;必须使用&#xff1a;文件路径 文件名&#xff08;具有唯一性&#xff09;。如果没有指明对应的文件路径&…

网络安全检测

实验目的与要求 (1) 帮助学生掌握木马和入侵的防护和检测方法、提高学习能力、应用能力和解决实际问题的能力。 (2) 要求学生掌握方法, 学会应用软件的安装和使用方法, 并能将应用结果展示出来。 实验原理与内容 入侵检测是通过对计算机网络或计算机系统中若干关键点收集信…

谷歌浏览器的资源管理功能详解

谷歌浏览器作为一款广受欢迎的网页浏览器&#xff0c;不仅以其快速、简洁和易用著称&#xff0c;还提供了强大的资源管理功能。本文将详细介绍如何在Chrome浏览器中进行资源管理&#xff0c;包括查看网页的渲染性能、禁用标签页的背景更新以及管理正在下载的文件。&#xff08;…

ARM异常处理 M33

1. ARMv8-M异常类型及其详细解释 ARMv8-M Exception分为两类&#xff1a;预定义系统异常(015)和外部中断(1616N)。 各种异常的状态可以通过Status bit查看&#xff0c;获取更信息的异常原因&#xff1a; CFSR是由UFSR、BFSR和MMFSR组成&#xff1a; 下面列举HFSR、MMFSR、…