封装Python脚本:使用pymysql+sshtunnel,支持通过SSH隧道方式链接mysql数据库

news2024/12/28 3:03:10

一、前言

通常为了保证数据库安全,不会允许直接连接数据库,而是需要通过SSH隧道去连接服务器背后的数据库;

通过Navicat操作如下:
在这里插入图片描述
在这里插入图片描述

二、python封装脚本

# -*- coding: utf-8 -*-
# @Time    : 2023/5/12 11:04
# @Author  : chenyinhua
# @File    : mysql_handle.py
# @Software: PyCharm
# @Desc: 使用pymysql模块连接mysql数据库的公共方法
import pymysql
from typing import Union
import json
from datetime import datetime
from sshtunnel import SSHTunnelForwarder
from loguru import logger


class MysqlServer:
    """
    初始化数据库连接(支持通过SSH隧道的方式连接),并指定查询的结果集以字典形式返回
    """

    def __init__(self, db_host, db_port, db_user, db_pwd, db_database, ssh=False,
                 **kwargs):
        """
        初始化方法中, 连接mysql数据库, 根据ssh参数决定是否走SSH隧道方式连接mysql数据库
        """
        self.server = None
        if ssh:
            self.server = SSHTunnelForwarder(
                ssh_address_or_host=(kwargs.get("ssh_host"), kwargs.get("ssh_port")),  # ssh 目标服务器 ip 和 port
                ssh_username=kwargs.get("ssh_user"),  # ssh 目标服务器用户名
                ssh_password=kwargs.get("ssh_pwd"),  # ssh 目标服务器用户密码
                remote_bind_address=(db_host, db_port),  # mysql 服务ip 和 part
                local_bind_address=('127.0.0.1', 5143),  # ssh 目标服务器的用于连接 mysql 或 redis 的端口,该 ip 必须为 127.0.0.1
            )
            self.server.start()
            db_host = self.server.local_bind_host  # server.local_bind_host 是 参数 local_bind_address 的 ip
            db_port = self.server.local_bind_port  # server.local_bind_port 是 参数 local_bind_address 的 port
        # 建立连接
        self.conn = pymysql.connect(host=db_host,
                                    port=db_port,
                                    user=db_user,
                                    password=db_pwd,
                                    database=db_database,
                                    charset="utf8",
                                    cursorclass=pymysql.cursors.DictCursor  # 加上pymysql.cursors.DictCursor这个返回的就是字典
                                    )
        # 创建一个游标对象
        self.cursor = self.conn.cursor()

    def query_all(self, sql):
        """
        查询所有符合sql条件的数据
        :param sql: 执行的sql
        :return: 查询结果
        """
        try:
            self.conn.commit()
            self.cursor.execute(sql)
            data = self.cursor.fetchall()
            # 关闭数据库链接和隧道
            self.close()
            return self.verify(data)
        except Exception as e:
            logger.error(f"查询所有符合sql条件的数据报错: {e}")
            raise e

    def query_one(self, sql):
        """
        查询符合sql条件的数据的第一条数据
        :param sql: 执行的sql
        :return: 返回查询结果的第一条数据
        """
        try:
            self.conn.commit()
            self.cursor.execute(sql)
            data = self.cursor.fetchone()
            # 关闭数据库链接和隧道
            self.close()
            return self.verify(data)
        except Exception as e:
            logger.error(f"查询符合sql条件的数据的第一条数据报错: {e}")
            raise e

    def insert(self, sql):
        """
        插入数据
        :param sql: 执行的sql
        """
        try:
            self.cursor.execute(sql)
            # 提交  只要数据库更新就要commit
            self.conn.commit()
            # 关闭数据库链接和隧道
            self.close()
        except Exception as e:
            logger.error(f"插入数据报错: {e}")
            raise e

    def update(self, sql):
        """
        更新数据
        :param sql: 执行的sql
        """
        try:
            self.cursor.execute(sql)
            # 提交 只要数据库更新就要commit
            self.conn.commit()
            # 关闭数据库链接和隧道
            self.close()
        except Exception as e:
            logger.error(f"更新数据报错: {e}")
            raise e

    def query(self, sql, one=True):
        """
        根据传值决定查询一条数据还是所有
        :param sql: 查询的SQL语句
        :param one: 默认True. True查一条数据,否则查所有
        :return:
        """
        try:
            if one:
                return self.query_one(sql)
            else:
                return self.query_all(sql)
        except Exception as e:
            logger.error(f"查询数据报错: {e}")
            raise e

    def close(self):
        """
        断开游标,关闭数据库
        如果开启了SSH隧道,也关闭
        :return:
        """
        # 关闭游标
        self.cursor.close()
        # 关闭数据库链接
        self.conn.close()
        # 如果开启了SSH隧道,则关闭
        if self.server:
            self.server.close()

    def verify(self, result: dict) -> Union[dict, None]:
        """验证结果能否被json.dumps序列化"""
        # 尝试变成字符串,解决datetime 无法被json 序列化问题
        try:
            json.dumps(result)
        except TypeError:  # TypeError: Object of type datetime is not JSON serializable
            for k, v in result.items():
                if isinstance(v, datetime):
                    result[k] = str(v)
        return result

