PostgreSQL_数据下载并保存(psycopg2)

news2025/3/23 16:49:24

目录

前置:

1 数据下载

1.1 多个股票多个交易日

1.2 一个交易日所有股票 

2 数据保存,使用python中的psycopg2包

2.1 在PyCharm中创建新项目,并安装包

2.2  代码-多个股票多个交易日

2.3 代码-一个交易日所有股票

2.4 在 pgAdmin4 中查看数据存储是否正确

1)t_daily

​编辑 2) t_stock_daily


前置:

本博文是一个系列。在本人“数据库专栏”-》“PostgreSQL_”开头的博文。

1 数据下载

本实例的数据都下载自优矿,具体如何下载可自行上优矿官网,这里不赘述。

下载后的csv文件如下:

1.1 多个股票多个交易日

这些是沪深两市 2023-07-11 到 2025- 02-12 所有股票日数据

1.2 一个交易日所有股票 

2 数据保存,使用python中的psycopg2包

2.1 在PyCharm中创建新项目,并安装包

pip install psycopg2、pip install pandas

2.2  代码-多个股票多个交易日

import psycopg2
import os
import pandas as pd
from datetime import datetime

def connect_db():
    try:
        conn = psycopg2.connect(database='db_stock',user='postgres',password='写你自己的密码',host='127.0.0.1',port=5432)
    except Exception as e:
        print(f'connection failed。{e}')
    else:
        return conn
    pass

# 批量插入数据
def many_day_insert(data_dir:str):
    '''
    1 纵向数据
    1.1 查询 ticker,把新股筛出来
    1.2 新股,用 insert; 非新股,用 update,array_cat
    2 横向数据
    2.1 按日期分组,直接 insert
    :param date_dir:
    :return:
    '''
    file_list = os.listdir(data_dir)
    col_list = ['secID','tradeDate','openPrice','highestPrice','lowestPrice','closePrice','turnoverVol','turnoverValue','dealAmount','turnoverRate','negMarketValue','marketValue','chgPct','PE','PE1','PB','isOpen','vwap']
    df = None
    for file_one in file_list:
        file_path = data_dir + file_one
        df00 = pd.read_csv(file_path,encoding='utf-8')
        df00 = df00.loc[:,col_list].copy()
        if df is None:
            df = df00
        else:
            df = pd.concat([df,df00])

    df['ticker'] = df['secID'].str.slice(0,6)
    ticker_list = df['ticker'].to_list()
    ticker_list = list(set(ticker_list))
    ticker_list_str = '\',\''.join(ticker_list)
    ticker_list_str = '\''+ticker_list_str+'\''

    # sql 判断 t_stock_daily 是否为空表
    sql_vertical_null_str = f"select count(*) from t_stock_daily;"
    # sql 查询 ticker
    sql_vertical_q_str = f"select ticker from t_stock_daily where ticker not in ({ticker_list_str});"

    sql_vertical_i_str = '''
    insert into t_stock_daily(ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
    '''

    sql_vertical_u_str = '''
    update t_stock_daily set tradeDate=array_cat(tradeDate,%s),openPrice=array_cat(openPrice,%s),highestPrice=array_cat(highestPrice,%s),lowestPrice=array_cat(lowestPrice,%s),closePrice=array_cat(closePrice,%s),turnoverVol=array_cat(turnoverVol,%s),turnoverValue=array_cat(turnoverValue,%s),dealAmount=array_cat(dealAmount,%s),turnoverRate=array_cat(turnoverRate,%s),negMarketValue=array_cat(negMarketValue,%s),marketValue=array_cat(marketValue,%s),chgPct=array_cat(chgPct,%s),PE=array_cat(PE,%s),PE1=array_cat(PE1,%s),PB=array_cat(PB,%s),isOpen=array_cat(isOpen,%s),vwap=array_cat(vwap,%s) where ticker=%s;
    '''

    sql_horizonal_i_str = '''
    insert into t_daily(tradeDate,tradeDateOj,ticker,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
    '''

    conn = connect_db()
    cur = conn.cursor()
    cur.execute(sql_vertical_null_str)
    res0 = cur.fetchone()
    if res0[0] == 0:
        new_ticker_list = ticker_list
    else:
        cur.execute(sql_vertical_q_str)
        res = cur.fetchall()
        new_ticker_list = []
        if res is not None:
            for one_node in res:
                new_ticker_list.append(one_node[0])

    v_i_list = []
    v_u_list = []
    h_i_list = []

    df_group_v = df.groupby(by='ticker')
    for ticker,group in df_group_v:
        if ticker in new_ticker_list:
            one_node = (
                ticker,
                group['tradeDate'].to_list(),
                group['openPrice'].to_list(),
                group['highestPrice'].to_list(),
                group['lowestPrice'].to_list(),
                group['closePrice'].to_list(),
                group['turnoverVol'].to_list(),
                group['turnoverValue'].to_list(),
                group['dealAmount'].to_list(),
                group['turnoverRate'].to_list(),
                group['negMarketValue'].to_list(),
                group['marketValue'].to_list(),
                group['chgPct'].to_list(),
                group['PE'].to_list(),
                group['PE1'].to_list(),
                group['PB'].to_list(),
                group['isOpen'].to_list(),
                group['vwap'].to_list()
            )
            v_i_list.append(one_node)
            pass
        else:
            one_node = (
                group['tradeDate'].to_list(),
                group['openPrice'].to_list(),
                group['highestPrice'].to_list(),
                group['lowestPrice'].to_list(),
                group['closePrice'].to_list(),
                group['turnoverVol'].to_list(),
                group['turnoverValue'].to_list(),
                group['dealAmount'].to_list(),
                group['turnoverRate'].to_list(),
                group['negMarketValue'].to_list(),
                group['marketValue'].to_list(),
                group['chgPct'].to_list(),
                group['PE'].to_list(),
                group['PE1'].to_list(),
                group['PB'].to_list(),
                group['isOpen'].to_list(),
                group['vwap'].to_list(),
                ticker
            )
            v_u_list.append(one_node)
            pass
        pass
    df_group_h = df.groupby(by='tradeDate')
    for date_str,group in df_group_h:
        one_node = (
            date_str,
            datetime.strptime(date_str,'%Y-%m-%d'),
            group['ticker'].to_list(),
            group['openPrice'].to_list(),
            group['highestPrice'].to_list(),
            group['lowestPrice'].to_list(),
            group['closePrice'].to_list(),
            group['turnoverVol'].to_list(),
            group['turnoverValue'].to_list(),
            group['dealAmount'].to_list(),
            group['turnoverRate'].to_list(),
            group['negMarketValue'].to_list(),
            group['marketValue'].to_list(),
            group['chgPct'].to_list(),
            group['PE'].to_list(),
            group['PE1'].to_list(),
            group['PB'].to_list(),
            group['isOpen'].to_list(),
            group['vwap'].to_list()
        )
        h_i_list.append(one_node)
        pass

    try:
        cur.executemany(sql_vertical_i_str,v_i_list)
        cur.executemany(sql_vertical_u_str,v_u_list)
        cur.executemany(sql_horizonal_i_str,h_i_list)
        conn.commit()
        pass
    except Exception as e:
        print(f'error: {e}')
        conn.rollback()
    finally:
        cur.close()
        conn.close()
    pass


