使用pytdx获取股票信息总结

news2024/11/29 8:41:13

使用pytdx获取股票相关信息汇总

  • 行情接口
  • 标准行情
  • 对接总结
    • 界面展示
    • 性能问题
    • 数据可靠性
  • 附录代码

行情接口

pytdx中提供了hq(标准行情)及exhq(扩展市场行情)两种接口,扩展市场目前已经失效无法使用。

标准行情

这里只提供代码,见附录。

对接总结

除了一开始不熟悉股票领域的相关定义,目前是遇到一个概念去查询一个概念,这个只有慢慢来了。

界面展示

目前采用odoo框架来存储和展示获取下来的相关信息。
体验地址:http://111.229.103.209:8090/
用户名:john,密码:123456
在这里插入图片描述

性能问题

目前是获取了深市和沪市的全部5300+股票的所有数据,历史交易数据打算只获取9月1号以后的。后续真实的使用,肯定是选定部分股票去获取数据。

数据可靠性

数据获取太频繁的时候,会遇到timeout,于是统一都增加了重试机制

附录代码

# -*- coding: utf-8 -*-
import time
from pytdx.hq import TdxHq_API


class Tdx_Client():

    def __init__(self, ip='111.229.247.189', port='7709'):
        self.ip = ip
        self.port = port
        self.api = TdxHq_API(auto_retry=True, raise_exception=False)
        self.api.connect(self.ip, int(self.port))

    def get_stock_quotes(self, all_stock, code=None):
        """
        获取股票行情
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_security_quotes(all_stock, code)
                    return data or []
            except Exception as e:
                time.sleep(3)
        return []

    def get_stock_bars(self, mkt_id, code, start, size):
        """
        获取k线
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_security_bars(mkt_id, code, start, size)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_stock_count(self, mkt_id):
        """
        获取股票数量
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_security_count(mkt_id)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_stock_list(self, mkt_id, start):
        """
        获取股票列表
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_security_list(mkt_id, start)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_index_bars(self, category, market, code, start, count):
        """
        获取指数k线
        n数据类型:
        0 5分钟K线
        1 15分钟K线
        2 30分钟K线
        3 1小时K线
        4 日K线
        7 1分钟
        8 1分钟K线
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_index_bars(category, market, code, start, count)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_minute_time_data(self, mkt_id, code):
        """
        查询分时行情
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_minute_time_data(mkt_id, code)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_history_minute_time_data(self, mkt_id, code, date):
        """
        查询历史分时行情
        """
        if isinstance(date, str):
            date = int(date)
        else:
            date = int(date.strftime("%Y%m%d"))
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_history_minute_time_data(
                        mkt_id, code, date)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_transaction_data(self, market, code, start, count):
        """
        查询分时成交
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_transaction_data(market, code, start, count)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_history_transaction_data(self, market, code, start, count, date):
        """
        查询历史分时成交
        """
        if isinstance(date, str):
            date = int(date)
        else:
            date = int(date.strftime("%Y%m%d"))
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_history_transaction_data(
                        market, code, start, count, date)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_company_info_category(self, market, code):
        """
        查询公司信息目录
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_company_info_category(market, code)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_company_info_content(self, market, code, filename, start, length):
        """
        读取公司信息-最新提示
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_company_info_content(market, code, filename, start, length)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_xdxr_info(self, market, code):
        """
        读取除权除息
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_xdxr_info(market, code)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_finance_info(self, market, code):
        """
        获取财务信息
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_finance_info(market, code)
                    return data
            except Exception as e:
                time.sleep(3)
        return {}

    def get_k_data(self, code, start, end):
        """
        日线级别k线获取函数
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    df = self.api.get_k_data(code, start, end)
                    return df.to_dict('records') or []
            except Exception as e:
                time.sleep(3)
        return []

    def get_and_parse_block_info(self, blockfile):
        """
        获取板块信息
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_and_parse_block_info(blockfile)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2192020.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

4. Getter和Setter注解与lombok

文章目录 1. 什么是Getter和Setter注解2. 什么是lombokjava自带的jar包 3. 从maven仓库里找lombok相关jar包4. 把jar包导入项目另一个jar包导入途径 5. 正式使用注解① 问题② 解决方案提示 6. 如果还想对某个成员变量添加限制怎么办7. 内容出处 1. 什么是Getter和Setter注解 官…

【包教包会】速通LLM《从头开始构建大型语言模型》免费pdf分享

介绍 在当今人工智能技术飞速发展的时代,大型语言模型(LLM)作为聊天机器人、文本生成和理解等应用的核心,已经成为研究和商业领域的关注焦点。尽管这些模型的应用无处不在,但对于大多数开发者来说,它们的工…

AI正悄然地影响着企业数字化转型

2022年底发布的ChatGPT将人工智能技术上升到了一个新的高度。如今,人工智能正彻底改变数字转型的进程,为企业提供优化运营和提升客户体验的机会。利用算法、数据分析、机器学习等人工智能技术结合企业自身情况,可以推动企业持续创新&#xff…

每日OJ题_牛客_mari和shiny_线性dp_C++_Java

目录 牛客_mari和shiny_线性dp 题目解析 C代码 Java代码 牛客_mari和shiny_线性dp mari和shiny (nowcoder.com) 描述: mari每天都非常shiny。她的目标是把正能量传达到世界的每个角落! 有一天,她得到了一个仅由小写字母组成的字…

ElasticSearch 备考 -- Snapshot Restore

一、题目 备份集群下的索引 task,存储快照名称为 snapshot_1 二、思考 这个涉及的是集群的备份,主要是通过创建快照,涉及到以下2步骤 Setp1:注册一个备份 snapshot repository Setp2:创建 snapshot 可以通过两种方…

找生网站方案———未来之窗行业应用跨平台架构

1)网站设计方面的考虑 主色调采用于公司深蓝色颜色,网页整体色彩明快、大气、简洁,每个细节均经过精心处 理,网页浏览快速,导航明确清晰。 网页设计要充分考虑网页的整体感觉,每个页面的图片与网站色调的过…

YOLO11改进 | 卷积模块 | 用Ghost卷积轻量化网络【详细步骤】

秋招面试专栏推荐 :深度学习算法工程师面试问题总结【百面算法工程师】——点击即可跳转 💡💡💡本专栏所有程序均经过测试,可成功执行💡💡💡 Ghost 模块可以作为即插即用组件来升级…

C(十四)while、for、do-while循环综合(一)

uu们,小弟我本科在读,文章我会一直坚持更新下去,包括但不限于C初阶、C进阶、数据结构、C、Linux、MySQL、项目、QT开发、各种算法(之后会持续更新),并且站在小白的视角尽可能通俗易懂地把这些写出来&#x…

xss之dom类型

目录 xss关于dom类型 1、闭合方式 2、闭合,直接输入1,成功闭合 上我们的pikachu xss关于dom类型 1、闭合方式 输入123,然后打开f12,审查元素,之前一直没有搞懂为什么要在前面加上个单引号 输入两个双引号&#x…

【STM32开发之寄存器版】(三)-详解NVIC中断

一、前言 STM32F103ZET6具备强大的中断控制能力,其嵌套向量中断控制器(NVIC)和处理器核的接口紧密相连,可以实现低延迟的中断处理和高效地处理晚到的中断。NVIC主要具备以下特性: 68个可屏蔽中断通道(不包含16个Cortex™-M3的中断线)&#xf…

【黑马点评】 使用RabbitMQ实现消息队列——1.Docker与RabbitMQ环境安装

黑马点评中,使用基于Redis的Stream实现消息队列,但是Strema已经不太常用。在此修改为使用RabbitMQ实现消息队列。主要包括RabbitMQ的环境准备(Docker的下载与安装)以及如何修改黑马点评中的代码。 【黑马点评】使用RabbitMQ实现消…

【Java数据结构】栈 (Stack)

【本节目标】 1. 栈的概念及使用 2. 相关 OJ 题 一、概念 栈:一种特殊的线性表,其只允许在固定的一端进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶,另一端称为栈底。栈中的数据元素遵守后进先出LIFO(Last…

C语言—单链表

目录 一、链表的概念及结构 二、单链表实现 (2.1)基本结构定义 (2.2)申请节点 (2.3)打印函数 (2.4)头部插入删除\尾部插入删除 (2.4.1)尾部插入 &…

Anaconda的安装与环境设置

文章目录 一、Anaconda介绍二、Anaconda环境搭建1. 下载Anaconda(1)官网下载(2)清华大学镜像 2. 安装Anaconda3.配置环境变量4.检验conda是否安装成功5.更改镜像源6.若菜单栏没有conda prompt 三、虚拟环境1.创建、查看、删除虚拟环境2.激活、退出虚拟环境 四、CUDA、Pytorch、…

基于SpringBoot+Vue的酒店客房管理系统

作者:计算机学姐 开发技术:SpringBoot、SSM、Vue、MySQL、JSP、ElementUI、Python、小程序等,“文末源码”。 专栏推荐:前后端分离项目源码、SpringBoot项目源码、Vue项目源码、SSM项目源码、微信小程序源码 精品专栏:…

qemu-system-aarch64开启user用户模式网络连接

一、问题 在使用qemu构建arm64的虚拟机时,虚拟机没有网络,桥接方式相对麻烦,我只是需要联网更新即可。与宿主机的通信我使用共享文件夹即可满足要求。 使用指令启动虚拟机时,网络部分的参数为 -net user,hostfwdtcp::10022-:22 …

白板2-数学基础

高斯分布1-极大似然估计 高斯分布2-极大似然估计-无偏&有偏 高斯分布3-从概率密度角度高斯分布4-局限性高斯分布5-边缘概率及条件概率高斯分布6-求联合概率分布

八大排序--01冒泡排序

假设有一组数据 arr[]{2,0,3,4,5,7} 方法:开辟两个指针,指向如图,前后两两进行比较,大数据向后冒泡传递,小数据换到前面。 一次冒泡后,数组中最大…

codetop标签动态规划大全C++讲解(四)!!动态规划刷穿地心!!学吐了家人们o(╥﹏╥)o

一天复习一篇,个人学习记录 1.最大子数组和2.最长的斐波那契子序列的长度3.最大正方形4.最长有效括号5.乘积最大子数组6.可被三整除的最大和7.回文子串数目8.最长回文子序列9.最长回文子串 1.最大子数组和 给你一个整数数组 nums ,请你找出一个具有最大…

SpringBoot在线教育平台:设计与实现的深度解析

2相关技术 2.1 MYSQL数据库 MySQL是一个真正的多用户、多线程SQL数据库服务器。 是基于SQL的客户/服务器模式的关系数据库管理系统,它的有点有有功能强大、使用简单、管理方便、安全可靠性高、运行速度快、多线程、跨平台性、完全网络化、稳定性等,非常…