python的websocket方法教程

news2024/11/28 3:50:45

WebSocket是一种网络通信协议,它在单个TCP连接上提供全双工的通信信道。在本篇文章中,我们将探讨如何在Python中使用WebSocket实现实时通信。

websockets是Python中最常用的网络库之一,也是websocket协议的Python实现。它不仅作为基础组件在众多项目中发挥着重要作用,其源码也值得广大“Python玩家”研究。
官网:https://github.com/python-websockets/websockets

1. 什么是WebSocket?

WebSocket协议是在2008年由Web应用程序设计师和开发人员创建的,目的是为了在Web浏览器和服务器之间提供更高效、更低延迟的双向通信。它允许客户端和服务器在任何时候发送消息,无需重新建立TCP连接。WebSocket可以在Web浏览器和服务器之间传输文本和二进制数据,使得构建实时Web应用程序变得更加简单。

2. 在Python中使用WebSocket

Python中有多个库可以帮助我们使用WebSocket,如:websockets、aiohttp等。在本文中,我们将使用websockets库来演示WebSocket编程。

要安装websockets库,你可以使用pip:

pip install websockets

3. 创建WebSocket服务器

使用websockets库,我们可以轻松地创建一个WebSocket服务器。以下是一个简单的示例:

import asyncio
import websockets

async def echo(websocket, path):
    async for message in websocket:
        print(f"Received message: {message}")
        await websocket.send(f"Echo: {message}")

