🙋♀️Tiktok APP的基于关键字检索的视频及评论信息爬虫共分为两期,希望对大家有所帮助。
第一期见下文。
第二期:基于视频URL的评论信息爬取
1. Node.js环境配置
首先配置 JavaScript 运行环境(如 Node.js),用于执行加密签名代码。
Node.js下载网址:https://nodejs.org/en
Node.js的安装方法(环境配置非常关键,决定了后面的程序是否可以使用):https://blog.csdn.net/liufeifeihuawei/article/details/132425239
2. Py环境配置
import time
import requests
import execjs
import os
from datetime import datetime
from urllib.parse import urlencode
from loguru import logger
import json
import random
from typing import Optional, Dict, List, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
3. 基于关键字检索的视频信息爬取
1. 主程序:设定爬取的关键字
通过文件topics.csv
导入你希望爬取的关键字。
通过文件videosInfo.json
存储爬取的结果,以字典格式存储。
if __name__ == '__main__':
os.makedirs('../results', exist_ok=True)
keywords, fields = read_csv(file_path='topics.csv') # 设定爬取的关键字
output_file = f'../results/videosInfo.json' # 保存结果的文件
cookie_str = read_cookie()
# 使用多线程并发爬取
with ThreadPoolExecutor(max_workers=1) as executor:
futures = []
for i in range(len(keywords)):
futures.append(executor.submit(crawl_keyword, keywords[i], output_file, cookie_str, fields[i], 20))
for future in as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"爬取过程中发生错误: {str(e)}")
logger.info("所有主题的视频爬取完成")
2. 多线程爬取单个关键词,限制最大请求次数
通过request_count
设定爬取的请求次数。
def crawl_keyword(keyword: str, output_file: str, cookie_str: str, field: str, max_requests: int = 10):
tiktok = TiktokUserSearch(output_file=output_file)
has_more = 1
cursor = '0'
search_id = None
request_count = 0 # 初始化请求计数器
while has_more and request_count < max_requests:
data = tiktok.main(keyword, field, cookie_str, cursor, search_id)
logger.info(f"Request {request_count + 1}: {data}")
if data and isinstance(data, dict):
# has_more = data.get('has_more', 0)
cursor = data.get('cursor', '0')
search_id = data.get('log_pb', {}).get('impr_id')
if 'data' in data:
data = data['data']
request_count += 1 # 更新请求计数
else:
logger.error("No data found in response")
break
else:
logger.error("Invalid response format")
break
time.sleep(random.randint(0, 5)) # 随机延时,避免请求过快
write_csv(keyword, request_count, file_path='../results/records.csv')
logger.info(f"爬取 {keyword} 的视频完成,共请求 {request_count} 次")
3. 定义TiktokUserSearch类
允许获得24类
字段,包括:
🖥️视频的URL、视频时长、标题等;
👨视频的发布者个人简介、获赞数据、视频数据等;
👍视频的点赞信息、分享次数、评论数量、播放次数、收藏次数等;
🎶视频的背景音乐ID,音乐来源等… …
class TiktokUserSearch:
def __init__(self, output_file: Optional[str] = None):
self.config = read_config()
self.headers = self.config.get("headers", {})
self.cookies = None
self.output_file = output_file if output_file else f'tiktok_videos_{datetime.now().strftime("%Y%m%d_%H%M%S")}.csv'
self.proxies = self.config.get("proxies", None) # 代理配置
self.lock = threading.Lock() # 线程锁
def cookie_str_to_dict(self, cookie_str: str) -> Dict[str, str]:
"""将cookie字符串转换为字典"""
cookie_dict = {}
try:
cookies = [i.strip() for i in cookie_str.split('; ') if i.strip() != ""]
for cookie in cookies:
key, value = cookie.split('=', 1)
cookie_dict[key] = value
except Exception as e:
logger.error(f"转换cookie时出错: {str(e)}")
raise
return cookie_dict
def get(self, keyword: str, cursor: str, search_id: Optional[str], cookie_str: str) -> Dict[str, Any]:
"""发送请求并获取数据"""
self.cookies = self.cookie_str_to_dict(cookie_str)
url = "https://www.tiktok.com/api/search/general/full/"
focus_state = "true" if cursor == "0" else "false"
params = {
"WebIdLastTime": f"{int(time.time())}",
"aid": "1988",
"app_language": "zh-Hans",
"app_name": "tiktok_web",
"browser_language": "zh-CN",
# ... 略
"webcast_language": "zh-Hans",
"msToken": self.cookies["msToken"],
}
if cursor != "0":
params.update({"search_id": search_id})
try:
x_b = execjs.compile(open('../configs/encrypt.js', encoding='utf-8').read()).call("sign", urlencode(params),
self.headers["user-agent"])
params.update({"X-Bogus": x_b})
except Exception as e:
logger.error(f"生成X-Bogus时出错: {str(e)}")
return {"error": str(e)}
headers = self.headers.copy()
headers.update({"referer": "https://www.tiktok.com/search?q=" + keyword})
max_retries = 3
for attempt in range(max_retries):
try:
response = requests.get(
url,
headers=headers,
cookies=self.cookies,
params=params,
timeout=(3, 10),
proxies=self.proxies
)
response.raise_for_status()
return response.json()
except (ex1, ex2, ex3) as e:
logger.warning(f"尝试 {attempt + 1}/{max_retries} 发生网络错误:{e}")
if attempt < max_retries - 1:
time.sleep(2)
else:
return {"error": f"Network error after {max_retries} attempts: {str(e)}"}
except Exception as e:
logger.error(f"发生其他错误:{e}")
return {"error": str(e)}
def parse_data(self, data_list: List[Dict[str, Any]], keyword: str, field: str) -> List[str]:
"""解析数据并保存到json文件"""
resultList = []
video_data = []
for u in data_list:
try:
item = u['item']
author = item['author']
stats = item['stats']
author_stats = item['authorStats']
video_id = str(item['id']), # 视频的唯一标识符(TikTok 视频 ID)
author_name = str(author['uniqueId']), # 作者的 TikTok 账号
video_url = f'https://www.tiktok.com/@{author_name[0]}/video/{video_id[0]}'
video_info = {
'search_keyword': keyword,
'video_field': field,
'video_id': video_id[0], # 视频的唯一标识符(TikTok 视频 ID)
'desc': item['desc'], # 视频的文字描述(caption/标题)
'create_time': datetime.fromtimestamp(item['createTime']).strftime('%Y-%m-%d %H:%M:%S'), # 视频的发布时间
'duration': item['video']['duration'], # 视频时长(单位:秒)
'video_url': video_url, # 视频播放地址
'author_id': author['id'], # 作者的唯一 ID
'author_name': author_name[0], # 作者的 TikTok 账号(uniqueId,即用户名)
#... 略
'author_following_count': author_stats['followingCount'], # 作者关注的人数
'digg_count': stats['diggCount'], # 视频的点赞(like)数量
'share_count': stats['shareCount'], # 视频的分享次数
'comment_count': stats['commentCount'], # 视频的评论数量
'play_count': stats['playCount'], # 视频的播放次数
'collect_count': stats.get('collectCount', 0), # 视频的收藏次数
}
# video_info['comments'] = self.get_comments(video_url)
if 'challenges' in item:
video_info['hashtags'] = ','.join([tag['title'] for tag in item['challenges']])
else:
video_info['hashtags'] = ''
# 背景音乐
if 'music' in item:
music = item['music']
video_info.update({
'music_id': music['id'],
'music_title': music['title'],
'music_author': music['authorName'],
'music_original': music['original']
})
video_data.append(video_info)
resultList.append(f"https://www.tiktok.com/@{author['uniqueId']}")
except Exception as e:
logger.error(f"解析视频数据时出错: {str(e)}")
continue
# **追加写入 JSON 文件**
try:
# 如果文件存在,读取已有数据
if os.path.exists(self.output_file):
with open(self.output_file, 'r', encoding='utf-8') as f:
try:
existing_data = json.load(f)
except json.JSONDecodeError:
existing_data = [] # 如果 JSON 解析失败,重置为空列表
else:
existing_data = []
# 追加新数据
existing_data.extend(video_data)
# 保存回 JSON 文件
with open(self.output_file, 'w', encoding='utf-8') as f:
json.dump(existing_data, f, ensure_ascii=False, indent=4)
logger.info(f"数据已{'追加' if existing_data else '保存'}到文件: {self.output_file}")
except Exception as e:
logger.error(f"保存 JSON 文件时出错: {str(e)}")
return resultList
def main(self, keyword: str, field: str, cookie_str: str, cursor: str = "0", search_id: Optional[str] = None) -> Dict[str, Any]:
"""主函数,执行搜索并解析数据"""
dataJson = self.get(keyword, cursor, search_id, cookie_str)
if dataJson:
if "error" in dataJson:
return {"cursor": cursor, "search_id": search_id, "data": [], "status": "-2",
"error": dataJson["error"]}
elif "verify_event" in str(dataJson):
return {"cursor": cursor, "search_id": search_id, "data": [], "status": "-1"}
else:
if 'data' in dataJson:
self.parse_data(dataJson['data'], keyword, field)
return dataJson