python 高效读取多个geojson 写入一个sq3(Sqlite) 、效率提高90%+

news2024/9/28 20:53:41

1.问题缘由:

        由于工作需求,需要将多个(总量10G+)geojson文件写入到sq3库,众所周知,sqlite 不支持多线程写入,那该怎么办呢,在网上也查了很多策略,都没有达到立竿见影的效果。于是还是回到写文件的本质:多线程写多文件,就绕开加锁的机制。

2.单线程读取的效果

单线程读写原始26个geojson文件,共294M,耗时:547S

写完的sq3文件大小:73.3M

3.多进程并发

多进程并发读写geojson,生成多个sq3文件,再合并到一个sq3文件耗时:16.5S

4.工具代码:

4.1 rw_data_geojson.py: 读写geojson文件

import os
import json

GEOMETRY = 'geometry'


def read_all_layer(src_path):
    """
    读取geojson 文件,传入读取文件路径,返回dict
    dict 是以layername为key,获取每个layer的dict,子字典的key为要素ID
    :param src_path: 读取geojson文件路径
    :return: 封装dict  {layerName:[id:{要素dict}]}
    """
    filenames = os.listdir(src_path)
    # 过滤出你想要处理的文件,例如只读取.txt文件
    txt_filenames = [f for f in filenames if f.endswith('.geojson')]
    geo_properties_map = {}
    # 循环读取每个文件
    for filename in txt_filenames:
        file_path = os.path.join(src_path, filename)
        with open(file_path, 'r') as file:
            content = file.read()
            geojson_data = json.loads(content)

        features = geojson_data.get('features', [])
        dict = {}
        for feature in features:
            properties = feature.get('properties')
            if GEOMETRY in feature:
                properties[GEOMETRY] = feature.get('geometry')
            dict[properties["id"]] = properties
        layername = filename.replace(".geojson", "")
        geo_properties_map[layername] = dict

    return geo_properties_map


def read_single_layer(geojson_path):
    """
    读取指定geojson 文件,返回dict  dict 是以layername为key,获取每个layer的dict,子字典的key为要素ID
    :param geojson_path: 读取geojson文件
    :return: 封装dict  {layerName:[id:{要素dict}]}
    """
    geo_properties_map = {}
    if not geojson_path.endswith('.geojson'):
        return geo_properties_map

    with open(geojson_path, 'r') as file:
        content = file.read()
        geojson_data = json.loads(content)
        features = geojson_data.get('features', [])
        dict = {}
        for feature in features:
            properties = feature.get('properties')
            if GEOMETRY in feature:
                properties[GEOMETRY] = feature.get('geometry')
            dict[properties["id"]] = properties
        key = os.path.basename(geojson_path).replace(".geojson", "")
        geo_properties_map[key] = dict

    return geo_properties_map


def build_geojson(src_feats, layer_name='', epsg_crs=None):
    """按照图层,格式化成geojson规格"""
    attrs = []
    for attr in [attr for key, attr in src_feats.items()]:
        geos_obj = attr.get(GEOMETRY)
        gjson_dict = {"properties": attr, "type": "Feature"}
        if geos_obj is not None:
            gjson_dict[GEOMETRY] = geos_obj
            del attr[GEOMETRY]
        attrs.append(gjson_dict)

    layer = {"type": "FeatureCollection", "features": attrs}

    if layer_name:
        layer['name'] = layer_name
        if epsg_crs and src_feats and any(GEOMETRY in a for a in attrs):
            if isinstance(epsg_crs, int) or (isinstance(epsg_crs, str) and epsg_crs.isdigit()):
                crs_str = "urn:ogc:def:crs:EPSG::%s" % epsg_crs
            else:
                crs_str = epsg_crs
            layer['crs'] = {"type": "name", "properties": {"name": crs_str}}

    return layer


def write_layer(target_path, layer_name, node_data):
    '''
    按图层写geojson数据到磁盘
    :param target_path: 目标文件目录
    :param layer_name: 目标文件名
    :param node_data: 写入的dict嵌套类型数据{dict:{[id:value]}}
    :return:
    '''
    if not os.path.exists(target_path):
        os.makedirs(target_path)
    with open(target_path + "/" + layer_name + ".geojson", 'w') as f:
        json.dump(node_data, f)

    print(target_path + "/" + layer_name + ".geojson 写入完毕")

