股票量化实时行情接口WebSocket接入Python封装

news2024/11/13 23:17:57

Python做量化,如果是日内策略,需要更实时的行情数据,不然策略滑点太大,容易跑偏结果。

之前用行情网站提供的level1行情接口,实测平均更新延迟达到了6秒,超过10只股票并发请求频率过快很容易封IP。后面又尝试了买代理IP来请求,成本太高而且不稳定。

在Github上看到一个可转债的Golang高频T+0策略,对接的是WebSocket协议,拿来改了改,封装了一个Python版本的包,记录一下:

#!python3
# -*- coding:utf-8 -*-
import time
import datetime
import websocket
import zlib
import requests
import threading

# 行情订阅推送封装
class Construct:
    __token = ""
    __server_req_url = "http://jvquant.com/query/server?market=ab&type=websocket&token="
    __ws_ser_addr = ""
    __ws_conn = ""
    __lv1_field = ["time", "name", "price", "ratio", "volume", "amount", "b1", "b1p", "b2", "b2p", "b3", "b3p", "b4",
                   "b4p", "b5", "b5p", "s1", "s1p", "s2", "s2p", "s3", "s3p", "s4", "s4p", "s5", "s5p"]
    __lv2_field = ["time", "oid", "price", "vol"]

    def __init__(self, logHandle, token, onRevLv1, onRevLv2):
        if logHandle == "" or token == "" or onRevLv1 == "" or onRevLv2 == "":
            msg = "行情初始化失败:logHandle或token或onRevLv1或onRevLv2必要参数缺失。"
            print(msg)
            exit(-1)
        self.__log = logHandle
        self.__token = token
        self.__deal_lv1 = onRevLv1
        self.__deal_lv2 = onRevLv2
        self.__getSerAddr()
        self.__conn_event = threading.Event()
        self.th_handle = threading.Thread(target=self.__conn)
        self.th_handle.start()
        self.__conn_event.wait()

    def __getSerAddr(self):
        url = self.__server_req_url + self.__token
        try:
            res = requests.get(url=url)
        except Exception as e:
            self.__log(e)
            return
        if (res.json()["code"] == "0"):
            self.__ws_ser_addr = res.json()["server"]
            print("获取行情服务器地址成功:" + self.__ws_ser_addr)
        else:
            msg = "获取行情服务器地址失败:" + res.text
            self.__log(msg)
            exit(-1)

    def __conn(self):
        wsUrl = self.__ws_ser_addr + "?token=" + self.__token
        self.__ws_conn = websocket.WebSocketApp(wsUrl,
                                                on_open=self.__on_open,
                                                on_data=self.__on_message,
                                                on_error=self.__on_error,
                                                on_close=self.__on_close)
        self.__ws_conn.run_forever()
        self.__conn_event.set()
        self.__log("ws thread exited")

    def addLv1(self, codeArr):
        cmd = "add="
        lv1Codes = []
        for code in codeArr:
            lv1Codes.append("lv1_" + code)

        cmd = cmd + ",".join(lv1Codes)
        self.__log("cmd:" + cmd)
        self.__ws_conn.send(cmd)

    def addLv2(self, codeArr):
        cmd = "add="
        lv1Codes = []
        for code in codeArr:
            lv1Codes.append("lv2_" + code)

        cmd = cmd + ",".join(lv1Codes)
        self.__log("cmd:" + cmd)
        self.__ws_conn.send(cmd)

    def dealLv1(self, data):
        self.__deal_lv1(data)

    def dealLv2(self, data):
        self.__deal_lv1(data)

    def __on_open(self, ws):
        self.__conn_event.set()
        self.__log("行情连接已创建")

    def __on_error(self, ws, error):
        self.__log("行情处理error:", error)

    def __on_close(self, ws, code, msg):
        self.__log("行情服务未连接")

    def close(self):
        self.__ws_conn.close()

    def __on_message(self, ws, message, type, flag):
        # 命令返回文本消息
        if type == websocket.ABNF.OPCODE_TEXT:
            self.__log("Text响应:" + message)
        # 行情推送压缩二进制消息,在此解压缩
        if type == websocket.ABNF.OPCODE_BINARY:
            now = datetime.datetime.now()
            nStamp = time.mktime(now.timetuple())
            date = now.strftime('%Y-%m-%d')

            rb = zlib.decompress(message, -zlib.MAX_WBITS)
            text = rb.decode("utf-8")
            # self.__log("Binary响应:" + text)
            ex1 = text.split("\n")
            for e1 in ex1:
                ex2 = e1.split("=")
                if len(ex2) != 2:
                    continue
                code = ex2[0]
                hqs = ex2[1]
                if code.startswith("lv1_"):
                    code = code.replace("lv1_", "")
                    hq = hqs.split(",")
                    if len(hq) == len(self.__lv1_field):
                        hqMap = dict(zip(self.__lv1_field, hq))
                        timeStr = hqMap['time']
                        date_obj = datetime.datetime.strptime(date + ' ' + timeStr, '%Y-%m-%d %H:%M:%S')
                        tStamp = int(time.mktime(date_obj.timetuple()))
                        if abs(tStamp - nStamp) <= 2:
                            self.__deal_lv1(code, hqMap)

                if code.startswith("lv2_"):
                    code = code.replace("lv2_", "")
                    hqArr = hqs.split("|")
                    for hq in hqArr:
                        hqEx = hq.split(",")
                        if len(hqEx) == len(self.__lv2_field):
                            hqMap = dict(zip(self.__lv2_field, hqEx))
                            timeEx = hqMap['time'].split('.')
                            if len(timeEx) == 2:
                                timeStr = timeEx[0]
                                date_obj = datetime.datetime.strptime(date + ' ' + timeStr, '%Y-%m-%d %H:%M:%S')
                                tStamp = int(time.mktime(date_obj.timetuple()))
                                if abs(tStamp - nStamp) <= 2:
                                    self.__deal_lv2(code, hqMap)

