背景
日常开发过程中肯定会存在MySQL表数据迁移至ES的情况,以canal为例,数据迁移时需要提前在ES中创建索引Mapping,但是如果碰到字段特别的表时,创建Mapping将是一件耗费心神的事情。为了解决这些重复工作,我使用Python编写了一个脚本,自动将MySQL中的表结构同步到ES中,本脚本只同步表结构,并不同步表数据,如需同步数据可以采用canal或者logstash等方式进行同步
脚本内容
如果不需要直接同步到es中,需要注释脚本最后一行,该脚本会将转换后的mapping信息打印到控制台中
import mysql.connector
import requests
import json
# MySQL连接配置
mysql_config = {
'host': '127.0.0.1',
'port': '3306',
'user': 'root',
'password': '123456',
'database': 'test'
}
# Elasticsearch配置
es_host = '127.0.0.1'
es_port = '9200'
es_index = 'order1'
# 新版本es不需求type字段
# es_type = '_doc'
def fetch_mysql_table_fields(mysql_config):
connection = mysql.connector.connect(**mysql_config)
cursor = connection.cursor()
# 获取MySQL表字段信息,指定需要转换得表名
cursor.execute(f"DESCRIBE {"`order`"}")
fields = cursor.fetchall()
cursor.close()
connection.close()
return fields
def generate_es_mapping(fields):
mapping = {
"mappings": {
"properties": {}
}
}
for field in fields:
field_name = field[0]
field_type = field[1]
# 根据MySQL字段类型设置Elasticsearch映射类型
es_field_type = "text" # 默认为文本类型
if "int" in field_type:
es_field_type = "integer"
elif "bigint" in field_type:
es_field_type = "long"
elif "tinyint" in field_type:
es_field_type = "short"
elif "float" in field_type:
es_field_type = "float"
elif "double" in field_type:
es_field_type = "double"
elif "decimal" in field_type:
es_field_type = "double"
elif "date" in field_type or "datetime" in field_type or "timestamp" in field_type or "time" in field_type:
es_field_type = "date"
elif "json" in field_type:
es_field_type = "object"
# 这里可以根据需要添加更多类型的映射
mapping["mappings"]["properties"][field_name] = {
"type": es_field_type
}
return mapping
def print_es_mapping(mapping):
print(json.dumps(mapping, indent=2))
def create_es_index_mapping(es_host, es_port, es_index, mapping):
url = f"http://{es_host}:{es_port}/{es_index}"
headers = {'Content-Type': 'application/json'}
payload = json.dumps(mapping)
response = requests.put(url, headers=headers, data=payload)
if response.status_code == 200:
print(f"Elasticsearch index mapping created for index '{es_index}'")
else:
print(f"Failed to create Elasticsearch index mapping. Status code: {response.status_code}")
print(response.text)
if __name__ == "__main__":
# 获取MySQL表字段信息
table_fields = fetch_mysql_table_fields(mysql_config)
# 生成Elasticsearch Mapping
es_mapping = generate_es_mapping(table_fields)
# 打印Elasticsearch Mapping到控制台
print_es_mapping(es_mapping)
# 创建Elasticsearch Index Mapping
create_es_index_mapping(es_host, es_port, es_index, es_mapping)