Python服务器和客户端功能库之websockets使用详解

news2025/1/13 9:30:04


概要

WebSockets 是一种在单个 TCP 连接上进行全双工通信的协议,特别适用于需要低延迟和高频率数据传输的实时应用,例如在线游戏、聊天应用和实时数据流。websockets 是一个基于 asyncio 的 Python 库,旨在提供简单易用的 WebSockets 服务器和客户端功能。本文将详细介绍 websockets 库,包括其安装方法、主要特性、基本和高级功能,以及实际应用场景,帮助全面了解并掌握该库的使用。


安装

要使用 websockets 库,首先需要安装它。以下是安装步骤:

使用 pip 安装

可以通过 pip 直接安装 websockets

pip install websockets

确认安装

安装完成后,可以通过以下命令确认安装是否成功:

python -c "import websockets; print(websockets.__version__)"

特性

  1. 简单易用:提供简洁的 API,方便快速上手。

  2. 基于 asyncio:利用 Python 的 asyncio 库实现异步 I/O 操作,支持高并发。

  3. 全双工通信:支持在单个连接上同时进行数据发送和接收。

  4. 支持多种协议:兼容 WebSocket 协议,支持 SSL/TLS 加密。

  5. 灵活扩展:支持自定义协议和中间件,方便扩展功能。

基本功能

创建 WebSocket 服务器

可以使用 websockets.serve 创建一个简单的 WebSocket 服务器:

import asyncio
import websockets

async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(message)

