MySQL数据同步到ES集群(MySQL数据库与ElasticSearch全文检索的同步)

news2025/1/16 8:18:49

简介:MySQL数据库与ElasticSearch全文检索的同步,通过binlog的设置对MySQL数据库操作的日志进行记录,利用Python模块对日志进行操作,再利用kafka的生产者消费者模式进行订阅,最终实现MySQL与ElasticSearch间数据的同步。


视频地址:

  1. mysql与elasticsearch同步1-数据库binlog的设置及python读取
  2. mysql与elasticsearch同步2-kafka生产者消费者模式消费binlog
  3. mysql与elasticsearch同步3-elasticsearch的增删改同步数据库

博客地址:

  1. Python实战案例:elasticsearch与数据库mysql的同步(上)
  2. Python实战案例:elasticsearch与数据库mysql的同步(下)

目录

P01-数据库binlog的设置及python读取

程序汇总

reader.py

运行截图

P02-kafka生产者消费者模式消费binlog

zookeeper安装

kafka安装

程序汇总

kafka_consumer.py

kafka_producer.py

kafka_producer_reader.py

reader_data.py

运行截图

P03-elasticsearch的增删改同步数据库

程序汇总

kafka_consumer.py

kafka_producer.py

kafka_producer_reader.py

reader_data.py

附录

视频word笔记

sql语句-readerbinlog.sql


P01-数据库binlog的设置及python读取

mysql -u root -p

show global variables like "%binlog%";

show binlog events;

set global binlog_format="ROW";

create database readerBinlog default charset=utf8;

use readerBinlog;

create table mytable(id int(11), name varchar(20));

insert into table mytable values(1, "孙大圣");

mysql> use readerbinlog;
Database changed
mysql> select * from mytable;
+------+------+
| id   | name |
+------+------+
|    1 | sds  |
|    2 | zbj  |
+------+------+
2 rows in set (0.00 sec)

 

pip3 install mysql-replication

【MySQL】Server-id导致Slave_IO_Running: No主从复制故障_ITPUB博客

(1236, 'Misconfigured master - server id was not set')

  1. mysql> SET GLOBAL server_id=3028;
  2. Query OK, 0 rows affected (0.00 sec)

程序汇总

reader.py

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent,
)

import json
import sys

MYSQL_SETTINGS = {
    "host": "localhost",
    "user": "root",
    "password": "root"
}

stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
                            server_id=2,
                            blocking=True,
                            only_schemas="readerbinlog",
                            only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent])

print(stream)

for binlogstream in stream:
    for row in binlogstream.rows:
        print("========================")
        print(row)

运行截图

P02-kafka生产者消费者模式消费binlog

zookeeper安装

zookeeper下载地址:Index of /zookeeper

kafka安装

kafka下载地址:Apache Kafka

cd windows

dir

kafka-server-start

kafka-server-start ..\..\config\server.properties

kafka-console-producer --broker-list localhost:9092 --topic test

kafka-console-consumer --bootstrap-server localhost:9092 --topic test

pip3 install kafka-python

程序汇总

kafka_consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer("message", bootstrap_servers=["localhost:9092"])
for mess in consumer:
    print(mess.value.decode("utf8"))

kafka_producer.py

from kafka import KafkaProducer

# 实例化生产者
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
producer.send("message", "kafka信息".encode())
producer.close()

kafka_producer_reader.py

from kafka import KafkaProducer
import json

# 实例化生产者
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent,
)

MYSQL_SETTINGS = {
    "host": "localhost",
    "user": "root",
    "password": "root"
}

stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
                            server_id=4,
                            blocking=True,
                            only_schemas="readerbinlog",
                            only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent])

# print(stream)

for binlogstream in stream:
    for row in binlogstream.rows:
        # print("========================")
        # print(row)
        row_json = json.dumps(row, ensure_ascii=False)
        producer.send("message", row_json.encode())
producer.close()

reader_data.py

import pymysql
from elasticsearch import Elasticsearch


def get_data():
    # 连接数据库
    conn = pymysql.connect(host="localhost", port=3306, user="root", password="root", database="readerbinlog")
    # 设置游标
    cursor = conn.cursor()
    # 执行sql语句,查找数据库中的所有的记录
    sql = "select * from mytable"
    cursor.execute(sql)
    # 获取执行sql语句后的所有结果
    results = cursor.fetchall()
    # 返回从数据库中取出的数据
    return results