4.2 db_sq3_tool.py :处理sq3数据库

import sqlite3
import os
from shapely.geometry import shape
from read_file import rw_data_geojson
import random
import time
import multiprocessing
import datetime


def create_connection(db_file):
    """ 创建与SQLite数据库的连接 """
    conn = None
    try:
        conn = sqlite3.connect(db_file)
        return conn
    except sqlite3.Error as e:
        print(e)
    return conn


def create_table(conn, create_table_sql):
    """ 使用给定的SQL语句创建表 """
    try:
        cursor = conn.cursor()
        cursor.execute(create_table_sql)
        conn.commit()
    except sqlite3.Error as e:
        print(e)


def insert_data(conn, insert_sql, data):
    """ 向数据库插入数据 """
    try:
        cursor = conn.cursor()
        cursor.execute(insert_sql, data)
        conn.commit()
    except sqlite3.Error as e:
        print(e)


def batch_insert_data(conn, data_list, table_name, columns):
    '''
    批量插入数据
    :param conn: 数据库连接
    :param data_list: 插入数据list
    :param table_name: 表名
    :param columns: 表的列名list
    :return:
    '''
    cursor = conn.cursor()
    # 构建插入语句的占位符
    placeholders = ', '.join(['?'] * len(columns))
    insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"

    try:
        cursor.executemany(insert_sql, data_list)
        conn.commit()
    except sqlite3.Error as e:
        print(f"An error occurred: {e}")
        conn.rollback()


def select_data(conn, select_sql):
    """ 从数据库查询数据 """
    try:
        cursor = conn.cursor()
        cursor.execute(select_sql)
        rows = cursor.fetchall()
        return rows
    except sqlite3.Error as e:
        print(e)


def update_data(conn, update_sql, data):
    """ 更新数据库中的数据 """
    try:
        cursor = conn.cursor()
        cursor.execute(update_sql, data)
        conn.commit()
    except sqlite3.Error as e:
        print(e)


def delete_data(conn, delete_sql, data):
    """ 从数据库中删除数据 """
    try:
        cursor = conn.cursor()
        cursor.execute(delete_sql, data)
        conn.commit()
    except sqlite3.Error as e:
        print(e)


def dict_data_write_sqlite(node_data, table_name, conn, batch=1):
    '''
    将读完的dict 结构图层内容写入到sq3
    :param node_data: 要写入的数据dict
    :param table_name: 表名称
    :param conn: 数据库连接
    :param batch: 是否批量插入
    :return:
    '''
    try:
        if len(node_data) == 0:
            return
        # 获取第一行的key转保存表的列名
        random_key, random_value = random.choice(list(node_data.items()))
        row0 = random_value
        flag2type = {'str': 'TEXT', 'int': 'BIGINT', 'float': 'REAL', 'dict': 'TEXT'}
        fld_types = []
        columns = []
        for key, value in row0.items():
            value_type = type(value).__name__
            # print(f'{table_name} key:{key} 类型: {value_type}')
            fld_types.append((key, flag2type[value_type]))
            columns.append(key)

        fld_sql = ','.join(f'{fld} {typ}' for fld, typ in fld_types if fld != 'id')
        pk_sql = 'id BIGINT PRIMARY KEY'
        create_tab_sql = f'CREATE TABLE IF NOT EXISTS {table_name} ({pk_sql}, {fld_sql});'

        if conn is not None:
            # 1.创建表结构
            create_table(conn, create_tab_sql)

            if batch:
                # 方式1:批量插入,一次提交,效率高
                data_list = []
                for id, data in node_data.items():
                    feature_list = []
                    for key, value in data.items():
                        if 'geometry' == key:
                            geometry = shape(value)
                            feature_list.append(str(geometry.wkt))
                        else:
                            feature_list.append(str(value))
                    data_list.append(feature_list)

                batch_insert_data(conn, data_list, table_name, columns)
            else:
                # 方式2:一条一条插入,适合小数据,效率低下
                for id, data in node_data.items():
                    # 插入数据的SQL语句和数据
                    cur_values = []
                    for key, value in data.items():
                        if 'geometry' == key:
                            geometry = shape(value)
                            cur_values.append("'" + str(geometry.wkt) + "'")
                        else:
                            cur_values.append(str(value))
                    flds_str = ','.join(columns)
                    vals_str = ','.join(cur_values)
                    insert_sql = f"insert into {table_name} ({flds_str}) values ({vals_str})"
                    insert_data(conn, insert_sql, data)

        print(f"{table_name} sq3写入成功")

    except Exception as e:
        print(" 写入sq3异常: " + e)


