说明
这里指的是从文件类型开始,将数据规范为所需的格式,然后存储。
内容
1 文件操作
压缩文件
rar a some_file.rar some_file.csv
获取文件
rsync -rvltz -e 'ssh -p 22' --progress root@IP:/data/graph_data graph_data
解压文件
apt install unrar
# e 忽略文件结构解压 , x 保持文件结构
unrar e filname.csv.rar
在获得了文件之后,需要对超大文件操作Python一些可能用的到的函数系列28 超大文本文件分割读取。
获取文件末尾n行:
import os
# 采用偏置方法读取末尾三百行文本
def read_tail_n(fname, n_lines):
off = - 100 # 偏置量
min_size = - os.path.getsize(fname) # 最小量
with open(fname,'rb+') as f:
while True:
if off < min_size:
f.seek(min_size, 2)
res_lines = f.readlines()
break
f.seek(off, 2) # 2 代表文件末尾,往回追 off个字符
lines= f.readlines()
if len(lines) > n_lines + 1:
res_lines = lines[-n_lines:]
break
off *= 2
return res_lines
读取中间n行
def read_somelines_mem(filename, start_line = 1 , end_line = 100):
f = open(filename, 'r')
line_count = 0
start_line -= 1 # 和数组索引对齐
lines = []
for i in range(end_line): # 从头开始循环
line = f.readline()
if line_count >= start_line:
lines.append(line)
line_count +=1 # line_count是行指针
f.close()
return lines
获取文件总行数
# 读取大文件行数
def nrow_big_txt(filename):
cnt = 0
with open(filename,'r') as f:
for line in f:
cnt+=1
print('total lines: %s' % cnt)
return cnt
处理时,假设我们每次只处理1千万条
# 读取5000万~6000万行
test1 = read_somelines_mem('filename.csv', 50000000, 60000000)
# 对可能存在的极少数问题清洗(理论上,4个字段应该一个也不缺)
test2 = [x.replace(',,',',').replace('\n','').split(',') for x in test1]
test3 = [x for x in test2 if len(x) ==4]
最终获得的文件大约为30G。
2.9G company_node_batch1.csv
2.8G company_node_batch2.csv
2.8G company_node_batch3.csv
2.0G company_node_batch4.csv
2.1G contact_rel_part1.csv
2.1G contact_rel_part2.csv
2.1G contact_rel_part3.csv
2.1G contact_rel_part4.csv
1.9G xxx.csv
310M invest_rel_v2.csv
2.3G xxx1.csv
2 数据规范
该保留节点的就保留节点
之前犯过一个迷糊,就是去掉了一类联系节点,之间建立两点间的边。然后就发生了数据爆炸,因为联系节点会有多个节点与之关联,去掉后产生了n平方的边数。(100*100=1万)。这应该还是属于基本的思维方式错误。
按图的思想,每一类实体,该分类的就分类,而不必在意业务场景的特殊性。只要最后可以用流畅的自然语言表达就行,例如:Company(A) HasContact Phone(B)
将数据保存为csv形式,大致的格式如下。
对于节点来说, ID和LABEL是必须的。ID可以简单理解为数据的主键,LABEL则是数据的表(Table)或集合(Collection)。
对于边来说, STARD_ID、END_ID、TYPE构成了一条边,STARD_ID和END_ID是起点和终点,需要注意的是,边声明的节点必须已存在于图中,否则会失败
。
额外的一点是,被声明为ID的列会被默认为字符型。所以即使id看起来是数值的,也会被当成字符处理。
company.csv
id:ID,name,:LABEL
100,a01,Company
101,a02,Company
102,a03,Company
contact.csv
id:ID,name,:LABEL
200,1311111111,Phone
201,1322222222,Phone
company_invest_rel.csv
:START_ID,:END_ID,per:float,:TYPE
100,101,0.5,Invest
contact_rel.csv
:START_ID,:END_ID,:TYPE
100,200,HasContact
101,200,HasContact
101,201,HasContact
102,201,HasContact
在使用pandas保存相应csv时可以参考以下语句
invest_rel_df.to_csv('invest_rel_v2.csv', index=False, encoding='utf-8', quoting=1)
数据模型【Nice To Have】
原始数据可能存在少量数据问题,为了确保后续可以正确的大批量处理,设定数据模型进行限定和转换。
from typing import List,Dict,Optional
from pydantic import BaseModel
class Relation(BaseModel):
rid : int
from_id: int
to_id: int
link_attr: str
link_typr:int
class Relation_s(BaseModel):
data_list : List[Relation]
test1 = read_somelines_mem('ds_lianxi_relation_e0.csv', 0,10000)
test2 = [x.replace(',,',',').replace('\n','').split(',') for x in test1]
test3 = [x for x in test2 if len(x) ==4]
tem_df = pd.DataFrame(test3, columns = ['from_id','to_id','link_attr','link_typr'])
tem_df['rid']= list(range(len(tem_df)))
sample_lod = tem_df.to_dict(orient='records')
rs = Relation_s(data_list = sample_lod[:3])
rs1 = [x.dict() for x in rs.data_list]
[{'rid': 0,
'from_id': 76247745,
'to_id': 25278409,
'link_attr': '111111111',
'link_typr': 1},
{'rid': 1,
'from_id': 24115962,
'to_id': 22426271,
'link_attr': '22222222',
'link_typr': 1},
{'rid': 2,
'from_id': 68525645,
'to_id': 66453181,
'link_attr': '3333333@qq.com',
'link_typr': 3}]
3 数据导入
当数据较多的时候,使用 neo4j-admin import是比较合适的方法。
假设是通过docker启动的neo4j。
通过bash方式启动容器(默认的entry point 会直接启动neo4j)
proc_path=/opt/aprojects/Neo4j_24535_36
data_path=/data/aprojects/Neo4j_24535_36
image_name="registry.cn-hangzhou.aliyuncs.com/andy08008/neo4j_5:v100"
# 操作数据
docker run -it \
--name='Neo4j_24535_36' \
--restart=always \
-v /etc/localtime:/etc/localtime \
-v /etc/timezone:/etc/timezone \
-v /etc/hostname:/workspace/hostname \
-e "LANG=C.UTF-8" \
-v ${data_path}/data:/data \
-v ${data_path}/logs:/logs \
-v ${proc_path}/conf4:/var/lib/neo4j/conf/ \
-v /data/neo4j_import:/var/lib/neo4j/import \
-v ${proc_path}/plugins4:/var/lib/neo4j/plugins \
--env NEO4J_AUTH=neo4j/xxxxxx \
-p 24535:7474 \
-p 24536:7687 \
${image_name} bash
然后确保neo4j停止时导入(neo4j stop
),可以导入多个节点和边的文件。
neo4j-admin database import full --nodes=import/company_node_batch1.csv --nodes=import/company_node_batch2.csv --nodes=import/company_node_batch3.csv --nodes=import/company_node_batch4.csv --nodes=import/xxx.csv --nodes=import/xxxx.csv --relationships=import/invest_rel_v2.csv --relationships=import/contact_rel_part1.csv --relationships=import/contact_rel_part2.csv --relationships=import/contact_rel_part3.csv --relationships=import/contact_rel_part4.csv --overwrite-destination --verbose
导入过程中会不断给到反馈
Neo4j version: 5.23.0
Importing the contents of these files into /data/databases/neo4j:
Nodes:
/var/lib/neo4j/import/company_node_batch1.csv
/var/lib/neo4j/import/company_node_batch2.csv
/var/lib/neo4j/import/company_node_batch3.csv
/var/lib/neo4j/import/company_node_batch4.csv
/var/lib/neo4j/import/xxx.csv
/var/lib/neo4j/import/xxxx.csv
Relationships:
/var/lib/neo4j/import/invest_rel_v2.csv
/var/lib/neo4j/import/contact_rel_part1.csv
/var/lib/neo4j/import/contact_rel_part2.csv
/var/lib/neo4j/import/contact_rel_part3.csv
/var/lib/neo4j/import/contact_rel_part4.csv
Available resources:
Total machine memory: 47.04GiB
Free machine memory: 20.71GiB
Max heap memory : 11.77GiB
Max worker threads: 8
Configured max memory: 483.9MiB
High parallel IO: true
Cypher type normalization is enabled (disable with --normalize-types=false):
Property type of 'regcap' normalized from 'float' --> 'double' in /var/lib/neo4j/import/company_node_batch1.csv
Property type of 'socnum' normalized from 'int' --> 'long' in /var/lib/neo4j/import/company_node_batch1.csv
Property type of 'regcap' normalized from 'float' --> 'double' in /var/lib/neo4j/import/company_node_batch2.csv
Property type of 'socnum' normalized from 'int' --> 'long' in /var/lib/neo4j/import/company_node_batch2.csv
Property type of 'regcap' normalized from 'float' --> 'double' in /var/lib/neo4j/import/company_node_batch3.csv
Property type of 'socnum' normalized from 'int' --> 'long' in /var/lib/neo4j/import/company_node_batch3.csv
Property type of 'regcap' normalized from 'float' --> 'double' in /var/lib/neo4j/import/company_node_batch4.csv
Property type of 'socnum' normalized from 'int' --> 'long' in /var/lib/neo4j/import/company_node_batch4.csv
Nodes, started 2024-09-23 06:28:28.070+0000
[*Nodes:0B/s 2.192GiB-------------------------------------------------------------------------] 224M ∆3.92M
Done in 3m 1s 796ms
Prepare node index, started 2024-09-23 06:31:29.907+0000
[*:3.030GiB-----------------------------------------------------------------------------------] 674M ∆ 0
Done in 1m 25s 278ms
Relationships, started 2024-09-23 06:32:55.193+0000
[*Relationships:0B/s 3.030GiB-----------------------------------------------------------------] 245M ∆ 560K
Done in 6m 59s 93ms
Node Degrees, started 2024-09-23 06:39:57.991+0000
[*>(2)================================================|CALCULATE:2.593GiB(5)==================] 245M ∆ 8.4M
Done in 22s 771ms
Relationship --> Relationship 1/2, started 2024-09-23 06:40:21.526+0000
[>(2)=================|*LINK(4)==================================|v:130.4MiB/s----------------] 245M ∆7.32M
Done in 1m 1s 948ms
RelationshipGroup 1/2, started 2024-09-23 06:41:23.480+0000
[>:2.097GiB/s--|>|*v:240.6KiB/s---------------------------------------------------------------] 106M ∆ 106M
Done in 1s 469ms
Node --> Relationship, started 2024-09-23 06:41:24.958+0000
[>:122.6MiB/s---------|*>(3)========================|LINK--------------|v:196.1MiB/s(3)=======] 191M ∆35.7M
Done in 14s 914ms
Relationship <-- Relationship 1/2, started 2024-09-23 06:41:39.882+0000
[>--------------------------------|*LINK(5)===========================|v:142.0MiB/s-----------] 245M ∆ 7.2M
Done in 56s 596ms
Relationship --> Relationship 2/2, started 2024-09-23 06:42:39.009+0000
[>(2)================================|*LINK(4)============================|v:33.04MiB/s-------] 245M ∆8.64M
Done in 45s 314ms
RelationshipGroup 2/2, started 2024-09-23 06:43:24.325+0000
[*>(6)=============================================================================|v:8.104MiB] 220M ∆ 220M
Done in 1s 616ms
Relationship <-- Relationship 2/2, started 2024-09-23 06:43:25.965+0000
[*>(2)=============================================================|LINK(4)==========|v:40.18M] 245M ∆6.84M
Done in 37s 627ms
Count groups, started 2024-09-23 06:44:04.283+0000
[>|*>--------------------------------------------------------------------------------|COUNT:93] 349K ∆ 349K
Done in 143ms
Gather, started 2024-09-23 06:44:11.745+0000
[>----------------|*CACHE:2.619GiB------------------------------------------------------------] 349K ∆ 349K
Done in 179ms
Write, started 2024-09-23 06:44:11.935+0000
[*>:??-----------------------------------------------------------------------|EN|v:??---------] 349K ∆ 349K
Done in 676ms
Node --> Group, started 2024-09-23 06:44:12.717+0000
[>---|*FIRST-----------------------------------------------------------------------------|v:1.] 340K ∆ 119K
Done in 3s 319ms
Node counts and label index build, started 2024-09-23 06:44:18.512+0000
[*>(3)===================================|LABEL INDEX-------------------------|COUNT:2.174GiB(] 224M ∆24.1M
Done in 16s 843ms
Relationship counts and relationship type index build, started 2024-09-23 06:44:35.898+0000
[>-------------------------|RELATIONSHIP TYPE|*COUNT------------------------------------------] 245M ∆ 380K
Done in 1m 51s 188ms
IMPORT DONE in 18m 1s 685ms.
Imported:
224894968 nodes
245297705 relationships
878241672 properties
Peak memory usage: 3.030GiB
成功后启动neo4j(neo4j start
)
root@457931a3e173:/var/lib/neo4j# neo4j start
Directories in use:
home: /var/lib/neo4j
config: /var/lib/neo4j/conf
logs: /logs
plugins: /var/lib/neo4j/plugins
import: /var/lib/neo4j
data: /var/lib/neo4j/data
certificates: /var/lib/neo4j/certificates
licenses: /var/lib/neo4j/licenses
run: /var/lib/neo4j/run
Starting Neo4j.
Started neo4j (pid:2429). It is available at http://0.0.0.0:7474
There may be a short delay until the server is ready.
然后就可以在前端访问了,第一步导入数据结束。
在使用时需要先设置索引,整个过程也是比较快的。
从数据导入到索引完成,总共花费约30分钟。