引用地址:https://github.com/freevolunteer/bondTrader/blob/main/pyscript/jvUtil/HanqQing.py

订阅指令参考:https://jvquant.com/wiki.html#--9

原文地址:https://zhuanlan.zhihu.com/p/6059899873

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2238264.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QT栅格布局的妙用

当groupBox中只有一个控件时&#xff0c;我们想要它满格显示可以对groupBox使用栅格布局

MyBatis快速入门(上)

MyBatis快速入门&#xff08;上&#xff09; 一、MyBatis 简介1、概述2、JDBC、Hibernate、MyBatis 对比 二、MyBatis 框架搭建1、开发环境2、创建maven工程3、创建MyBatis的核心配置文件4、创建mapper接口5、创建MyBatis的映射文件6、通过junit测试功能7、加入log4j2日志功能 …

在Pybullet中加载Cinema4D创建的物体

首先明确我们的目标&#xff0c;是希望在cinema4D中创建自己想要的模型&#xff0c;并生成.obj文件&#xff0c;然后在pybullet中加载.obj文件作为静态物体&#xff0c;可以用于抓取物体&#xff0c;避障物体。&#xff08;本文提到的方法只能实现静态物体的建模&#xff0c;如…

第十三届交通运输研究(上海)论坛┆智能网联汽车技术现状与研究实践

0.简介 交通运输研究&#xff08;上海&#xff09;论坛&#xff08;简称为TRF&#xff09;是按照国际会议的组织原则&#xff0c;为综合交通运输领域学者们构建的良好合作交流平台。交通运输研究&#xff08;上海&#xff09;论坛已经成功举办了十二届&#xff0c;凝聚了全国百…

Pr:视频过渡快速参考(合集 · 2025版)

Adobe Premiere Pro 自带七组约四十多个视频过渡 Video Transitions效果&#xff0c;包含不同风格和用途&#xff0c;可在两个剪辑之间创造平滑、自然的转场&#xff0c;用来丰富时间、地点或情绪的变化。恰当地应用过渡可让观众更好地理解故事或人物。 提示&#xff1a; 点击下…

stm32 踩坑笔记

串口问题&#xff1a; 问题&#xff1a;会改变接收缓冲的下一个字节 串口的初始化如下&#xff0c;位长度选择了9位。因为要奇偶校验&#xff0c;要选择9位。但是接收有用数据只用到1个字节。 问题原因&#xff1a; 所以串口接收时会把下一个数据更改

昇思大模型平台打卡体验活动:项目4基于MindSpore实现Roberta模型Prompt Tuning

基于MindNLP的Roberta模型Prompt Tuning 本文档介绍了如何基于MindNLP进行Roberta模型的Prompt Tuning&#xff0c;主要用于GLUE基准数据集的微调。本文提供了完整的代码示例以及详细的步骤说明&#xff0c;便于理解和复现实验。 环境配置 在运行此代码前&#xff0c;请确保…

后悔没早点知道,Coze 插件 + Cursor 原来可以这样赚钱

最近智能体定制化赛道异常火爆。 打开闲鱼搜索"Coze 定制",密密麻麻的服务报价直接刷屏,即使表明看起来几十块的商家,一细聊,都是几百到上千不等的报价。 有趣的是,这些智能体定制化服务背后,最核心的不只是工作流设计,还有一个被很多人忽视的重要角色 —— …

基于STM32的节能型路灯控制系统设计

