sanic + webSocket:股票实时行情推送服务实现

news2025/1/11 8:17:41

💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。
在这里插入图片描述

  • 推荐:「stormsha的主页」👈,「stormsha的知识库」👈持续学习,不断总结,共同进步,为了踏实,做好当下事儿~

  • 专栏导航

    • Python系列: Python面试题合集,剑指大厂
    • Git系列: Git操作技巧
    • GO系列: 记录博主学习GO语言的笔记,该笔记专栏尽量写的试用所有入门GO语言的初学者
    • 数据库系列: 详细总结了常用数据库 mysql 技术点,以及工作中遇到的 mysql 问题等
    • 运维系列: 总结好用的命令,高效开发
    • 算法与数据结构系列: 总结数据结构和算法,不同类型针对性训练,提升编程思维

    非常期待和您一起在这个小小的网络世界里共同探索、学习和成长。💝💝💝 ✨✨ 欢迎订阅本专栏 ✨✨

    💖The Start💖点点关注,收藏不迷路💖

    📒文章目录

      • 1、Sanic WebSocket 实现股票实时推送服务
        • 1.1. 设置 Sanic WebSocket 服务端
        • 1.2. 创建 HTML 客户端代码
      • 2. 实现客户端与服务端的交互
      • 3. 总结


1、Sanic WebSocket 实现股票实时推送服务

在本文中,我们将详细介绍如何使用 Sanic 框架来实现一个股票实时推送服务,并且展示如何在客户端通过 HTML 和 JavaScript 代码接收这些实时更新。Sanic 是一个异步的 Python 框架,非常适合构建高性能的 Web 应用程序和 API。我们将通过以下几个步骤来实现这个系统:

  1. 设置 Sanic WebSocket 服务端
  2. 创建 HTML 客户端代码
  3. 实现客户端与服务端的交互

1.1. 设置 Sanic WebSocket 服务端

首先,我们需要安装 Sanic 和相关的 WebSocket 库。可以使用以下命令来安装:

pip install sanic sanic_cors websockets

接下来,我们创建一个简单的 Sanic 应用程序,并使用 WebSocket 实现实时的股票数据推送。

import json
import uuid
from json import JSONDecodeError

from sanic import Sanic, response
from sanic_cors import CORS, cross_origin
from sanic.websocket import WebSocketProtocol
from websockets import ConnectionClosedError, ConnectionClosedOK
import asyncio
import time
import random

app = Sanic("StockWebSocket")
CORS(app)

# 以股票代码为键的字典,存储订阅者列表
subscriptions = {}

# 模拟股票行情数据,实际开发中用真实行情源代替
stock_data = {
    "a": {"stock_name": "阿里", "price": 150.00, "change": 0.25},
    "b": {"stock_name": "百度", "price": 2750.00, "change": 0.25},
    "t": {"stock_name": "腾讯", "price": 3400.00, "change": 0.25}
}


class SubscriptObject(object):
    def __init__(self, register_id, ws):
        self.register_id = register_id
        self.ws = ws

    async def recv(self):
        try:
            while True:
                msg = await self.ws.recv()
                if not msg:
                    continue
                data = json.loads(msg)
                action = data.get('action')
                stock_symbol = data.get('stock_symbol')
                if action == "subscribe":
                    if stock_symbol in stock_data:
                        if not subscriptions.get(stock_symbol):
                            subscriptions[stock_symbol] = set()
                        subscriptions[stock_symbol].add(self.ws)
                    else:
                        await self.ws.send(json.dumps({"error": "Unknown stock symbol"}))
                else:
                    print(f"当前状态 {data}")
        except ConnectionClosedError:
            print(f'{self.register_id} 断开连接, 取消监听')
        except ConnectionClosedOK:
            print(f'{self.register_id} 断开连接, 取消监听')
        except JSONDecodeError:
            print(f'收到的数据解析失败')
        finally:
            # 清理客户端订阅
            for symbol in subscriptions:
                subscriptions[symbol].discard(self.ws)
            print("Client disconnected")