if __name__ == '__main__':
	# 以下调试链接方式,请使用自己的测试数据库配置进行测试。
	# ----------需要走SSH隧道的数据库---------------
    # db_host = "x.x.x.x"
    # db_port = 3306
    # db_user = "root"
    # db_pwd = "*********"
    # db_database = "*******"
    # ssh = True
    ssh_info = {
        "ssh_host": "x.x.x.x",
        "ssh_port": x,
        "ssh_user": "root",
        "ssh_pwd": "*********"
    }
    # ----------不需要走SSH隧道的数据库---------------
    db_host = "127.0.0.1"
    db_port = 3306
    db_user = "root"
    db_pwd = "123456"
    db_database = "mytools"
    ssh = False
    
    db = MysqlServer(db_host, db_port, db_user, db_pwd, db_database, ssh,
                     **ssh_info)
    print(db.query_one(sql="select * from users limit 1;"))

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/518062.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Activity内存泄漏时包含的view还有没有的救?

Activity泄漏会导致该Activity引用到的Bitmap、DrawingCache等无法释放,对内存造成大的压力,兜底回收是指对于已泄漏Activity,尝试回收其持有的资源,泄漏的仅仅是一个Activity空壳,从而降低对内存的压力。 做法也非常简…

ssh终端工具推荐-WindTerm

什么是WindTerm 官方github https://github.com/kingToolbox/WindTerm A Quicker and better SSH/Telnet/Serial/Shell/Sftp client for DevOps. 按官方说明,WindTerm是一个更快更好的SSH/Telnet/Serial/Shell/Sftp的DevOps工具。 WindTerm目前对商业是免费无限制…

QML APP开发套路(二):前/后端交互概述

(1)QML开发简介 Qt应用框架在传统UI(QWidget窗体)的基础上,提供了Qt Quick模块,该模块基于 QML 语言来定义UI及交互方式。区别于 QWidget 定义UI的方式,QML利于将UI交互与业务逻辑处理剥离成前…

什么是智慧校园?

什么是智慧校园? 智慧校园平台是目前教育信息化领域的热点之一。 随着数字化转型的加速,越来越多的学校开始寻求解决方案,以提高教育管理的效率和质量。 在使用智慧校园平台的过程中,一些痛点问题也浮现出来。为解决这些问题&a…

基于AT89C51单片机的贪吃蛇游戏设计

点击链接获取Keil源码与Project Backups仿真图: https://download.csdn.net/download/qq_64505944/87778030 源码获取 主要内容: 设计一个贪吃蛇游戏,使其具有以下游戏规则:①当没有改变方向时,贪吃蛇沿原来路径一直前进②贪吃蛇无法回头,只能异于当前方向改变行动③蛇…

网页更新提醒是什么?如何自动监控网页并自动记录或发送通知?

网页更新提醒是什么? 在互联网信息资源丰富,且更新速度快的情况下,如果需要监控一些网页变化,实现例如热点/热搜/热评监测、商品上新/价格/库存监测、作品上新/评论/点赞监测、招标/投标/拍卖/竞价监测、公告/通知/活动监测等情况…

南京大学主办 | EIScopus检索 | 2023年人工智能与统计学前沿国际会议

2023年人工智能与统计学前沿国际会议 会议简介 Brief Introduction 2023年人工智能与统计学前沿国际会议(CFAIS 2023) 会议时间:2023年8月18日-20日 召开地点:中国南京 大会官网:www.cfais.org 2023年人工智能与统计学国际会议(CFAIS 2023)将…

测评,补单是什么神仙利器?能提高速卖通,国际站,newegg店铺的销量

