【通用消息通知服务】0x3 - 发送我们第一条消息(Websocket)

news2025/1/23 4:01:31

【通用消息通知服务】0x3 - 发送我们第一条消息

项目地址: A generic message notification system[Github]

实现接收/发送Websocket消息

Websocket Connection Pool

import asyncio
from asyncio.queues import Queue
from asyncio.queues import QueueEmpty
from contextlib import suppress
from typing import Any

import async_timeout
import orjson
from sanic.log import logger
from ulid import ULID

from common.depend import Dependency

PING = "#ping"
PONG = "#pong"


class WebsocketConnectionPoolDependency(
    Dependency, dependency_name="WebsocketPool", dependency_alias="ws_pool"
):
    def __init__(self, app) -> None:
        super().__init__(app)
        self.lock = asyncio.Lock()
        self.connections = {}	# 存储websocket connections
        self.send_queues = {}   # 各websocket发送队列
        self.recv_queues = {}   # 各websocket接收消息队列
        self.close_callbacks = {} # websocket销毁回调
        self.listeners = {} # 连接监听函数

    def _gen_id(self) -> str:
        return str(ULID())

    async def add_connection(self, connection) -> str:
        async with self.lock:
            id = self._gen_id()
            self.connections[id] = connection
            self.send_queues[id] = Queue()
            self.app.add_task(
                self.send_task(self.send_queues[id], connection),
                name=f"websocket_{id}_send_task",
            )
            self.recv_queues[id] = Queue()
            self.app.add_task(
                self.recv_task(self.recv_queues[id], connection),
                name=f"websocket_{id}_recv_task",
            )
            self.app.add_task(self.notify_task(id), name=f"websocket_{id}_notify_task")
            self.app.add_task(
                self.is_alive_task(id), name=f"websocket_{id}_is_alive_task"
            )
            setattr(connection, "_id", id)
            return connection._id

    def get_connection(self, connection_id: str):
        return self.connections.get(connection_id)

    async def add_listener(self, connection_id, handler) -> str:
        async with self.lock:
            id = self._gen_id()
            self.listeners.setdefault(connection_id, {}).update({id: handler})
            return id

    async def remove_listener(self, connection_id, listener_id):
        async with self.lock:
            self.listeners.get(connection_id, {}).pop(listener_id, None)

    async def add_close_callback(self, connection_id, callback):
        async with self.lock:
            self.close_callbacks.setdefault(connection_id, []).append(callback)

    def is_alive(self, connection_id: str):
        if hasattr(connection_id, "_id"):
            connection_id = connection_id._id
        return connection_id in self.connections

    async def remove_connection(self, connection: Any):
        if hasattr(connection, "_id"):
            connection_id = connection._id
        else:
            connection_id = connection

            if connection_id not in self.connections:
                # removed already
                return

        async with self.lock:
            logger.info(f"remove connection: {connection_id}")

            with suppress(Exception):
                await self.app.cancel_task(f"websocket_{connection_id}_send_task")
            with suppress(Exception):
                await self.app.cancel_task(f"websocket_{connection_id}_recv_task")
            with suppress(Exception):
                await self.app.cancel_task(f"websocket_{connection_id}_notify_task")
            with suppress(Exception):
                await self.app.cancel_task(f"websocket_{connection_id}_is_alive_task")

            if connection_id in self.send_queues:
                del self.send_queues[connection_id]

            if connection_id in self.recv_queues:
                del self.recv_queues[connection_id]

            if connection_id in self.listeners:
                del self.listeners[connection_id]

            if connection_id in self.close_callbacks:
                await self.do_close_callbacks(connection_id)
                del self.close_callbacks[connection_id]

            if connection_id in self.connections:
                del self.connections[connection_id]

    async def do_close_callbacks(self, connection_id):
        for cb in self.close_callbacks.get(connection_id, []):
            self.app.add_task(cb(connection_id))

    async def prepare(self):
        self.is_prepared = True
        logger.info("dependency:WebsocketPool is prepared")
        return self.is_prepared

    async def check(self):
        return True

    async def send_task(self, queue, connection):
        while self.is_alive(connection):
            try:
                data = queue.get_nowait()
            except QueueEmpty:
                await asyncio.sleep(0)
                continue
            try:
                if isinstance(data, (bytes, str, int)):
                    await connection.send(data)
                else:
                    await connection.send(orjson.dumps(data).decode())
                queue.task_done()
            except Exception as err:
                break

    async def recv_task(self, queue, connection):
        while self.is_alive(connection):
            try:
                data = await connection.recv()
                await queue.put(data)
                logger.info(f"recv message: {data} from connection: {connection._id}")
            except Exception as err:
                break

    async def notify_task(self, connection_id):
        while self.is_alive(connection_id):
            try:
                logger.info(f"notify connection: {connection_id}'s listeners")
                data = await self.recv_queues[connection_id].get()
                for listener in self.listeners.get(connection_id, {}).values():
                    await listener(connection_id, data)
            except Exception as err:
                pass

    async def is_alive_task(self, connection_id: str):
        if hasattr(connection_id, "_id"):
            connection_id = connection_id._id

        get_pong = asyncio.Event()

        async def wait_pong(connection_id, data):
            if data != PONG:
                return
            get_pong.set()

        while True:
            get_pong.clear()
            await self.send(connection_id, PING)
            listener_id = await self.add_listener(connection_id, wait_pong)

            with suppress(asyncio.TimeoutError):
                async with async_timeout.timeout(
                    self.app.config.WEBSOCKET_PING_TIMEOUT
                ):
                    await get_pong.wait()

            await self.remove_listener(connection_id, listener_id)
            if get_pong.is_set():
                # this connection is closed
                await asyncio.sleep(self.app.config.WEBSOCKET_PING_INTERVAL)
            else:
                await self.remove_connection(connection_id)

    async def wait_closed(self, connection_id: str):
        """
        if negative=True, only release when client close this connection.
        """
        while self.is_alive(connection_id):
            await asyncio.sleep(0)
        return False

    async def send(self, connection_id: str, data: Any) -> bool:
        if not self.is_alive(connection_id):
            return False
        if connection_id not in self.send_queues:
            return False
        await self.send_queues[connection_id].put(data)

        return True