async def stock_data_generator():
    while True:
        # 模拟股票数据更新
        stock_code_list = list(subscriptions.keys())
        for symbol in stock_code_list:
            ws_list = subscriptions.get(symbol, []).copy()
            dd = stock_data.get(symbol)
            data = dict()
            data['stock_name'] = dd['stock_name']
            data['price'] = dd['price'] + random.uniform(-5, 5)
            data['change'] = dd['change'] + random.uniform(-1, 1)
            data['timestamp'] = time.time()
            for ws in ws_list:
                try:
                    await ws.send(json.dumps(data))
                except ConnectionClosedOK:
                    print("连接取消前", subscriptions[symbol])
                    subscriptions[symbol].discard(ws)
                    print("连接取消后", subscriptions[symbol])
        await asyncio.sleep(1)  # 每2秒更新一次


@app.websocket("/ws/stock")
@cross_origin(app)  # 为这个路由启用CORS
async def stock_websocket(request, ws):
    # 模拟实时更新股票价格
    register_id = uuid.uuid4().hex
    sub_obj = SubscriptObject(register_id, ws)
    await sub_obj.recv()


# 启动股票数据更新任务
app.add_task(stock_data_generator())


@app.route('/')
async def index(request):
    return await response.file('index.html')


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000, protocol=WebSocketProtocol)

1.2. 创建 HTML 客户端代码

我们需要一个简单的 HTML 页面,允许用户选择股票符号,并通过 WebSocket 连接到服务端以接收实时更新。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Stock Real-time Update</title>
    <style>
        body {
            font-family: Arial, sans-serif;
        }

        .stock-data {
            margin: 20px;
            padding: 10px;
            border: 1px solid #ddd;
        }

        select {
            margin: 20px;
            padding: 5px;
        }
    </style>
</head>
<body>
<h1>实时股票行情</h1>
<select id="stock-select">
    <option value="a">阿里</option>
    <option value="b">百度</option>
    <option value="t">腾讯</option>
</select>
<div id="stock-container">
    <!-- 初始化一个空的 stock-data 元素,用于动态添加新的 stock-data 元素 -->
    <div class="stock-data" id="stock-data-template" style="display: none;">
        股票名称: <span class="stock-name"></span><br>
        涨跌幅: <span class="change"></span><br>
        时间戳: <span class="timestamp"></span><br>
    </div>
</div>

<script>
    const stockSelect = document.getElementById('stock-select');
    const stockContainer = document.getElementById('stock-container');
    const stockDataTemplate = document.getElementById('stock-data-template');

    function updateStock(symbol) {
        const ws = new WebSocket(`ws://localhost:8000/ws/stock`);
        const stockDataElement = stockDataTemplate.cloneNode(true); // 复制 stock-data 模板元素
        stockDataElement.id = `stock-data-${symbol}`; // 根据股票代码设置 ID
        stockDataElement.style.display = 'block'; // 显示元素
        stockContainer.appendChild(stockDataElement); // 将元素添加到容器中

        ws.onopen = function () {
            console.log("WebSocket connection established.");
            ws.send(JSON.stringify({
                action: 'subscribe',
                stock_symbol: symbol
            }));
        };

        ws.onmessage = function (event) {
            const data = JSON.parse(event.data);
            const stockData = stockDataElement.getElementsByClassName('stock-data')[0];
            if (data.error) {
                stockData.innerHTML = `<p>${data.error}</p>`;
            } else {
                console.log("ddddddd", data)
                stockDataElement.getElementsByClassName('stock-name')[0].textContent = data.stock_name;
                stockDataElement.getElementsByClassName('change')[0].textContent = data.change;
                stockDataElement.getElementsByClassName('timestamp')[0].textContent = data.timestamp;
            }
        };

        ws.onerror = function (error) {
            console.log("WebSocket Error: ", error);
        };
    }

    stockSelect.addEventListener('change', function () {
        const selectedSymbol = stockSelect.value;
        updateStock(selectedSymbol);
    });

    // 初始化加载第一个股票的数据
    updateStock(stockSelect.value);
</script>
</body>
</html>

访问地址:http://localhost:8000

2. 实现客户端与服务端的交互

在上面的代码中,客户端通过 WebSocket 连接到服务端,并根据输入的股票符号接收实时更新。服务端定期推送模拟的股票数据到所有订阅该股票符号的客户端。

3. 总结

通过以上步骤,我们实现了一个基于 Sanic 的 WebSocket 股票实时推送服务,并创建了一个简单的 HTML 客户端以接收这些实时更新。这个示例可以根据实际需求进行扩展和优化,如连接管理、数据来源的整合、错误处理等。希望这个示例能帮助你在实际项目中实现实时数据推送功能。