def write_elasticsearch():
    # es = Elasticsearch()
    es = Elasticsearch(['http://localhost:9100'])
    try:
        results = get_data()
        for row in results:
            print(row)
            res = {
                "id": row[0],
                "name": row[1]
            }
            # es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
            es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
    except Exception as e:
        print(e)


if __name__ == "__main__":
    # print(get_data())
    write_elasticsearch()

运行截图

P03-elasticsearch的增删改同步数据库

pip3 install elasticsearch

谷歌浏览器es head插件

import pymysql
from elasticsearch import Elasticsearch


def get_data():
    # 连接数据库
    conn = pymysql.connect(host="localhost", port=3306, user="root", password="root", database="readerbinlog")
    # 设置游标
    cursor = conn.cursor()
    # 执行sql语句,查找数据库中的所有的记录
    sql = "select * from mytable"
    cursor.execute(sql)
    # 获取执行sql语句后的所有结果
    results = cursor.fetchall()
    # 返回从数据库中取出的数据
    return results


def write_elasticsearch():
    # es = Elasticsearch()
    es = Elasticsearch(['http://localhost:9100'])
    try:
        results = get_data()
        for row in results:
            print(row)
            res = {
                "id": row[0],
                "name": row[1]
            }
            # es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
            es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
    except Exception as e:
        print(e)


if __name__ == "__main__":
    # print(get_data())
    write_elasticsearch()

程序汇总

kafka_consumer.py

from kafka import KafkaConsumer
import json
from elasticsearch import Elasticsearch

consumer = KafkaConsumer("message", bootstrap_servers=["localhost:9092"])
es = Elasticsearch()
for mess in consumer:
    # print(mess.value.decode("utf8"))
    # 传进来的数据需要进行json转换
    result = json.loads(mess.value.decode("utf8"))
    # print(event["event"])
    event = result["event"]
    if event == "insert":
        result_values = result["values"]
        es.index(index="westjourney", doc_type="test-type", id=result_values["id"], body=result_values)
        print("添加数据成功!")
    elif event == "update":
        # 注意更新操作,body内容要加入一个doc键,指示的内容就是要修改的内容
        result_values = result["after_values"]
        es.update(index="westjourney", doc_type="test-type", id=result_values["id"], body={"doc": result_values})
        print("更新数据成功!")
    elif event == "delete":
        result_id = result["values"]["id"]
        es.delete(index="westjourney", doc_type="test-type", id=result_id)
        print("删除数据成功!")

kafka_producer.py

from kafka import KafkaProducer

# 实例化生产者
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
producer.send("message", "kafka信息".encode())
producer.close()

kafka_producer_reader.py

from kafka import KafkaProducer
import json

# 实例化生产者
producer = KafkaProducer(bootstrap_servers=["localhost:9092"])

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
    DeleteRowsEvent,
    UpdateRowsEvent,
    WriteRowsEvent,
)

MYSQL_SETTINGS = {
    "host": "localhost",
    "user": "root",
    "password": "root"
}

stream = BinLogStreamReader(connection_settings=MYSQL_SETTINGS,
                            server_id=4,
                            blocking=True,
                            only_schemas="readerbinlog",
                            only_events=[WriteRowsEvent, UpdateRowsEvent, DeleteRowsEvent])

# print(stream)

for binlogstream in stream:
    for row in binlogstream.rows:
        # print("========================")
        # print(row)
        if isinstance(binlogstream, WriteRowsEvent):
            row["event"] = "insert"
        elif isinstance(binlogstream, UpdateRowsEvent):
            row["event"] = "update"
        elif isinstance(binlogstream, DeleteRowsEvent):
            row["event"] = "delete"
        row_json = json.dumps(row, ensure_ascii=False)
        producer.send("message", row_json.encode())
producer.close()

reader_data.py

import pymysql
from elasticsearch import Elasticsearch


def get_data():
    # 连接数据库
    conn = pymysql.connect(host="localhost", port=3306, user="root", password="root", database="readerbinlog")
    # 设置游标
    cursor = conn.cursor()
    # 执行sql语句,查找数据库中的所有的记录
    sql = "select * from mytable"
    cursor.execute(sql)
    # 获取执行sql语句后的所有结果
    results = cursor.fetchall()
    # 返回从数据库中取出的数据
    return results