if __name__ == '__main__':
    many_day_insert(r'E:/temp005/')
    pass

2.3 代码-一个交易日所有股票

def one_day_insert(file_path:str):
    '''
    1 纵向数据
    1.1 新股,insert
    1.2 非新股,update, array_append
    2 横向数据
    2.1 直接 insert 一条数据
    :param file_path:
    :return:
    '''
    df = pd.read_csv(file_path,encoding='utf-8')

    col_list = ['secID', 'tradeDate', 'openPrice', 'highestPrice', 'lowestPrice', 'closePrice', 'turnoverVol',
                'turnoverValue', 'dealAmount', 'turnoverRate', 'negMarketValue', 'marketValue', 'chgPct', 'PE', 'PE1',
                'PB', 'isOpen', 'vwap']
    df = df.loc[:,col_list].copy()
    df['ticker'] = df['secID'].str.slice(0,6)
    ticker_list = df['ticker'].to_list()
    ticker_list = list(set(ticker_list))
    ticker_list_str = '\',\''.join(ticker_list)
    ticker_list_str = '\'' + ticker_list_str + '\''

    # sql 判断 t_stock_daily 是否为空表
    sql_vertical_null_str = f"select count(*) from t_stock_daily;"
    # sql 查询 ticker
    sql_vertical_q_str = f"select ticker from t_stock_daily where ticker not in ({ticker_list_str});"

    sql_vertical_i_str = '''
        insert into t_stock_daily(ticker,tradeDate,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
        '''

    sql_vertical_u_str = '''
            update t_stock_daily set tradeDate=array_append(tradeDate,%s),openPrice=array_append(openPrice,%s),highestPrice=array_append(highestPrice,%s),lowestPrice=array_append(lowestPrice,%s),closePrice=array_append(closePrice,%s),turnoverVol=array_append(turnoverVol,%s),turnoverValue=array_append(turnoverValue,%s),dealAmount=array_append(dealAmount,%s),turnoverRate=array_append(turnoverRate,%s),negMarketValue=array_append(negMarketValue,%s),marketValue=array_append(marketValue,%s),chgPct=array_append(chgPct,%s),PE=array_append(PE,%s),PE1=array_append(PE1,%s),PB=array_append(PB,%s),isOpen=array_append(isOpen,%s),vwap=array_append(vwap,%s) where ticker=%s;
            '''

    sql_horizonal_i_str = '''
        insert into t_daily(tradeDate,tradeDateOj,ticker,openPrice,highestPrice,lowestPrice,closePrice,turnoverVol,turnoverValue,dealAmount,turnoverRate,negMarketValue,marketValue,chgPct,PE,PE1,PB,isOpen,vwap) values (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s);
        '''

    conn = connect_db()
    cur = conn.cursor()
    cur.execute(sql_vertical_null_str)
    res0 = cur.fetchone()
    if res0[0] == 0:
        new_ticker_list = ticker_list
    else:
        cur.execute(sql_vertical_q_str)
        res = cur.fetchall()
        new_ticker_list = []
        if res is not None:
            for one_node in res:
                new_ticker_list.append(one_node[0])

    v_i_list = []
    v_u_list = []
    h_one = None
    for i,row in df.iterrows():
        if row['ticker'] in new_ticker_list:
            one_node = (
                row['ticker'],
                [row['tradeDate']],
                [row['openPrice']],
                [row['highestPrice']],
                [row['lowestPrice']],
                [row['closePrice']],
                [row['turnoverVol']],
                [row['turnoverValue']],
                [row['dealAmount']],
                [row['turnoverRate']],
                [row['negMarketValue']],
                [row['marketValue']],
                [row['chgPct']],
                [row['PE']],
                [row['PE1']],
                [row['PB']],
                [row['isOpen']],
                [row['vwap']]
            )
            v_i_list.append(one_node)
            pass
        else:
            one_node = (
                row['tradeDate'],
                row['openPrice'],
                row['highestPrice'],
                row['lowestPrice'],
                row['closePrice'],
                row['turnoverVol'],
                row['turnoverValue'],
                row['dealAmount'],
                row['turnoverRate'],
                row['negMarketValue'],
                row['marketValue'],
                row['chgPct'],
                row['PE'],
                row['PE1'],
                row['PB'],
                row['isOpen'],
                row['vwap'],
                row['ticker']
            )
            v_u_list.append(one_node)
            pass
        pass
    date_str = df['tradeDate'][0]
    h_one = (
        date_str,
        datetime.strptime(date_str,'%Y-%m-%d'),
        df['ticker'].to_list(),
        df['openPrice'].to_list(),
        df['highestPrice'].to_list(),
        df['lowestPrice'].to_list(),
        df['closePrice'].to_list(),
        df['turnoverVol'].to_list(),
        df['turnoverValue'].to_list(),
        df['dealAmount'].to_list(),
        df['turnoverRate'].to_list(),
        df['negMarketValue'].to_list(),
        df['marketValue'].to_list(),
        df['chgPct'].to_list(),
        df['PE'].to_list(),
        df['PE1'].to_list(),
        df['PB'].to_list(),
        df['isOpen'].to_list(),
        df['vwap'].to_list()
    )

    try:
        cur.executemany(sql_vertical_i_str,v_i_list)
        cur.executemany(sql_vertical_u_str,v_u_list)
        cur.execute(sql_horizonal_i_str,h_one)
        conn.commit()
        pass
    except Exception as e:
        print(f'error: {e}')
        conn.rollback()
    finally:
        cur.close()
        conn.close()
    pass

