【案例分享】跨机房ES同步实战

news2025/1/23 1:10:44

背景

众所周知单个机房在出现不可抗拒的问题(如断电、断网等因素)时,会导致无法正常提供服务,会对业务造成潜在的损失。所以在协同办公领域,一种可以基于同城或异地多活机制的高可用设计,在保障数据一致性的同时,能够最大程度降低由于机房的仅单点可用所导致的潜在高可用问题,最大程度上保障业务的用户体验,降低单点问题对业务造成的潜在损失显得尤为重要。

同城双活,对于生产的高可用保障,重大的意义和价值是不可言喻的。表面上同城双活只是简单的部署了一套生产环境而已,但是在架构上,这个改变的影响是巨大的,无状态应用的高可用管理、请求流量的管理、版本发布的管理、网络架构的管理等,其提升的架构复杂度巨大。

结合真实的协同办公产品:京办(为北京市政府提供协同办公服务的综合性平台)生产环境面对的复杂的政务网络以及京办同城双活架构演进的案例,给大家介绍下京办持续改进、分阶段演进过程中的一些思考和实践经验的总结。本文仅针对ES集群在跨机房同步过程中的方案和经验进行介绍和总结。

架构

1.部署Logstash在金山云机房上,Logstash启动多个实例(按不同的类型分类,提高同步效率),并且和金山云机房的ES集群在相同的VPC

2.Logstash需要配置大网访问权限,保证Logstash和ES原集群和目标集群互通。

3.数据迁移可以全量迁移和增量迁移,首次迁移都是全量迁移后续的增加数据选择增量迁移。

4.增量迁移需要改造增加识别的增量数据的标识,具体方法后续进行介绍。





原理

Logstash工作原理





Logstash分为三个部分input 、filter、ouput:

1.input处理接收数据,数据可以来源ES,日志文件,kafka等通道.

2.filter对数据进行过滤,清洗。

3.ouput输出数据到目标设备,可以输出到ES,kafka,文件等。

增量同步原理

1. 对于T时刻的数据,先使用Logstash将T以前的所有数据迁移到有孚机房京东云ES,假设用时∆T

2. 对于T到T+∆T的增量数据,再次使用logstash将数据导入到有孚机房京东云的ES集群

3. 重复上述步骤2,直到∆T足够小,此时将业务切换到华为云,最后完成新增数据的迁移

适用范围:ES的数据中带有时间戳或者其他能够区分新旧数据的标签

流程





准备工作

1.创建ECS和安装JDK忽略,自行安装即可

2.下载对应版本的Logstash,尽量选择与Elasticsearch版本一致,或接近的版本安装即可

Download Logstash Free | Get Started Now | Elastic

1) 源码下载直接解压安装包,开箱即用

2)修改对内存使用,logstash默认的堆内存是1G,根据ECS集群选择合适的内存,可以加快集群数据的迁移效率。





3. 迁移索引

Logstash会帮助用户自动创建索引,但是自动创建的索引和用户本身的索引会有些许差异,导致最终数据的搜索格式不一致,一般索引需要手动创建,保证索引的数据完全一致。

以下提供创建索引的python脚本,用户可以使用该脚本创建需要的索引。

create_mapping.py文件是同步索引的python脚本,config.yaml是集群地址配置文件。

注:使用该脚本需要安装相关依赖

yum install -y PyYAML
yum install -y python-requests

拷贝以下代码保存为 create_mapping.py:

import yaml
import requests
import json
import getopt
import sys

def help():
    print
    """
    usage:
    -h/--help print this help.
    -c/--config config file path, default is config.yaml
    
    example:  
    python create_mapping.py -c config.yaml 
    """
