7.5. 配图生成
-
目的:由于小红书发布文章要求图文格式,因此在生成文案的基础上,我们还需要生成图文搭配文案进行发布。
-
原实现思路:
起初我打算使用deepseek的文生图模型Janus进行本地部署生成,参考博客:Deepseek发布的Janus-Pro-1B初体验但后来尝试使用后发现Janus现阶段对于这类特定任务的生成图还不太能够胜任。以下是我尝试使用文案让Janus生成的图片:
-
现实现思路:
- 当下普遍的方案是使用文案生成一段相关的html代码,再使用python中的自动化库来进行相应部分的截图,最后将截图与文案进行组合,形成图文格式。
-
代码实现:
-
html生成:
''' Author: yeffky Date: 2025-02-14 08:43:28 LastEditTime: 2025-02-15 19:28:28 ''' import requests import json import os from datetime import datetime def build_prompt(drafts): prompt = "根据下面的小红书文案,帮我生成一个html页面,包含小红书的封面(需要一个卡片状的封面,上面只需文案内容即可,需要吸引眼球),以及下方几个要点内容,要点内容和封面我希望制作成卡片形式,并且每一部分的div请为我附上属性id,id为'card1', 'card2', ...。要求符合小红书平台的图文要求规则以及平替风格,还要符合小红书平台的用户审美。回复只要给出代码即可,请不要添加多余表达" return f"""{prompt} \n\n小红书文案:\n\n{drafts}""" def get_deepseek_response(prompt, api_key): url = "https://api.deepseek.com/chat/completions" headers = { "Authorization": f"Bearer {api_key}", 'Content-Type': 'application/json', 'Accept': 'application/json', } payload = json.dumps({ "messages": [ { "content": prompt, "role": "user" } ], "model": "deepseek-reasoner", "frequency_penalty": 0, "max_tokens": 2048, "presence_penalty": 0, "response_format": { "type": "text" }, "stop": None, "stream": False, "stream_options": None, "temperature": 1, "top_p": 1, "tools": None, "tool_choice": "none", "logprobs": False, "top_logprobs": None }) response = None while not response: try: print("发送请求") response = requests.post(url, data=payload, headers=headers, timeout=200) response.raise_for_status() if not response.json(): response = None except requests.exceptions.RequestException as e: print(f"请求失败:{str(e)},开始重试...") response = None return response.json()['choices'][0]['message']['content'] def generate_html(): api_key = os.getenv("DEEPSEEK_API_KEY") today = datetime.now().strftime("%Y-%m-%d") file_path = "./xiaohongshu_drafts/小红书_推广文案_千战系列" + today +".txt" drafts = open(file_path, "r", encoding="utf-8").read() prompt = build_prompt(drafts=drafts) response = get_deepseek_response(prompt, api_key) print(response) with open('./pic_generate/pic.html', 'w', encoding='utf-8') as f: f.write(response)
-
截图:
''' Author: yeffky Date: 2025-02-14 09:41:09 LastEditTime: 2025-02-15 10:44:51 ''' from playwright.sync_api import sync_playwright import time import re def generate_pic(url): # 启动浏览器 player = sync_playwright().start() # 初始化Playwright并启动 chrome_driver = player.chromium # 获取Chromium浏览器实例 browser = chrome_driver.launch(headless=False) # 启动浏览器,headless=False表示以非无头模式启动,即显示浏览器窗口 context = browser.new_context() # 创建一个新的浏览器上下文(类似于一个新的浏览器窗口) page = context.new_page() # 在该上下文中创建一个新的页面(标签页) # 访问页面 card_cnt = 0 with(open('./pic_generate/pic.html', 'r', encoding='utf-8')) as f: page_content = f.read() card_cnt = len(re.findall(r'<div class="card" id="card\d+">', page_content)) print(card_cnt) page.goto(url) # 导航到指定的URL # 截取相关卡片的截图 for i in range(1, card_cnt + 1): card_pic = page.query_selector(f"id=card{i}") # 使用CSS选择器查找页面中的搜索按钮元素 card_pic.screenshot(path=f"./pictures/card{i}.png") # 对搜索按钮元素进行截图并保存为b.png # 停止访问 context.close() # 关闭浏览器上下文 browser.close() # 关闭浏览器 player.stop() # 停止Playwright if __name__ == '__main__': url = 'D:/Project/UUCrawl/Code/pic_generate/pic.html' generate_pic(url)
-
7.6. 自动化发布
- 目的:将生成的图片和文案自动发布到小红书
- 实现思路:
- 1.使用python中的selenium库,模拟页面操作,登陆后需要将cookie保存下来,下次使用时直接读取cookie,避免重复登陆。同时保存一份token,每次调用登录时检查token是否过期,如未过期则无需登录操作。
- 2.登录后,模拟页面操作前往发布页面,使用send_keys()方法输入标题和正文,使用click()方法点击发布按钮。
- 参考开源项目:xhs_ai_publisher
- 代码实现:
'''
Author: yeffky
Date: 2025-02-15 20:28:32
LastEditTime: 2025-02-17 14:08:45
'''
import sys
sys.path.append("./")
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from utils import line_process
import time
import json
import os
class XiaohongshuClient:
def __init__(self):
self.driver = webdriver.Chrome()
self.wait = WebDriverWait(self.driver, 10)
# 获取当前执行文件所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))
self.token_file = os.path.join(current_dir, "xiaohongshu_token.json")
self.cookies_file = os.path.join(current_dir, "xiaohongshu_cookies.json")
self.token = self._load_token()
self._load_cookies()
def _load_token(self):
"""从文件加载token"""
if os.path.exists(self.token_file):
try:
with open(self.token_file, 'r') as f:
token_data = json.load(f)
# 检查token是否过期
if token_data.get('expire_time', 0) > time.time():
return token_data.get('token')
except:
pass
return None
def _save_token(self, token):
"""保存token到文件"""
token_data = {
'token': token,
# token有效期设为30天
'expire_time': time.time() + 30 * 24 * 3600
}
with open(self.token_file, 'w') as f:
json.dump(token_data, f)
def _load_cookies(self):
"""从文件加载cookies"""
if os.path.exists(self.cookies_file):
try:
with open(self.cookies_file, 'r') as f:
cookies = json.load(f)
self.driver.get("https://creator.xiaohongshu.com")
for cookie in cookies:
self.driver.add_cookie(cookie)
except:
pass
def _save_cookies(self):
"""保存cookies到文件"""
cookies = self.driver.get_cookies()
with open(self.cookies_file, 'w') as f:
json.dump(cookies, f)
def login(self, phone, country_code="+86"):
"""登录小红书"""
# 如果token有效则直接返回
if self.token:
return
# 尝试加载cookies进行登录
self.driver.get("https://creator.xiaohongshu.com/login")
self._load_cookies()
self.driver.refresh()
time.sleep(3)
# 检查是否已经登录
if self.driver.current_url != "https://creator.xiaohongshu.com/login":
print("使用cookies登录成功")
self.token = self._load_token()
self._save_cookies()
time.sleep(2)
return
else:
# 清理无效的cookies
self.driver.delete_all_cookies()
print("无效的cookies,已清理")
# 如果cookies登录失败,则进行手动登录
self.driver.get("https://creator.xiaohongshu.com/login")
# 等待登录页面加载完成
time.sleep(5)
# 点击国家区号输入框
country_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='请选择选项']")))
country_input.click()
time.sleep(5)
# 等待区号列表出现并点击+886
# 等待区号列表出现并点击+86
try:
self.driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div/div[2]/div[1]/div[2]/div/div/div/div/div/div[2]/div[1]/div[1]/div/div/div[1]/input").click()
time.sleep(3)
self.driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div/div[2]/div[1]/div[2]/div/div/div/div/div/div[2]/div[1]/div[1]/div/div/div[1]/input").send_keys(country_code)
time.sleep(3)
# self.driver.find_element(By.XPATH, "/html/body/div[6]/div/div").click()
# china_option = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//div[contains(@class, 'css-cqcgee')]//div[contains(text(), '+86')]")))
time.sleep(5)
except Exception as e:
print("无法找到国家区号选项")
print(e)
# 定位手机号输入框
phone_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='手机号']")))
phone_input.clear()
phone_input.send_keys(phone)
# 点击发送验证码按钮
try:
send_code_btn = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".css-uyobdj")))
send_code_btn.click()
except:
# 尝试其他可能的选择器
try:
send_code_btn = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".css-1vfl29")))
send_code_btn.click()
except:
try:
send_code_btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'发送验证码')]")))
send_code_btn.click()
except:
print("无法找到发送验证码按钮")
# 输入验证码
verification_code = input("请输入验证码: ")
code_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='验证码']")))
code_input.clear()
code_input.send_keys(verification_code)
# 点击登录按钮
login_button = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".beer-login-btn")))
login_button.click()
# 等待登录成功,获取token
time.sleep(3)
# 保存cookies
self._save_cookies()
# 关闭浏览器
# self.driver.quit()
# print(f"获取到的token: {token}")
# if token:
# self._save_token(token)
# self.token = token
# else:
# print("未能获取到token")
def post_article(self, title, content, images=None):
"""发布文章
Args:
title: 文章标题
content: 文章内容
images: 图片路径列表
"""
# 如果token失效则重新登录
# 设置token
# self.driver.execute_script(f'localStorage.setItem("token", "{self.token}")')
time.sleep(3)
print("点击发布按钮")
# 点击发布按钮
publish_btn = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".btn.el-tooltip__trigger.el-tooltip__trigger")))
publish_btn.click()
# 如果是发布视频,则不操作这一步
# 切换到上传图文
time.sleep(3)
tabs = self.driver.find_elements(By.CSS_SELECTOR, ".creator-tab")
if len(tabs) > 1:
tabs[1].click()
time.sleep(3)
# # 输入标题和内容
# title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".title-input")))
# content_input = self.driver.find_element(By.CSS_SELECTOR, ".content-input")
# title_input.send_keys(title)
# content_input.send_keys(content)
# 上传图片
if images:
upload_input = self.driver.find_element(By.CSS_SELECTOR,'input[type="file"]')
# 将所有图片路径用\n连接成一个字符串一次性上传
upload_input.send_keys('\n'.join(images))
time.sleep(1)
time.sleep(3)
JS_ADD_TEXT_TO_INPUT = """
var elm = arguments[0], txt = arguments[1];
elm.value += txt;
elm.dispatchEvent(new Event('change'));
"""
title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".d-text")))
self.driver.execute_script(JS_ADD_TEXT_TO_INPUT, title_input, title)
# title_input.send_keys(title)
# Start of Selection
# Start of Selection
print(content)
JS_ADD_TEXT_TO_P = """
var elm = arguments[0], txt = arguments[1];
elm.textContent = txt;
"""
content_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".ql-editor")))
p_element = content_input.find_element(By.CSS_SELECTOR, "p")
print(p_element)
self.driver.execute_script(JS_ADD_TEXT_TO_P, p_element, content)
# content_input.send_keys(123)
# 发布
time.sleep(600)
submit_btn = self.driver.find_element(By.CSS_SELECTOR, ".el-button.publishBtn")
submit_btn.click()
def close(self):
"""关闭浏览器"""
self.driver.quit()
def post_article():
poster = XiaohongshuClient()
phone = open('./docs/phone.txt').read()
poster.login(phone)
print("登录成功")
print("开始发布文章")
print(os.getcwd())
title = open('./xiaohongshu_drafts/小红书_推广文案_千战系列2025-02-15.txt', 'r', encoding='utf-8').readline()
article = line_process.get_article('./xiaohongshu_drafts/小红书_推广文案_千战系列2025-02-15.txt')
print(article)
images = os.listdir('./pictures')
images = map(lambda x: os.path.join(r"D:\Project\UUCrawl\Code\pictures", x), images)
poster.post_article(title, article, images)
poster.close()
7.7. 主程序
from crawler import ip_crawler, data_crawler
from analysis import data_analysis
from pic_generate import pic_generate, html_generate
from post import xiaohongshu_post
import file_handler
if __name__ == '__main__':
url = 'D:/Project/UUCrawl/Code/pic_generate/pic.html'
# 获取IP
ip = ip_crawler.crawl_ip()
# 获取数据
data = data_crawler.crawl_data()
# 数据分析
data_analysis.analysis_data()
file_handler.start_observer()
# 生成html
html_generate.generate_html()
# 生成图片
pic_generate.generate_pic(url)
# 发布小红书
xiaohongshu_post.post_article()