2.4 在 pgAdmin4 中查看数据存储是否正确

上面的操作 存储的是  2023年7月11日 到 2025年2月13日 数据

1)t_daily

查询 t_daily 中最小日期和最大日期

select max(tradeDateOj) from t_daily;
select min(tradeDateOj) from t_daily;

 2) t_stock_daily

查看某一股票的第一条数据和最后一条数据

select tradeDate[1],openPrice[1],highestPrice[1],lowestPrice[1],closePrice[1],turnoverVol[1],turnoverValue[1],dealAmount[1],turnoverRate[1],negMarketValue[1],marketValue[1],chgPct[1],PE[1],PE1[1],PB[1],isOpen[1],vwap[1] from t_stock_daily where ticker='000001';

 select tradeDate[array_length(tradeDate,1)],openPrice[array_length(openPrice,1)],highestPrice[array_length(highestPrice,1)],lowestPrice[array_length(lowestPrice,1)],closePrice[array_length(closePrice,1)],turnoverVol[array_length(turnoverVol,1)],turnoverValue[array_length(turnoverValue,1)],dealAmount[array_length(dealAmount,1)],turnoverRate[array_length(turnoverRate,1)],negMarketValue[array_length(negMarketValue,1)],marketValue[array_length(marketValue,1)],chgPct[array_length(chgPct,1)],PE[array_length(PE,1)],PE1[array_length(PE1,1)],PB[array_length(PB,1)],isOpen[array_length(isOpen,1)],vwap[array_length(vwap,1)] from t_stock_daily where ticker='000001';

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2319607.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

