Python Tornado 实现SSE服务端主动推送方案

news2025/1/11 2:34:27

一、SSE 服务端消息推送

SSEServer-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source 接口被制定为HTML5 的一部分。相比于 WebSocket,服务器端和客户端工作量都要小很多、简单很多,而 Tornado 又是Python中的一款优秀的高性能web框架,本文带领大家一起实践下 Tornado SSE 的实现。

本文主要探索两个方面的实践:一个是客户端发送请求,服务端的返回是分多次进行传输的,直到传输完成,这种情况下请求结束后,就可以考虑关闭 SSE了,所以这种连接可以认为是暂时的。另一种是由服务端在特定的时机下主动推送消息给到客户端,推送的时机具有不确定性,随时性,所以这种情况下需要客户端和服务端保持长久连接。

本次使用的 Tornado 版本:

tornado==6.3.2

二、短暂性场景下的 SSE 实现

短暂性场景下就是对应上面的第一点,客户端主动发送请求后,服务端分多次传输,直到完成,数据获取完成后连接就可以断开了,适用于一些接口复杂,操作步骤多的场景,可以提前告诉客户端现在进行到了哪一步了,并且这种方式也有利于服务端的横向扩展。

Tornado 中实现,需要注意的是要关闭 _auto_finish ,这样的话就不会被框架自己主动停止连接了,下面是一个实现的案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor

class SSE(RequestHandler):

    def initialize(self):
        # 关闭自动结束
        self._auto_finish = False
        print("initialize")

    def set_default_headers(self):
        # 设置为事件驱动模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用缓存
        self.set_header('Content-Control', "no-cache")
        # 保持长连接
        self.set_header('Connection', "keep-alive")
        # 允许跨域
        self.set_header('Access-Control-Allow-Origin', "*")

    def prepare(self):
        # 准备线程池
        self.executor = self.application.pool

    @tornado.gen.coroutine
    def get(self):
        result = yield self.doHandle()
        self.write(result)
        # 结束
        self.finish()

    @run_on_executor
    def doHandle(self):
        tornado.ioloop.IOLoop.current()
        # 分十次推送信息
        for i in range(10):
            time.sleep(1)
            self.flush()
            self.callback(f"current: {i}")
        return f"data: end\n\n"

    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)


def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)

    print(f"Start server success", f"The prot = {port}")

    tornado.ioloop.IOLoop.current().start()


if __name__ == '__main__':
    startServer(8020)

运行后可以到浏览器访问:http://localhost:8020/sse,此时就可以看到服务端在不断地推送数据过来了:

在这里插入图片描述

那如何在前端用 JS 获取数据呢,前面提到在 JS 层面,有封装好的 Event Source 组件可以直接拿来使用,例如:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>测试服务器推送技术</title>
</head>
<body>
    <div id="messages"></div>
</body>
<script>
    const eventSource = new EventSource('http://localhost:8020/sse');

    // 事件回调
    eventSource.onmessage = (event) => {
      console.log(event.data)
      const messagesDiv = document.getElementById('messages');
      messagesDiv.innerHTML += '<p>' + event.data + '</p>';
    };

    // 异常
    eventSource.onerror = (error) => {
      console.error('EventSource failed:', error);
      eventSource.close();
    };

    eventSource.onopen = ()=>{
        console.log("开启")
    }
  </script>
</html>

运行后可以看到服务端分阶段推送过来的数据:

在这里插入图片描述

三、长连接场景下的 SSE 实现

上面实现了客户端请求后,分批次返回,但是有些情况下是客户端连接后没有东西返回,而是在某个特定的时机下返回给某几个客户端,所以这种情况,我们需要和客户端保持长久的连接,同时进行客户端连接的缓存,因为同时有可能有 100 个用户,但是推送时可能只需要给 10 个用户推送,这种方式相当于将一个客户端和一个服务端进行了绑定,一定程度上不利于服务端的横向扩展,但也可以通过一些消息订阅的方式解决类似问题。

下面是一个实现案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor


