1 Django中集成celery
- web项目- - 》用浏览器访问
- 定时任务
- 延迟任务
- 异步任务
1.1 安装模块
pip install Django== 3.2 .22
pip install celery
pip install redis
pip install eventlet
1.2 在项目路径下创建 celery.py
import os
from celery import Celery
os. environ. setdefault( 'DJANGO_SETTINGS_MODULE' , 'django_celery_crawl.settings' )
app = Celery( 'proj' )
app. config_from_object( 'django.conf:settings' )
app. autodiscover_tasks( )
1.3 django 的配置文件 settings.py
BROKER_URL = 'redis://127.0.0.1:6379/1'
RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
ACCEPT_CONTENT = [ 'json' ]
TASK_SERIALIZER = 'json'
RESULT_SERIALIZER = 'json'
TASK_RESULT_EXPIRES = 60 * 60 * 24
TIMEZONE = 'Asia/Shanghai'
1.4 以后只需要再不同app中,写tasks.py
from celery import shared_task
@shared_task
def add ( a, b) :
return a + b
1.5 在项目目录下的 init .py中加入
from . celery import app as celery_app
__all__ = ( 'celery_app' , )
1.6 启动celery的worker-写一个提交异步任务的视图函数
1.6.1 启动worker
celery - A django_celery_crawl worker - l debug - P eventlet
1.6.2 写一个视图函数测试
from django. contrib import admin
from django. urls import path, include
urlpatterns = [
path( 'admin/' , admin. site. urls) ,
path( 'app01/' , include( 'app01.urls' ) ) ,
]
from django. contrib import admin
from django. urls import path
from . views import send_mail_view
urlpatterns = [
path( 'send/' , send_mail_view) ,
]
from django. shortcuts import render, HttpResponse
from . tasks import send_email
def send_mail_view ( request) :
to_user = request. GET. get( 'user' )
res = send_email. delay( to_user)
print ( res. id )
return HttpResponse( '邮件已发送,id号为%s' % res. id )
1.6.3 在浏览器输入:
http: // 127.0 .0 .1 : 8000 / app01/ send/ ?user= 616564099 @qq. com
1.7 celery 架构图
2 实现定时任务
2.1 django的配置文件配置
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'every_5_second' : {
'task' : 'app01.tasks.add' ,
'schedule' : timedelta( seconds= 5 ) ,
'args' : ( 33 , 44 ) ,
}
}
2.2 重启worker—启动beat
celery - A django_celery_crawl worker - l debug - P eventlet
celery - A django_celery_crawl beat - l debug
3 通过Admin 后台管理添加定时任务
如果要再新加一个定时任务(爬美女图片)
我们只能修改代码 :settings. py- - > 加入代码
重启worker,重启beat- - 》才能行
有些麻烦
- Admin 后台管理添加定时任务
3.1 引入–安装djiango-celery-beat
pip install django- celery- beat
3.2 在app中注册djiango-celery-beat
INSTALLED_APPS = [
. . .
'app01' ,
'django_celery_beat'
]
3.3 配置django的时区
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_L10N = True
USE_TZ = False
3.4 在setting中配置调度器
CELERYBEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'
3.5 数据迁移
DATABASES = {
'default' : {
'ENGINE' : 'django.db.backends.mysql' ,
'NAME' : 'django_celery_crawl' ,
'HOST' : '127.0.0.1' ,
'PORT' : 3306 ,
'USER' : 'root' ,
'PASSWORD' : '123'
}
}
pip install mysqlclient
python manage. py makemigrations
python manage. py migrate
3.6 使用admin后台管理插入数据
- 可以使用navicat插入
- django提供了一个后台管理- - 》登录到后台管理,可以图形化界面录入数据
- 但是需要个账号登录: 命令创建账号
python manage. py createsuperuser
- 我创建的是:
账号: admin
密码 : 123456
3.7 美化admin
https: // gitee. com/ tompeppa/ simpleui
https: // newpanjing. github. io/ simpleui_docs/ config. html
pip3 install django- simpleui
INSTALLED_APPS = [
'simpleui' ,
. . .
]
3.8 手动添加任务
celery - A django_celery_crawl worker - l debug - P eventlet
celery - A django_celery_crawl beat - l debug
4 通过Admin查看任务运行情况
这里要用到django- celery- results插件。
通过插件可以使用Django的orm作为结果存储,这样的好处在于我们可以直接通过django的数据查看到任务状态,同时为可以制定更多的操作
4.1 安装 django-celery-results
pip install django- celery- results
4.2 app中注册
INSTALLED_APPS = (
. . . ,
'django_celery_results' ,
)
4.3 修改配置文件
CELERY_RESULT_BACKEND = 'django-db'
4.4 迁移数据库
python manage. py makemigrations
python manage. py migrate
4.5 在admin中查看即可
5 通过Flower监控celery运行情况
如果不想通django的管理界面监控任务的执行,还可以通过Flower插件来进行任务的监控。Flower的界面更加丰富,可以监控的信息更全
Flower 是一个用于监控和管理 Celery 集群的开源 Web 应用程序。它提供有关 Celery workers 和tasks状态的实时信息
1 实时监控celery的Events
- 查看任务进度和历史记录
- 查看任务详细信息(参数、开始时间、运行时间等)
2 远程操作
- 查看workers 状态和统计数据
- 关闭并重新启动workers 实例
- 控制工作池大小和自动缩放设置
- 查看和修改工作实例消耗的队列
- 查看当前正在运行的任务
- 查看计划任务(预计到达时间/ 倒计时)
- 查看保留和撤销的任务
- 应用时间和速率限制
- 撤销或终止任务
3 Broker 监控
- 查看所有 Celery 队列的统计信息
5.1 修改结果存储为redis
RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
5.2 安装flower和启动
pip install flower
celery - A django_celery_crawl flower - - port- 5555
http: // 127.0 .0 .1 : 5555 /
- 一般先启动flower
6 任务执行成功或失败告警(邮件)
虽然可以通过界面来监控了,但是我们想要得更多,人不可能天天盯着界面看吧,如果能实现任务执行失败或成功就自动发邮件告警就好了。这个Celery当然也是没有问题的
6.1 task.py 中加入
import time
from celery import shared_task
from celery import Task
from django. core. mail import send_mail
from django. conf import settings
class SendEmailTask ( Task) :
def on_success ( self, retval, task_id, args, kwargs) :
info = f'爬虫任务成功-- 任务id是: { task_id} , 参数是: { args} , 执行成功 !'
send_mail( 'celery监控成功告警' , info, settings. EMAIL_HOST_USER, [ '616564099@qq.com' ] )
def on_failure ( self, exc, task_id, args, kwargs, einfo) :
info = f'爬虫任务失败-- 任务id是: { task_id} , 参数是: { args} , 执行失败,请去flower中查看原因 !'
send_mail( 'celery监控爬虫失败告警' , info, settings. EMAIL_HOST_USER, [ '616564099@qq.com' ] )
def on_retry ( self, exc, task_id, args, kwargs, einfo) :
print ( '重试了!!!' )
@shared_task
def add ( a, b) :
return a + b
@shared_task
def send_email ( to_user) :
time. sleep( 2 )
return '发送邮件成功:%s' % to_user
@shared_task ( base= SendEmailTask, bind= True )
def crawl_cnblogs ( self) :
print ( '爬去cnblogs网站技术博客了' )
return True
6.2 setting中 邮箱配置
EMAIL_HOST = 'smtp.qq.com'
EMAIL_PORT = 465
EMAIL_HOST_USER = ''
EMAIL_HOST_PASSWORD = ''
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
EMAIL_USE_SSL = True
6.3 在admin中添加任务执行
7 定时爬取技术类文章-邮件通知
- 爬取它首页的推荐文章
- 文章标题
- 文章作者
- 文章url地址
- 文章摘要
- 每隔一天爬一次
- 可能会重复- - 》去重
- pip install requests
- pip install beautifulsoup4
7.1 models.py 创建表
from django. db import models
class Article ( models. Model) :
title = models. CharField( max_length= 64 )
url = models. CharField( max_length= 64 )
author = models. CharField( max_length= 64 )
desc = models. TextField( )
python manage. py makemigrations
python manage. py migrate
7.2 在task中写
import time
from celery import shared_task
from celery import Task
from django. core. mail import send_mail
from django. conf import settings
class SendEmailTask ( Task) :
def on_success ( self, retval, task_id, args, kwargs) :
info = f'爬虫任务成功-- 任务id是: { task_id} , 参数是: { args} , 执行成功 !'
send_mail( 'celery监控成功告警' , info, settings. EMAIL_HOST_USER, [ '616564099@qq.com' ] )
def on_failure ( self, exc, task_id, args, kwargs, einfo) :
info = f'爬虫任务失败-- 任务id是: { task_id} , 参数是: { args} , 执行失败,请去flower中查看原因 !'
send_mail( 'celery监控爬虫失败告警' , info, settings. EMAIL_HOST_USER, [ '616564099@qq.com' ] )
def on_retry ( self, exc, task_id, args, kwargs, einfo) :
print ( '重试了!!!' )
import requests
from bs4 import BeautifulSoup
from . models import Article
from redis import Redis
@shared_task ( base= SendEmailTask, bind= True )
def crawl_cnblogs ( self) :
conn = Redis( host= '127.0.0.1' , port= 6379 )
res = requests. get( 'https://www.cnblogs.com/' )
soup = BeautifulSoup( res. text, 'html.parser' )
article_list = soup. find_all( name= 'article' , class_= 'post-item' )
for article in article_list:
title = article. find( name= 'a' , class_= 'post-item-title' ) . text
url = article. find( name= 'a' , class_= 'post-item-title' ) . attrs. get( 'href' )
author = article. find( name= 'a' , class_= 'post-item-author' ) . span. text
desc = article. find( name= 'p' , class_= 'post-item-summary' ) . text. strip( )
print ( f'''
文章名字: { title}
文章地址: { url}
文章作者: { author}
文章摘要: { desc}
''' )
res = conn. sadd( 'urls' , url)
if res:
Article. objects. create( title= title, url= url, author= author, desc= desc)
return True
7.3 以后只需要在后台管理添加定时任务
- 所有服务都启动,不用管了
8 定时爬取美女图片-告警案例
- 爬给定的页面中的图片保存到本地
https: // pic. netbian. com/ tupian/ 33985 . html
https: // pic. netbian. com/ tupian/ 32967 . html
import os
@shared_task ( base= SendEmailTask, bind= True )
def crawl_photo ( self, url) :
res = requests. get( url)
res. encoding = 'gbk'
soup = BeautifulSoup( res. text, 'html.parser' )
ul = soup. find( 'ul' , class_= 'clearfix' )
img_list = ul. find_all( name= 'img' , src= True )
for img in img_list:
try :
url = img. attrs. get( 'src' )
if not url. startswith( 'http' ) :
url = 'https://pic.netbian.com' + url
print ( url)
res1 = requests. get( url)
name = url. split( '-' ) [ - 1 ]
with open ( os. path. join( settings. BASE_DIR, 'img' , name) , 'wb' ) as f:
for line in res1. iter_content( ) :
f. write( line)
except Exception as e:
continue