def process_mapping(index_mapping, dest_index):
    print(index_mapping)
    # remove unnecessary keys
    del index_mapping["settings"]["index"]["provided_name"]
    del index_mapping["settings"]["index"]["uuid"]
    del index_mapping["settings"]["index"]["creation_date"]
    del index_mapping["settings"]["index"]["version"]

    # check alias
    aliases = index_mapping["aliases"]
    for alias in list(aliases.keys()):
        if alias == dest_index:
            print(
                "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
            del index_mapping["aliases"][alias]
    if index_mapping["settings"]["index"].has_key("lifecycle"):
        lifecycle = index_mapping["settings"]["index"]["lifecycle"]
        opendistro = {"opendistro": {"index_state_management":
                                         {"policy_id": lifecycle["name"],
                                          "rollover_alias": lifecycle["rollover_alias"]}}}
        index_mapping["settings"].update(opendistro)
        # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
        del index_mapping["settings"]["index"]["lifecycle"]
    print(index_mapping)
    return index_mapping
def put_mapping_to_target(url, mapping, source_index, dest_auth=None):
    headers = {'Content-Type': 'application/json'}
    create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
    if create_resp.status_code != 200:
        print(
            "create index " + url + " failed with response: " + str(create_resp) + ", source index is " + source_index)
        print(create_resp.text)
        with open(source_index + ".json", "w") as f:
            json.dump(mapping, f)
def main():
    config_yaml = "config.yaml"
    opts, args = getopt.getopt(sys.argv[1:], '-h-c:', ['help', 'config='])
    for opt_name, opt_value in opts:
        if opt_name in ('-h', '--help'):
            help()
            exit()
        if opt_name in ('-c', '--config'):
            config_yaml = opt_value

    config_file = open(config_yaml)
    config = yaml.load(config_file)
    source = config["source"]
    source_user = config["source_user"]
    source_passwd = config["source_passwd"]
    source_auth = None
    if source_user != "":
        source_auth = (source_user, source_passwd)
    dest = config["destination"]
    dest_user = config["destination_user"]
    dest_passwd = config["destination_passwd"]
    dest_auth = None
    if dest_user != "":
        dest_auth = (dest_user, dest_passwd)
    print(source_auth)
    print(dest_auth)

    # only deal with mapping list
    if config["only_mapping"]:
        for source_index, dest_index in config["mapping"].iteritems():
            print("start to process source index" + source_index + ", target index: " + dest_index)
            source_url = source + "/" + source_index
            response = requests.get(source_url, auth=source_auth)
            if response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    response.status_code) + " response is " + response.text)
                continue
            mapping = response.json()
            index_mapping = process_mapping(mapping[source_index], dest_index)

            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
            print("process source index " + source_index + " to target index " + dest_index + " successed.")
    else:
        # get all indices
        response = requests.get(source + "/_alias", auth=source_auth)
        if response.status_code != 200:
            print("*** get all index failed. resp statusCode:" + str(
                response.status_code) + " response is " + response.text)
            exit()
        all_index = response.json()
        for index in list(all_index.keys()):
            if "." in index:
                continue
            print("start to process source index" + index)
            source_url = source + "/" + index
            index_response = requests.get(source_url, auth=source_auth)
            if index_response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    index_response.status_code) + " response is " + index_response.text)
                continue
            mapping = index_response.json()

            dest_index = index
            if index in config["mapping"].keys():
                dest_index = config["mapping"][index]
            index_mapping = process_mapping(mapping[index], dest_index)

            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
            print("process source index " + index + " to target index " + dest_index + " successed.")

if __name__ == '__main__':
    main()

配置文件保存为config.yaml:

# 源端ES集群地址,加上http://
source: http://ip:port
source_user: "username"
source_passwd: "password"
# 目的端ES集群地址,加上http://
destination: http://ip:port
destination_user: "username"
destination_passwd: "password"

# 是否只处理这个文件中mapping地址的索引
# 如果设置成true,则只会将下面的mapping中的索引获取到并在目的端创建
# 如果设置成false,则会取源端集群的所有索引,除去(.kibana)
# 并且将索引名称与下面的mapping匹配,如果匹配到使用mapping的value作为目的端的索引名称
# 如果匹配不到,则使用源端原始的索引名称
only_mapping: true

# 要迁移的索引,key为源端的索引名字,value为目的端的索引名字
mapping:
    source_index: dest_index

以上代码和配置文件准备完成,直接执行 python create_mapping.py 即可完成索引同步。

索引同步完成可以取目标集群的kibana上查看或者执行curl查看索引迁移情况:

GET _cat/indices?v



 



全量迁移

Logstash配置位于config目录下。

