【ElasticSearch】ES与MySQL数据同步方案及Java实现

news2025/1/23 0:53:31

文章目录

  • 一、同步实现思路
    • 1、方案一:同步调用
    • 2、方案二:异步通知
    • 3、方案三:监听binlog
  • 二、实现ES与MySQL数据同步
    • 1、导入hotel-admin工程
    • 2、项目分析
    • 3、SpringAMQP整合
    • 4、声明队列和交换机
    • 5、发送消息MQ
    • 6、监听MQ消息
    • 7、测试同步功能

一、同步实现思路

elasticsearch中的酒店数据来自于mysql数据库,当mysql中的数据发生改变时,es中的数据也要跟着改变,即es与mysql之间的数据同步。
在这里插入图片描述

1、方案一:同步调用

操作mysql的微服务hotel-admin不能直接更新es的索引库,那就由操作es索引库的微服务hotel-demo来暴露一个更新索引库的接口给hotel-admin调用

在这里插入图片描述
同步调用方式下,业务耦合太多。

2、方案二:异步通知

引入消息队列,hotel-admin将数据写入mysql,并且自己再往MQ发条消息,说"数据更新了",任务就算完成,至于后面的hotel-demo什么时候更新ES,花费了多久,那是hotel-demo自己的事情。
在这里插入图片描述

3、方案三:监听binlog

使用canal中间件去监听mysql的binlog,当binlog发生改变,就通知hotel-demo,和上面不同的时,更加解放了hotel-admin这个上游服务,它往mysql写完数据就算任务完成,不用发消息,也不用调用其他服务,达到了完全解耦合
在这里插入图片描述
其实mysql的binlog发生改变,搭配cancel,就像方案二的hotel-admin服务发了一条消息到MQ。


三种实现方式的对比:
在这里插入图片描述

二、实现ES与MySQL数据同步

1、导入hotel-admin工程

启动服务,访问localhost:{spring.service.port}

在这里插入图片描述
在hotel-admin服务中,模拟MySQL数据的增删改查。

2、项目分析

mysql的增删改动作需要同步到es中,但对es来说,增和改可以归为一类,就看文档id存在不存在了。接下来使用方案二,及MQ实现数据同步,思路如下;

  • 声明exchange、queue、RoutingKey
  • 在hotel-admin中的增、删、改业务中完成消息发送
  • 在hotel-demo中完成消息监听,并更新elasticsearch中数据

模型如下:

在这里插入图片描述

3、SpringAMQP整合

  • 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>    		
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


  • 临时启动个rabbitmq
docker run -d -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# 访问host:15672,用户和密码为默认的guest
  • 在hotel-admin中的application.yml,添加mq连接信息
spring:
  rabbitmq:
      host: 192.168.150.101 # 主机名
      port: 5672 # 端口
      virtual-host: / # 虚拟主机 
      username: guestt # 用户名
      password: guest # 密码


  • 最后,记得给消费方也引入AMQP依赖,并添加上mq的连接信息

4、声明队列和交换机

  • 在常量目录下定义队列和交换机的名字
package cn.llg.hotel.constants;

public class HotelMqConstants {
	//交换机名称
    public static final String EXCHANGE_NAME = "hotel.topic";
    //新增和修改队列
    public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";
    //删除队列
    public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";
    //RoutingKey
    public static final String INSERT_KEY = "hotel.insert";
    public static final String DELETE_KEY = "hotel.delete";
}

  • 接下来声明队列和交换机,可以基于注解,也可以基于Bean,后者复杂些,这里演示后者
package cn.llg.hotel.config;

import cn.llg.hotel.constants.HotelMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @date 2023/7/12
 */
@Configuration
public class MqConfig {

    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);
    }

    @Bean
    public Queue insertQueue(){
        return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);
    }

    @Bean
    public Queue deleteQueue(){
        return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);
    }

    /**
     * 绑定队列和交换机关系
     */
    @Bean
    public Binding insertQueueBinding(){
        return BindingBuilder
                .bind(insertQueue())
                .to(topicExchange())
                .with(HotelMqConstants.INSERT_KEY);
    }

    @Bean
    public Binding deleteQueueBinding(){
        return BindingBuilder
                .bind(deleteQueue())
                .to(topicExchange())
                .with(HotelMqConstants.DELETE_KEY);
    }
}


5、发送消息MQ

注入RabbitTemplate的对象之后,这里就直接在controller中发送MQ消息了,convertAndSend()方法的三个参数:

  • 交换机名称
  • routingKey
  • 消息内容,这里消息体尽量小些,别把一整个对象发过去
package cn.llg.hotel.web;

