源码地址:
https://gitee.com/liuhaizhang/django2-configuring-websockethttps://gitee.com/liuhaizhang/django2-configuring-websocket
python3.9.0
django==2.2.1
channels==2.2.0
项目结构:
test_websocket_django2
-chat
-home
-test_websocket_django2
-manage.py
一、配置settings.py
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'channels', #1、注册channels
'chat.apps.ChatConfig',
'home.apps.HomeConfig',
]
WSGI_APPLICATION = 'test_websocket_django2.wsgi.application'
#2、新增ASGI应用
ASGI_APPLICATION = 'test_websocket_django2.asgi.application'
二、chat应用:所有websocket都放在这里
新建
routings.py : 存放websocket路由
from django.urls import path
from . import consumers
# 这个变量是存放websocket的路由
websocket_urlpatterns = [
path('chat/socket/', consumers.ChatView),
]
consumers.py :写websocket的类
from channels.generic.websocket import WebsocketConsumer
from channels.exceptions import StopConsumer
from asgiref.sync import async_to_sync
import time
class ChatView(WebsocketConsumer):
def websocket_connect(self, message):
# 客户端与服务端进行握手时,会触发这个方法
# 服务端允许客户端进行连接,就是握手成功
self.accept()
def websocket_receive(self, message):
# 接收到客户端发送的数据
recv = message.get('text')
print('接收到的数据,', recv)
if recv == 'close':
# 服务的主动断开连接
print('服务器断开连接')
self.close()
else:
# 客户端向服务端发送数据
self.send(f'我收到了,{time.strftime("%Y-%m-%d %H:%M:%S")}')
def websocket_disconnect(self, message):
# 客户端端口连接时,会触发该方法,断开连接
print('客户端断开连接')
raise StopConsumer()
三、在settings.py同级目录下创建asgi.py
写入内容:
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import chat.routings #导入websocket的路由
application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack(
URLRouter(
chat.routings.websocket_urlpatterns, #把websocket的路由注册进去
)
),
})
四、启动项目,测试
1、启动
python manage.py runsever
启动显示,应该是这样:ASGI/Channels
2、测试
测试网站:
EasySwoole-WebSocket在线测试工具