首先安装celery
pip install celery
安装redis
一、Redis for Windows下载
之前微软维护了一份Windows版本的Redis,但是版本停留在3.2,并且也关闭了项目更新渠道。这里我们使用另外一位大神提供的Windows Redis,更新及时,用户量也很大。
下载地址为:https://github.com/tporadowski/redis/releases
我们选择下载Redis-x64-5.0.14.msi
二、安装Redis:
这里以图文的形式讲解Redis的安装过程。首先双击Redis-x64-5.0.14.msi,然后点击运行。
三、Redis使用:
默认情况下,在安装完Redis后,就已经启动了Redis服务。如果想要手动启动Redis,则可以打开cmd,然后输入以下命令:
其中path/to/redis.windows.conf是存放redis.windows.conf的路径,默认是在Redis安装路径下。比如:
启动Redis后,可以通过redis-cli命令连接到Redis服务,然后输入命令操作了。比如:
至此我们就完成了Redis的安装和基本使用。
项目安装redis
pip install hiredis
pip install redis
启动命令
celery -A app.mycelery worker --loglevel=info
虽然启动了但是不起作用
另外在windows下运行celery需要安装一个第三方库gevent
pip install gevent
#重新启动
celery -A app.mycelery worker --loglevel=info -P gevent
# @bp.get("/email/captcha")
# def email_captcha():
# # /email/captcha?email=xx@q.com
# email = request.args.get('email')
# if not email:
# return jsonify({'code':400,'message':'请先传入邮箱!'})
# # 产生随机六位数字
# source = list(string.digits)
# captcha = "".join(random.sample(source,6))
# # subject 主题 recipients 接收者
# message = Message(subject='【滴滴】注册验证码',recipients=[email],body='【滴滴】您的注册验证码为:%s'%captcha)
# try:
# mail.send(message)
# except Exception as e:
# print('邮件发送失败!')
# print(e)
# return jsonify({'code':500,'message':'邮件发送失败'})
# return jsonify({'code': 200, 'message': '邮件发送成功'})
@bp.get("/email/captcha")
def email_captcha():
# /email/captcha?email=xx@q.com
email = request.args.get('email')
if not email:
return jsonify({'code':400,'message':'请先传入邮箱!'})
# 产生随机六位数字
source = list(string.digits)
captcha = "".join(random.sample(source,6))
subject = '【滴滴】注册验证码'
body = '【滴滴】您的注册验证码为:%s' % captcha
# subject 主题 recipients 接收者
current_app.celery.send_task("send_mail",(email,subject,body))
return jsonify({'code': 200, 'message': '邮件发送成功'})
from flask_mail import Message
from exts import mail
from celery import Celery
# 定义任务函数 发送给谁 主题 内容
def send_mail(recipient, subject, body):
message = Message(subject=subject, recipients=[recipient], body=body)
try:
mail.send(message)
return {"status": "SUCCESS"}
except Exception:
return {"status": "FAILURE"}
# 创建celery对象工厂函数
def make_celery(app):
celery = Celery(app.import_name, backend=app.config['CELERY_RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
app.celery = celery
# 添加任务
celery.task(name="send_mail")(send_mail)
return celery
发送邮件的同时缓存验证码