import cn.llg.hotel.constants.HotelMqConstants;
import cn.llg.hotel.pojo.Hotel;
import cn.llg.hotel.pojo.PageResult;
import cn.llg.hotel.service.IHotelService;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

import java.security.InvalidParameterException;

@RestController
@RequestMapping("hotel")
public class HotelController {

    @Autowired
    private IHotelService hotelService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping
    public void saveHotel(@RequestBody Hotel hotel){
        // 新增酒店
        hotelService.save(hotel);
        // 发送MQ消息,MQ是基于内存的,你把整个酒店对象hotel发过去很容易占满队列,发个主键ID就好,消息体尽量小些
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    @PutMapping()
    public void updateById(@RequestBody Hotel hotel){
        if (hotel.getId() == null) {
            throw new InvalidParameterException("id不能为空");
        }
        hotelService.updateById(hotel);

        // 发送MQ消息
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());
    }

    @DeleteMapping("/{id}")
    public void deleteById(@PathVariable("id") Long id) {
        hotelService.removeById(id);

        // 发送MQ消息
        rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);
    }
    //其他接口
    @GetMapping("/list")
    public PageResult hotelList(
            @RequestParam(value = "page", defaultValue = "1") Integer page,
            @RequestParam(value = "size", defaultValue = "1") Integer size
    ){
        Page<Hotel> result = hotelService.page(new Page<>(page, size));

        return new PageResult(result.getTotal(), result.getRecords());
    }
    
    @GetMapping("/{id}")
    public Hotel queryById(@PathVariable("id") Long id){
        return hotelService.getById(id);
    }
}



6、监听MQ消息

hotel-demo整合完SpringAMQP后,在hotel-demo中监听消息。

  • 新建类HotelListener类,并加@Component注解以Bean的形式管理
package cn.llg.hotel.mq;

import cn.llg.hotel.constants.HotelMqConstants;
import cn.llg.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @date 2023/7/13
 */
@Component
public class HotelListener {

    @Resource
    IHotelService hotelService;

    /**
     * 监听酒店新增或者修改的业务
     * id接受一个Long,因为发送过来的是一个Long id
     * @param id 酒店ID
     */
    @RabbitListener(queues = HotelMqConstants.INSERT_QUEUE_NAME)
    public void listenHotelInsertAndUpdate(Long id){
        hotelService.insertDocById(id);
    }

    /**
     * 监听酒店删除业务
     */
    @RabbitListener(queues = HotelMqConstants.DELETE_QUEUE_NAME)
    public void listenHotelDelete(Long id){
        hotelService.deleteDocById(id);
    }
}
  • 拿到MQ中的酒店id后,使用JavaHighLevelClient对象来更新ES数据
package cn.llg.hotel.service;

import cn.llg.hotel.domain.dto.RequestParams;
import cn.llg.hotel.domain.pojo.Hotel;
import cn.llg.hotel.domain.vo.PageResult;
import com.baomidou.mybatisplus.extension.service.IService;


public interface IHotelService extends IService<Hotel> {

    void insertDocById(Long id);