用户可以参考配置修改Logstash配置文件,为了保证迁移数据的准确性,一般建议建立多组Logstash,分批次迁移数据,每个Logstash迁移部分数据。

配置集群间迁移配置参考:







input{
    elasticsearch{
        # 源端地址
        hosts =>  ["ip1:port1","ip2:port2"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
        # 需要迁移的索引列表,以逗号分隔,支持通配符
        index => "a_*,b_*"
        # 以下三项保持默认即可,包含线程数和迁移数据大小和logstash jvm配置相关
        docinfo=>true
        slices => 10
        size => 2000
        scroll => "60m"
    }
}

filter {
  # 去掉一些logstash自己加的字段
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output{
    elasticsearch{
        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
 # 目的端索引名称,以下配置为和源端保持一致
        index => "%{[@metadata][_index]}"
        # 目的端索引type,以下配置为和源端保持一致
        document_type => "%{[@metadata][_type]}"
        # 目标端数据的_id,如果不需要保留原_id,可以删除以下这行,删除后性能会更好
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }

    # 调试信息,正式迁移去掉
    stdout { codec => rubydebug { metadata => true }}
}

增量迁移

预处理:

1. @timestamp 在elasticsearch2.0.0beta版本后弃用

_timestamp field | Elasticsearch Guide [2.4] | Elastic

2. 本次对于京办从金山云机房迁移到京东有孚机房,所涉及到的业务领域多,各个业务线中所代表新增记录的时间戳字段不统一,所涉及到的兼容工作量大,于是考虑通过elasticsearch中预处理功能pipeline进行预处理添加统一增量标记字段:gmt_created_at,以减少迁移工作的复杂度(各自业务线可自行评估是否需要此步骤)。

PUT _ingest/pipeline/gmt_created_at
{
  "description": "Adds gmt_created_at timestamp to documents",
  "processors": [
    {
      "set": {
        "field": "_source.gmt_created_at",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

3. 检查pipeline是否生效

GET _ingest/pipeline/*

4. 各个index设置对应settings增加pipeline为默认预处理

PUT index_xxxx/_settings
{
  "settings": {
    "index.default_pipeline": "gmt_created_at"
  }
}

5. 检查新增settings是否生效

GET index_xxxx/_settings



 



增量迁移脚本

schedule-migrate.conf

index:可以使用通配符的方式

query: 增量同步的DSL,统一gmt_create_at为增量同步的特殊标记

schedule: 每分钟同步一把,"* * * * *"

input {
elasticsearch {
        hosts =>  ["ip:port"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
        index => "index_*"
        query => '{"query":{"range":{"gmt_create_at":{"gte":"now-1m","lte":"now/m"}}}}'
        size => 5000
        scroll => "5m"
        docinfo => true
        schedule => "* * * * *"
      }
}
filter {
     mutate {
      remove_field => ["source", "@version"]
   }
}
output {
    elasticsearch {
        # 目的端es地址
        hosts => ["http://ip:port"]
        # 安全集群配置登录用户名密码
        user => "username"
        password => "password"
        index => "%{[@metadata][_index]}"
        document_type => "%{[@metadata][_type]}"
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }

# 调试信息,正式迁移去掉
stdout { codec => rubydebug { metadata => true }}
}

问题:

mapping中存在join父子类型的字段,直接迁移报400异常





[2022-09-20T20:02:16,404][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, 
:action=>["index", {:_id=>"xxx", :_index=>"xxx", :_type=>"joywork_t_work", :routing=>nil}, #<LogStash::Event:0x3b3df773>], 
:response=>{"index"=>{"_index"=>"xxx", "_type"=>"xxx", "_id"=>"xxx", "status"=>400, 
"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse", 
"caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"[routing] is missing for join field [task_user]"}}}}}

解决方法:

An routing missing exception is obtained when reindex sets the routing value - Elasticsearch - Discuss the Elastic Stack Reindex API parent set to null, removes routing as well · Issue #26183 · elastic/elasticsearch · GitHub

结合业务特征,通过在filter中加入小量的ruby代码,将_routing的值取出来,放回logstah event中,由此问题得以解决。

示例:



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/71905.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微软CTO谈AI:逃不掉马拉松就准备好跑鞋!30秒自动化妆机;ChatGPT刷爆票圈;剪纸风格的AI绘画 | ShowMeAI资讯日报

&#x1f440;日报合辑 | &#x1f3a1;AI应用与工具大全 | &#x1f514;公众号资料下载 | &#x1f369;韩信子 &#x1f4e2; 对话微软 CTO 凯文斯科特 (Kevin Scott)&#xff1a;人工智能将去向何方&#xff1f; https://blogs.microsoft.com/ai/a-conversation-with-kevi…

【卡塔尔世界杯】空调制冷,全是科技与狠活

半自动化越位技术比赛用球AI Rihla球场智能空调Feelix Palm辅助技术可持续利用的体育场便利的数字设施和App 西班牙队和英格兰队穿外套出场&#xff0c;卡塔尔的空调功率到底有多大&#xff1f; 还是很大的&#xff0c;不管是室外还是室内&#xff0c;到处都安装了空调&#…

Verilog系统函数

Verilog系统函数前言一、$width&#xff08;一&#xff09;简介&#xff08;二&#xff09;$width 参数&#xff08;三&#xff09;例子二、Specify参数三、$display&#xff08;一&#xff09;简介&#xff08;二&#xff09;格式说明&#xff08;三&#xff09;例子1.例12.例…

Bert论文解读及相关代码实践

Bert&#xff1a;Bidirectional Encoder Representations from Transformers Transformer中双向Encoder表达学习。BERT被设计为通过在所有层中对左右上下文进行联合调节&#xff0c;从未标记文本中预训练深度双向表示。预训练的BERT模型可以通过仅一个额外的输出层进行微调&am…

AVS-试听分割-论文阅读

题目: Audio-Visual Segmentation 论文地址:https://arxiv.org/abs/2207.05042 GitHub地址:https://github.com/OpenNLPLab/AVSBench 项目主页:https://opennlplab.github.io/AVSBench/ 相关博客https://arxiv.org/abs/2203.03821 摘要 We propose to explore a new pro…

(附源码)ssm汽车租赁 毕业设计 271621

基于ssm的汽车租赁平台的设计与实现 摘 要 随着社会经济的快速发展,我国机动车保有量大幅增加,城市交通问题日益严重。为缓解用户停车难问题,本文设计并实现了汽车租赁平台.该系统通过错峰停车达到车位利用率最大化.基于现状分析,本文结合实际停车问题,从系统应用流程,系统软硬…

微电网和直流电网中最优潮流(OPF)的凸优化(Matlab代码实现)

&#x1f4cb;&#x1f4cb;&#x1f4cb;本文目录如下&#xff1a;⛳️⛳️⛳️ 目录 1 概述 2 最优潮流 3 电力系统强大的CVX 4 直流电网中最优潮流&#xff08;OPF&#xff09;的凸优化 4.1 Matlab代码 4.2 运行结果 5 微电网中最优潮流&#xff08;OPF&#xff09;的凸优…

go gin web综合教程,包括 mysql redis log 路由

前言 在学习go许久&#xff0c;没看到网上有比较综合的gin web教程&#xff0c;很多都是最基础的教程&#xff0c;完全就是启动个服务返回参数&#xff0c;没有过多的结合实际开发。下面我结合一下我的经验&#xff0c;来写一篇深入的综合教程&#xff0c;包括数据库Mysql、re…

JMeter启动时常见的错误

很多小伙伴在学工具这一块时&#xff0c;安装也是很吃力的一个问题&#xff0c;之前记得有说过怎么安装jmeter这个工具。那么你要启动jmeter的时候&#xff0c;一些粉丝就会碰到如下几个问题。 1.解压下载好的jmeter安装&#xff0c;Windows 平台&#xff0c;双击 jmeter/bin …

基于python实现的SGM半全局立体匹配算法

文章目录前言一、SGM是什么&#xff1f;1.代价计算2.代价聚合3.视察计算4.视察优化二、基于python实现SGM算法&#xff1f;总结前言 开始正是入门立体匹配算法啦&#xff0c;会不断更新立体匹配的算法和代码。   水平有限&#xff0c;旨在先了解和读懂别人的代码的实现方式&a…

3D立体匹配入门 - 视差计算

经典假设 1、左右视图成功匹配的窗口&#xff0c;具有相同的像素 这个是最经典的假设&#xff0c;几乎所有视差图计算都用上了他&#xff0c;通过匹配左右窗口像素&#xff0c;得到最佳匹配对应的x轴坐标差&#xff0c;就是视差 2、像素P的视差只与其领域有关 这个是基于马尔…

外汇天眼:即使与世界第一的差价合约提供商交易也会被骗!

你能想象&#xff0c;当你与世界第一的差价合约提供商进行交易时&#xff0c;也可能会被骗吗&#xff1f; 在投资理财多元化的今天&#xff0c;外汇投资理财也备受大家的关注&#xff0c;而与此同时&#xff0c;骗子的诈骗渠道也与时俱进&#xff0c;各类外汇投资骗局也层出不穷…

VMware Workstation 17.0 Pro SLIC Unlocker for Linux

VMware_Dell_2.6_BIOS-EFI64_Mod&#xff1b;macOS Unlocker&#xff0c;支持 macOS Ventura 请访问原文链接&#xff1a;VMware Workstation 17.0 Pro SLIC & Unlocker for Windows & Linux&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&a…

多模式直方图的视网膜图像增强

论文题目&#xff1a;Retinal Image Enhancement in Multi-Mode Histogram 1 摘要 视网膜图像的评估被广泛用于帮助医生诊断许多疾病&#xff0c;如糖尿病或高血压。从采集过程来看&#xff0c;视网膜图像往往具有较低的灰度对比度和动态范围。本文提出了一种基于直方图分析的…

MySQL回表

1.索引结构 1.1.B-Tree(B树)和BTree(B树) 前面是B-Tree,后面是BTree,两者的区别在于: B-Tree中,所有的节点都会带有指向具体记录的指针;BTree中只有叶子节点才会带有指向具体记录的指针;B-Tree中,不同的叶子之间没有连在一起;BTree中所有的叶子节点通过指针连接在一起;B-Tree中…

java版商城之 Spring Cloud+SpringBoot+mybatis+uniapp b2b2c o2o 多商家入驻商城 直播带货商城 电子商务

一个好的SpringCloudSpringBoot b2b2c 电子商务平台涉及哪些技术、运营方案&#xff1f;以下是我结合公司的产品做的总结&#xff0c;希望可以帮助到大家&#xff01; 搜索体验小程序&#xff1a;海哇 1. 涉及平台 平台管理、商家端&#xff08;PC端、手机端&#xff09;、买…

浴室预约小程序毕业设计,洗澡预约澡堂预约系统设计与实现,微信小程序毕业设计论文怎么写毕设源码开题报告需求分析怎么做

项目背景和意义 目的&#xff1a;本课题主要目标是设计并能够实现一个基于微信小程序浴室预约系统&#xff0c;前台用户使用小程序&#xff0c;后台管理使用JavaMysql开发&#xff0c;后台使用了springboot框架&#xff1b;通过后台添加设定浴室类型、录入浴室和管理浴室、管理…

CTPN+CRNN算法端到端实现文字识别的实战开发

本文分享自华为云社区《CTPNCRNN 算法端到端实现文字识别》&#xff0c;作者&#xff1a;HWCloudAI。 OCR介绍 光学字符识别&#xff08;英语&#xff1a;Optical Character Recognition&#xff0c;OCR&#xff09;是指对文本资料的图像文件进行分析识别处理&#xff0c;获取…

Java规则引擎Drools急速入门

文章目录1.Drools规则引擎简介2.Drools API开发步骤3.SpringBoot整合Drools案例4.Drools基础语法5.Drools条件语法部分6.Drools结果操作部分7.Drools内置属性部分8.Drools高级语法部分1.Drools规则引擎简介 &#xff08;1&#xff09;什么是规则引擎 ​ 全称为业务规则管理系…

类与对象(上篇)

类与对象面向过程和面向对象类的引入类的定义类的访问限定符及封装访问限定符封装类的作用域类的实例化类对象类对象的存储方式类成员函数的this指针this指针的引出this指针的特性面向过程和面向对象 C语言是面向过程&#xff0c;注重的是过程&#xff0c;先分析求解问题的步骤…