🔥🔥🔥道阻且长,行则将至,让我们一起加油吧!🌙🌙🌙

💖The End💖点点关注,收藏不迷路💖

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2051533.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux中的内核编程

Linux内核是操作系统的核心组件&#xff0c;负责管理系统的资源、提供硬件抽象和执行系统调用。内核编程是一项涉及操作系统核心的高级任务&#xff0c;它允许开发人员直接与系统内核进行交互&#xff0c;实现更高效、更特定的功能。本文将深入探讨Linux中的内核编程&#xff0…

2021年上半年网络工程师考试上午真题

2021年上半年网络工程师考试上午真题 网络工程师历年真题含答案与解析 第 1 题 以下关于RISC和CISC计算机的叙述中&#xff0c;正确的是&#xff08; &#xff09;。 (A) RISC不采用流水线技术&#xff0c;CISC采用流水线技术(B) RISC使用复杂的指令&#xff0c;CISC使用简…

事件驱动架构的事件版本管理

有一种办法&#xff1a;发送会议邀请给所有团队&#xff0c;经过101次会议后&#xff0c;发布维护横幅&#xff0c;所有人同时点击发布按钮。或... 可用适配器&#xff0c;但微调。没错&#xff01;就像软件开发中90%问题一样&#xff0c;有种模式帮助你找到聪明解决方案。 1…

C Primer Plus(中文版)第13章编程练习,仅供参考

第十三章编程练习 对于文件的操作是程序开发过程中必不可少的。首先&#xff0c;来看一下第一题&#xff0c;对13.1程序进行修改&#xff0c;输入文件名&#xff0c;而不是命令行参数。完整程序代码以及运行结果如下&#xff1a; #include<stdio.h> #include<stdlib…

【数据结构篇】~单链表(附源码)

【数据结构篇】~链表 链表前言链表的实现1.头文件2.源文件 链表前言 链表是一种物理存储结构上非连续、非顺序的存储结构&#xff0c;数据元素的逻辑顺序是通过链表中的指针链接次序实现的。 1、链式机构在逻辑上是连续的&#xff0c;在物理结构上不一定连续​ 2、结点一般是从…

Java二十三种设计模式-命令模式(18/23)

命令模式&#xff1a;将请求封装为对象的策略 概要 本文全面探讨了命令模式&#xff0c;从基础概念到实现细节&#xff0c;再到使用场景、优缺点分析&#xff0c;以及与其他设计模式的比较&#xff0c;并提供了最佳实践和替代方案&#xff0c;旨在帮助读者深入理解命令模式并…

【xr-frame】微信小程序xr-frame典型案例

微信小程序xr-frame典型案例 在之前的工作中&#xff0c;我大量使用XR-Frame框架进行AR开发&#xff0c;并积累了一些案例和业务代码。其中包括2D图像识别、手部动作识别、Gltf模型加载、动态模型加载、模型动画等内容。小程序部分使用TypeScript编写&#xff0c;而XR-Frame组…

利用puppeteer将html网页生成图片

1.什么是puppeteer&#xff1f; Puppeteer是一个Node库&#xff0c;它提供了一个高级API来通过DevTools协议控制Chromium或Chrome。 可以使用Puppeteer来自动化完成浏览器的操作&#xff0c;官方给出的一些使用场景如下&#xff1a; 生成页面PDF抓取 SPA&#xff08;单页应用…

3.Windows Login Unlocker-忘记电脑密码也可以解决

想要解锁Windows系统的开机密码&#xff0c;但官网的传统方法只适合Windows本地账户&#xff0c;对微软账户或PIN码()束手无策&#xff1f;别担心&#xff0c;小编之前推荐过的「Windows Login Unlocker」软件能为您排忧解难。这款出色的工具不仅能够轻松绕过各种Windows密码&a…

C语言-写一个用矩形法求定积分的通用函数,分别求积分区间为[0,1]sinx,cosx,e的x方的定积分

一、题目要求&#xff1a; 二、思路 ①数学方面:矩形法求定积分的公式 将积分图形划分成为指定数量的矩形&#xff0c;求取各个矩形的面积&#xff0c;然后最终进行累加得到结果 1.积分区间: [num1, num2] 2.分割数量:count 每个矩形的边长:dx(num2-num1)/count 3.被积分…