    void deleteDocById(Long id);
}
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {

    @Resource
    RestHighLevelClient client;

    @Override
    public void insertDocById(Long id) {
        try {
            //0.根据ID查数据,并转为文档类型
            Hotel hotel = getById(id);
            HotelDoc hotelDoc = new HotelDoc(hotel);
            //1.准备request
            IndexRequest request = new IndexRequest("hotel").id(hotelDoc.getId().toString());
            //2.准备DSL
            request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);
            //3.发送请求
            client.index(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public void deleteDocById(Long id) {

        try {
            //1.准备request
            DeleteRequest request = new DeleteRequest("hotel",id.toString());
            //2.发送请求
            client.delete(request,RequestOptions.DEFAULT);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

    }
}

最后补充下上面的Hotel和HotelDoc之间的转换关系:

@Data
@TableName("tb_hotel")
public class Hotel {
    @TableId(type = IdType.INPUT)
    private Long id;
    private String name;
    private String address;
    private Integer price;
    private Integer score;
    private String brand;
    private String city;
    private String starName;
    private String business;
    private String longitude;
    private String latitude;
    private String pic;
}

@Data
@NoArgsConstructor
public class HotelDoc {
    private Long id;
    private String name;
    private String address;
    private Integer price;
    private Integer score;
    private String brand;
    private String city;
    private String starName;
    private String business;
    private String location;
    private String pic;
    //距离
    private Object distance;
    //是否充广告
    private Boolean isAD;
    //ES中的completion,后面存数组,这里可以对应成List
    private List<String> suggestion;

    public HotelDoc(Hotel hotel) {
        this.id = hotel.getId();
        this.name = hotel.getName();
        this.address = hotel.getAddress();
        this.price = hotel.getPrice();
        this.score = hotel.getScore();
        this.brand = hotel.getBrand();
        this.city = hotel.getCity();
        this.starName = hotel.getStarName();
        this.business = hotel.getBusiness();
        this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
        this.pic = hotel.getPic();
        if(this.business.contains("/")){
            //此时business有多个值,需要分开后放入suggestion
            String[] arr = this.business.split("/");
            //添加元素
            this.suggestion = new ArrayList<>();
            Collections.addAll(this.suggestion,arr);
            this.suggestion.add(this.brand);
        }else{
            this.suggestion = Arrays.asList(this.brand,this.business);
        }

    }
}

7、测试同步功能

重启两个服务,查看MQ:

在这里插入图片描述

点击队列查看详情,可以看到绑定交换机成功:

在这里插入图片描述
接下来去酒店管理页面修改一条酒店信息(我直接调接口了,不去页面改)

在这里插入图片描述
在酒店搜索页面搜一下:

在这里插入图片描述

可以看到ES数据跟随MySQL更新成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/747557.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一百二十九、Kettle——从MySQL增量导入到GreenPlum

一、目标 用Kettle从MySQL增量导入数据到GreePlum 二、前提准备 &#xff08;一&#xff09;kettle已连上MySQL &#xff08;二&#xff09;kettle已连上GreenPlum 三、实施步骤 &#xff08;一&#xff09;打开kettle&#xff0c;新建转换任务。拖拽2个表输入、替换NULL…

【Doris的安装和使用】

1.准备工作 1.1集群规划 1.2 操作系统安装要求 设置系统最大打开文件句柄数(注意这里的*不要去掉) sudo vim /etc/security/limits.conf * soft nofile 65536 * hard nproc 65536设置最大虚拟块的大小 sudo vim /etc/sysctl.conf vm.max_map_count2000000将修改的配置分发给…

麒麟-飞腾Kylin-V4桌面arm64系统静态编译QT

1.系统具体版本&#xff1a; 2. 因为此版本的源很老了&#xff0c;需要修改版本的源&#xff0c;才能正常更新各种软件&#xff0c;否则&#xff0c;你连麒麟商店都打不开。 sudo vi /etc/apt/sources.list 选择你系统对应版本的源地址&#xff1a; #4.0.2桌面版本: deb ht…

k8s 中的卷

前面的文章我们分享了 pod &#xff0c;RC&#xff0c;RS&#xff0c;DaemonSet&#xff0c;CJ&#xff0c;Service 等各种资源 今天我们来分享一波如何将磁盘挂载到容器中&#xff0c;在 docker 里面这种技术叫做 数据卷&#xff0c;感兴趣的小伙伴可以查看一下文章&#xff…

JDK环境配置、且运行一个简单程序

目录 JDK环境配置命令行运行java文件 JDK环境配置 下载好jdk,打开jdk下的bin&#xff0c;复制路径。 右击我的电脑&#xff0c;点击属性&#xff0c;找到高级系统设置&#xff0c;点击环境变量。 双击path&#xff0c;新建把路径粘贴进去即可。 打开cmd输入javac -version…

不会编程也可以制作ERP、CRM系统?

在以往的编程开发中&#xff0c;如果想要个人开发一款简单的ERP等流程系统&#xff0c;肯定是需要有编程代码的功底的&#xff0c;再学习编程语言和框架&#xff0c;又得花费大量的时间&#xff0c;而且不能完全确保可以做出来&#xff0c;毕竟编程开发有一定的门槛&#xff0c…

3d Max中的Arnold渲染为黑色,这样处理!

使用Arnold渲染视图(ARV)时&#xff0c;图像保持黑色。 快照功能和常规3ds Max渲染设置可按预期生成图像。 解决方案&#xff1a; 解决方案可能需要执行下面的一项或多项操作&#xff1a; 添加光源 检查场景文件是否包含光源。如果场景中没有光源&#xff0c;渲染结果为黑色…

通信算法之178: 通信信道模型及循环/线性卷积2

上一篇见 通信算法之159: 通信信道模型和循环/线性卷积 一. 衰减 二.多径效应--时延扩展--相干带宽 三. 时变性--多普勒扩展--相干时间 四. 频率选择性衰落&#xff0c;时间选择性衰落 小尺度衰落&#xff0c;小 五.瑞丽和莱斯信道 六循环卷积与线性卷积 线性卷积定义及计算…

C# PaddleInference OCR 表格识别

效果 项目 VS2022.net4.8OpenCvSharp4Sdcb.PaddleInferenceSdcb.PaddleOCR 测试图片 代码 using OpenCvSharp.Extensions; using OpenCvSharp; using Sdcb.PaddleInference; using Sdcb.PaddleOCR; using Sdcb.PaddleOCR.Models; using Sdcb.PaddleOCR.Models.Details; using…

netwox伪造ARP响应【网络工程】(保姆级图文)

目录 伪造ARP响应1) 在模拟之前&#xff0c;验证局域网中是否存在主机 192.168.43.97。在主机 B 上使用 arping 命令 ping 该主机。执行命令如下&#xff1a;2) 在主机 A 上伪造 ARP 响应&#xff0c;创建虚拟主机 192.168.43.97&#xff0c;设置其 MAC 地址为 A1&#xff1a;B…

react 初学(1)

1.安装环境 需要Node.js 自行下载安装 然后全局安装create-react-app npm install -g create-react-app 如果出现报错请参考 create-react-app -V 报错无法将“create-react-app”项识别为 cmdlet、函数、脚本文件或可运行程序的名称_Java-请多指教的博客-CSDN博客 2.创建…

尚医通04:Axios Node Npm bable webpack+前端工程改造

目录 本日学习 内容介绍 Axios Node NPM包管理器 本日学习 1. 了解Axios :他是异步请求用的&#xff0c;前后端。 用于在浏览器和 Node.js 中发送 HTTP 请求。它支持从服务器获取数据、上传数据以及执行其他与 HTTP 相关的操作。 2.Node:它允许你在服务器端运行 JavaScrip…

stm32(adc数模转换)

ADC介绍 ADC是什么&#xff1f; 全称&#xff1a;Analog-to-Digital Converter&#xff0c;指模拟/数字转换器 ADC的性能指标 量程&#xff1a;能测量的电压范围分辨率&#xff1a;ADC能辨别的最小模拟量&#xff0c;通常以输出二进制数的位数表示&#xff0c;比如&#xff1…

Leetcode每日一题:931. 下降路径最小和(2023.7.13 C++)

目录 931. 下降路径最小和 题目描述&#xff1a; 实现代码与解析&#xff1a; 动态规划 原理思路&#xff1a; 931. 下降路径最小和 题目描述&#xff1a; 给你一个 n x n 的 方形 整数数组 matrix &#xff0c;请你找出并返回通过 matrix 的下降路径 的 最小和 。 下降…

mongdb实战

概述 前言 这几天轮播图想用mongdb开发&#xff0c;然后就有了一下代码 效果图 源码如下 package jkw.pojo;import lombok.Data; import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.index.Indexed; import org.springframewo…

【JAVA】穷词——基于嵌入式的数据库derby+BeautyEye的单词字典应用

文章目录 1. 题目2. 项目结构层次3. 环境以及技术栈说明4. 项目报告4.1软件功能描述4.2项目类图4.2.1 src层级下的类图4.2.2 data层级下的类图4.2.3 gui层级下的类图4.2.4 resource层级下的类图4.2.5 view层级下的类图4.2.6 DelWord的类图4.2.7 CustomMessageDialog的类图4.2.8…

Antd List组件增加gutter属性后出现横向滚动,如何解决

第一次使用ant design的List列表组件&#xff0c;设置gutter间隔属性后&#xff0c;页面出现了横向滚动条&#xff0c;查阅文档发现是由于加间隔后导致容器宽度被撑开&#xff0c;ant design官方默认给外层容器加了margin-left和margin-right 解决方法是在外层容器预留一定的pa…

stringstream的使用

写到290题使用stringstream简化步骤&#xff0c;学习一下使用 目录 小问题&#xff1f; 成员函数clear() 那么问题来了&#xff1f;clear在啥时候用呢&#xff1f; 数据类型转换 <sstream>库定义了三种类&#xff1a;istringstream、ostringstream、stringstream &l…

RT1176 LCDIFv2 RGB565引脚不连续

RT1052和RT1176的LCDIF&#xff0c;使用RGB565格式时PIN脚分配是连续的:LCDIF_DATA00~LCDIF_DATA15。 但RT1176的LCDIFv2并不是这样&#xff0c;使用RGB565格式时PIN脚分配不是连续的&#xff0c;而是移位填充8位*324位分配的。 RT1176 LCDIFv2 RGB565LCDIF_DATA00LCDIF_DATA0…

CVE漏洞复现-CVE-2021-36934 Windows 提权漏洞

CVE-2021-36934 Windows 提权漏洞 漏洞描述 7月20日&#xff0c;微软确认了一个新的本地提权漏洞&#xff0c;安全研究成员将其称为HiveNightmare或者SeriousSAM&#xff0c;该漏洞允许低权限的用户访问Windows系统文件。成功利用此漏洞的攻击者可以使用SYSTEM特权运行任意代…