python链接池和pymysql批量入库——从0实现大规模异步爬虫框架项目4

news2024/11/15 22:53:21

我将这个链接池和批量入库封装了一个工具类上传了pypi,可以直接import使用

使用也较为简单,导入PooledDBhelper的DBhelper,调用DBhelper.PooledDBhelper()方法传入数据库链接信息创建一个链接池即可

pip install PooledDBhelper==1.0.0

 

 -------------------------------

正文

众所周知,不管是爬虫也好后端也好,Python开发最常用的ORM就是sqlAlchemy,他很完善很强大,但是为了更快更轻,不用学习新的语法,而且可以理解一些sql概念而不是直接使用工具。

我们先来维护一个链接池,然后做一个sqlhelper工具类,实现更简单好用的数据存储入库。

数据库:现在的数据库很多,关系型数据库 MySQL(MariaDB), PostgreSQL 等,NoSQL数据库,还有NewSqL数据库。但MySQL(Mariadb)从易获取性、易使用性、稳定性、社区活跃性方面都有较大优势,所以,我们在够用的情况下都选择MySQL。

数据库客户端模块:然后我们选择PyMySQL这个库,它可以和Python 3的异步模块aysncio结合起来,形成了aiomysql 模块,后面我们写异步爬虫时就可以对数据库进行异步操作了。链接池模块: 我们考虑到创建和释放数据库连接是一个很耗时的操作,所以通常创建一个连接池,需要就获取,用完则放回连接池。这个模块有主要有两个模块PooledDB和PersistentDB,我们选择PooledDB

一个简单的链接池案例:引入pymysql和PooledDB,实例化PooledDB在参数中传入数据库链接配置,creator参数选择pymysql,得到一个链接池,使用链接池的connection()方法获取一个链接,使用链接的cursor()方法获得游标,然后execute()执行sql,从cursor.fetchall()中获取结果即可。下面task函数就是简单的使用,然后如果需要我们可以开线程去跑。
import pymysql
from dbutils.pooled_db import PooledDB

test_POOL=PooledDB(
    creator=pymysql,
    maxconnections=10,
    mincached=2,
    blocking=True,
    host='127.0.0.1',
    # sshtunnel='',
    port=3306,
    user='root',
    password='root',
    database='jxc',
    charset='utf8'
)
def task(num):
    sql = "SELECT * FROM duty"
    conn=test_POOL.connection()
    cursor=conn.cursor(cursor=pymysql.cursors.DictCursor)
    cursor.execute(sql)
    data = cursor.fetchall()
    print(num, '-' * 8)
    for i in data:
        print(i)
    conn.close()

from threading import Thread
for i in range(32):
    t=Thread(target=task,args=(i,))
    t.start()

查询的方法就像上面这样,接下来我们写一个类

包含创建链接池和一些数据库操作,具体解释一下解释使用dbutils库 传入数据库链接信息创建一个链接池,我这用了一个{}接受参数也方便后面做扩展。

import pymysql
from pymysql.cursors import DictCursor
from dbutils.pooled_db import PooledDB

class PooledDBhelper:
    def __init__(self, dbconfig: {}):
        '''
        :param dbconfig: {
            'host': '192.168.0.1',
            'user': 'username',
            'password': 'password',
            'port': 3306,
            'db': 'db_name'
        }
        '''
        self.pool = self.connectionPool(dbconfig)
    def connectionPool(self, dbconfig):
        try:
            pool = PooledDB(
                creator=pymysql,
                maxconnections=10,  # 连接池允许的最大连接数,0和None表示不限制连接数
                mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建

                # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                blocking=True,  
                host=dbconfig['host'],
                user=dbconfig['user'],
                passwd=dbconfig['password'],
                db=dbconfig['db'],
                cursorclass=DictCursor
            )
            return pool
        except Exception as e:
            raise Exception("数据库链接失败(create connect failed):{}".format(e))

写完这个类我们就可以通过  pool=PooledDBhelper(dbconfig)获得一个链接池了,这个池一般开局注册一个作为全局变量,而不是每次数据库操作新注册一个池子。


if __name__ == "__main__":
    pool=PooledDBhelper({
        'host': '192.168.0.1',
        'user': 'username',
        'password': 'password',
        'port': 3306,
        'db': 'db_name'
    })

然后接下来我们继续给这个类添加功能,常用的两个执行一个sql语句获取一条结果(或插入单条)和执行一个sql语句获取多条结果这个直接就写了,分别用了两种写法

    def task(self, sql, *args):
        '''
        fetchall
        :param sql:
        :param args:
        :return:
        '''
        conn = self.pool.connection()
        cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
        try:
            cursor.execute(sql, args)
            data = cursor.fetchall()
        except Exception as e:
            raise ("SQL execution failure", e)
        else:
            return data
        finally:
            cursor.close()
            conn.close()

    def fetchone(self, sql):
        with self.pool.connection() as connection:
            connection.autocommit = True
            with connection.cursor() as cursor:
                '''
                在创建连接的时候,增加参数 autocommit = 1 ,当发生update等操作时,会实时更新到数据库内。避免 conn.commit() 来提交到数据库
                如果没有设置自动提交,也没有手动提交,当进行插入或更新等操作时,只在本地客户端能看到更新,在其他客户端或数据库内,数据无变化。
                适合实时操作,随时少量、频繁的更新'''
                row=cursor.execute(sql)
                result = cursor.fetchone()
                connection.commit()
        return result

