1. 前言
随着互联网的发展,大数据已经是很普遍的一个现象,已不再是零几年时的神话。数据资源意为着就是信息资源,很好的使用起来说是财富资源一点也不夸张.所以无论当下是小的还是大的互联网公司都会遇到大数的情况。只不过不同的业务逻辑需求与使用情况不一样罢了。
大数据我这大致归类为两种,一种是静态的数据,比如说在一定时间段内的账号信息;另一种是动态数据,比如流量访问信息等。
2. 目标
以上简单的概述了大数量的背景,而本文不会展开叙述太多,而接下来要叙述的是就是如何快速导入百万账号。账号也就是我在上一章节说到的静态资源,这些静态资源有个特性就是在一段时间内是没有多大的变化的,并且与相关联性要求也很高,所以注定存储此类数据会使用选择使用关系型数据库(ACID特性:原子性,一致性,隔离性,持久性),即我们常见的如"MySQL、PostgreSQL、Oracle Database、Microsoft SQL Server等"。这几个典型的数据各有利弊,综合场景与成本等来考虑,大部分会选择使用开源的mysql数据库.所以下文实例都是以mysql数据为例。
3. 方法
常用的有两种方式:一种方法是,直接写sql语句插入数据;另一种方法是,数据文件导入。通常情况下,直接通过SQL语句将数据写入数据库比导入CSV文件数据更快。这里有几点考虑因素:
-
直接SQL语句写入数据库的优势:
- 直接数据传输:通过SQL语句直接写入数据库时,数据可以直接传输到数据库的存储引擎中,跳过了文件读取、解析和加载的过程。
- 少量数据时更快:特别是对于少量数据,直接使用SQL语句可以快速执行,避免了文件操作的额外开销和时间。
- 即时处理:SQL语句的执行几乎是实时的,一旦语句提交,数据就可以立即在数据库中可用。
-
导入CSV文件数据的考虑:
- 大数据量处理:对于大数据量,虽然导入CSV文件可能需要一些预处理步骤(如文件读取、解析和加载),但在某些情况下,使用数据库的批量导入工具或者专门的ETL(Extract, Transform, Load)工具可能会更高效。
- 数据准备和格式化:如果数据已经以CSV文件的形式存在,并且需要进行大量的数据转换或者数据清洗,导入CSV文件可能更具实际可行性和操作便利性。
- 导入工具的优化:一些数据库管理系统提供了优化的导入工具,可以有效地处理大量数据的导入,比如MySQL的LOAD DATA INFILE命令或PostgreSQL的COPY命令。
综合考虑:
-
数据量和频率:如果你经常需要导入大量数据,并且已经准备好了CSV文件,那么使用导入工具可能更为方便和高效。
-
实时性需求:如果你需要实时数据更新,并且数据量不是特别大,那么直接使用SQL语句写入数据库可能更合适。
综上所述,对于大多数情况,直接使用SQL语句写入数据库通常更快速和直接,特别是对于少量数据或需要即时处理的情况。然而,对于大量数据的导入,特别是需要预处理或者已经存在CSV文件的情况下,使用导入工具可能更有效率。
4.如何快速导入百万级账号
了解了背景与目的,又分析了方法,这里就是具体的操作实现了。研究了几种方式发现导入300百万以上的数据还是使用命令"load data infile"命令最快,一个表全量写入32个字段数据.使用python构造数据csv文件耗时18秒左右,csv文件写入库耗时不到6分钟;另一个全量表8个字段,构造数据文件耗时8秒,csv文件写入库耗时不到2分钟.经查看结果与写入的csv数据保持一致。
注:
-
这里有个前提条件,那就是具有可修改my.cnf权限.或是"
--secure-file-priv
"已经做了配置.因为这涉及到安全性考虑没有配置打开"--secure-file-priv
"那就没有办法快捷导入csv数据文件到库里。 -
构造数据还需要有python环境。
-
因不涉及到创建与获取组织架构信息,所以得提前知道组织的名称与组织id.这里简化了一下在构造时输入一个组织名称与id(org_auto,zdy_530853592895066112).实际情况可以写入到一个文件里,然后可以从文件中获取再写入到对应的csv数据表中。
下面以结合业务逻辑写入到三个库表数据为例的具体操作:
4.1 根据实际情况构造数据文件
首先要了解数据结构,那些字段是改写字段不能为空,那些是可以为空,那些是可以为空但实际情况下基本不为空.在此是在线上根据业务根据了真实值情况进行了信息虚拟化改造了一下。
-- base1.table1_user definition
CREATE TABLE `table1_user` (
`userid` varchar(64) NOT NULL,
`emid` varchar(32) DEFAULT '',
`cname` varchar(64) DEFAULT '',
`utype` int DEFAULT '0',
`pswd` varchar(64) DEFAULT '',
`onjob` tinyint(1) DEFAULT '1',
`userstatus` varchar(16) DEFAULT '',
`mobile` varchar(64) DEFAULT '',
`idnumbermd5` varchar(32) DEFAULT '',
`mail` varchar(256) DEFAULT '',
`pin_errorcount` int DEFAULT '0',
`pin_locktime` int DEFAULT '0',
`deptname` text,
`dept_path_id` text,
`dept_path_name` text,
`created` datetime DEFAULT CURRENT_TIMESTAMP,
`updated` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
`expire_time` datetime DEFAULT NULL,
`surname` varchar(64) DEFAULT '',
`givenname` varchar(64) DEFAULT '',
`resetpwd` tinyint(1) DEFAULT '0',
`emp_type` varchar(256) DEFAULT '',
`dept_id` text,
`companynumber` varchar(256) DEFAULT '',
`leader` varchar(64) DEFAULT '',
`ptjs` varchar(256) DEFAULT '',
`errorcount` int DEFAULT '0',
`locktime` int DEFAULT '0',
`lockstatus` int DEFAULT '0',
`note` varchar(256) DEFAULT '',
`mailmember` text,
`modify_pwd` datetime DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`userid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
-- base2.table2_organization definition
CREATE TABLE `table2_organization` (
`org_id` varchar(64) NOT NULL,
`org_name` varchar(255) NOT NULL,
`status` tinyint(1) DEFAULT '1',
`type` varchar(8) DEFAULT NULL,
`note` varchar(255) DEFAULT NULL,
`created` datetime DEFAULT CURRENT_TIMESTAMP,
`updated` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
`source_opcode` int DEFAULT '0',
`inherit` int DEFAULT '0',
PRIMARY KEY (`org_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
-- base2.table2_user definition
CREATE TABLE `table2_user` (
`userid` varchar(64) NOT NULL,
`utype` int DEFAULT '0',
`cname` varchar(64) DEFAULT '',
`surname` varchar(64) DEFAULT '',
`givenname` varchar(64) DEFAULT '',
`emid` varchar(32) DEFAULT '',
`mobile` varchar(64) DEFAULT '',
`onjob` tinyint(1) DEFAULT '1',
`userstatus` varchar(16) DEFAULT '',
`idnumbermd5` varchar(32) DEFAULT '',
`pwd` varchar(64) DEFAULT '',
`resetpwd` tinyint(1) DEFAULT '0',
`emp_type` varchar(256) DEFAULT '',
`mail` varchar(256) DEFAULT '',
`deptname` text,
`dept_id` text,
`companynumber` varchar(256) DEFAULT '',
`leader` varchar(64) DEFAULT '',
`ptjs` varchar(256) DEFAULT '',
`dept_path_id` text,
`dept_path_name` text,
`errorcount` int DEFAULT '0',
`locktime` int DEFAULT '0',
`lockstatus` int DEFAULT '0',
`note` varchar(256) DEFAULT '',
`mailmember` text,
`created` datetime DEFAULT CURRENT_TIMESTAMP,
`updated` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
`modify_pwd` datetime DEFAULT CURRENT_TIMESTAMP,
`expire_time` datetime DEFAULT NULL,
`source_opcode` int DEFAULT '0',
`login_time` datetime DEFAULT NULL,
PRIMARY KEY (`userid`),
KEY `utype` (`utype`) USING BTREE,
KEY `lockstatus` (`lockstatus`) USING BTREE,
KEY `userstatus` (`userstatus`) USING BTREE,
KEY `index_onjob_userstatus` (`onjob`,`userstatus`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3;
结合上表结构,实现的python代码如下,保存为createDate.py文件:
# -*- coding: utf-8 -*-
# @Time : 2024-3-30
# @Author : zhh
# @Version :
# @File : createDate.py
# @Software: PyCharm
import time,sys,os,random
from datetime import datetime
def time_str():
start_date = datetime(2022, 1, 1)
end_date = datetime(2024, 7, 10)
random_date = start_date + random.random() * (end_date - start_date)
time_str = random_date.strftime("%Y-%m-%d %H:%M:%S")
return time_str
# 删除目录下的所有文件
def delete_files_in_directory(directory):
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
try:
if os.path.isfile(file_path):
os.remove(file_path)
print(f"已删除文件: {file_path}")
elif os.path.isdir(file_path):
print(f"跳过目录: {file_path}")
except Exception as e:
print(f"删除 {file_path} 时出错: {e}")
def ensure_directory_exists(directory):
if not os.path.exists(directory):
print(f"目录 '{directory}' 不存在,将会创建。")
os.makedirs(directory)
print(f"目录 '{directory}' 创建成功。")
return 0
else:
print(f"目录 '{directory}' 已经存在,无需创建。")
return 1
# delete_files_in_directory(directory)
def write_file(data, flag):
# with lock:
# 将数据写入到文件中
file=flag
with open(f"{directory_path}/{file}.csv", "a") as f:
# f.writel(data + "\n")
f.writelines(data)
# 创建线程A,B,同时向文件中写入数据
def thread_write(org_name,org_id,numbers):
num = 0
SSHA="{SSHA2}"
created_tiem = time_str()
FILL_TIME = "2099-01-01 01:00:00"
for i in range(int(numbers)):
chian_user_lst = []
chian_org_user_lst = []
chianotp_user_lst = []
for i1 in range(10000):
# 补充0,保持7位数
userid = "addUser" +str(i).zfill(3) + str(i1).zfill(4)
chian_user_row = f"{userid},0,'测试账号','测试','账号','2','+86',1,'Active','','{SSHA}RNFjJb5v4QenLqdfWi8nR9E3anlQcHd2U2RaYw==',0,'正式','',{org_name},{org_id},'','','',{org_id},{org_name},0,0,0,'','',{created_tiem},{created_tiem},{created_tiem},{FILL_TIME},0,{FILL_TIME}\n"
chian_user_lst.append(chian_user_row.replace("'", ""))
chian_org_user_row = f"{org_id},{userid},'',0, \n"
chian_org_user_lst.append(chian_org_user_row.replace("'", ""))
chianotp_user_row = f"{userid},'2','测试账号',0, '', 1,'Active','+86','','',0,0,{org_name},{org_id},{org_name},{created_tiem},{created_tiem},{FILL_TIME},'测试','账号',0,'正式',{org_id},'','','',0,0,0,'','',{FILL_TIME}\n"
chianotp_user_lst.append(chianotp_user_row.replace("'", ""))
write_file(chian_user_lst, "chian_user")
write_file(chian_org_user_lst, "chian_organization_user")
write_file(chianotp_user_lst, "chianotp_user")
# time.sleep(1) # 线程等待1秒钟
num = num + 1
print("写入行数:", num*10000)
if __name__ == '__main__':
# 存储数据目录
directory_path = 'sql/csvfile'
path_exist = ensure_directory_exists(directory_path)
if path_exist:
delete_files_in_directory(directory_path)
# 获取命令行参数
args = sys.argv
org_name = sys.argv[1]
org_id = sys.argv[2]
numbers = sys.argv[3]
# 创建线程
thread_a = threading.Thread(target=thread_write, args=[org_name, org_id, numbers])
startTime = datetime.now()
print("开始时间", startTime)
# 启动线程
thread_a.start()
# 等待线程结束
thread_a.join()
endTime = datetime.now()
duration = (endTime-startTime).total_seconds() # 计算总秒数
print("结束时间", endTime, "共持续", duration, "秒")
把createDate.py文件上传到mysql服务器上,如"/opt/createDate"目录下.执行以下命令:
/data/pubchian/python3/bin/python3 createDate.py org_auto zdy_530853592895066112 1
实际执行结果如下图显示:
确认mysql的secure-file-priv配置信息,打开/etc/my.cnf配置文件,添加要写入csv文件的路径即在[mysqld]项中添加如下:
[mysqld]
secure_file_priv = '/opt/createDate/csvfile/'
添加完成后需要重启一下mysql数据库.
重启 MySQL 服务:
sudo systemctl restart mysql
如果 MySQL 服务名不是 mysql,而是 mysqld,则使用:
sudo systemctl restart mysqld #这会通过 systemd 来重新启动 MySQL 服务。
csv文件数据导入mysql数据库中,执行如下命令:
# 进入数据库
/data/chiansec/mysql/bin/mysql -uroot -pMSFpMmlZ1ZkY6ZLSZrPU
# 确定可以导入数据库的文件目录,如果没有可以在/etc/my.cnf的配置文件[mysqld]中添加,如:secure_file_priv = "/opt/createDate/csvfile/"
SHOW VARIABLES LIKE 'secure_file_priv';
# 导入数据库文件
USE chianotpms;
load data infile '/opt/createDate/csvfile/chianotp_user.csv' replace into table `chianotp_user`
fields terminated by ','
lines terminated by '\n';
#
use chianseciam;
load data infile '/opt/createDate/csvfile/chian_user.csv' replace into table chian_user
fields terminated by ','
lines terminated by '\n';
#
load data infile '/opt/createDate/csvfile/chian_organization_user.csv' replace into table chian_organization_user
fields terminated by ','
lines terminated by '\n';
5.其他
如果是集群部署的话,可以重复以上操作导入相同的数据(csv文件也可以重复使用不用再构造),也可以使用数据迁移的方式mysql数据库直接同步到集群中的另一台mysql服务器上,即保持数据的一致性,完整性。