# 单例
def singleton(cls):
    instances = {}
    def wrapper(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return wrapper


# 订阅推送工具类
@singleton
class Pusher():

    def __init__(self):
        self.clients = {}

    def add_client(self, client_id, callback):
        if client_id not in self.clients:
            self.clients[client_id] = callback
            print(f"{client_id} 连接")

    def send_all(self, message):
        for client_id in self.clients:
            callback = self.clients[client_id]
            print("发送消息给:", client_id)
            callback(message)

    def send(self, client_id, message):
        callback = self.clients[client_id]
        print("发送消息给:", client_id)
        callback(message)


class SSE(RequestHandler):
    # 定义推送者
    pusher = Pusher()

    def initialize(self):
        # 关闭自动结束
        self._auto_finish = False
        print("initialize")

    def set_default_headers(self):
        # 设置为事件驱动模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用缓存
        self.set_header('Content-Control', "no-cache")
        # 保持长连接
        self.set_header('Connection', "keep-alive")
        # 允许跨域
        self.set_header('Access-Control-Allow-Origin', "*")

    @tornado.gen.coroutine
    def get(self):
        # 客户端唯一标识
        client_id = self.get_argument("client_id")
        self.pusher.add_client(client_id, self.callback)

    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()


# 定义推送接口,模拟推送
class Push(RequestHandler):
    # 定义推送者
    pusher = Pusher()

    def prepare(self):
        # 准备线程池
        self.executor = self.application.pool

    @tornado.gen.coroutine
    def get(self):
        # 客户端标识
        client_id = self.get_argument("client_id")
        # 推送的消息
        message = self.get_argument("message")
        result = yield self.doHandle(client_id, message)
        self.write(result)

    @run_on_executor
    def doHandle(self, client_id, message):
        tornado.ioloop.IOLoop.current()
        self.pusher.send(client_id, message)
        return "success"


class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/push", Push),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)

def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)

    print(f"Start server success", f"The prot = {port}")

    tornado.ioloop.IOLoop.current().start()


if __name__ == '__main__':
    startServer(8020)

这里我定义了一个 Pusher 订阅推送工具类,用来存储客户端的连接,以及给指定客户端或全部客户端发送消息,然后我又定义 Push 接口,模拟不定时的指定客户端发送信息的场景。

同样前端也要修改,需要给自己定义 client_id ,例如:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>测试服务器推送技术</title>
</head>
<body>
    <div id="client"></div>
    <div id="messages"></div>
</body>
<script>
    function generateUUID() {
      let uuid = 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, function(c) {
        const r = Math.random() * 16 | 0;
        const v = c === 'x' ? r : (r & 0x3 | 0x8);
        return v.toString(16);
      });

      return uuid;
    }
    // 利用uuid 模拟生成唯一的客户端ID
    let client_id = generateUUID();
    document.getElementById('client').innerHTML = "当前 client_id = "+client_id;
    const eventSource = new EventSource('http://localhost:8020/sse?client_id='+client_id);

    // 事件回调
    eventSource.onmessage = (event) => {
      console.log(event.data)
      const messagesDiv = document.getElementById('messages');
      messagesDiv.innerHTML += '<p>' + event.data + '</p>';
    };

    // 异常
    eventSource.onerror = (error) => {
      console.error('EventSource failed:', error);
      eventSource.close();
    };

    eventSource.onopen = ()=>{
        console.log("开启")
    }
  </script>
</html>

这里我用 uuid 模拟客户端的唯一ID,在真实使用时可不要这么做。

下面使用浏览器打开三个页面,可以看到三个不同的 client_id :

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在服务端的日志中也能看到这三个客户端的连接:

在这里插入图片描述

下面调用 push 接口来给任意一个客户端发送消息,例如这里发给client_id = 2493045e-84dd-4118-8d96-0735c4ac186b 的用户 :

在这里插入图片描述

下面看到 client_id2493045e-84dd-4118-8d96-0735c4ac186b的页面:

在这里插入图片描述
已经成功收到推送的消息,反之看另外两个:

在这里插入图片描述
在这里插入图片描述
都没有消息,到这里就实现了长连接下不定时的服务端消息推送方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1404287.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Bitcoin Core钱包启动signet测试网络

使用Bitcoin Core钱包启动signet测试网络 为什么要启动测试网络 我们的目的是了解BTC网络的运行原理&#xff0c;学习BTC相关技术。在主网中有较高的手续费和大量的区块数据&#xff0c;不利于我们进行测试。 下载钱包 一定要去可信的网站下载钱包&#xff0c;否则非常容易…

java web servlet 学习系统进度管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 java Web学习系统进度管理系统是一套完善的java web信息管理系统 &#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环 境为TOMCAT7.0,Myeclipse8.5开发&#xff0c;数据库为…

go语言(十四)----反射

变量的结构 2 举个例子 package mainimport "fmt"type Reader interface {ReadBook() }type Writer interface {WriteBook() }//具体类型 type Book struct {}func (this *Book) ReadBook() {fmt.Println("Read a Book")}func (this *Book) WriteBook() {…

Mac上如何设置映射某个网站站点域名的IP

最近某常用的站点换 IP 了&#xff0c;但是 DNS 服务器还没有修改&#xff0c;这就导致无法访问&#xff08;换 DNS 服务器也不行&#xff09;。在用了一段时间的 IP 访问之后&#xff0c;还是没好&#xff0c;不知道是 DNS 污染还是咋了&#xff0c;所以最后还是手动改一下吧。…

【LeetCode】每日一题 2024_1_22 最大交换(模拟)

文章目录 LeetCode&#xff1f;启动&#xff01;&#xff01;&#xff01;题目&#xff1a;最大交换题目描述&#xff1a;代码与解题思路 LeetCode&#xff1f;启动&#xff01;&#xff01;&#xff01; 几百年没有见到题目描述这么短的题目了&#xff0c;泪目了 题目&#x…

电商平台接口自动化框架实践

技术栈 Mimproxy(抓包)pytestalluredockerJenkinsgitlab 语言&#xff1a;电商API接口自动化实现流程 红色为可实现/尚未完成 绿色为需要人工干预部分 自动生成测试用例模板&#xff08;俩种方式二选一&#xff09;&#xff1a; mimproxy&#xff0c;通过浏览器代理抓包方式…

074:vue+mapbox 加载here地图(影像瓦片图 v2版)

第074个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+mapbox中加载here地图的影像瓦片图 v2软件版本。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例效果配置方式示例源代码(共77行)相关API参考:专栏目标示例效果

区块链游戏:Web3时代玩法的全新演绎

随着区块链技术的蓬勃发展&#xff0c;区块链游戏正成为数字娱乐领域的一颗璀璨明珠。在Web3时代&#xff0c;区块链游戏以其去中心化、透明、可验证的特性&#xff0c;为玩家带来了全新的游戏体验。本文将深入探讨区块链游戏在Web3时代的崭新玩法和引领未来的可能性。 1. 去中…

GPTBots:利用FlowBot中的卡片和表单信息,提供丰富的客服体验

在当今的数字化时代&#xff0c;客户服务的形式和体验正在经历着前所未有的变革。传统的文字消息方式已经无法满足现代用户对于服务体验的多元化需求。那么&#xff0c;如何才能在这个信息爆炸的时代&#xff0c;让我们的服务方式更加个性化、多样化&#xff0c;从而提供更丰富…

分享一个“产业级,开箱即用”的NLP自然语言处理工具

NLP的全称是Natuarl Language Processing&#xff0c;中文意思是自然语言处理&#xff0c;是人工智能领域的一个重要方向 自然语言处理&#xff08;NLP&#xff09;的一个最伟大的方面是跨越多个领域的计算研究&#xff0c;从人工智能到计算语言学的多个计算研究领域都在研究计…

记录一次从有道云笔记迁移到语雀笔记