start_server = websockets.serve(echo, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在这个示例中,我们定义了一个名为echo的协程函数,它接收两个参数:websocket和path。该函数使用async for循环读取客户端发送的消息,并将消息发送回客户端。

然后,我们使用websockets.serve()函数创建一个WebSocket服务器,监听本地主机的8765端口。最后,我们使用asyncio的事件循环启动服务器。

4. 创建WebSocket客户端

要创建一个WebSocket客户端,我们同样可以使用websockets库。以下是一个简单的客户端示例:

import asyncio
import websockets

async def main():
    async with websockets.connect("ws://localhost:8765") as websocket:
        message = "Hello, server!"
        await websocket.send(message)
        print(f"Sent: {message}")

        response = await websocket.recv()
        print(f"Received: {response}")

asyncio.run(main())

在这个示例中,我们使用websockets.connect()函数建立与WebSocket服务器的连接。然后,我们使用send()方法向服务器发送消息,并使用recv()方法接收服务器的响应。

5. 总结

WebSocket协议为Web浏览器和服务器之间提供了实时双向通信的能力,使得构建实时Web应用程序变得更加容易。在Python中,我们可以使用websockets库轻松地实现WebSocket编程。

6. 通过websockets这个项目,从大型开源项目中学习asyncio库。

一、asyncio.Transport
在官方文档中,Transport被描述成对socket的抽象,它控制着如何传输数据。除了websockets,uvicorn、daphne等ASGI实现都会用到Transport。

Transport继承于ReadTransport和WriteTransport,两者都继承于BaseTransport。顾名思义,Transport兼备读和写的功能,可以类比为读写socket对象。
在这里插入图片描述

Transport对象提供以下常用函数——

is_reading:判断该Transport是否在读。

set_write_buffer_limits:设置写入Transport的高和低水位。考虑到网络状况,有时不希望写入过多的数据。

write、write_eof、write_line:为当前Transport写入数据,分别表示写入二进制数据、eof和二进制行数据。其中eof写入后不会关闭Transport,但会flush数据。

abort:立刻关闭Transport,不接受新的数据。留在缓冲的数据也会丢失,后续调用Protocol的connection_lost函数。

在websockets中,Transport使用场景不多,一般都是通过Protocol对象的回调参数使用的。在websocket的初始化过程中,会设置Transport的最高水位。同样,在这种场景下,该对象也是作为回调参数使用的。
在这里插入图片描述

二、asyncio.Protocol
如果Transport是对socket的抽象,那么Protocol就是对协议的抽象。它提供了如何使用Transport的方式。
在这里插入图片描述

用户使用的Protocol直接继承自BaseProtocol,并提供了六个Unimplemented函数需要用户去实现——

connection_made:当连接建立时会执行该函数,该函数包含一个Transport类型的参数。

connection_lost:当连接丢失或者关闭时会执行该函数,该函数包含一个Exception类型的参数。

pause_writing:当Transport对象写入的数据高于之前设置的高水位时被调用,一般会暂停数据的写入。

resume_writing:当Transport对象写入的数据低于之前设置的低水位时被调用,一般用于恢复数据写入。

data_received:当有数据被接受时回调,该函数包含一个二进制对象data,用来表示接受的数据。

eof_received:当被Transport对象被调用write_eof时被调用。

在websockets中,server端的connection_made实现截图如图所示。在该函数中,websockets将用户实现的handler封装成task对象,并和websocket的server绑定。
在这里插入图片描述

而在client端中实现如第一节截图所示,只是在reader中注册该Transport对象。

websockets的connection_lost函数实现方式如下。主要操作即更新状态、关闭pings、更新对应的waiter状态,以及维护reader对象。
在这里插入图片描述

在其他函数的实现中,websockets也主要用到了reader对象完成数据流的暂停和恢复,以及数据的写入。

从上面代码实现可以看出,websockets通过reader代理完成数据流的操作。这个reader是一个asyncio.StreamReader对象。这个对象具体如何使用将在下一篇介绍。

附录:进阶版本:

python使用websockets库
serve:在server端使用,等待客户端的连接。如果连接成功,返回一个websocket。

connect: 在client端使用,用于建立连接。

send:发送数据

recv:接收数据

close:关闭连接

服务端

#!/usr/bin/python3
# 主要功能:创建1个基本的websocket server, 符合asyncio 开发要求
import asyncio
import websockets
from datetime import datetime


async def handler(websocket):
    data = await websocket.recv()
    reply = f"Data received as \"{data}\".  time: {datetime.now()}"
    print(reply)
    await websocket.send(reply)
    print("Send reply")


async def main():
    async with websockets.serve(handler, "localhost", 9999):
        await asyncio.Future()  # run forever

if __name__ == "__main__":
    asyncio.run(main())

客户端

import asyncio
import websockets
import time


async def ws_client(url):
    for i in range(1, 40):
        async with websockets.connect(url) as websocket:
            await websocket.send("Hello, I am PyPy.")
            response = await websocket.recv()
        print(response)
        time.sleep(1)

asyncio.run(ws_client('ws://localhost:9999'))

服务端

import asyncio
import websockets

IP_ADDR = "127.0.0.1"
IP_PORT = "9090"

# 握手,通过接收Hi,发送"success"来进行双方的握手。
async def serverHands(websocket):
    while True:
        recv_text = await websocket.recv()
        print("recv_text=" + recv_text)
        if recv_text == "Hi":
            print("connected success")
            await websocket.send("success")
            return True
        else:
            await websocket.send("connected fail")



# 接收从客户端发来的消息并处理,再返给客户端success
async def serverRecv(websocket):
    while True:
        recv_text = await websocket.recv()
        print("recv:", recv_text)
        await websocket.send("success,get mess:"+ recv_text)


# 握手并且接收数据
async def serverRun(websocket, path):
    print(path)
    await serverHands(websocket)
    await serverRecv(websocket)


# main function
if __name__ == '__main__':
    print("======server======")
    server = websockets.serve(serverRun, IP_ADDR, IP_PORT)
    asyncio.get_event_loop().run_until_complete(server)
    asyncio.get_event_loop().run_forever()

客户端

import asyncio
import websockets

IP_ADDR = "127.0.0.1"
IP_PORT = "9090"



async def clientHands(websocket):
    while True:
        # 通过发送hello握手
        await websocket.send("Hi")
        response_str = await websocket.recv()
        # 接收"success"来进行双方的握手
        if "success" in response_str:
            print("握手成功")
            return True


# 向服务器端发送消息
async def clientSend(websocket):
    while True:
        input_text = input("input text: ")
        if input_text == "exit":
            print(f'"exit", bye!')
            await websocket.close(reason="exit")
            return False
        await websocket.send(input_text)
        recv_text = await websocket.recv()
        print(f"{recv_text}")


# 进行websocket连接
async def clientRun():
    ipaddress = IP_ADDR + ":" + IP_PORT
    async with websockets.connect("ws://" + ipaddress) as websocket:
        await clientHands(websocket)
        await clientSend(websocket)


# main function
if __name__ == '__main__':
    print("======client======")
    asyncio.get_event_loop().run_until_complete(clientRun())

服务端

# -*- coding:utf8 -*-

import json
import socket
import asyncio
import logging
import websockets
import multiprocessing

IP = '127.0.0.1'
PORT_CHAT = 9090

USERS ={}

#提供聊天的后台
async def ServerWs(websocket,path):
    logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
                        filename="chat.log",
                        level=logging.INFO)
    # 握手
    await websocket.send(json.dumps({"type": "handshake"}))
    async for message in websocket:
        data = json.loads(message)
        message = ''
        # 用户发信息
        if data["type"] == 'send':
            name = '404'
            for k, v in USERS.items():
                if v == websocket:
                    name = k
            data["from"] = name
            if len(USERS) != 0:  # asyncio.wait doesn't accept an empty list
                message = json.dumps(
                    {"type": "user", "content": data["content"], "from": name})
        # 用户注册
        elif data["type"] == 'register':
            try:
                USERS[data["uuid"]] = websocket
                if len(USERS) != 0:  # asyncio.wait doesn't accept an empty list
                    message = json.dumps(
                        {"type": "login", "content": data["content"], "user_list": list(USERS.keys())})
            except Exception as exp:
                print(exp)
        # 用户注销
        elif data["type"] == 'unregister':
            del USERS[data["uuid"]]
            if len(USERS) != 0:  # asyncio.wait doesn't accept an empty list
                message = json.dumps(
                    {"type": "logout", "content": data["content"], "user_list": list(USERS.keys())})
        #打印日志
        logging.info(data)
        # 群发
        await asyncio.wait([user.send(message) for user in USERS.values()])