引言 本项目基于STM32微控制器设计了一个智能节能型路灯控制系统&#xff0c;通过集成多个传感器模块和控制设备&#xff0c;实现对路灯的自动调节。该系统能够根据周围环境光照强度、车辆和行人活动等情况&#xff0c;自动控制路灯的开关及亮度调节&#xff0c;从而有效减少能…

Qml 模型-视图-代理(贰)之 动态视图学习

目录 动态视图 动态视图用法 ⽅向&#xff08;Orientation&#xff09; 键盘导航和⾼亮 页眉与页脚 网格视图 动态视图 动态视图用法 Repeater 元素适合有限的静态数据&#xff0c; QtQuick 提供了 ListView 和 GridView, 这两个都是基于 Flickable(可滑动) 区域的元素…

新标准大学英语综合教程1课后习题答案PDF第三版

《新标准大学英语&#xff08;第三版&#xff09;综合教程1 》是“新标准大学英语&#xff08;第三版&#xff09;”系列教材之一。本书共包含6个单元&#xff0c;从难度和话题上贴近大一上学生的认知和语言水平&#xff0c;包括与学生个人生活领域和社会文化等相关内容&#x…

Python闭包|你应该知道的常见用例(下)

引言 在 Python 编程语言中&#xff0c;闭包通常指的是一个嵌套函数&#xff0c;即在一个函数内部定义的另一个函数。这个嵌套的函数能够访问并保留其外部函数作用域中的变量。这种结构就构成了一个闭包。 闭包在函数式编程语言中非常普遍。在 Python 中&#xff0c;闭包特别有…

Rocky、Almalinux、CentOS、Ubuntu和Debian系统初始化脚本v9版

Rocky、Almalinux、CentOS、Ubuntu和Debian系统初始化脚本 Shell脚本源码地址&#xff1a; Gitee&#xff1a;https://gitee.com/raymond9/shell Github&#xff1a;https://github.com/raymond999999/shell脚本可以去上面的Gitee或Github代码仓库拉取。 支持的功能和系统&am…

AUTOSAR OS模块详解(一) 概述

AUTOSAR OS模块详解(一) 概述 本文主要介绍AUTOSAR架构下的OS概述。 文章目录 AUTOSAR OS模块详解(一) 概述1 前言1.1 操作系统1.2 嵌入式操作系统1.3 AUTOSAR操作系统 2 AUTOSAR OS2.1 AUTOSAR OS组成2.2 AUTOSAR OS类别2.3 任务管理2.4 调度表2.5 资源管理2.6 多核特性2.7 …

5位机械工程师如何共享一台工作站的算力?

在现代化的工程领域中&#xff0c;算力已成为推动创新与技术进步的关键因素之一。对于机械工程师而言&#xff0c;强大的计算资源意味着能够更快地进行复杂设计、模拟分析以及优化工作&#xff0c;从而明显提升工作效率与项目质量。然而&#xff0c;资源总是有限的&#xff0c;…

Scala 中 set 的实战应用 :图书管理系统

1. 创建书籍集合 首先&#xff0c;我们创建一个可变的书籍集合&#xff0c;用于存储图书馆中的书籍信息。在Scala中&#xff0c;mutable.Set可以用来创建一个可变的集合。 val books mutable.Set("朝花惜拾", "活着") 2. 添加书籍 我们可以使用操作符…

DevCheck Pro手机硬件检测工具v5.33

前言 DevCheck Pro是一款手机硬件和操作系统信息检测查看工具&#xff0c;该软件的功能非常强大&#xff0c;为用户提供了系统、硬件、应用程序、相机、网络、电池等一系列信息查看功能 安装环境 [名称]&#xff1a;DevCheckPro [版本]&#xff1a;5.33 [大小]&a…

cv::intersectConvexConvex返回其中一个输入点集,两个点集不相交

问题&#xff1a;cv::intersectConvexConvex返回其中一个输入点集&#xff0c;但两个点集并不相交 版本&#xff1a;opencv 3.1.0 git上也有人反馈了intersectConvexConvex sometimes returning one of the input polygons in case of empty intersection #10044 是凸包嵌套判…

【刷题12】ctfshow刷题

来源&#xff1a;ctfshow easyPytHon_P 考点&#xff1a;代码审计&#xff0c;源代码查看 打开后查看源码&#xff0c;发现一个源码地址&#xff0c;打开看看 可以知道在此目录下有个flag.txt文件&#xff0c;再观察源码 from flask import request cmd: str request.form.get…

spark的学习-03

RDD的创建的两种方式&#xff1a; 方式一&#xff1a;并行化一个已存在的集合 方法&#xff1a;parallelize 并行的意思 将一个集合转换为RDD 方式二&#xff1a;读取外部共享存储系统 方法&#xff1a;textFile、wholeTextFile、newAPIHadoopRDD等 读取外部存储系统的数…