def write_elasticsearch():
    # es = Elasticsearch()
    es = Elasticsearch(['http://localhost:9100'])
    try:
        results = get_data()
        for row in results:
            print(row)
            res = {
                "id": row[0],
                "name": row[1]
            }
            # es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
            es.index(index="westjourney", doc_type="test-type", id=row[0], body=res)
    except Exception as e:
        print(e)


if __name__ == "__main__":
    # print(get_data())
    write_elasticsearch()

附录

视频word笔记

  

 

sql语句-readerbinlog.sql

/*
SQLyog Ultimate v12.08 (64 bit)
MySQL - 5.5.40-log : Database - readerbinlog
*********************************************************************
*/


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`readerbinlog` /*!40100 DEFAULT CHARACTER SET utf8 */;

USE `readerbinlog`;

/*Table structure for table `mytable` */

DROP TABLE IF EXISTS `mytable`;

CREATE TABLE `mytable` (
  `id` int(11) DEFAULT NULL,
  `name` varchar(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Data for the table `mytable` */

insert  into `mytable`(`id`,`name`) values (1,'sds'),(2,'zbj'),(3,'lsls'),(4,'shdjsh'),(5,'宋壹');

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

ヾ(◍°∇°◍)ノ゙加油~

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/331155.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++类和对象:面向对象编程的核心。| 面向对象还编什么程啊,活该你是单身狗。

👑专栏内容:C学习笔记⛪个人主页:子夜的星的主页💕座右铭:日拱一卒,功不唐捐 文章目录一、前言二、面向对象编程三、类和对象1、类的引入2、类的定义Ⅰ、声明和定义在一起Ⅱ、声明和定义分开Ⅲ、成员变量命…

ChatGPT 怎么注册使用最新详细教程-新手小白

2022年11月30日chatGPT发布,一年时间风靡全美,甚至有调查,美国89%的大学生用chatGPT做作业,微软用100亿美元投资了该公司,这也引起了google的紧张,神经语言、人工智能、颠覆未来,成为描述chatGP…

VR博物馆带你走进云端,感受数字时代的力量

博物之志,以文化人,为了打破传统线上静态的博物馆图片,VR博物馆给民众带来了全新的视听体验,突破天气、交通、客流量等传统旅游限制问题,在VR全景中还能将线下博物馆的多媒体影响也逐一呈现出来,接下来让我…

ChatGPT给程序员人手一个,这很朋克(由ChatGPT编写)

目录ChatGPT、程序员、朋克为什么程序员需要ChatGPT,为什么这很朋克总结ChatGPT、程序员、朋克 本文由ChatGPT编写。 ChatGPT是由OpenAI开发的大型语言模型。它的核心功能是生成人类语言文本,因此有多种应用场景,如文本生成、对话生成、文本…

FlexGanttFX 11.12.6 Crack

FlexGanttFX 是 JavaFX 的调度和资源规划组件。它允许开发人员通过 CSS 以及可插入渲染器和编辑策略的使用来自定义其外观和行为的每个方面。FlexGanttFX 利用场景图/场景节点和画布 API 的完美组合,确保即使是最大的数据集也可以快速呈现。FlexGanttFX 不仅外表漂亮…

【java】遍历set集合,iterator遍历TreeSet,增强for循环遍历,set排序

目录 1. 增强for循环遍历(底层还是用iterator实现的)2.iterator遍历TreeSet3.说明4.补充测试用的集合来自上篇:https://blog.csdn.net/qq_43622777/article/details/128924730 1. 增强for循环遍历(底层还是用iterator实现的&#…

服务异步通信 RabbitMQ

服务异步通信 RabbitMQRabbitMQ快速入门RabbitMQ概述和安装常见消息模型HelloWorld案例SpringAMQPBasic Queue 简单队列模型消息发送消息接收测试WorkQueue消息发送消息接收测试能者多劳总结发布/订阅Fanout声明队列和交换机消息发送消息接收总结Direct基于注解声明队列和交换机…

Ubuntu 22.04 LTS 入门安装配置优化、开发软件安装一条龙

Ubuntu 22.04 LTS 入门安装配置&优化、开发软件安装 例行前言   最近在抉择手上空余的笔记本(X220 i7-2620M,Sk Hynix ddr3 8G*2 ,Samsung MINISATA 256G)拿来运行什么系统比较好,早年间我或许还会去继续使用Win…

urllib基础+xpath基础(爬虫基础_1)

文章目录1 urllib库的使用1.1 urllib.request发送请求获得响应数据一个类型六个方法内容下载定制请求对象1.2 urllib.parseget请求编码post请求编码1.3 ajax的get请求示例1.4 ajax的post请求示例1.5 Handler处理器1.6 代理服务器2 解析2.1 xpath2.2 JsonPath2.3 BeautifulSoup1…

自动驾驶感知——多传感器融合技术

文章目录1. 运动感知类与环境感知类传感器2. 为什么需要这么多传感器?2.1 从需求侧分析2.2 从供给侧分析3. 多传感器硬件系统的设计思路4. 多传感器系统的时序闭环4.1 传感器时钟闭环构建4.2 成像同步机制5. 多传感器融合算法5.1 多传感器融合问题建模5.2 后融合5.2…

OpenAI ChatGPT 人工智能机器人注册使用,能以中文对答如流的机器人

文章目录一、什么是 ChatGPT二、宇宙最强技术狂魔 马斯克 与 ChatGPT三、在中国大陆如何注册 ChatGPT1. 注册前准备(只适用于中国大陆)2. 注册方法与步骤四、GhatGPT 的使用方法1. 网页直接使用2. 使用 Google Chrome 浏览器插件3. CSDN 已经接入 ChatGP…

创业平台推荐 ⌈ 适和全部开发者 ⌋ | 成为一名开发者原来那么简单 | 获取收益不再困难 | 快来加入这个大家庭吧

💛 前情提要💛 本文是番外篇:在当今生活中,我们都想在业余时间通过不断学习去充实自己、提高自己 而本文就是为大家拓宽一种思路🤩,从身为开发者的角度出发,为大家提供一个全面的平台去开启“…

const在C和C++中的区别

昨天有个学生去做C/C软件工程师的笔试题,遇到了这么一个题目,来问我结果是多少? 看似非常普通的一道C语言题目,如果不指定编译器,还真不知道结果是多少。 不信我来演示给你看下。 首先是用gcc来编译,就是…

Linux系统安全:安全技术和防火墙

目录 一、安全技术 1、安全技术 2、防火墙分类 二、防火墙 1、iptables五表五链 2、黑白名单 3、iptables基本语法 4、iptables选项 5、控制类型 6、隐藏扩展模块 7、显示扩展模块 8、iptables规则保存 9、自定义链使用 一、安全技术 1、安全技术 ①入侵检测系统…

Node.js http 模块详解(1)

http 模块 使用 Node.js 中创建 Web 服务,主要依赖内置的 http 模块。经典的 express.js、koa.js 框架都是以 http 模块为核心,进行的不同程度的封装。 创建一个最简单的 Web 服务只需要几行代码。新建一个 index.js 文件,输入以下内容&…

【GCC】3: webrtc带宽(预估调整)和GCC模块

webrtc源码分析(8)-拥塞控制(上)-码率预估 bandwidth bitrate estimator 整体码控流程 webrtc源码分析(8)-拥塞控制(上)-码率预估 大神绘制的: TWCC TCC算法的流程 TccEstimator 大神用go写的:Transport-CC Algorithm Description This is a Goog

python小游戏——打砖块代码开源

♥️作者:小刘在这里 ♥️每天分享云计算网络运维课堂笔记,努力不一定有收获,但一定会有收获加油!一起努力,共赴美好人生! ♥️夕阳下,是最美的,绽放,愿所有的美好&#…

Redis单线程还快的原因

Redis单线程还快的原因 Redis Server是多线程的,Redis单线程指的是请求处理整个流程是单线程的! 单线程还快的原因 纯内存操作: Redis数据存储在内存中,速度很快。 非阻塞IO多路复用机制: Redis 采用了多路复用机制&a…

从事互联网行业,考一个PMP会有帮助吗?

PMP是项目管理类证书,不要求工作岗位,只要涉及项目、项目管理岗位,就可以考PMP证书,就会有帮助。但这里先说一下PMP的报名条件,先看能不能考,再说考了是否有帮助。 条件如下图,有两个&#xff…

睿智的目标检测64——目标检测中的MixUp数据增强方法

睿智的目标检测64——目标检测中的MixUp数据增强方法学习前言代码下载什么是MixUp数据增强方法实现思路全部代码1、数据增强与MixUp2、调用代码学习前言 哈哈哈!我再来一次数据增强! 代码下载 https://github.com/bubbliiiing/object-detection-augm…