并发下载方案调研和测试
需求: 下载10亿图片
代理按流量计费,动态变化,不计入耗时内
测试:
1. 使用多线程+代理: 100个线程,10分钟下载了3000
2. 使用多进程+多线程+代理: 5个进程+100个线程,10分钟下载了1.2w
3. 使用aiohttp,200个并发下,10分钟下了8w+
结果:
1. 多线程下,线程数量超过100,速度提升基本就没有效果了,
2. 多进程+多线程,速率提升明显,但进程开多了速度提升也不是很明显
3. 使用aiohttp,下载极快,服务器带宽迅速拉升,翻了10倍以上
样例代码
from loguru import logger
import asyncio
import os
import aiohttp
from aiohttp_socks import SocksConnector
proxy = 'socks5://127.0.0.1:12345' # scoks5代理
# 限制并发数和速率控制
MAX_CONCURRENCY = 200 # 最大并发任务数
async def download_image(result: dict, save_dir: str, semaphore: asyncio.Semaphore):
async with semaphore: # 限制并发数
try:
id = result['id']
file_name = result['file_name']
url = result['url']
connector = SocksConnector.from_url(proxy)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(url) as response:
if response.status == 200:
save_path = os.path.join(save_dir, file_name)
with open(save_path, 'wb') as f:
f.write(await response.read())
logger.info(f"图片下载成功: {file_name}")
else:
logger.error(f"下载失败,状态码 {response.status}: {url}")
except Exception as e:
logger.error(f"下载出现错误: {str(e)} - {url}")
async def download_all_images(results: list, save_dir: str):
os.makedirs(save_dir, exist_ok=True)
# 使用 Semaphore 来限制并发数
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
logger.info(f'创建任务 ')
# 创建所有任务并运行
tasks = [download_image(result, save_dir, semaphore) for result in results]
await asyncio.gather(*tasks)
def main():
"""图片下载"""
results = [
{
"id": "3124234095",
"file_name": "3124234095.jpg",
"url": "https://live.staticflickr.com/3168/3124234095_1b35adf327_k.jpg"
},
{
"id": "51798293924",
"file_name": "51798293924.jpg",
"url": "https://live.staticflickr.com/65535/51798293924_e2000d4e47_k.jpg"
},
{
"id": "51796948257",
"file_name": "51796948257.jpg",
"url": "https://live.staticflickr.com/65535/51796948257_abdcd580bf_k.jpg"
},
{
"id": "51795816842",
"file_name": "51795816842.jpg",
"url": "https://live.staticflickr.com/65535/51795816842_b20475e1c2_k.jpg"
}
]
save_path ='/data/images/'
asyncio.run(download_all_images(results, save_path))
if __name__ == '__main__':
main()