Websocket Provider

from typing import Dict
from typing import List
from typing import Union

from pydantic import BaseModel
from pydantic import field_serializer
from sanic.log import logger

from apps.message.common.constants import MessageProviderType
from apps.message.common.constants import MessageStatus
from apps.message.common.interfaces import SendResult
from apps.message.providers.base import MessageProviderModel
from apps.message.validators.types import EndpointExID
from apps.message.validators.types import EndpointTag
from apps.message.validators.types import ETag
from apps.message.validators.types import ExID
from utils import get_app


class WebsocketMessageProviderModel(MessageProviderModel):
    class Info:
        name = "websocket"
        description = "Bio-Channel Communication"
        type = MessageProviderType.WEBSOCKET

    class Capability:
        is_enabled = True
        can_send = True

    class Message(BaseModel):
        connections: List[Union[EndpointTag, EndpointExID, str]]
        action: str
        payload: Union[List, Dict, str, bytes]

        @field_serializer("connections")
        def serialize_connections(self, connections):
            return list(set(map(str, connections)))

    async def send(self, provider_id, message: Message) -> SendResult:
        app = get_app()
        websocket_pool = app.ctx.ws_pool

        sent_list = set()

        connections = []
        for connection in message.connections:
            if isinstance(connection, ETag):
                connections.extend(
                    [
                        w
                        for c in await connection.decode()
                        for w in c.get("websockets", [])
                    ]
                )
            elif isinstance(connection, ExID):
                endpoint = await connection.decode()
                if endpoint:
                    connections.extend(endpoint.get("websockets", []))
            else:
                connections.append(connection)

        connections = list(
            set(filter(lambda x: app.ctx.ws_pool.is_alive(connection), connections))
        )

        # logger.info(f"sending websocket message to {connections}")
        for connection in connections:
            if await websocket_pool.send(
                connection, data=message.model_dump_json(exclude=["connections"])
            ):
                sent_list.add(connection)

        if sent_list:
            return SendResult(
                provider_id=provider_id, message=message, status=MessageStatus.SUCCEEDED
            )
        else:
            return SendResult(
                provider_id=provider_id, message=message, status=MessageStatus.FAILED
            )

