数据同步大冒险:PostgreSQL到MySQL的奇妙之旅

news2025/1/13 7:34:52

引言:一场跨数据库的浪漫邂逅 💑

在数据的世界里,不同数据库系统就像是来自不同星球的恋人,它们各自拥有独特的魅力,但偶尔也会渴望一场跨越界限的亲密接触。今天,我们就来见证一场PostgreSQL与MySQL之间的浪漫邂逅——定时获取PostgreSQL中的数据,并将其温柔地同步至MySQL的怀抱中。这不仅是一场技术的挑战,更是一次数据流转的艺术展现!

场景设定:数据的星际旅行 🚀

想象一下,你是一位数据守护者,负责管理着两个星球的数据库:PostgreSQL的“科技星球”和MySQL的“人文星球”。每天,你都需要从“科技星球”收集最新的科研成果(数据),然后运送到“人文星球”上,让那里的居民也能享受到科技进步的果实。

思路分析:星际导航图 🗺️

要完成这场星际旅行,我们需要精心规划航线:

启航准备:确保两艘飞船(数据库连接)都已就绪,且飞船上的储物舱(数据表)结构相似,便于数据转移。
坐标定位:首先,从PostgreSQL中读取最新的数据ID,这是我们的“出发坐标”。然后,在MySQL中查询最新的数据ID,作为我们的“目标坐标”。
航线规划:通过比较两个坐标的差距,我们可以确定哪些数据是新增的,需要被“运送”到MySQL。这就像是在浩瀚的数据海洋中,绘制出一条最优的航线。
数据转移:根据规划好的航线,将选定的数据从PostgreSQL中提取出来,并安全地“降落”在MySQL的相应位置。
实战代码:编写星际导航程序 💻
虽然具体的代码实现需要你们自己来完成(因为你们才是这场冒险的主角!),但我可以给你们一个大致的框架,就像是一个星际导航程序的伪代码:

# 假设这是你的星际导航程序  
  
def connect_to_postgresql():  
    # 连接PostgreSQL数据库,获取连接对象  
    # ...  
    return pg_connection  
  
def connect_to_mysql():  
    # 连接MySQL数据库,获取连接对象  
    # ...  
    return mysql_connection  
  
def fetch_latest_ids(connection, table_name):  
    # 从指定数据库中获取最新数据ID  
    # 使用SQL查询,如 SELECT MAX(id) FROM table_name  
    # ...  
    return latest_id  
  
def sync_data(pg_connection, mysql_connection, source_table, target_table):  
    # 1. 获取两个数据库的最新ID  
    pg_latest_id = fetch_latest_ids(pg_connection, source_table)  
    mysql_latest_id = fetch_latest_ids(mysql_connection, target_table)  
      
    # 2. 确定需要同步的数据范围  
    if pg_latest_id > mysql_latest_id:  
        # 3. 编写SQL查询,选择ID在mysql_latest_id到pg_latest_id之间的数据  
        # 4. 执行查询,获取数据  
        # 5. 编写SQL语句,将获取的数据插入MySQL  
        # ...  
  
# 定时执行数据同步任务  
# 可以使用APScheduler等库来实现定时任务

结尾:星际旅行的意义 🌟
通过这场PostgreSQL到MySQL的数据同步冒险,我们不仅实现了数据的跨库流动,更深刻体会到了数据在不同系统间共享的重要性。正如星际旅行不仅仅是为了探索未知,更是为了促进不同文明之间的交流与融合。希望这次经历能激发你们对数据世界更多奇妙的想象和探索!

实际源码

"""
功能:
监测数据表是否更新
连接postgresql
连接mysql
比较-同步
"""
import sys
import pymysql
import psycopg2
import pandas as pd
import numpy as np

dbname_mysql = 'followup'
table_name_list_mysql = ['s01_issue_table',
                         'np_kickoff','np_rfq',
                         's02_np_kickoff','s02_np_rfq',
                         's06_kick_off_list','s06rfqlist']

dbname_postgresql = 'lcmbigdata'
table_name_list_postgresql = ['s01_issue_table',
                         'np_kickoff','np_rfq',
                         's02_np_kickoff','s02_np_rfq',
                         's06_kick_off_list','s06rfqlist']

col_address_lists = [['create_time', 'update_time'],
                     ['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date'],
                     ['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date'],
                     ['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date']]

