最近在用sanic框架做项目,今天需要处理多进程共享缓存问题,在网上搜索了很多,知道使用multiprocessing模块,但是导入后,直接使用会报错,然后看官网解决问题。
直接看官方文档点我哦
大致意思如下:
Python provides a few methods for exchanging objects(opens new window), synchronizing(opens new window), and sharing state(opens new window) between processes. This usually involves objects from the multiprocessing and ctypes modules.
If you are familiar with these objects and how to work with them, you will be happy to know that Sanic provides an API for sharing these objects between your worker processes. If you are not familiar, you are encouraged to read through the Python documentation linked above and try some of the examples before proceeding with implementing shared context.
Similar to how application context allows an applicaiton to share state across the lifetime of the application with app.ctx, shared context provides the same for the special objects mentioned above. This context is available as app.shared_ctx and should ONLY be used to share objects intended for this purpose.
The shared_ctx will:
NOT share regular objects like int, dict, or list
NOT share state between Sanic instances running on different machines
NOT share state to non-worker processes
only share state between server workers managed by the same Manager
Attaching an inappropriate object to shared_ctx will likely result in a warning, and not an error. You should be careful to not accidentally add an unsafe object to shared_ctx as it may not work as expected. If you are directed here because of one of those warnings, you might have accidentally used an unsafe object in shared_ctx.
In order to create a shared object you must create it in the main process and attach it inside of the main_process_start listener.
翻译过来如下:
一个小例子
import multiprocessing
from sanic import HTTPResponse, Sanic, response
from sanic.log import logger
app = Sanic("Hw-Licence-System")
app.config.REQUEST_TIMEOUT = 180
# 创建共享的Manager对象
@app.main_process_start
async def main_process_start(app):
app.shared_ctx.cache = multiprocessing.Manager().dict()
@app.route("/api/v1/get_data", methods=["GET"])
async def get_data(request):
product_name = request.args.get("product_name")
shared_cache = request.app.shared_ctx.cache
# 尝试从共享缓存中获取数据
if product_name in shared_cache:
data = shared_cache[product_name]
return response.json({"status": True, "data": data})
# 存储到缓存
logger.info("get data from server")
shared_cache[product_name] = "123"
# 获取数据并返回
if product_name in shared_cache:
data = shared_cache[product_name]
return response.json({"status": True, "data": data})
else:
return response.json({"status": False, "message": "Data not found"})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=3000, workers=4)