启明星辰春招面试题

《网安面试指南》https://mp.weixin.qq.com/s/RIVYDmxI9g_TgGrpbdDKtA?token1860256701&langzh_CN 5000篇网安资料库https://mp.weixin.qq.com/s?__bizMzkwNjY1Mzc0Nw&mid2247486065&idx2&snb30ade8200e842743339d428f414475e&chksmc0e4732df793fa3bf39…

边缘计算革命:重构软件架构的范式与未来

摘要 边缘计算通过将算力下沉至网络边缘,正在颠覆传统中心化软件架构的设计逻辑。本文系统分析了边缘计算对软件架构的范式革新,包括分布式分层架构、实时资源调度、安全防护体系等技术变革,并结合工业物联网、智慧医疗等场景案例&#xff0c…

【读点论文】Chain Replication for Supporting High Throughput and Availability

在分布式系统中,强一致性往往和高可用、高吞吐是矛盾的。比如传统的关系型数据库,其保证了强一致性,但往往牺牲了可用性和吞吐量。而像 NoSQL 数据库,虽然其吞吐量、和扩展性很高,但往往只支持最终一致性,无…

Servlet、Servlet的5个接口方法、生命周期、以及模拟实现 HttpServlet 来写接口的基本原理

DAY15.1 Java核心基础 Servlet Servlet是一个接口,是java的基础,java之所以编写web的程序,接收请求并响应,就是因为Sevlet接口 Java 类实现了Servlet接口的时候就可以接收并响应请求,成为web服务器 Web服务器就是接…

贝叶斯公式的一个直观解释

