循序渐进丨使用 Python 向 MogDB 数据库批量操作数据的方法

news2025/1/12 7:43:37

当我们有时候需要向数据库里批量插入数据,或者批量导出数据时,除了使用传统的gsql copy命令,也可以通过Python的驱动psycopg2进行批量操作。本文介绍了使用psycopg2里的executemany、copy_from、copy_to、copy_expert等方式来批量操作 MogDB 数据库里的数据的方法。

outside_default.png

安装psycopg2驱动

适配 MogDB 的psycopg2可以从以下链接下载:

https://www.mogdb.io/downloads/psycopg2/all

下载之前,先确定所用的Python版本:

python3 --version
Python 3.7.9

目前只支持Python 3.6以上版本:

8760cbf461f04f2ff00afb4b5007940e.png

下载之后,将文件上传到运行程序的主机上,并运行以下命令:

python3 -m pip install psycopg*.whl

当然,如果该主机可以联通互联网,也可以直接从以下链接获取下载地址:https://www.mogdb.io/downloads/psycopg2/all 

396e6482dc40bab9c1342474cbf6f5f9.png

不用真正下载,而是直接运行(其中url对应实际Python版本的url):

python3 -m pip install https://cdn-mogdb.enmotech.com/drivers/python/psycopg2-whl/5.0.0.3/psycopg2-5.0.0.3-cp37-cp37m-linux_aarch64.whl

数据库端创建测试用户和测试表:

gsql
create user pytest sysadmin password 'Enmotech@123';
create table test_psycopg2(id int,source varchar(32), create_time date,name varchar(32),val int);

outside_default.png

建立连接并创建cursor对象

Python中:

import psycopg2
conn=psycopg2.connect(database="postgres",user="pytest",password="Test@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
conn.close()

参数就不用介绍含义了,一目了然。

outside_default.png

使用execute进行逐条插入

execute可用于逐条插入,它是cursor中的一个方法,接收的参数为语句+一个可索引的列表,列表中每一列对应语句中的 %s.

cursor.execute(sqltext,var_List)

假设现在我们需要往数据库中插入10000条随机数据,可以循环调用execute:

import psycopg2
import datetime
import random
import time

conn=psycopg2.connect(database="postgres",user="pytest",password="Test@123",host="127.0.0.1",port=26000)
cur=conn.cursor() 
currtime=datetime.datetime.now()
data1=[(id,"execute",(currtime+datetime.timedelta(minutes=id)).strftime('%Y-%m-%d %H:%M:%S'),"name"+str(id),random.randint(0,100000)) for id in range(1,10001)];

start=time.perf_counter()
for row in data1:
    cur.execute ("insert into test_psycopg2 values(%s,%s,%s,%s,%s)",row); #此处有5个占位符,对应的row中有5列

end=time.perf_counter();
print(f"use execute to insert 10000 rows, cost {int((end-start)*1000)} ms");
conn.commit()

26002c3dbeab15a5a6eefa1ef49e76ad.png

outside_default.png

使用executemany进行批量插入

executemany可用于批量插入,它是cursor中的一个方法,接收的参数为语句+一个嵌套了可索引的列表的列表,列表中每一行对应语句中的 %s.

cursor.executemany(sqltext,var_List)

假设现在我们需要往数据库中插入10000条随机数据,可以调用一次executemany:

import psycopg2
import datetime
import random
import time
conn=psycopg2.connect(database="postgres",user="pytest",password="Test@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
data2=[(id,"executemany",(currtime+datetime.timedelta(minutes=id)).strftime('%Y-%m-%d %H:%M:%S'),"name"+str(id),random.randint(0,100000)) for id in range(1,10001)];
start=time.perf_counter()
cur.executemany("insert into test_psycopg2 values(%s,%s,%s,%s,%s)",data2);
end=time.perf_counter();
print(f"use executemany to insert 10000 rows, cost {int((end-start)*1000)} ms");
conn.commit()

14df9a64ed6c39a5276e5b2ebf6b4b3a.png

从执行时间看executemany和execute相差无几,并没有使用类似于JDBC驱动中的批量绑定功能。

outside_default.png

使用copy_from(文件)从文件中批量插入

无论是execute还是executemany,性能都不够理想。如果对性能有要求,批量插入的时候,应该使用copy相关的方法,如copy_from。

copy_from接收多个参数,主要是前面的4个参数:

copy_from(input_file, table_name, sep, columns)

其中第一个参数设计上是文件句柄,但是也可以用其他带有read()和readline()方法的对象来代替。

第三个参数则是分隔符,因为使用copy时,需要自己先按行把数据拼好的,列之间用分隔符来分隔,通常可以使用"tab"或者",". 当然,这需要考虑你的数据中会不会包含这两个字符,避免二义性。

第四个参数则是列名的列表,如果不提供,则是所有列,但是要注意列顺序。

假设现在我们需要往数据库中插入10000条随机数据,可以先生成一个文件,再调用copy_from(文件):

import psycopg2
import datetime
import random
import time
from io import StringIO
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=open("test_psycopg2_from.csv","w");
filesize=0
for id in range(1,10001):
    filesize=filesize+tmpfile.write(f"{id},copy_from_File,{(currtime+datetime.timedelta(minutes=id)).strftime('%Y-%m-%d %H:%M:%S')},name{id},{random.randint(0,100000)}\n")

tmpfile.close()

tmpfile2=open("test_psycopg2_from.csv","r");

start=time.perf_counter()
cur.copy_from(tmpfile2,"test_psycopg2",sep=",");
end=time.perf_counter();
print(f"use copy_from(File) to insert 10000 rows, cost {int((end-start)*1000)} ms");
conn.commit()
tmpfile.close()

af73baac17c8d4ce6238a9581fd93715.png

19ms,相比execute和executemany,有百倍的提升。

outside_default.png

使用copy_from(StringIO)进行批量插入

对于在内存中处理的数据,专门放到文件系统再取出来显然没有必要,因此,copy_from第一参数可以使用StringIO这个模块来模拟内存中的一个类文件对象(关于StringIO,限于篇幅,不展开解释)。

假设现在我们需要往数据库中插入10000条随机数据,可以把记录按行写到StringIO对象,再调用copy_from(StringIO):

import psycopg2
import datetime
import random
import time
from io import StringIO
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=StringIO("");
filesize=0
for id in range(1,10001):
    filesize=filesize+tmpfile.write(f"{id},copy_from_StringIO,{(currtime+datetime.timedelta(minutes=id)).strftime('%Y-%m-%d %H:%M:%S')},name{id},{random.randint(0,100000)}\n")

tmpfile.seek(0)
start=time.perf_counter()
cur.copy_from(tmpfile,"test_psycopg2",sep=",");
end=time.perf_counter();
print(f"use copy_from(StringIO) to insert 10000 rows, cost {int((end-start)*1000)} ms");
conn.commit()

2b577bd0d528a3a9e840bf49f4de288d.jpeg

同样19ms。

这里面需要注意的是,StringIO写完了(write)之后,必须调用seek(0)回到文件头,否则,相当于从文件末端开始,是无法完成copy的操作的。

outside_default.png

使用copy_to(文件)导出到文件

有进就有出,pyscopg2里面也支持把表copy到文件中。

copy_to接收多个参数,主要是前面的4个参数。

copy_to(out_file, table_name, sep, columns)

其中第一个参数设计上是文件句柄,但是也可以用其他带有write()方法的对象来代替。

第三个参数则是分隔符。

第四个参数则是列名的列表,如果不提供,则是所有列。

假设现在我们把表导出成本地文件,可以直接调用copy_to(文件):

import psycopg2
import time
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=open("test_psycopg2_to.csv","w");
start=time.perf_counter()
cur.copy_to(tmpfile,"test_psycopg2",sep=",");
end=time.perf_counter();
print(f"use copy_to(File) to dump tables, cost {int((end-start)*1000)} ms");
tmpfile.close()

aed001013d8a9217e774bcf5d218f2de.png

58ms,不过表里面目前已经有4万条记录。

outside_default.png

使用copy_to(StringIO)导出到内存对象

同理,也可以把表导出到StringIO的内存对象中,调用copy_to(StringIO),如下所示:

import psycopg2
import time
from io import StringIO
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=StringIO("");
start=time.perf_counter()
cur.copy_to(tmpfile,"test_psycopg2",sep=",");
end=time.perf_counter();
print(f"use copy_to(StringIO) to dump tables, cost {int((end-start)*1000)} ms");
print(f"filelength:{len(tmpfile.getvalue())}")
conn.commit()
tmpfile.close()

db138d9fcf2554110fbec412120ecb2b.png

outside_default.png

使用copy_expert()进行复杂copy操作

gsql的copy命令其实提供了很多复杂的功能,但显然copy_from/copy_to无法全部做到,比如,从表中根据条件过滤出需要的数据并进行copy。

因此,除了copy_from/copy_to, pyscopg2提供了更专业的copy_expert函数,更接近原始的copy命令,它接收两个参数,第一个是SQL语句,第二个是类文件句柄, 你可以认为是把它嫁接给了gsql里面的STDIN/STDOUT。

copy_export(sql,file_handle)

假设现在我们需要把之前的source为"copy_from_File"的部分取出来,然后,对表的数据做一定运算,还希望带表header, 可以调用copy_expert,进行精细调控:

import psycopg2
import time
conn=psycopg2.connect(database="postgres",user="pytest",password="Enmotech@123",host="127.0.0.1",port=26000)
cur=conn.cursor()
currtime=datetime.datetime.now()
tmpfile=open("test_psycopg2_copy_expert.csv","w");
start=time.perf_counter()
cur.copy_expert("copy (select *,sqrt(val) val2 from test_psycopg2 test_psycopg2 where source = 'copy_from_File') to STDIN with(format 'csv', header on ) ",tmpfile);
end=time.perf_counter();
print(f"use copy_expert() to dump tables, cost {int((end-start)*1000)} ms");
conn.commit()
tmpfile.close()

f6da111643db536a8aa64397ebfc7400.png

看看导出效果,没问题:

81ba0d4363eecca0c134569f8077af58.png

outside_default.png

参考文档

  • https://docs.mogdb.io/zh/mogdb/v5.0/1-psycopg-based-development

  • https://www.psycopg.org/docs/cursor.html

关于作者

罗海雄,云和恩墨数据库研发架构师,性能优化专家,2012年 ITPUB 全国SQL大赛冠军。他拥有超十年企业级系统设计与优化经验,对SQL优化理解尤其深入,曾服务于甲骨文公司。

f03719b383c77a81f33e6c4955117909.gif

数据驱动,成就未来,云和恩墨,不负所托!


云和恩墨创立于2011年,是业界领先的“智能的数据技术提供商”。公司总部位于北京,在国内外35个地区设有本地办公室并开展业务。

云和恩墨以“数据驱动,成就未来”为使命,致力于将创新的数据技术产品和解决方案带给全球的企业和组织,帮助客户构建安全、高效、敏捷且经济的数据环境,持续增强客户在数据洞察和决策上的竞争优势,实现数据驱动的业务创新和升级发展。

自成立以来,云和恩墨专注于数据技术领域,根据不断变化的市场需求,创新研发了系列软件产品,涵盖数据库、数据库存储、数据库云管和数据智能分析等领域。这些产品已经在集团型、大中型、高成长型客户以及行业云场景中得到广泛应用,证明了我们的技术和商业竞争力,展现了公司在数据技术端到端解决方案方面的优势。

在云化、数字化和智能化的时代背景下,云和恩墨始终以正和多赢为目标,感恩每一位客户和合作伙伴的信任与支持,“利他先行”,坚持投入于数据技术核心能力,为构建数据驱动的智能未来而不懈努力。

我们期待与您携手,共同探索数据力量,迎接智能未来。

5dc8bf55ca34ffbfa93bf5dfad3641ab.gif

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1609520.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Darknet框架优化介绍

一、DarkNet框架简介 1.DarkNet的简介 Darknet是一个完全使用C语言编写的人工智能框架,可以使用CUDA的开源框架。主要应用于图像识别领域。 它具有可移植性好,安装间接,查看源码方便等优势,提供了OpenCV等附加选项,还…

【SpringBoot实战篇】获取用户详细信息-ThreadLocal优化

1 分析问题 对token的解析当初在拦截器中已经写过。期待的是在拦截器里写了,在其他地方就不写了,应该去复用拦截器里面得到的结果 2 解决方式-ThreadLocal 2.1提供线程局部变量 用来存取数据: set()/get()使用ThreadLocal存储的数据, 线程安全 2.2过程图…

Java如何用EasyExcel插件对Excel进行数据导入和数据导出

文章目录 一、EasyExcel的示例导入依赖创建实体类数据导入和导出 二、EasyExcel的作用三、EasyExcel的注解 EasyExcel是一个阿里巴巴开源的excel处理框架,它以使用简单、节省内存著称。在解析Excel时,EasyExcel没有将文件数据一次性全部加载到内存中&…

java-springmvc 01

MVC就是和Tomcat有关。 01.MVC启动的第一步,启动Tomcat 02.Tomcat会解析web-inf的web.xml文件

【Flutter】自动生成图片资源索引插件二:FlutterAssetsGenerator

介绍 FlutterAssetsGenerator 插件 :没乱码,生成的图片索引命名是小驼峰 目录 介绍一、安装二、使用 一、安装 1.安装FlutterAssetsGenerator 插件 生成的资源索引类可以修改名字,我这里改成R 2. 根目录下创建assets/images 3. 点击image…

上网行为管理系统功能介绍_上网行为管理实现的功能

上网行为管理系统是一种集成了网络监控、行为分析、策略管理和安全控制等功能的综合性软件解决方案。 它通过对企业内部网络的全面监控和深度分析,帮助管理者了解员工的网络使用习惯、识别潜在风险、优化网络资源配置,并最终实现网络安全和效率的双重提…

实在IDP文档审阅产品导引

实在IDP文档审阅:智能文档处理的革新者 一、引言 在数字化转型的浪潮中,文档处理的智能化成为企业提效的关键。实在智能科技有限公司推出的实在IDP文档审阅,是一款利用AI技术快速理解、处理文档的智能平台,旨在为企业打造专属的…

偏微分方程算法之二阶双曲型方程显式差分法

目录 一、研究目标 二、理论推导 2.1 三层显格式建立 2.2 三层显格式改进 三、算例实现 3.1 一阶显格式 3.2 二阶显格式 一、研究目标 介绍完一阶双曲型偏微分方程的几种差分格式后,我们继续探讨二阶方程的差分格式。这里以非齐次二阶双曲型偏微分方程的初边…

Android11 SystemUI clock plugin 插件入门

插件的编写 参照ExamplePlugin,需要系统签名。 需要先编译以下模块得到jar,引用在项目中。 m SystemUIPluginLibcom.android.systemui.permission.PLUGIN PluginManager.addPluginListener SystemUI 是如何发现 clock plugin 的? Syste…

【管理咨询宝藏78】MBB大型城投集团核心能力建设分析报告

本报告首发于公号“管理咨询宝藏”,如需阅读完整版报告内容,请查阅公号“管理咨询宝藏”。 【管理咨询宝藏78】MBB大型城投集团核心能力建设分析报告 【格式】PDF版本 【关键词】战略规划、商业分析、管理咨询、MBB顶级咨询公司 【强烈推荐】 这是一套…

【服务器部署篇】Linux下Redis安装

作者介绍:本人笔名姑苏老陈,从事JAVA开发工作十多年了,带过刚毕业的实习生,也带过技术团队。最近有个朋友的表弟,马上要大学毕业了,想从事JAVA开发工作,但不知道从何处入手。于是,产…

【数据结构】单链表经典算法题的巧妙解题思路

目录 题目 1.移除链表元素 2.反转链表 3.链表的中间节点 4.合并两个有序链表 5.环形链表的约瑟夫问题 解析 题目1:创建新链表 题目2:巧用三个指针 题目3:快慢指针 题目4:哨兵位节点 题目5:环形链表 介绍完了…

cdh cm界面HDFS爆红:不良 : 该 DataNode 当前有 1 个卷故障。 临界阈值:任意。(Linux磁盘修复)

一、表现 1.cm界面 报错卷故障 检查该节点,发现存储大小和其他节点不一致,少了一块物理磁盘 2.查看该磁盘 目录无法访问 dmesg检查发现错误 dmesg | grep error二、解决办法 移除挂载 umount /data10 #可以移除挂载盘,或者移除挂载目…

【爬虫】多线程爬取图片

多线程爬虫 多线程爬虫概述1.1 多线程的优势1.2 多线程的挑战 设计多线程爬虫1.1 项目设计1.2 项目流程1.3注意事项 总结 多线程爬虫概述 在当今信息爆炸的时代,网络爬虫(Web Scraper)已成为获取和分析网络数据的重要工具。而多线程爬虫&…

Codigger GT模块:GUI融合Terminal,重塑开发体验

在信息技术日新月异的今天,开发者与计算机系统进行交互的界面,其体验的优化与升级显得尤为关键。Codigger G&T正是应这一需求而生,它巧妙地将现代图形用户界面(GUI)的优势融入传统的Terminal中,为开发者…

算法训练营day16

一、二叉树的最大深度 递归解法 后序遍历(DFS) class Solution {public int maxDepth(TreeNode root) {if (root null) return 0;return Math.max(maxDepth(root.left), maxDepth(root.right)) 1;} }算法解析: 终止条件: 当 root 为空,…

openAI tts Java文本转语音完整前后端代码 html

Java后端代码 maven 仓库&#xff1a; <!--openAI 请求工具--> <dependency><groupId>com.unfbx</groupId><artifactId>chatgpt-java</artifactId><version>1.1.5</version> </dependency>maven 仓库官方 tts 使用案例…

Odoo讨论+聊天模块:一体化内部协作平台,赋能高效沟通与业务流程协作

Odoo讨论聊天模块&#xff1a;一体化内部协作平台&#xff0c;赋能高效沟通与业务流程协作 Odoo 讨论模块是一个集成了即时通讯、文件共享、业务关联、权限控制等功能于一体的内部协作工具&#xff0c;允许用户通过跨模块的聊天窗口或通过专用的“讨论”面板互相发送消息、分享…

利用redis和fastapi实现本地与平台策略进行交互

redis简介: 在pandas一文有详细使用方法(一文教会pandas-CSDN博客)&#xff0c;具体可视化软件有redisstudio等。它是一个由 Salvatore Sanfilippo 写的 key-value 存储系统&#xff0c;是跨平台的非关系型数据库。 Redis 是一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支…

Redis: 在项目中的应用

文章目录 一、Redis的共享session应用二、分布式缓存1、缓存2、缓存一致性问题解决方案&#xff08;缓存更新策略&#xff09;&#xff08;1&#xff09;作用&#xff08;2&#xff09;三种策略&#xff08;3&#xff09;主动更新策略&#xff08;数据库、缓存不一致解决方案&a…