def server_run():
    print("server")
    start_server = websockets.serve(ServerWs, '0.0.0.0', PORT_CHAT)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()


if __name__ == "__main__":
    from multiprocessing import Process
    multiprocessing.freeze_support()
    server = Process(target=server_run, daemon=False)
    server.start()

服务端

import asyncio
import websockets
import time
import json
import threading
# 功能模块
class OutputHandler():
    async def run(self,message,send_ms,websocket):
        # 用户发信息
        await send_ms(message, websocket)
        # 单发消息
        # await send_ms(message, websocket)
        # 群发消息
        #await s('hi起来')


# 存储所有的客户端
Clients = {}

# 服务端
class WS_Server():
    def __init__(self):
        self.ip = "127.0.0.1"
        self.port = 9090

    # 回调函数(发消息给客户端)
    async def callback_send(self, msg, websocket=None):
        await self.sendMsg(msg, websocket)

    # 发送消息
    async def sendMsg(self, msg, websocket):
        print('sendMsg:', msg)
        # websocket不为空,单发,为空,群发消息
        if websocket != None:
            await websocket.send(msg)
        else:
            # 群发消息
            await self.broadcastMsg(msg)
        # 避免被卡线程
        await asyncio.sleep(0.2)

    # 群发消息
    async def broadcastMsg(self, msg):
        for user in Clients:
            await user.send(msg)


    # 针对不同的信息进行请求,可以考虑json文本
    async def runCaseX(self,jsonMsg,websocket):
        print('runCase')
        op = OutputHandler()
        # 参数:消息、方法、socket
        await op.run(jsonMsg,self.callback_send,websocket)

    # 连接一个客户端,起一个循环监听
    async def echo(self,websocket, path):
        # 添加到客户端列表
        # Clients.append(websocket)
        # 握手
        await websocket.send(json.dumps({"type": "handshake"}))
        # 循环监听
        while True:
            # 接受信息
            try:
                # 接受文本
                recv_text = await websocket.recv()
                message = "Get message: {}".format(recv_text)
                # 返回客户端信息
                await websocket.send(message)
                # 转json
                data = json.loads(recv_text)

                # 用户发信息
                if data["type"] == 'send':
                    name = '404'
                    for k, v in Clients.items():
                        if v == websocket:
                            name = k
                    data["from"] = name
                    if len(Clients) != 0:  # asyncio.wait doesn't accept an empty list
                        message = json.dumps({"type": "send", "content": data["content"], "from": name})
                        await self.runCaseX(jsonMsg=message, websocket=websocket)

                # 用户注册
                elif data["type"] == 'register':
                    try:
                        Clients[data["uuid"]] = websocket
                        if len(Clients) != 0:  # asyncio.wait doesn't accept an empty list
                            message = json.dumps({"type": "register", "content": data["content"], "user_list": list(Clients.keys())})
                            await self.runCaseX(jsonMsg=message, websocket=websocket)
                    except Exception as exp:
                        print(exp)

                # 用户注销
                elif data["type"] == 'unregister':
                    del Clients[data["uuid"]]

                # 对message进行解析,跳进不同功能区
                # await self.runCaseX(jsonMsg=data,websocket=websocket)
            # 链接断开
            except websockets.ConnectionClosed:
                print("ConnectionClosed...", path)
                # del Clients
                break
            # 无效状态
            except websockets.InvalidState:
                print("InvalidState...")
                # del Clients
                break
            # 报错
            except Exception as e:
                print("ws连接报错",e)
                # del Clients
                break



    # 启动服务器
    async def runServer(self):
        async with websockets.serve(self.echo, self.ip, self.port):
            await asyncio.Future()  # run forever

	# 多协程模式,防止阻塞主线程无法做其他事情
    def WebSocketServer(self):
        asyncio.run(self.runServer())

    # 多线程启动
    def startServer(self):
        # 多线程启动,否则会堵塞
        thread = threading.Thread(target=self.WebSocketServer)
        thread.start()
        # thread.join()