E E E:抓到娃娃 H H H:坐地铁 H ˉ \bar H Hˉ:坐公交 P ( E ) P ( H ) P ( E ∣ H ) P ( H ‾ ) P ( E ∣ H ‾ ) P({E}) P({H}) P({E} \mid {H}) {P}(\overline{{H}}) {P}({E} \mid \overline{{H}}) P(E)P(H)P(E∣H)P(H)P(E∣H) P (…

Java 大视界 -- Java 大数据分布式计算中的通信优化与网络拓扑设计(145)

💖亲爱的朋友们,热烈欢迎来到 青云交的博客!能与诸位在此相逢,我倍感荣幸。在这飞速更迭的时代,我们都渴望一方心灵净土,而 我的博客 正是这样温暖的所在。这里为你呈上趣味与实用兼具的知识,也…

reconstruct_3d_object_model_for_matching例子

文章目录 1.获取om3文件2.准备可视化3.准备3D可视化4.读取3D模型5.显示成对注册结果16.显示成对注册结果27.联合注册模型8.处理图像8.1子采样8.2 图像计算与平滑8.3 三角测量 9.基于表面做3D匹配10.评估模型准确度10.1 在场景中找到模型10.2 计算模型和场景之间的距离 11.立体系…

【JavaWeb学习Day27】

Tlias前端 员工管理 条件分页查询&#xff1a; 页面布局 搜索栏&#xff1a; <!-- 搜索栏 --><div class"container"><el-form :inline"true" :model"searchEmp" class"demo-form-inline"><el-form-item label…

Webrtc编译官方示例实现视频通话

Webrtc编译官方示例实现视频通话 前言 webrtc官网demo中给了一个供我们学习和应用webrtc的一个很好的例子&#xff1a;peerconnection&#xff0c;这期我们就来编译和运行下这个程序看看视频通话的效果以。 1、打开源码工程 继上期源码编译完成后&#xff0c;我们使用vs打开…

大数据学习(80)-数仓分层

&#x1f34b;&#x1f34b;大数据学习&#x1f34b;&#x1f34b; &#x1f525;系列专栏&#xff1a; &#x1f451;哲学语录: 用力所能及&#xff0c;改变世界。 &#x1f496;如果觉得博主的文章还不错的话&#xff0c;请点赞&#x1f44d;收藏⭐️留言&#x1f4dd;支持一…

刘强东突然发声:不该用算法压榨最底层兄弟!东哥,真正的人民企业家

今天忙了一天&#xff0c;很累&#xff0c;准备睡觉的时候&#xff0c;看到网上盛传的刘强东的朋友圈&#xff0c;东哥又在朋友圈发文了。 说实话&#xff0c;看完之后&#xff0c;感动&#xff0c;真的感动。 尤其是当我看到这两句话的时候。 1、我们所学的知识、商业模式、技…

Java 记忆链表,LinkedList 的升级版

文章目录 记忆链表 MemoryLinkedList实战源代码 众所周知&#xff0c;ArrayList 和 LinkedList 是 Java 集合中两个基本的数据结构&#xff0c;对应数据结构理论中的数组和链表。但在这两个数据结构&#xff0c;开发者们通常使用 ArrayList&#xff0c;而不使用 LinkedList。JD…

poetry安装与使用

文章目录 安装方法创建虚拟环境其他常用命令从 poetry.lock 中安装第三方依赖包 安装方法 安装命令&#xff08;全局安装&#xff0c;不要在虚拟环境中安装&#xff0c;方便后面创建环境使用&#xff09; pip install poetry修改虚拟环境路径&#xff08;首次使用poetry时执行&…

UVM config机制及uvm_resource_pool

目录 1. uvm_config_db 类源码 1.1 set 1.2 get 2. uvm_resource_pool 2.1 uvm_resource_pool::set 2.2 uvm_resource 3. usage 4. 小结 uvm提供一种uvm_config_db机制使得在仿真中通过变量设置来修改环境,使环境更加灵活。本文主要介绍uvm_config_db#(type)::get/set…

JAVA学习*接口

接口 在生活中我们常听说USB接口&#xff0c;那接口是什么呢&#xff1f; 在Java中&#xff0c;接口相当于多个类的一种公共规范&#xff0c;是一种引用数据类型。 定义接口 public interface IUSB {public static final String SIZE "small";public abstract vo…

Python实验:读写文本文件并添加行号

[实验目的] 熟练掌握内置函数open()的用法&#xff1b;熟练运用内置函数len()、max()、和enumerate()&#xff1b;熟练运用字符串的strip()、ljust()和其它方法&#xff1b;熟练运用列表推导式。 [实验和内容] 1.编写一个程序demo.py&#xff0c;要求运行该程序后&#xff0…

IDEA导入jar包后提示无法解析jar包中的类,比如无法解析符号 ‘log4j‘

IDEA导入jar包后提示无法解析jar包中的类 问题描述解决方法 问题描述 IDEA导入jar包的Maven坐标后&#xff0c;使用jar中的类比如log4j&#xff0c;仍然提示比如无法解析符号 log4j。 解决方法 在添加了依赖和配置文件后&#xff0c;确保刷新你的IDE项目和任何缓存&#xff…

数据结构——顺序栈seq_stack

前言&#xff1a;大家好&#x1f60d;&#xff0c;本文主要介绍了数据结构——顺序栈 目录 一、概念 1.1 顺序栈的基本概念 1.2 顺序栈的存储结构 二、基本操作 2.1 结构体定义 2.2 初始化 2.3 判空 2.4 判满 2.5 扩容 2.6 插入 入栈 2.7 删除 出栈 2.8 获取栈顶元…

python3.13.2安装详细步骤(附安装包)

文章目录 前言一、python3.13.2下载二、python3.13.2安装详细步骤1.查看安装文件2.启动安装程序3.安装模式选择4.自定义安装配置5.高级选项设置6.执行安装7.开始安装8.安装完成8.打开软件9.安装验证 前言 在数字化时代&#xff0c;Python 已成为不可或缺的编程语言。无论是开发…

AI-Talk开发板之更换串口引脚

一、默认引脚 CSK6011A使用UART0作为Debug uart&#xff0c;AI-Talk开发板默认使用的GPIOA2和GPIOA3作为Debug uart的RX和TX&#xff0c;通过连接器CN6引出。 二 、更换到其它引脚 查看60xx_iomux_v1.0可以&#xff0c;UART0的tx和rx可以映射到很多管脚上。 结合AI-Talk开发板…