测评,补单相信这个词对于大部分卖家来说,想必都不陌生,因为都知道它能够快速帮助自己的产品添加评论,获取排名,打造爆款。于是许多卖家潜意识里认为“测评其实就是刷评”。 简单点来说,测评就是卖家通过【评…

485测试

注意如果十六进制发送数据 数据打印时 如果是%c 打印出来的数据会十进制显示 解决方案 1.改变打印方式 %x 打印 2.因为什么也不勾选 ,默认asii显示 利用串口助手发ltz ,打印也是ltz(发什么就显示什么) 如下图所示

如何使用appuploader制作描述文件​

如何使用appuploader制作描述文件​ 承接上文我们讲述了怎么制作证书,本文我们来看下怎么制作描述文件吧。​制作描述文件前我们首先我们来添加一个测试设备,后面再制作描述文件。 1.添加测试设备​ 其中添加设备一项中,根据提示操作添加…

OpenGL ES特效分析 --- 跳动的心

很早之前就见过一个博主发的shader图片,一个跳动的心https://blog.csdn.net/Kennethdroid/article/details/104536532, 感觉太好玩了,于是想要分析一下原理,上面的博主也已经做了初步分析,但是对于我这个特效小白来说还…

VMware Aria Operations 8.12 - 自动驾驶式 IT 运维管理

VMware Aria Operations 8.12 - 自动驾驶式 IT 运维管理 请访问原文链接:https://sysin.org/blog/vmware-aria-operations/,查看最新版。原创作品,转载请保留出处。 作者主页:sysin.org 自动驾驶式 IT 运维管理 VMware Aria Op…

YOLOv5白皮书-第Y3周:yolov5s.yaml文件解读

🍨 本文为🔗365天深度学习训练营 中的学习记录博客 🍖 原作者:K同学啊|接辅导、项目定制 ● 难度:夯实基础⭐⭐ ● 语言:Python3、Pytorch3 ● 时间:5月8日-5月12日 🍺要求&#xff…

电脑永久删除文件怎么找回来?分享一种数据恢复方法

电脑对于日常生活和工作都起着重要作用,但是在我们日常办公中,有时会误删除一些文档,甚至永久删除,当我们想找回的时候却手足无措,不知道该怎么办。同时在很多用户的观念里,当我们无法从回收站恢复时&#…

Openai+Coursera: ChatGPT Prompt Engineering(一)

想和大家分享一下最近学习的Coursera和openai联合打造ChatGPT Prompt Engineering在线课程,下面是通过API来访问ChatGPT的主要代码: import openaiopenai.api_key XXXXXXXXXdef get_completion(prompt, model"gpt-3.5-turbo"):messages [{&…

【OpenCV】学习课-图像裁剪与拼接(2)!

1. 图像的裁剪 cv2.selectROI() ###可以通过鼠标选择感兴趣的矩形区域(ROI) import cv2img cv2.imread("xxx.png", flags1) # flags1 读取彩色图像(BGR)roi cv2.selectROI(img, showCrosshairTrue, fromCenterFalse) xmin, ymin, w, h…

2022年平均工资出炉,IT行业又是第一

根据5月9日国家统计局最新资料显示,2022年,全国城镇非私营单位就业人员年平均工资为114029元,比上年增长6.7%,扣除通胀后实际增长4.6%。其中,行业间的差距相当明显。根据资料显示,2022年无论是在私营单位还…

ESP8266(1):搭建Linux环境ESP8266_RTOS_SDK,ESP8266使用GPIO控制继电器

0)准备工作,之前一直对esp8266不了解,现在想着给鱼缸加个定时打氧的程序,控制泵的工作。所以购买了一个esp8266 Relay,自己摸索写个简单程序。让泵工作一段时间,再休眠一段时间。 1)宿主机Ubuntu 20.04.5 …

Vue 项目利用 HBuilderX 打包 APP 流程

想要将 Vue 打包成 App,要借助一些插件工具,例如:Electron、Cordova 等,或者直接利用开发工具,例如:Android Studio、HBuilderX 等。本文的目的是带大家通过 HBuilder 开发工具对 Vue 项目进行打包。 一、…

MySQL-函数

1.什么是函数 函数在计算机语言的使用中贯穿始终,函数的作用是什么呢?它可以把我们经常使用的代码封装起来, 需要的时候直接调用即可。这样既 提高了代码效率 ,又 提高了可维护性 。在 SQL 中我们也可以使用函数 对检索出来的数据…