智慧草莓基地管理系统--论文pf

TOC springboot359智慧草莓基地管理系统--论文pf 绪论 1.1 研究背景 当前社会各行业领域竞争压力非常大&#xff0c;随着当前时代的信息化&#xff0c;科学化发展&#xff0c;让社会各行业领域都争相使用新的信息技术&#xff0c;对行业内的各种相关数据进行科学化&#xf…

泰安鲁菜根普照店重装开业:传承与创新并举 品味舌尖上的泰安

8月17日&#xff0c;泰安鲁菜根普照店重装开业。店内装修以泰山文化为主题&#xff0c;营造出一种浓郁的地方文化特色氛围。这家以泰山文化为底蕴&#xff0c;以大汶河传统民俗食材为基础的餐饮企业&#xff0c;一直致力于发掘和传承泰山周边及汶河两岸的传统特色美食&#xff…

【漫谈C语言和嵌入式009】探索LVDT:线性可变差动变压器的工作原理与应用

引言 在现代工业和工程领域中&#xff0c;精确的位移测量是许多系统正常运行的关键。线性可变差动变压器&#xff08;LVDT, Linear Variable Differential Transformer&#xff09;是一种广泛使用的位移传感器&#xff0c;因其高精度、可靠性和耐用性在各种应用中得到了普遍认可…

iOS(OC)学习第1天-怎么设置UI

xCode 版本为 新建工程 xCode->iOS->App->输入工程名称 项目结构 storyboard文件&#xff1a;可以通过拖动方式添加UI的UI布局文件&#xff0c;理解为Android的XML布局文件 名字说明Main.storyboard首页LaunchScreen.storyboard引导页 添加布局 引导页 设置组件的属…

复现DOM破坏案例

准备工作&#xff1a; 做好代理然后访问靶场 XSS Game - Learning XSS Made Simple! | Created by PwnFunction 第一关 Ma Spaghet! 这是源代码部分&#xff1a; <!-- Challenge --> <h2 id"spaghet"></h2> <script> spaghet.innerHTM…

【推荐100个unity插件之25】使用Vroid进行二次元建模,并在unity中使用VRM模型——URP-UniVrm插件的使用

最终效果 文章目录 最终效果什么是Vriod官网地址下载安装使用导出模型unity使用VRM模型导入URP-UniVrm插件 Blender使用Blender安装Cats Blender Plugin 插件Blender安装VRM-Addon-for-Blender插件导入VRM模型导出为FBX模型 使用别人的VRM模型完结 什么是Vriod 如果你玩过能捏…

经典动作手机游戏:《艾希》安卓手机游戏下载

《艾希手游》是一款以女性为主角的游戏&#xff0c;玩家在游戏中将扮演主角艾希&#xff0c;在各种场景中进行冒险&#xff0c;探索剧情和解开谜题。这款游戏的画面精美&#xff0c;操作简单易上手&#xff0c;同时提供了丰富的剧情和多样的玩法。 下载地址&#xff1a;https:/…

氙灯老化试验箱试验机

氙灯老化试验箱&#xff0c;采用6.5KW大功率的精密水冷式氙灯&#xff0c;曝晒面积达到了6500cm2 功能强大&#xff0c;测试结果可靠 ◆ 满足国内外所有氙灯测试标准要求。 ◆ 采用氙灯灯管及滤光器组件&#xff0c;保证试验数据的可比性和重现性。 ◆ 自动旋转式三层鼓型样板架…

MATLAB根据数值画直方图

一直很纠结MATLAB为什么不提供根据数值&#xff08;或统计值&#xff09;画直方图的函数&#xff0c;只给一个不专业的bar&#xff0c;原来histogram支持。 edges [0-0.5:70.5]; counts [508 821 898 892 552 181 159 85];figure; histogram(BinEdges,edges,BinCounts,count…

SAP Memory ABAP Memory超级详细解析

SAP Memory & ABAP Memory超级详细解析_abap set parameter id-CSDN博客 FREE MEMORY ID ZTESTMAT. 清空指定的ABAPmemory FREE MEMORY. 清空externalsession内的所有ABAPmemory 最后请注意 IMPORT…