def read_single_geojson_write_sq3(args):
    '''
    单文件读写sq3
    :param args:
    :return:
    '''
    file_name, target_path = args
    layer_name = file_name.replace(".geojson", "")
    db_file = target_path + '/' + layer_name + '.sq3'
    # 创建数据库连接
    conn = create_connection(db_file)
    # 读取数据,写数据库
    geojson_file = os.path.join(folder_path, file_name)
    node_data = rw_data_geojson.read_single_layer(geojson_file)
    if len(node_data[layer_name]) > 0:
        for layer_name, layer_value in node_data.items():
            dict_data_write_sqlite(layer_value, layer_name, conn, batch=1)
    else:
        # 删除空sq3
        os.remove(db_file)
    conn.close()
    return file_name


def merge_sq3(target_path):
    # 连接到目标数据库(要拷贝到的数据库)
    localtime = time.localtime()
    merge_folder = target_path + "/" + "merge_sq3_finish"
    if not os.path.exists(merge_folder):
        os.makedirs(merge_folder)
    target_db = merge_folder + "/" + str(time.strftime('%Y%m%d', localtime)) + ".sq3"
    if os.path.exists(target_db):
        os.remove(target_db)
        print(f"{target_db} 已被删除。")
    target_conn = create_connection(target_db)
    target_cursor = target_conn.cursor()
    # 连接到源数据库(要拷贝的数据库)
    for item in os.listdir(target_path):
        table_name = item.replace(".sq3", "")
        source_db_file = os.path.join(target_path, item)
        if os.path.isfile(source_db_file) and item != '.DS_Store':
            # 附加源数据库到目标数据库连接
            target_cursor.execute(f"ATTACH DATABASE '{source_db_file}' AS source_db;")
            # 将源sq3中的 table_name 表 复制到 目标.sq3
            target_cursor.execute(f"CREATE TABLE {table_name} AS SELECT * FROM source_db.{table_name}")
            # 分离附加的数据库
            target_cursor.execute("DETACH DATABASE source_db;")
            target_conn.commit()

    # 提交更改并关闭连接
    target_conn.close()

5.单线程读写代码

    folder_path = '/Users/admin/Desktop/123/sq3效率/geojson'
    target_path = "/Users/admin/Desktop/123/sq3效率/merge_sq3"

    # 1.单线程全量读写
    start_time = time.time()
    node_data = rw_data_geojson.read_all_layer(folder_path)

    # 创建数据库连接
    db_file = target_path + '/' + '20240928.sq3'
    if os.path.exists(db_file):
        os.remove(db_file)
        print(f"{db_file} 已被删除。")
    conn = create_connection(db_file)
    for layer_name, layer_value in node_data.items():
        if len(node_data[layer_name]) > 0:
            dict_data_write_sqlite(layer_value, layer_name, conn, batch=0)
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"写入sq3 函数执行时间:{execution_time} 秒")
    exit()

6.多线程读写,合并到一个sq3数据库

# 2.多文件多线程读写
    start_time = time.time()
    for root, dirs, files in os.walk(target_path):
        for file in files:
            db_file = os.path.join(root, file)
            os.remove(db_file)
            print(f"{db_file} 已被删除。")
    with multiprocessing.Pool(processes=5) as pool:
        for file_name in os.listdir(folder_path):
            if file_name == '.DS_Store':
                continue
            params = [(file_name, target_path)]
            pool.map(read_single_geojson_write_sq3, params)

    # 合并多个sq3文件
    merge_sq3(target_path)
    end_time = time.time()
    execution_time = end_time - start_time
    print(f"写入sq3 函数执行时间:{execution_time} 秒")
    exit()