websocket接口


@app.websocket("/websocket")
async def handle_websocket(request, ws):
    from apps.endpoint.listeners import register_websocket_endpoint
    from apps.endpoint.listeners import unregister_websocket_endpoint

    con_id = None
    try:
        ctx = request.app.ctx
        con_id = await ctx.ws_pool.add_connection(ws)
        logger.info(f"new connection connected -> {con_id}")
        await ctx.ws_pool.add_listener(con_id, register_websocket_endpoint)
        await ctx.ws_pool.add_close_callback(con_id, unregister_websocket_endpoint)
        await ctx.ws_pool.send(
            con_id, data={"action": "on.connect", "payload": {"connection_id": con_id}}
        )
        await ctx.ws_pool.wait_closed(con_id) # 等待连接断开
    finally:
    	# 如果连接被客户端断开, handle_websocket将会被直接销毁, 所以销毁处理需要放在finally。
        request.app.add_task(request.app.ctx.ws_pool.remove_connection(con_id))

结果截图

websocket connected

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/939983.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Triplet Fingerprinting(三元组网站指纹攻击)

文章信息 论文题目:《Triplet Fingerprinting: More Practical and Portable Website Fingerprinting with N-shot Learning》 期刊(会议):Proceedings of the 2019 ACM SIGSAC Conference on Computer and Communications Secur…

【Java 中级】一文精通 Spring MVC - 上传(十)

👉博主介绍: 博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家,WEB架构师,阿里云专家博主,华为云云享专家,51CTO 专家博主 ⛪️ 个人社区&#x…

LeetCode第11~15题解

CONTENTS LeetCode 11. 盛最多水的容器(中等)LeetCode 12. 整数转罗马数字(中等)LeetCode 13. 罗马数字转整数(简单) LeetCode 11. 盛最多水的容器(中等) 【题目描述】 给定一个长…

广州华锐互动:VR垃圾分类虚拟科普系统让学习过程更加丰富有趣

在我们的日常生活中,垃圾分类已成为一项重要的公民责任。然而,由于缺乏对垃圾分类的深入理解和相关知识,许多人在实践中往往感到困惑和挫败。为了解决这个问题,一种创新的解决方案应运而生:垃圾分类VR虚拟仿真教学系统…

ESP32使用Arduino读写SD卡

背景 esp32屏幕包含一个sd卡接口,通过SPI线连接,需要对插入改SD接口中的TF卡进行读写,通过arduino平台实现。 代码中HSPI指ESP32的SPI2; 在Master模式下,SPID是MOSI/data out, SPIQ是MISO/data in: 代码 #include…

【C语言】指针 和 数组 笔试题详解

目录 一、数组 1.一维数组 2.字符数组 3.二维数组 二、指针 笔试题1 笔试题2 笔试题3 笔试题4 笔试题5 笔试题6 笔试题7 笔试题8(有难度)【看明白会有质的收获】 在这里我们需要先了解数组名的意义 sizeof(数组名) ,这里的数组名表示…

7年经验之谈 —— 如何高效的开展app的性能测试

APP性能测试是什么 从网上查了一下,貌似也没什么特别的定义,我这边根据自己的经验给出一个自己的定义,如有巧合纯属雷同。 客户端性能测试就是,从业务和用户的角度出发,设计合理且有效的性能测试场景,制定…

代码随想录Day_48打卡

①、打家劫舍 你是一个专业的小偷,计划偷窃沿街的房屋。每间房内都藏有一定的现金,影响你偷窃的唯一制约因素就是相邻的房屋装有相互连通的防盗系统,如果两间相邻的房屋在同一晚上被小偷闯入,系统会自动报警。 给定一个代表每个房…

