channels官方文档:Django Channels — Channels 4.0.0 documentation
效果如下:
主要实现功能
基于Django的认证的群聊
具体实现
当建立websocket的时候,建立之前是http消息,我们可以拿到http消息里面的cookie等信息进行认证,本次使用的是jwt认证,因此需要在建立的连接的时候,将jwt认证信息通过cookie发送给后端,进行通信认证,具体方法如下:
from rest_framework_simplejwt.tokens import AccessToken
from django.http.cookie import parse_cookie
from rest_framework_simplejwt.authentication import JWTAuthentication
class ServerAccessToken(AccessToken):
"""
自定义的token方法是为了登出的时候,将 access token 禁用
"""
def verify(self):
user_id = self.payload.get('user_id')
if BlackAccessTokenCache(user_id, hashlib.md5(self.token).hexdigest()).get_storage_cache():
raise TokenError('Token is invalid or expired')
super().verify()
class CookieJWTAuthentication(JWTAuthentication):
"""
支持cookie认证,是为了可以访问 django-proxy 的页面,比如 flower
"""
def get_header(self, request):
header = super().get_header(request)
if not header:
cookies = request.META.get('HTTP_COOKIE')
if cookies:
cookie_dict = parse_cookie(cookies)
header = f"Bearer {cookie_dict.get('X-Token')}".encode('utf-8')
return header
async def token_auth(scope):
cookies = scope.get('cookies')
if cookies:
token = f"{cookies.get('X-Token')}".encode('utf-8')
if token:
try:
auth_class = CookieJWTAuthentication()
validated_token = ServerAccessToken(token)
return True, await sync_to_async(auth_class.get_user)(validated_token)
except TokenError as e:
return False, e.args[0]
return False, False
然后再建立连接的时候,进行一个认证
class MessageNotify(AsyncJsonWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(args, kwargs)
self.room_group_name = None
self.disconnected = True
self.username = ""
async def connect(self):
status, user_obj = await token_auth(self.scope)
if not status:
logger.error(f"auth failed {user_obj}")
# https://developer.mozilla.org/zh-CN/docs/Web/API/CloseEvent#status_codes
await self.close(4401)
具体群聊核心代码如下:
class MessageNotify(AsyncJsonWebsocketConsumer):
def __init__(self, *args, **kwargs):
super().__init__(args, kwargs)
self.room_group_name = None
self.disconnected = True
self.username = ""
async def connect(self):
status, user_obj = await token_auth(self.scope)
if not status:
logger.error(f"auth failed {user_obj}")
# https://developer.mozilla.org/zh-CN/docs/Web/API/CloseEvent#status_codes
await self.close(4401)
else:
logger.info(f"{user_obj} connect success")
room_name = self.scope["url_route"]["kwargs"].get('room_name')
username = self.scope["url_route"]["kwargs"].get('username')
# data = verify_token(token, room_name, success_once=True)
if username and room_name:
self.disconnected = False
self.username = username
self.room_group_name = f"message_{room_name}"
# Join room group
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
else:
logger.error(f"room_name:{room_name} token:{username} auth failed")
await self.close()
async def disconnect(self, close_code):
self.disconnected = True
if self.room_group_name:
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
# Receive message from WebSocket
async def receive_json(self, content, **kwargs):
action = content.get('action')
if not action:
await self.close()
data = content.get('data', {})
if action == "message":
data['uid'] = self.channel_name
data['username'] = self.username
# Send message to room group
await self.channel_layer.group_send(
self.room_group_name, {"type": "chat_message", "data": data}
)
else:
await self.channel_layer.send(self.channel_name, {"type": action, "data": data})
async def userinfo(self, event):
data = {
'username': self.username,
'uid': self.channel_name
}
await self.send_data('userinfo', {'data': data})
async def chat_message(self, event):
data = event["data"]
data['time'] = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
# Send message to WebSocket
await self.send_data('message', {'data': data})
async def send_data(self, action, content, close=False):
data = {
'time': time.time(),
'action': action
}
data.update(content)
return await super().send_json(data, close)
中间还涉及消息队列,本次使用的是基于redis的消息队列,需要在settings.py进行配置
CHANNEL_LAYERS = {
"default": {
# "BACKEND": "channels_redis.core.RedisChannelLayer",
"BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
"CONFIG": {
"hosts": [f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{CHANNEL_LAYERS_CACHE_ID}"],
},
},
}
代码已经开源,GitHub地址:GitHub - nineaiyu/xadmin-server: xadmin-基于Django+vue3的rbac权限管理系统