最后我们做最主要的一个功能,因为爬虫的数据库操作,大部分都是入库,我们做一个批量入库,

因为大部分时候我们爬的数据都是一个[{"k":"v"}] 这样的形式,所以我做了一个只需传入字典列表

自动获取字典的key作为字段,value作为内容的入库

    def insert_many(self, many_data, table_name):
        '''
        :param [{"k1":"v1","k2":"v2"},{"k1":"v3","k2":"v4"}]:
        :param table_name:
        :return: affected_rows
        '''
        values = [tuple(i.values()) for i in many_data]
        keys = list(many_data[-1].keys())
        sql_1 = "insert into `{}`(`{}`) values({})".format(table_name, '`,`'.join(many_data[-1].keys()),
                                                           ','.join([''.join('%s') for _ in keys]))
        try:
            with self.pool.connection() as conn:
                with conn.cursor() as cursor:
                    row_number = cursor.executemany(sql_1, values)
                    conn.commit()
            return "Successful affected_rows: {}".format(row_number)
        except Exception as e:
            conn.rollback()
            return "ERROR:{}".format(e)

好接下来做一些入库的操作,看看好不好用

if __name__ == "__main__":
    pool=PooledDBhelper({
        'host': '192.168.0.1',
        'user': 'username',
        'password': 'password',
        'port': 3306,
        'db': 'db_name'
    })

    data_list= [{"name":'a', 'info':'1'}, {"name":'b', 'info':'2'},{"name":'none', 'info':'3'}]
    rows=pool.insert_many(data_list,"cy_self_test")
    print(rows)

    result_list=pool.task("select * from cy_self_test")
    print(result_list)

    query = "insert into `cy_self_test`({}) values {}".format("`name`,`info`", ("cy","world"))
    pool.fetchone(query)

    query="select  * from cy_self_test where id=1"
    result=pool.fetchone(query)
    print(result)

 然后整个封装起来,我们就获得了一个数据库工具类,做数据库链接池,支持批量插入和查询,基于PooledDB代码封装,并且简化了开发人员的操作。

下一节讲怎么把一个代码包传到pypi

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/184945.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚析构函数的作用

类的析构函数是为了释放内存资源,析构函数不被调用的话就会造成内存泄漏。虚析构函数定义为虚析构函数是为了当用一个基类的指针删除一个派生类的对象时,派生类的析构函数会被调用。但并不是要把所有类的析构函数都写成虚函数。只有当一个类被用来作为基…

信息论复习—卷积码

目录 卷积码的基本概念: 卷积码与分组码的不同特点: 卷积码的构造与表示方法: 卷积码编码器的结构: 卷积码(3,1,3): 卷积码的卷积关系: 卷积码的生成矩阵: 卷积码的多项式: 系统码结构的卷积码: 卷积码的监督矩阵: 卷积…

Pipenv使用指南:轻量级虚拟环境管理工具详解

前言 终于能够挤出一点时间来总结最近学到的一些技术知识点了,博主这两周被居家隔离-集中隔离-居家隔离来回折腾,现在终于是得到解放能够空出的时间来写写博客了,但是项目又催的紧,写博文的时间还是有限,这周我会尽量…

正则语言的性质

正则语言的性质 一、正则语言的性质 1.正则语言的泵引理 设LLL是正则语言,则存在与LLL相关的常数nnn满足:对于任何LLL中的串www,如果∣w∣≥n|w|\geq n∣w∣≥n,则我们就能把www打断为三个串wxyzwxyzwxyz使得: y̸ϵ…

2011年专业408