6.在上述基础上,再继续提效

        若单个geojson文件太大时,可多线程分批读取,将读取的块内容,写到一个分块的.sq3,再并发合并到单个图层的sq3,最后将多个图层合并到一个sq3中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2174725.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

甄选范文“论分布式存储系统架构设计”,软考高级论文,系统架构设计师论文

论文真题 分布式存储系统(Distributed Storage System)通常将数据分散存储在多台独立的设备上。传统的网络存储系统采用集中的存储服务器存放所有数据,存储服务器成为系统性能的瓶颈,也是可靠性和安全性的焦点,不能满足大规模存储应用的需要。分布式存储系统采用可扩展的…

车辆重识别(去噪扩散概率模型)论文阅读2024/9/27

[2] Denoising Diffusion Probabilistic Models 作者:Jonathan Ho Ajay Jain Pieter Abbeel 单位:加州大学伯克利分校 摘要: 我们提出了高质量的图像合成结果使用扩散概率模型,一类潜变量模型从非平衡热力学的考虑启发。我们的最…

linux驱动设备程序(内核层、应用层)

一、linux驱动程序 1、分类 字符设备&#xff08;驱动&#xff09;、块设备&#xff08;驱动&#xff09;、网络设备&#xff08;驱动&#xff09;。 2、核心 应用程序运行在用户空间&#xff08;3G&#xff09;&#xff1b;<系统调用>——><陷入>——>&…

正则表达式在过滤交换机lldp信息的应用举例

#include <iostream> #include <string> #include <regex> #include <vector> #include <unordered_map> #include <sstream> #include <unistd.h> // For usleep// 假设存在的 LOG_INFO 和 LOG_WARNING 函数 #define LOG_INFO(...)…

17.第二阶段x86游戏实战2-线程发包和明文包

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 本人写的内容纯属胡编乱造&#xff0c;全都是合成造假&#xff0c;仅仅只是为了娱乐&#xff0c;请不要…

基于docker-compose部署openvas

目录 0.部署openvas 1.编辑docker-compose文件 2.运行compose 3.访问openvas 4.openvas扫描 5.创建任务 6.点击Task Wizard ​编辑 7.输入通讯的IP地址 8.下载报告 9.下载完成 0.部署openvas 1.编辑docker-compose文件 vim docker-compose.yaml version: 3service…

《论文阅读》 用于产生移情反应的迭代联想记忆模型 ACL2024

《论文阅读》 用于产生移情反应的迭代联想记忆模型 ACL2024 前言简介任务定义模型架构Encoding Dialogue InformationCapturing Associated InformationPredicting Emotion and Generating Response损失函数问题前言 亲身阅读感受分享,细节画图解释,再也不用担心看不懂论文啦…

通信工程学习:什么是MAI多址干扰

MAI:多址干扰 MAI多址干扰(Multiple Access Interference)是无线通信领域,特别是在码分多址(CDMA)系统中,一个关键的干扰现象。以下是对MAI多址干扰的详细解释: 一、定义 多址干扰是指在CDMA系统中,由于多个用户的信号在时域和频域上是混叠的,从而导…

区块链可投会议CCF C--FC 2025 截止10.8 附录用率

Conference&#xff1a;Financial Cryptography and Data Security (FC) CCF level&#xff1a;CCF C Categories&#xff1a;network and information security Year&#xff1a;2025 Conference time&#xff1a;14–18 April 2025, Miyakojima, Japan 录用率&#xff1…

阿里云oss配置

阿里云oss配置 我们可以使用阿里云的对象存储服务来存储图片&#xff0c;首先我们要注册阿里云的账号登录后可以免费试用OSS服务。 之后我们打开控制台&#xff0c;选择对象存储服务&#xff0c;就看到我们下面的画面&#xff1a; 我们点击创建Bucket,之后就会出现如下图界面…

退出系统接口代码开发