if __name__=='__main__':
    print("server")
    s = WS_Server()
    s.startServer()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1295885.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring AOP 概念及其使用

目录 AOP概述 什么是AOP? 什么是Spring AOP ? Spring AOP 快速入门 1.引⼊ AOP 依赖 2.编写AOP程序 Spring AOP 核心概念 1.切点 2.连接点 3.通知 4.切面 通知类型 注意事项: PointCut(定义切点) 切面优先级 Order 切点表达…

IDEA删除最近打开的文件记录

IDEA删除最近打开的文件记录 遇见问题:如何删除IDEA中最近打开的文件记录 解决方法 先关闭IDEA 找到 recentProjects.xml 文件 windows 位置:(AppData是隐藏文件夹) 1.C:\Users\电脑用户名\AppData\Roaming\JetBrains\IntelliJIde…

Bash脚本调用百度翻译API进行中文到英文的翻译

写一个bash脚本调用百度翻译API进行中文到英文的翻译,首先需要进行相关的申请。看百度给出的文档链接: 百度翻译API文档 需要先注册一个百度账号,然后申请APPID。脚本中会用到appid和key这两个值。按照文档给出的提示可以获得。如下是脚本: #…

零基础如何入门HarmonyOS开发?

HarmonyOS鸿蒙应用开发是当前非常热门的一个领域,许多人都想入门学习这个技术。但是,对于零基础的人来说,如何入门确实是一个问题。下面,我将从以下几个方面来介绍如何零基础入门HarmonyOS鸿蒙应用开发学习。 一、了解HarmonyOS鸿…

markdown记录

文章目录 基础操作使用一级列表、二级列表 博文链接 基础操作 使用一级列表、二级列表 博文链接 CSDN-Markdown语法集锦 CSDN-markdown语法之如何使用LaTeX语法编写数学公式 CSDN Markdown简明教程1-关于Markdown CSDN Markdown简明教程2-基本使用 CSDN Markdown简明教程3-表…

X86汇编语言:从实模式到保护模式(代码+注释)--c6

X86汇编语言:从实模式到保护模式(代码注释)–c6 标志寄存器FLAGS: 6th:ZF位(Zero Flag):零标志,执行算数或者逻辑运算之后,会将该位置位。10th:D…

油猴(Tampermonkey)浏览器插件简单自定义脚本开发

介绍 浏览器插件,包括油猴插件和其他插件,通过它们可以实现浏览器网页的定制化与功能增强。 其他插件一般只有某种具体的功能,且已经写死而不能更改,比如Adblock插件只用于去广告。 油猴插件是一款用于管理用户脚本的插件&…

RocketMQ - SpringBoot整合RocketMQ

SpringBoot整合RocketMQ 1、快速实战 ​ 按照SpringBoot三板斧&#xff0c;快速创建RocketMQ的客户端。创建Maven工程&#xff0c;引入关键依赖&#xff1a; <dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>r…

HarmonyOS应用开发工具DevEco Studio安装与使用

语雀知识库地址&#xff1a;语雀HarmonyOS知识库 飞书知识库地址&#xff1a;飞书HarmonyOS知识库 知识库内容逐步完善中… 工欲善其事必先利其器&#xff0c;要编写HarmonyOS应用就需要用到官方提供的IDE工具来编写相应的代码。 在鸿蒙开发者官网&#xff0c;其提供了官方的开…

Markdown从入门到精通