文章目录0 结果1 题目2 思路2.1 思路1(暴力解:排序)2.2 思路2(较优解:归并合并数组)2.3 思路3(较优解:数组指针后移)2.4 思路4(最优解:两个数组的…

ARM BTI指令介绍

目录 一、JOP 二、BTI 三、启用BTI 四、BTI是怎么实现的 一、JOP JOP(Jump-oriented programming)类似于ROP(Return-Oriented Programming)。在 ROP 攻击中,会扫描出useful gadgets(易被攻击的一段代码…

【数据结构】顺序队列的原理及实现

1.什么是队列 队列是一种比较特殊的线性表,特殊就在于它只允许在表的前端来进行删除,在表的后端来进行插入,队列它是一种操作受限制的线性表。插入的一端称为队尾,删除的一端称为队头,队列里没有元素就称它为空队列。…

快速使用代码编辑工具vim+ctags+cscope快捷使用itop3568开发板

当以上配置全部完成后,每当拿到新的工程代码,进入相关代码目录,利用 vim ./命令打 开当前目录, 然后 crtl 生成插件所需文件,最后使用命令:qa!退出 vim。此步只需进行一 次。 在此目录中打开任意代码文件或任意子目录…

jdk8新特性-日期时间

1、介绍 日期时间类在Java开发中是必不可少的,前后端如何传递时间参数、数据库的时间在Java中对应什么类型、Java中时间转换的各种方式有哪些? 2、Date类 Date类是JDK1.0时期推出来的第一代时间类,位于java.util包下,是最常用的…

unctf easy_serialize反序列化字符逃逸

&#xfeff;ctf题目平台&#xff1a;UNCTF - HACKING 4 FUN。web题难度适中 easy_serialize 题目源码&#xff1a; <?php include "function.php"; $action $_POST[action]; $name $_POST[name]; $pass $_POST[pass]; $email $_POST[email]; ​ function …

南京小米java面经(一面)

目录1.java支持多继承吗2.线程的生命周期3.线程和进程的区别4.单例模式有几种5.写一下双重锁的单例6.jvm有哪些区域7.jvm哪些区域是线程共享的&#xff0c;哪些是线程私有的8.gc中判断对象可回收的方式有哪些9.gc垃圾回收算法有哪些10.哪些对象可以作为gc root11.gc中的引用计数…

软件设计师教程(四)程序设计语言基础知识

软件设计师教程 软件设计师教程&#xff08;一&#xff09;计算机系统知识-计算机系统基础知识 软件设计师教程&#xff08;二&#xff09;计算机系统知识-计算机体系结构 软件设计师教程&#xff08;三&#xff09;计算机系统知识-计算机体系结构 程序设计语言知识软件设计师…

工资管理系统

一、系统简介 工资管理涉及企业管理的多个方面&#xff0c;如员工基本信息、员工在职离岗、员工考勤、员工加班等等。根据这些信息&#xff0c;在每个月的固定时间&#xff0c;生成全体员工的月工资&#xff0c;部门月工资以及全厂月工资。对于月工资&#xff0c;能够实现按照员…

机器学习【西瓜书/南瓜书】--- 第五章 神经网络

1.神经元模型 1.1 M-P神经元模型 输出函数&#xff1a; 其中θ为阈值&#xff0c; ω i为第i个神经元的连接权重&#xff0c; xi为来自第i个神经元的输入。 1.2 激活函数 阶跃函数&#xff1a; 理论上我们使用阶跃函数。将输入值映射为输出值为0/1&#xff0c;显然1为神经元…

Windows下编译安装OpenCASCADE

OpenCASCADE (以下简称OCC)是一套开源的几何建模系统&#xff0c;提供了曲面、实体等建模方式&#xff0c;已经广泛应用在CAD、CAE、CAM等软件开发。 OpenCASCADE官网已经提供OpenCASCADE的编译安装方法&#xff0c;本文结合实操过程&#xff0c;简述其过程。 零、环境 操作系…

Databend v0.9.0 版本发布

各位社区小伙伴们&#xff0c;历经数月开发&#xff0c;Databend 于 2023 年 1 月 13 日迎来了 v0.9.0 版本的正式发布&#xff01; 这次新版本是 Databend 迈向 1.0 版本的最后一个大版本&#xff0c;也是迄今为止我们对核心代码重构幅度最大的一个版本&#xff01;相较于 v0…

C++ string容器,cha*

目录 1.string基本概念 2.string构造函数,char* 3.string赋值操作 4.string字符串拼接,append 5.string查找和替换,find,replace 6.string字符串比较,compare 7.string字符存取[].at&#xff0c;取&#xff0c;修改单个字符&#xff0c;size返回字符串长度 8.string插入和…

数学建模学习笔记(13)分类模型

分类模型分类问题的基本概念逻辑斯蒂回归&#xff08;Logistic&#xff09;Fisher线性判别分析多分类问题的SPSS求解分类问题的基本概念 分类问题概述&#xff1a;对于给定的一个对象&#xff0c;根据其特征将其划分到多个已给定的类别中的一个。 二分类和多分类&#xff1a;…

题目:两数之和

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录描述描述 方式1:暴力遍历 public static int[] twoSum(int[] nums, int target) {int i 0;int j 1;int[] result new int[2];for(int m i;m<nums.length-1;m){…

13.3nm粒径水溶性Cds/CdTe-PAA-P4VP/BSA的制备方法

13.3nm粒径水溶性Cds/CdTe-PAA-P4VP/BSA的制备方法今天小编分享量子点偶联聚合物&#xff0c;一起看看吧&#xff1a;Cds/CdTe-PAA-P4VP/BSA的制备过程&#xff1a;称取86.5mgCdCl2于三口瓶中&#xff0c;加入295mL二次水&#xff08;考虑到在调pH值时会增加溶液的体积&#xf…