推荐阅读 智能化校园&#xff1a;深入探讨云端管理系统设计与实现&#xff08;一&#xff09; 智能化校园&#xff1a;深入探讨云端管理系统设计与实现&#xff08;二&#xff09; 1、安装git&#xff0c;python3等准备工作 文章中标注python3&#xff0c;为避免与python2 冲…

【正点原子STM32】C语言重点知识(配置MDK支持C99、位操作清零置一、带参数的宏定义、头文件的条件编译和代码条件编译、关键字、结构体指针、代码规范)

一、stdint.h简介 配置MDK支持C99 二、位操作 如何给寄存器某个位赋值&#xff08;清零置一&#xff09; 三、宏定义 带参数的宏定义 四、条件编译 头文件的条件编译和代码条件编译 五、extern声明 六、类型别名(typedef) 类型别名应用 七、结构体 应用举例&#xf…

muduo网络库剖析——线程Thread类

muduo网络库剖析——线程Thread类 前情从muduo到my_muduo 概要框架与细节成员函数使用方法 源码结尾 前情 从muduo到my_muduo 作为一个宏大的、功能健全的muduo库&#xff0c;考虑的肯定是众多情况是否可以高效满足&#xff1b;而作为学习者&#xff0c;我们需要抽取其中的精…

CentOS7安装Docker和docekr-compose

CentOS7安装Docker Docker 分为 CE 和 EE 两大版本。CE免费版&#xff0c;EE付费版 Docker CE 支持 64 位版本 CentOS 7&#xff0c;并且要求内核版本不低于 3.10 第一步、检测系统环境 如果之前或者安装系统时候自带docker的话可以卸载&#xff08;可选&#xff09; 如果之…

用ChatGPT教学、科研!亚利桑那州立大学与OpenAI合作

亚利桑那州立大学&#xff08;简称“ASU”&#xff09;在官网宣布与OpenAI达成技术合作。从2024年2月份开始&#xff0c;为所有学生提供ChatGPT企业版访问权限&#xff0c;主要用于学习、课程作业和学术研究等。 为了帮助学生更好地学习ChatGPT和大语言模型产品&#xff0c;AS…

Linux下软件安装的命令【RPM,YUM】及常用服务安装【JDK,Tomcat,MySQL】

Linux下软件安装的命令 源码安装 以源代码安装软件&#xff0c;每次都需要配置操作系统、配置编译参数、实际编译&#xff0c;最后还要依据个人喜好的方式来安装软件。这个过程很麻烦很累人。 RPM软件包管理 RPM安装软件的默认路径: 注意&#xff1a; /etc 配置文件放置目录…

基于Java SSM框架实现共享单车管理系统项目【项目源码+论文说明】

基于java的SSM框架实现共享单车管理系统演示 摘要 随着科学技术的飞速发展&#xff0c;各行各业都在努力与现代先进技术接轨&#xff0c;通过科技手段提高自身的优势&#xff1b;对于共享单车管理系统当然也不能排除在外&#xff0c;随着网络技术的不断成熟&#xff0c;带动了…

鸿蒙自定义刷新组件使用

前言 DevEco Studio版本&#xff1a;4.0.0.600 1、RefreshLibrary_HarmonyOS.har&#xff0c;用于HarmonyOS "minAPIVersion": 9, "targetAPIVersion": 9, "apiReleaseType": "Release", "compileSdkVersion": "3.…

核苷酸与相对论的数学关系猜想

质量-鸟嘌呤 M-G 金 收缩 能量-胸腺嘧啶 E-T 火 混沌 时间-胞嘧啶 T-C 水 次序 空间-腺嘌呤 S-A 木 扩散 确定了这三种对应关系之后&#xff0c;我们就可以用相对论里面的数学关系来确定基因的关系 四边形理论有六个方…

python24.1.22创建类-定义对象属性

类&#xff1a;创建对象的模板&#xff0c;定义对象的属性和方法 对象&#xff1a;类的实例 Pascal命名法定义类名称 定义类 创建对象 返回对象属性