退出系统不需要传入参数 请求过滤404的错误--请求次数监听这些都不需要更改 从controller层开始开发代码&#xff0c;因为每个接口都需要增加接口防刷拦截&#xff0c;不然会恶意攻击&#xff0c;所以在这里增加退出系统接口防刷拦截&#xff1b;并退出系统接口没有header和t…

图像分割(九)—— Mask Transfiner for High-Quality Instance Segmentation

Mask Transfiner for High-Quality Instance Segmentation Abstract1. Intrudouction3. Mask Transfiner3.1. Incoherent Regions3.2. Quadtree for Mask RefinementDetection of Incoherent Regions四叉树的定义与构建四叉树的细化四叉树的传播 3.3. Mask Transfiner Architec…

修改Kali Linux的镜像网站

由于官方的镜像可能会出现连接不上的问题导致无法安装我们所需要的包&#xff0c;所以需要切换镜像站为国内的&#xff0c;以下是一些国内常用的Kali Linux镜像网站&#xff0c;它们提供了与Kali Linux官方网站相同的软件包和资源&#xff0c;但访问速度更快&#xff1a; 清华…

Feign:服务挂了也不会走fallback

Feign 本质上是一个 HTTP 客户端&#xff0c;用于简化微服务之间的 HTTP 通信。它允许开发者通过定义接口和注解来声明式地编写 HTTP 客户端&#xff0c;而无需手动编写 HTTP 请求和响应处理的代码。 今天在模拟微服务A feign调用微服务B的时候&#xff0c;把微服务B关了&#…

通过WinCC在ARMxy边缘计算网关上实现智能运维

随着信息技术与工业生产的深度融合&#xff0c;智能化运维成为提升企业竞争力的关键因素之一。ARMxy系列的ARM嵌入式计算机BL340系列凭借其高性能、高灵活性和广泛的适用性&#xff0c;为实现工业现场的智能运维提供了坚实的硬件基础。 1. 概述 ARMxy BL340系列是专为工业应用…

python爬虫案例——抓取链家租房信息(8)

文章目录 1、任务目标2、分析网页3、编写代码1、任务目标 目标站点:链家租房版块(https://bj.lianjia.com/zufang/) 要求:抓取该链接下前5页所有的租房信息,包括:标题、详情信息、详情链接、价格 如: 2、分析网页 用浏览器打开链接,按F12或右键检查,进入开发者模式;因…

【病理图像】如何获取全切片病理图像的信息python版本

1. QuPath 拿到一张全切片病理图像时,我们可以用QuPath查看,如下图: 随着鼠标滚轮的滑动,我们可以看到更加具体的细胞状态,如下图: 当然,我们也可以在Image看到当前全切片图像的一些信息,如下图: 如果是10张以内的图像还好,我们可以一张一张打开查看,但是我们在…

基于VUE的在线手办交易平台购物网站前后端分离系统设计与实现

目录 1. 需求分析 2. 技术选型 3. 系统架构设计 4. 前端开发 5. 后端开发 6. 数据库设计 7. 测试 8. 部署上线 9. 运维监控 随着二次元文化的兴起&#xff0c;手办作为一种重要的周边产品&#xff0c;受到了广大动漫爱好者的喜爱。手办市场的需求日益增长&#xff0c;…

重拾CSS,前端样式精读-布局(表格,浮动,定位)

前言 本文收录于CSS系列文章中&#xff0c;欢迎阅读指正 CSS布局在Web开发中经历了多个阶段的演变&#xff0c;不同的时期出现了不同的布局方法&#xff0c;以适应不断变化的设计需求&#xff0c;从表格布局&#xff0c;浮动布局&#xff0c;到弹性盒&#xff0c;格栅布局&am…

【C++】—— priority_queue与仿函数

【C】—— priority_queue 与仿函数 1 priority_queue 介绍2 priority_queue 的使用2.1 priority_queue 的函数接口2.2 priority_queue 的使用 3 仿函数3.1 什么是仿函数3.2 仿函数的应用 4 需自己写仿函数的情况4.1 类类型不支持比较大小4.2 类中支持的比较方式不是我们想要的…