start_server = websockets.serve(echo, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

创建 WebSocket 客户端

可以使用 websockets.connect 创建一个简单的 WebSocket 客户端:

import asyncio
import websockets

async def hello():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        await websocket.send("Hello, World!")
        response = await websocket.recv()
        print(f"< {response}")

asyncio.get_event_loop().run_until_complete(hello())

处理异常

可以在服务器和客户端代码中处理连接和传输中的异常:

import asyncio
import websockets

async def echo(websocket, path):
    try:
        async for message in websocket:
            await websocket.send(message)
    except websockets.ConnectionClosed as e:
        print(f"Connection closed: {e}")

start_server = websockets.serve(echo, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

高级功能

广播消息

可以实现消息广播功能,将消息发送给所有连接的客户端:

import asyncio
import websockets

connected_clients = set()

async def handler(websocket, path):
    connected_clients.add(websocket)
    try:
        async for message in websocket:
            await asyncio.wait([client.send(message) for client in connected_clients])
    finally:
        connected_clients.remove(websocket)

start_server = websockets.serve(handler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

SSL/TLS 加密

可以为 WebSocket 服务器添加 SSL/TLS 加密,确保数据传输安全:

import asyncio
import ssl
import websockets

ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile="path/to/certfile", keyfile="path/to/keyfile")

async def echo(websocket, path):
    async for message in websocket:
        await websocket.send(message)

start_server = websockets.serve(echo, "localhost", 8765, ssl=ssl_context)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

自定义协议

可以实现自定义 WebSocket 协议,扩展 WebSocket 的功能:

import asyncio
import websockets

class CustomProtocol(websockets.WebSocketServerProtocol):
    async def process_request(self, path, request_headers):
        if path != "/custom":
            return (404, [], b"Not Found")

start_server = websockets.serve(echo, "localhost", 8765, create_protocol=CustomProtocol)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

实际应用场景

实时聊天应用

在实时聊天应用中,通过 websockets 实现消息的实时传输和广播。

import asyncio
import websockets

connected_clients = set()

async def chat_handler(websocket, path):
    connected_clients.add(websocket)
    try:
        async for message in websocket:
            await asyncio.wait([client.send(message) for client in connected_clients])
    finally:
        connected_clients.remove(websocket)

start_server = websockets.serve(chat_handler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

实时数据流

在实时数据流应用中,通过 websockets 实现服务器向客户端实时推送数据。

import asyncio
import websockets
import random
import time

async def data_stream(websocket, path):
    while True:
        data = random.randint(1, 100)
        await websocket.send(str(data))
        await asyncio.sleep(1)

start_server = websockets.serve(data_stream, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在线游戏

在多人在线游戏中,通过 websockets 实现玩家间的实时通信和游戏状态同步。

import asyncio
import websockets

players = set()

async def game_handler(websocket, path):
    players.add(websocket)
    try:
        async for message in websocket:
            await asyncio.wait([player.send(message) for player in players])
    finally:
        players.remove(websocket)

start_server = websockets.serve(game_handler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

实时监控系统

在实时监控系统中,通过 websockets 实现服务器向多个客户端实时推送监控数据。

import asyncio
import websockets
import random

connected_clients = set()

async def monitor_handler(websocket, path):
    connected_clients.add(websocket)
    try:
        while True:
            data = random.randint(1, 100)
            await asyncio.wait([client.send(str(data)) for client in connected_clients])
            await asyncio.sleep(1)
    finally:
        connected_clients.remove(websocket)

start_server = websockets.serve(monitor_handler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

总结

websockets 库是一个功能强大且易于使用的工具,能够帮助开发者在各种应用场景中实现 WebSocket 通信。通过支持全双工通信、基于 asyncio 的高并发处理、多种协议支持和灵活扩展,websockets 提供了强大的功能和灵活的扩展能力。本文详细介绍了 websockets 库的安装方法、主要特性、基本和高级功能,以及实际应用场景。希望本文能帮助大家全面掌握 websockets 库的使用,并在实际项目中发挥其优势。无论是在实时聊天、实时数据流还是在线游戏和实时监控系统中,websockets 库都将是一个得力的工具。

如果你觉得文章还不错,请大家 点赞、分享、留言 下,因为这将是我持续输出更多优质文章的最强动力!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1950334.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux进程信号详解【下】

&#x1f30e; Linux进程信号详【下】 文章目录&#xff1a; Linux信号详解 核心转储 信号保存       信号的三种状态       信号集操作函数         sigset_t类型接口         sigprocmask接口         sigpending接口         …

会员信息管理系统-计算机毕业设计源码38258

目 录 摘要 1 绪论 1.1 研究背景 1.2 研究意义 1.3开发技术 1.3.1 Spring Boot框架 1.3.2 Java语言 1.3.3 MySQL数据库 1.4论文结构与章节安排 2系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1 登录流程 2.2.2数据删除流程 2.3 系统功能分析 2.4 系统用例分析…

我的世界!

每位冒险家在《我的世界》中的出生点都各不相同&#xff0c; 有的出生在桦木森林&#xff0c;有的出生在草原&#xff0c; 还有的出生在临近海洋的沙滩。 这些环境叫做生物群系&#xff0c;也常被称为生态系统。 在《我的世界》中的不同生物群系具有不同的地域特色—— 不…

TDS传感器

目录 一、实物图 二、原理图 引脚定义 模块特性 三 、简介 四、注意事项 源文件下载 可访问底部联系方式也可前往电子校园网官网搜索关键词 关键词&#xff1a; TDS传感器 一、实物图 二、原理图 引脚定义 …

稳定、低成本、兼容性强的无线串口通信选择-适用于多场景的高性能无线串口模块

LoRa610Pro是思为无线的一款无线串口通讯模块采用了先进的LoRa扩频调制跳频技术&#xff0c;高效的接收灵敏度&#xff0c;具有超强的抗干扰性&#xff0c;还增强了通信的穿透能力和距离&#xff0c;相较于传统的FSK和GFSK产品有明显的优势。 高效的接收灵敏度 由于采用了LoRa…

学习记录day19——数据结构 查找算法

概念 在给定数据元素的某个值&#xff0c;在查找表中确定一个其关键字等于给定值的数据元素的操作&#xff0c;叫做查找 查找的分类 顺序查找:将待查找数据&#xff0c;进行全部遍历一遍&#xff0c;直到找到要查找的元素 折半查找:每次都去除一半的查找范围的查找方式&#x…

【C++高阶】哈希之美:探索位图与布隆过滤器的应用之旅

&#x1f4dd;个人主页&#x1f339;&#xff1a;Eternity._ ⏩收录专栏⏪&#xff1a;C “ 登神长阶 ” &#x1f921;往期回顾&#x1f921;&#xff1a;模拟实现unordered 的奥秘 &#x1f339;&#x1f339;期待您的关注 &#x1f339;&#x1f339; ❀哈希应用 &#x1f4…

PyQt5 + selenium,自动票务工具,演唱会门票,学习使用

PyQt5 selenium&#xff1b;在damai工具的基础上加入了UI界面&#xff0c;并将应用做了打包工作&#xff0c;主要是方便不会/不想折腾环境的用户使用&#xff0c;抢票的核心代码来自由于原作者不再维护&#xff0c;自己修改了部分代码。 安装教程 解压安装包到任意位置&…

U盘损坏无法访问?解锁两大高效数据恢复秘籍

U盘损坏之痛&#xff1a;数据失联的困境 在日常生活中&#xff0c;U盘作为数据交换与存储的重要工具&#xff0c;其便捷性无可替代。然而&#xff0c;当U盘遭遇损坏&#xff0c;无法被计算机正常访问时&#xff0c;存储在其中的宝贵数据仿佛一夜之间变得遥不可及&#xff0c;这…

关键词查找【Boyer-Moore 算法】

1、【Boyer-Moore 算法】 【算法】哪种算法有分数复杂度&#xff1f;- BoyerMoore字符串匹配_哔哩哔哩_bilibili BM算法的精华就在于BM(text, pattern),也就是BM算法当不匹配的时候一次性可以跳过不止一个字符。即它不需要对被搜索的字符串中的字符进行逐一比较&#xff0c;而…

云盘高速检测的秘密:密封圈外观检测全解析!

密封圈是一种用于填塞、隔离或密封两个相互连接部件之间空隙的圆形密封装置。密封圈通常由橡胶、塑料、金属等材料制成&#xff0c;具有弹性并能在压力作用下填充间隙&#xff0c;防止液体、气体或固体物质泄漏。 密封圈可根据具体应用选择不同材料&#xff0c;如橡胶密封圈适…

UDP网口(3)逻辑组包(下)

文章目录 1.ARP应答验证2.UDP实现思路3.UDP接收验证4.UDP发送验证5.总结与思考6.传送门 1.ARP应答验证 创建一个ARP应答工程&#xff0c;当PC发出ARP请求的时候&#xff0c;手动按下板卡指定按键&#xff0c;将会响应ARP应答。以此验证phy芯片的配置正常&#xff0c;硬件链路正…

node+mysql实现(账户密码,阿里云短信验证,QQ邮箱注册登录,短信验证密码重置,邮箱密码重置)之注册,登录密码重置总篇

node+mysql实现账户登录 注意效果图项目插件代码参数说明短信验证模块邮箱验证模块注册方式登录方式密码重置前端页面部分登录页面账户登录页面(login.html)短信验证登录页面(smsLogin.html)邮箱登录页面(emailLogin.html)注册部分页面短信验证注册页面(register.html)邮…

Profinet 转 EtherCAT 主站网关

一、功能概述 1.1 设备简介 本产品是 PN(Profinet)和 ECAT(EtherCAT)网关&#xff0c;通过数据映射方式工作。 本产品在 PN 侧作为 PN IO 从站&#xff0c;接西门子 PLC 的 Profinet 口&#xff1b;在 ECAT 侧 做为 ECAT 主站&#xff0c;接 ECAT 从站&#xff0c;如伺服驱…

懒人精灵安卓版纯本地离线文字识别插件

目的 懒人精灵是一款可以模拟鼠标和键盘操作的自动化工具。它可以帮助用户自动完成一些重复的、繁琐的任务&#xff0c;节省大量人工操作的时间。懒人精灵也包含图色功能&#xff0c;识别屏幕上的图像&#xff0c;根据图像的变化自动执行相应的操作。本篇文章主要讲解下更优秀的…

nacos2.x作为配置中心和服务注册和发现以及springcloud使用

目录 一、nacos是什么 二、windows下安装配置nacos 1、准备 2、安装nacos 3、配置nacos 4、启动并且访问nacos 三、springcloud使用nacos作为配置中心 四、springcloud使用nacos进行服务注册与发现 五、springcloud使用nacos进行服务消费 六、nacos的一些高级配置 1…

IP地址申请HTTPS证书

申请IP地址的HTTPS证书是一个相对简单但需要仔细操作的过程。选择合适的CA机构&#xff0c;明确所需证书类型&#xff0c;按照规定步骤提交申请并验证信息&#xff0c;最后正确安装和部署&#xff0c;即可实现通过IP地址访问的安全HTTPS连接。 下面是具体的申请流程&#xff0…

云盘高速视觉检测机,如何提高螺丝件的检测效率?

螺纹螺丝钉是一种常见的螺纹结构紧固件&#xff0c;通常由金属制成&#xff0c;具有螺旋状的螺纹结构。这种螺丝钉旨在通过旋入螺纹孔或材料中&#xff0c;实现可靠的固定连接。 螺纹螺丝钉具有螺旋状的螺纹结构&#xff0c;使其能够轻松旋入金属或其他硬质材料。主要用于金属…

Spring Boot 引入 Guava Retry 实现重试机制

为什么要用重试机制 在如今的系统开发中&#xff0c;为了保证接口调用的稳定性和数据的一致性常常会引入许多第三方的库。就拿缓存和数据库一致性这个问题来说&#xff0c;就有很多的实现方案&#xff0c;如先更新数据库再删除缓存、先更新缓存再更新数据库&#xff0c;又或者…

江苏省发改委副主任钱海云一行莅临我司调研指导

近日&#xff0c;江苏省发改委副主任钱海云、支援合作处副处长卢桐、调研员鲁培和一行&#xff0c;在江宁开发区管委会及市、区发改委有关负责人陪同下&#xff0c;莅临南京天洑软件有限公司走访调研。天洑软件总工程师郭阳博士携管理层参与本次调研活动。 在参观过程中&#x…