Markdown从入门到精通 文章目录 Markdown从入门到精通前言一、Markdown是什么二、Markdown优点三、Markdown的基本语法3.1 标题3.2 字体3.3 换行3.4 引用3.5 链接3.6 图片3.7 列表3.8 分割线3.9 删除线3.10 下划线3.11 代码块3.12 表格3.13 脚注3.14 特殊符号 四、Markdown的高…

Ubuntu 22.04源码安装yasm 1.3.0

sudo lsb_release -r看到操作系统的版本是22.04&#xff0c;sudo uname -r可以看到内核版本是5.15.0-86-generic&#xff0c;sudo gcc --version可以看到版本是11.2.0&#xff0c;sudo make --version可以看到版本是GNU Make 4.3。 下载yasm http://yasm.tortall.net/Downlo…

深入了解Java 8日期时间新玩法:DateTimeFormatter与ZoneOffset的使用

推荐语 在这篇文章中&#xff0c;我们将深入探讨Java中的DateTimeFormatter和ZoneOffset类的功能和使用方法。这些类是在Java 8中引入的新的日期时间API的一部分&#xff0c;它们为我们提供了更灵活、更易用的日期和时间处理能力。尽管这些类在Java 8中已经出现&#xff0c;但…

SIT3232E高静电防护,单电源供电,双通道,RS232 收发器

SIT3232E 是一款 3.0V~5.5V 供电、双通道、低功耗、高静电防护 ESD 保护&#xff0c;完全满足 TIA/EIA-232 标准要求的 RS-232 收发器。 SIT3232E 包括两个驱动器和两个接收器&#xff0c;具有增强形 ESD 保护功能&#xff0c;达到 15kV 以上 HBM ESD 、 8kV …

拼多多商品详情数据接口在数据分析行业的作用性

在数据分析行业中&#xff0c;拼多多商品详情数据的作用性主要体现在以下几个方面&#xff1a; 了解市场和用户需求&#xff1a;通过拼多多商品详情数据&#xff0c;企业可以了解到市场上什么产品受欢迎&#xff0c;用户对产品的反馈和评价如何&#xff0c;从而调整自己的销售…

Qt/QML编程学习之心得:工程中的文件(十二)

Qt生成了工程之后,尤其在QtCreator产生对应的project项目之后,就如同VisualStudio一样,会产生相关的工程文件,那么这些工程文件都是做什么的呢?这里介绍一下。比如产生了一个Qt Widget application,当然如果Qt Quick Application工程会有所不同。 一、.pro和.pro.user …

神经内科临床常用的焦虑、抑郁评估量表,医生必备!

根据神经内科医生的量表使用情况&#xff0c;常笑医学整理了神经内科临床上常用的焦虑、抑郁评估量表&#xff0c;为大家分享临床常见的焦虑、抑郁、睡眠等量表评估内容&#xff0c;均支持量表下载和在线使用&#xff0c;建议收藏&#xff01; 1.汉密顿抑郁量表&#xff08;Ham…

初识Linux:权限(1)

目录 提示&#xff1a;以下指令均在Xshell 7 中进行 Linux 的权限 内核&#xff1a; 查看操作系统版本 查看cpu信息 查看内存信息 外部程序&#xff1a; 用户&#xff1a; 普通用户变为超级用户&#xff1a; su 和 su-的区别&#xff1a; root用户变成普通用户&#…

一文搞懂全连接算法和它的作用

如果你是搞AI算法的同学&#xff0c;相信你在很多地方都见过全连接层。 无论是处理图片的卷积神经网络&#xff08;CNN&#xff09;&#xff0c;还是处理文本的自然语言处理&#xff08;NLP&#xff09;网络&#xff0c;在网络的结尾做分类的时候&#xff0c;总是会出现一个全…

chfs,简单好用的局域网共享网盘

1. 网盘简介 2. 下载安装包 点击地址下载 3. 部署网盘 &#xff08;1&#xff09;创建文件夹 mkdir -p /opt/chfs/share /opt/chfs/logs share 共享文件夹&#xff0c;也就是存放文件的地方 logs 存放日志文件&#xff08;2&#xff09;上传压缩包到 /opt/chfs目录下…

3dMax vs Cinema4d哪个更好更适合你?

Cinema 4d和3dMax的区别 用于游戏风格、开发和风格可视化的3D建模、动画和渲染软件系统&#xff0c;为用户提供制作和编辑动画、视觉效果和环境的灵活性。4D CINEMA可能是由MAXON构建的强大的3D建模、运动图形、绘画和动画软件系统。Cinema 4D将在每个Windows和MAC操作系统上运…