Yolov1原理详细解读及实战(一)理论篇

什么是Yolov1? Yolo(You Only Look Once)是一种one-stage目标检测算法,即仅需要 “看” 一次就可以识别出图片中物体的class类别和位置。作为one-stage的开山鼻祖,YOLOv1以其简洁的网络结构和GPU上的实时检测速度而一鸣惊人,打破了R-CNN的“…

Linux centos7 bash编程(break和continue)

在学习shell知识时,简单编程要从格式入手。 首先学习好单行注释和多行注释。 先学习简单整数的打印输出,主要学习echo命令,学习选项-e -n的使用。 下面的练习是常用的两个分支程序:break和continue。 #!/bin/bash # 这是单行注…

极氪汽车的云资源治理细探

作者:极氪汽车吴超 前言 2021 年,极氪 001 迅速崭露头角,仅用 110 天便创下了首款车型交付量“最快破万”的纪录。2022 年 11 月,极氪 009 在短短 76 天内便率先完成了首批交付,刷新了中国豪华纯电品牌交付速度的纪录…

算法通过村第四关-栈白银笔记|括号问题

文章目录 前言1. 括号匹配问题2. 最小栈问题3. 最大栈 总结 前言 提示:如果让我送给年轻人四个字,就是:量力而行。 量力而行不会失眠,不会啃老,不会为各种考试焦虑。顺其自然活得轻松。其实,量力而行最易大…

数据库集群的简单了解

Update 关于操作的日志 1.0 redo log 读一次写一次 一共2次, 不安全 注意redo log是顺写 而file是随机 所以Mysql做出类似HDFS的操作 行为日志和数据分离,但是不同的是,Mysql在内存中操作修改,如果不出事故,由内存中的行为来直接…

Yolov1原理详细解读及实战(二)实战篇

在Yolov1原理详细解读及实战(一)理论篇 中,我们对Yolov1网络结构、算法流程、训练及推理原理进行了详细剖析,本章进入实战环节,话不多说,马上开始! 环境 vscodeWSL:Ubuntu 18.04python 3.9.7 …

wireshark过滤器的使用

目录 wiresharkwireshark的基本使用wireshark过滤器的区别 抓包案例 wireshark wireshark的基本使用 抓包采用 wireshark,提取特征时,要对 session 进行过滤,找到关键的stream,这里总结了 wireshark 过滤的基本语法,…

外卖订餐系统源码:数字化时代餐饮服务的创新之道

在如今快节奏的生活中,外卖订餐系统源码正成为餐饮业界的一股创新浪潮。它为餐厅和创业者提供了一个数字化的平台,使订餐与配送更加便捷、高效。本文将为您展示如何使用外卖订餐系统源码创建一个简单但功能强大的订餐平台。 # 导入必要的模块 import d…

离线竞价功能说明及设置

为了更加方便广大用户不再熬夜竞价,西部数码推出了离线竞价功能,现已正式上线,欢迎大家使用反馈。 1、离线竟价功能说明 当您拥有域名的出价权限时,您可在 【我参与的竞价】或【我出价的域名】列表选中域名开启离线竟价。 设置…

固定资产管理系统都需要考虑哪些问题?

企业管理中固定资产的追踪和管理是非常重要的一环。固定资产不仅包括房屋、土地、机器设备等大量的资产,也包括低值易耗品。因为这些资产往往是企业生产或者管理过程中不可或缺的。在使用和管理过程中,往往涉及到转移、借调、维护等方面。如何进行有效的…

健康安全的新定义,照明舒适达到巅峰,SUKER书客护眼台灯L1震撼发售

深耕照明领域多年的SUKER书客,这一次给大家带来一份大惊喜。在最近正式发布新品——SUKER书客护眼台灯L1,这款护眼台灯承载着在照明领域的前沿技术,能保证照明安全健康和舒适度并带来非常优秀的护眼效果。作为书客在护眼台灯领域的颠覆式新品…