modelname_postgresql = 'digitalelf'

# 数据库连接参数
conn_params_mysql = {
    "database": dbname_mysql,
    "user": "root",
    "password": "root",
    "host": "localhost",
    "port": 3306  # 端口号应该是整数
}
conn_params_postgresql = {
    "database": dbname_postgresql,
    "user": "postgres",  # 数据库用户
    "password": "root",  # 数据库密码
    "host": "localhost",  # 数据库服务器地址
    "port": 6666  # 数据库端口
}

issue_columns = ['id' ,'model_no' ,'part_no' ,'issue_type', 'issue_from', 'start_date',
 'end_date' ,'status', 'priority', 'fab', 'issue_description', 'root_cause',
 'customer' ,'customer_operation' ,'phase', 'analysis', 'solution', 'progress',
 'lesson_learnt' ,'create_by' ,'create_time', 'update_by', 'update_time',
 'sys_org_code' ,'site', 'issue_owner', 'is_nudd' ,'attachment' ,'issue_dept',
 'issue_update' ,'func', 'material_structure']
kickoff_columns = ['id', 'week', 'jiazhi', 'jishu', 'modelno', 'pn', 'customer', 'technology', 'cellsite', 'kickoffdate', 'mpdate', 'dvtdate', 'fcst', 'design_processremark', 'yingyong', 'kickoff_gopremium', 'jingzhengduishou', 'kehu_shiyong_fangshi', 'odm', 'others_feiyong', 'renli_target', 'fy_target', 'mpiowner', 'mpi_bumen', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code']
rfq_columns = ['id', 'model_no', 'customer', 'evaluation_date', 'rfq_result', 'fail_cause', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code']
s02_kickoff_columns = ['id', 'week', 'customer', 'technology', 'odm', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'renli_target', 'fy_target', 'jiazhi', 'jishu', 'pn', 'cellsite', 'kickoffdate', 'mpdate', 'dvtdate', 'fcst', 'design_processremark', 'yingyong', 'kickoff_gopremium', 'jingzhengduishou', 'kehu_shiyong_fangshi', 'others_feiyong', 'mpiowner', 'mpi_bumen', 'modelno']
s02_rfq_columns = ['id', 'customer', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'model_no', 'evaluation_date', 'rfq_result', 'fail_cause']
s06_kickoff_columns = ['id', 'modelno', 'technology', 'cellsite', 'fcst', 'jingzhengduishou', 'odm', 'mpiowner', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'week', 'jiazhi', 'jishu', 'kickoff_gopremium', 'pn', 'customer', 'kickoffdate', 'mpdate', 'dvtdate', 'yingyong', 'kehu_shiyong_fangshi', 'others_feiyong', 'renli_target', 'fy_target', 'design_processremark', 'mpi_bumen']
s06_rfq_columns = ['id', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'model_no', 'customer', 'evaluation_date', 'rfq_result', 'fail_cause']

columns_names = [issue_columns,
                 kickoff_columns,rfq_columns,
                 s02_kickoff_columns,s02_rfq_columns,
                 s06_kickoff_columns,s06_rfq_columns]

sql_host = 'localhost'
port = 3306
user = 'root'
password = 'root'
sql_db_name = 'followup'
# sql_table_name = 'np_issue'


def replace_nan_with_none(value):
    """将 numpy.nan 替换为 None,其他值保持不变。"""
    return None if np.isnan(value) else value

# #数据库SQL上传函数
def sql_connect():
    try:
        conn = pymysql.connect(
            host = sql_host,
            port = port,
            user = user,
            password = password,
            db = sql_db_name,
            charset='utf8')
        return conn
    except Exception as e:
        logging.error('SQL CONNECT' + str(e))

def sql_upload(raw_data, table_name):
    """
    将DataFrame raw_data写入MySQL数据库指定的table_name表中。
    """
    # 连接数据库
    conn = sql_connect()
    cursor = conn.cursor()

    # 确保表存在,如果不存在则创建表
    # 注意:这里简化了表的创建过程,实际应用中可能需要根据raw_data的列名和数据类型创建合适的表结构
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        -- 假设所有列都是VARCHAR类型,实际情况应根据raw_data的列调整
        flag VARCHAR(255),
        period VARCHAR(255),
        dept VARCHAR(255),
        -- 其他列...
        bd VARCHAR(255)
    );
    """
    cursor.execute(create_table_sql)

    # 构建插入语句
    insert_sql = f"""
    INSERT INTO {table_name} ({', '.join(raw_data.columns)})
    VALUES ({', '.join(['%s'] * len(raw_data.columns))});
    """

    # 执行插入语句
    for index, row in raw_data.iterrows():
        try:
            # 处理列表中的每个元素
            # print("index:\n",index)
            row_list = list(row)
            for i in range(len(row_list)):
                if row_list[i] is np.nan:
                    row_list[i] = replace_nan_with_none(row_list[i])
            # if np.nan in row:
            #     cleaned_row = [replace_nan_with_none(x) for x in list(row)]
            cursor.execute(insert_sql, row_list)
            conn.commit()
        except Exception as e:
            print("上传失败:", e)
            print(insert_sql,list(row))
            conn.rollback()

    cursor.close()
    conn.close()
    print("数据成功上传至数据库")

def data_update_PostgreSQL_MySQL(data_type_tips):
    for i in range(0,len(table_name_list_mysql)):
        # i=1
        print("🦁"*30)
        print(f"第{data_type_tips}次更新{table_name_list_mysql[i]}")
        print("🐎" * 30)
        table_name_mysql = table_name_list_mysql[i]
        table_name_postgresql = table_name_list_postgresql[i]
        col_address_list = col_address_lists[i]
        data_columns = columns_names[i]
        id_list_mysql = []
        id_list_postgresql = []
        id_difference = []
        try:

            # 使用连接参数建立PostgreSQL连接
            try:
                conn_postgresql = psycopg2.connect(**conn_params_postgresql)
                # print("成功连接到PostgreSQL数据库")

                # 创建一个cursor对象来执行SQL命令
                cur_postgresql = conn_postgresql.cursor()

                # 执行SQL查询(例如:选择所有记录)
                # cur_postgresql.execute(f"SELECT * FROM {dbname}.{your_table_name};")
                cur_postgresql.execute(f"SELECT id FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql};")

                # 获取所有查询结果
                rows = cur_postgresql.fetchall()

                for row in rows:
                    id_list_postgresql.append(row[0])
                    # print(row)

            except (Exception, psycopg2.DatabaseError) as error:
                print(f"PostgreSQL数据库错误:{error}")

            # 使用连接参数建立MySQL连接
            try:
                conn_mysql = pymysql.connect(**conn_params_mysql)
                # print("成功连接到MySQL数据库")

                # 创建一个cursor对象来执行SQL命令
                cur_mysql = conn_mysql .cursor()

                # 执行SQL查询(例如:选择所有记录)
                # 修复 SQL 查询字符串中的表名引用
                #cur.execute(f"SELECT * FROM {your_table_name};")
                cur_mysql.execute(f"SELECT id FROM {table_name_mysql};")

                # 获取所有查询结果
                rows = cur_mysql.fetchall()

                for row in rows:
                    id_list_mysql.append(row[0])
                    # print(row)

                for id in id_list_postgresql:
                    if id not in id_list_mysql:
                        # print(id)
                        id_difference.append(id)
                print(len(id_difference))
                # print(id_difference)
                # sys.exit()
                print("🐒"*20)
                if len(id_difference):
                    if len(id_difference) == 1:
                        cur_postgresql.execute(
                            f"SELECT * FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql} where id = '{id_difference[0]}';")
                    else:
                        id_difference = tuple(id_difference)
                        cur_postgresql.execute(f"SELECT * FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql} where id in {id_difference};")

                    """新入新的数据库"""
                    # 获取所有查询结果
                    rows_new = cur_postgresql.fetchall()
                    # print(len(rows_new),len(data_columns))
                    print("🐅" * 20)
                    rows_pd = pd.DataFrame(rows_new, columns=data_columns)
                    print("🐍"*20)
                    # print(len(rows_pd.columns),len(rows_new),len(data_columns))
                    print("🐉" * 20)
                    # 处理 DataFrame 中的日期时间列
                    for col in data_columns:
                        data_type = rows_pd[col].dtype
                        if 'datetime64[ns, UTC+08:00]' in str(data_type):
                            # 移除时区信息
                            rows_pd[col] = rows_pd[col].dt.tz_localize(None)
                            # 将日期时间转换为字符串格式
                            rows_pd[col] = rows_pd[col].dt.strftime('%Y-%m-%d %H:%M:%S')
                    sql_upload(rows_pd, table_name_mysql)
                    print(f"已同步{len(id_difference)}条记录至{table_name_mysql}")
                    # print('★' * 10, '\n', 'successs')
                else:
                    print(f"已同步{len(id_difference)}条记录至{table_name_mysql}")

            except (Exception, pymysql.DatabaseError) as error:
                print(f"MySQL数据库错误:{error}")

            finally:
                # 关闭cursor和连接
                if cur_postgresql:
                    cur_postgresql.close()
                if conn_postgresql:
                    conn_postgresql.close()
                    # print("PostgreSQL数据库连接已关闭")

                # 关闭cursor和连接
                if cur_mysql:
                    cur_mysql.close()
                if conn_mysql :
                    conn_mysql .close()
                    # print("MySQL数据库连接已关闭")
        except Exception as e:
            print(f"数据库同步ERROR {e}")

def data_update():
    for i in range(1,3):
        # print(f"第{i}次更新数据库")
        data_update_PostgreSQL_MySQL(i)

data_update()

运行结果

在这里插入图片描述

增加定时功能

import schedule
import time

def data_update():
    print("Updating data...")

# 每15分钟执行一次
schedule.every(15).minutes.do(data_update)

while True:
    schedule.run_pending()
    time.sleep(1)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2087403.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

初赛试题-2022年CSP-J3

先言 本次试卷 完善程序 三、完善程序(每题3分,共30分) (1) 【答案】 A B C D A (2) 【答案】 A B C D A

探索用于小占用关键词检测的TinyML框架:一个简明概述

目录 摘要 第一部分:引言 第二部分:部署TinyML的常见挑战 第三部分:SF-KWS的不同方法 A. 网络架构(Network Architecture) B. 学习技术(Learning Techniques) C. 模型压缩(Mo…

最新黑名单查询录入系统PHP网站源码

源码介绍: 最新黑名单查询录入系统PHP网站源码 前端html 后端layui 操作部分都采用API接口的方式实线 集结了layui表格的多数据操作,添加,批量删除,分页,单项删除 后台数据修改采用绑定参数的形式来进行修改可以很…

C语言函数递归(含扫雷进阶思路)

文章目录 一、什么是递归二、递归的使用思路和限制条件1.递归的使用思路2.递归的限制条件 三、递归的举例举例1:求n的阶乘2.举例2:顺序打印⼀个整数的每⼀位 四、递归与迭代对比五、递归与迭代对比举例七、扫雷进阶思路 一、什么是递归 递归是学习C语⾔函…

暄桐教室分享“闲人”指南

一种理想的生活状态,叫“做个闲人”,如苏东坡《行香子述怀》那般,“对一张琴,一壶酒,一溪云”,放下纷扰,好自在。然而,闲并不是简单的无事可做,让自己时光充沛、能量聚集…

MacOS使用FileZilla通过ssh密钥文件连接远程服务器(已解决)

需求描述 mac电脑,使用filezilla通过FTP连接远程服务器,使用ssh密钥文件代替密码。 版本信息 MacOS:Sonoma 14.5 M3芯片 FileZilla:3.66.5 在这里插入图片描述 连接 1. 创建站点 打开filezilla工具,右上角选择“文件 -> 站点管理器”,打开站点管理器弹窗。 2.…

vue 动态替换父组件

替换父组件?? 什么鬼??? 这个场景的确很少见!!不过我们要说的的确是要替换父组件!!!!!! 就是子组件内容不变但是父组件变…

【HuggingFace Transformers】LlamaDecoderLayer源码解析

LlamaDecoderLayer源码解析 1. LlamaDecoderLayer 介绍2. LlamaDecoderLayer 类源码解析 1. LlamaDecoderLayer 介绍 LlamaDecoderLayer 是 LLaMA 模型中的一个关键组件,它结合了自注意力机制、全连接层和残差连接,以及对输入数据的归一化。主要流程为&…

SpringCloud之一IDEA导入已有微服务项目并启动服务

一|、导入已有微服务项目 启动idea,file --> open,选择项目根目录;点击屏幕右侧maven projects按钮 -->点那个绿,如果屏幕右侧没有maven projects按钮,点击Help->Find Action,输入maven&#xff…

算法-存在重复元素(219)

这道题一眼看过去暴力,两层循环,找到相等的数字,然后判断一下就行,但是这样的话不符合哈希表使用原则。这道题同样利用了hash表键值配对的规则。 class Solution {public boolean containsNearbyDuplicate(int[] nums, int k) {M…

C语言 | Leetcode C语言题解之第382题链表随机节点

题目: 题解: typedef struct {struct ListNode * head; } Solution;Solution* solutionCreate(struct ListNode* head) {Solution * obj (Solution *)malloc(sizeof(Solution));assert(obj ! NULL);obj->head head;return obj; }int solutionGetRa…

keil中内存的存储规律

keil中内存的存储规律 keil中内存的存储规律 文章目录 keil中内存的存储规律keil中内存的存储规律 keil中内存的存储规律 #include <stdlib.h> #include "gd32f30x.h" #include "led_drv.h" #include "delay.h" #include "key_drv.…

GIT 下载安装使用教程

一. GIT下载 git下载地址https://git-scm.com/downloads 二. git安装 1. 许可声明 看完许可声明&#xff0c;点击Next就好了 2. 选择安装路径 默认为C盘&#xff0c;可以修改&#xff0c;这里修改为D盘&#xff0c;点击Next 3. 组件选择 勾选添加在桌面上&#xff0c;就是…

android gradle特别慢

gradle下载慢 修改gradle-wrapper.properties 替换https://services.gradle.org/distributions为https://mirrors.cloud.tencent.com/gradle distributionBaseGRADLE_USER_HOME distributionPathwrapper/dists distributionUrlhttps\://mirrors.cloud.tencent.com/gradle/g…

jenkins发布文件到远程服务器

jenkins安装 安装教程 后台启动脚本 创建脚本&#xff1a;start_jenkins.sh ls for pid in $(ps -ef|grep jenkins.war|grep -v grep|cut -c 10-16); doecho $pid;kill -9 $pid; done;nohup java -Djava.awt.headlesstrue -jar /usr/local/jenkins/jenkins.war --webroot/…

Linux入门攻坚——30、sudo、vsftpd

su&#xff1a;Switch User&#xff0c;即切换用户 su [-l user] -c ‘COMMAND’ 如&#xff1a;su -l root -c ‘COMMAND’ 如果没有指定-l user&#xff0c;则默认是root sudo&#xff1a;可以让某个用户不需要拥有管理员的密码&#xff0c;而可以执行管理员的权限。 需…

RabbitMQ练习(Topics)

1、RabbitMQ教程 《RabbitMQ Tutorials》https://www.rabbitmq.com/tutorials 2、环境准备 参考&#xff1a;《RabbitMQ练习&#xff08;Hello World&#xff09;》和《RabbitMQ练习&#xff08;Work Queues&#xff09;》。 确保RabbitMQ、Sender、Receiver、Receiver2容器…

云原生向量数据库 PieCloudVector 助力多模态大模型 AI 应用

全球 AGI&#xff08;人工通用智能&#xff09;市场快速增长的背景下&#xff0c;企业应用成为推动这一领域发展的主要力量&#xff0c;企业如何选择合适的技术来支撑其智能化转型显得尤为重要。在墨天轮《数据库技术如何增强 AI 大模型&#xff1f;》数据库沙龙活动中&#xf…

C语言典型例题55

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 题目&#xff1a; 例题4.7 兔子的繁殖。这是一个有趣的古典问题&#xff1a;有一对兔子&#xff0c;从出生后的第3个月开始起每个月都生一对兔子。小兔子长到第3个月又生一对兔子。假设所有兔子都不死&#xff0c;…

深度解读SGM41511电源管理芯片I2C通讯协议REG08寄存器解释

REG08 是 SGM41511 的第九个寄存器&#xff0c;地址为 0x08。这是一个只读&#xff08;R&#xff09;寄存器&#xff0c;用于报告各种状态信息。上电复位值&#xff08;PORV&#xff09;为 xxxxxxxx&#xff0c;表示上电时的初始状态是不确定的。这个寄存器提供了充电器当前状态…