easy-es使用以及Es和MySQL同步

news2025/1/3 9:57:21

Easy-Es使用

介绍

官方地址Easy-Es,它主要就是简化了ES相关的API, 使用起来像MP一样舒服

SpringBoot接入Easy-Es

相关依赖

已进入Es和Easy-Es依赖

<properties>
    <java.version>11</java.version>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <spring-boot.version>2.6.13</spring-boot.version>
    <es.vsersion>7.12.0</es.vsersion>
    <easy_es.vsersion>2.0.0</easy_es.vsersion>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <!-- es依赖 -->
    <!-- 排除springboot中内置的es依赖,以防和easy-es中的依赖冲突-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-high-level-client</artifactId>
        <version>${es.vsersion}</version>
    </dependency>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${es.vsersion}</version>
    </dependency>

    <!-- easy-es -->
    <dependency>
        <groupId>org.dromara.easy-es</groupId>
        <artifactId>easy-es-boot-starter</artifactId>
        <version>${easy_es.vsersion}</version>
    </dependency>

    <!-- hutool -->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.32</version>
    </dependency>
</dependencies>

SQL初始化

-- 创建whitebrocade数据库
DROP DATABASE IF EXISTS whitebrocade;
CREATE DATABASE whitebrocade;
USER whitebrocade;
-- 创建student表
CREATE TABLE `student` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `description` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4

-- 插入数据
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `description`) VALUES (1, '小牛马', '我是小牛马');
INSERT INTO `whitebrocade`.`student`(`id`, `name`, `description`) VALUES (2, '中牛马', '我是中牛马');

application.yaml文件

你只需要更改address更改成你自己的ES地址和端口即可, 如果有设置账号密码的, 那么可以见username和password注释打开,进行填写

# 应用服务 WEB 访问端口
server:
  port: 9999

easy-es:
  # 动态数据源配置
  dynamic:
    datasource:
      # 默认数据源名称
      master:
        #填你的es连接地址
        address: localhost:9200
  enable: true # 默认为true,若为false时,则认为不启用本框架
  # 如果是https, 那就改成https
  schema: http
  # address: localhost:9200
  # username: 有设置才填写,非必须
  # password: 有设置才填写,非必须
  # 不打印banner
  banner: false
  keep-alive-millis: 30000 # 心跳策略时间 单位:ms
  connect-timeout: 5000 # 连接超时时间 单位:ms
  socket-timeout: 600000 # 通信超时时间 单位:ms
  connection-request-timeout: 5000 # 连接请求超时时间 单位:ms
  max-conn-total: 100 # 最大连接数 单位:个
  max-conn-per-route: 100 # 最大连接路由数 单位:个
  global-config:
    # 是否开启小黑子模式,默认关闭, 开启后日志将更有趣,提升编码乐趣,仅供娱乐,切勿用于其它任何用途
    i-kun-mode: true
    # 开启控制台打印通过本框架生成的DSL语句,默认为开启,测试稳定后的生产环境建议关闭,以提升少量性能
    print-dsl: true
    # 当前项目是否分布式项目,默认为true,在非手动托管索引模式下,若为分布式项目则会获取分布式锁,非分布式项目只需synchronized锁.
    distributed: false
    # 重建索引超时时间 单位小时,默认72H 可根据ES中存储的数据量调整
    reindexTimeOutHours: 72
    # 异步处理索引是否阻塞主线程 默认阻塞 数据量过大时调整为非阻塞异步进行 项目启动更快
    async-process-index-blocking: true
    # 分布式环境下,平滑模式,当前客户端激活最新索引最大重试次数,若数据量过大,重建索引数据迁移时间超过4320/60=72H,可调大此参数值,此参数值决定最大重试次数,超出此次数后仍未成功,则终止重试并记录异常日志
    active-release-index-max-retry: 4320
    # 分布式环境下,平滑模式,当前客户端激活最新索引最大重试次数 分布式环境下,平滑模式,当前客户端激活最新索引重试时间间隔 若您期望最终一致性的时效性更高,可调小此值,但会牺牲一些性能
    active-release-index-fixed-delay: 60
    db-config:
      # 是否开启下划线转驼峰 默认为false
      map-underscore-to-camel-case: true
      # 索引前缀,可用于区分环境  默认为空 用法和MP的tablePrefix一样的作用和用法
      # index-prefix:
      # id生成策略 none由ES自动生成,是默认的配置,无需您额外配置 推荐
      id-type: none
      # 字段更新策略 默认为not_null
      field-strategy: not_empty
      # 默认开启,开启后查询所有匹配数据,若不开启,会导致无法获取数据总条数,其它功能不受影响,若查询数量突破1W条时,需要同步调整@IndexName注解中的maxResultWindow也大于1w,并重建索引后方可在后续查询中生效(不推荐,建议分页查询).
      enable-track-total-hits: true
      # 数据刷新策略,默认为不刷新,若对数据时效性要求比较高,可以调整为immediate,但性能损耗高,
      # 也可以调整为折中的wait_until, 等待请求提交数据后,等待数据完成刷新(1s),再结束请求 性能损耗适中
      refresh-policy: wait_until
      # 批量更新接口的阈值 默认值为1万,突破此值需要同步调整enable-track-total-hits=true,@IndexName.maxResultWindow > 1w,并重建索引
      batch-update-threshold: 10000
      # 是否智能为字段添加.keyword后缀 默认开启,开启后会根据当前字段的索引类型及当前查询类型自动推断本次查询是否需要拼接.keyword后缀
      # 是否自动拼接.keyword后缀是基于自动推断的,如果你当前实体类的字段类型是String,并且其字段类型未指定或指定为keyword_text双类型时,才会有自动拼接后缀,并且是否拼接取决于查询本身,如果是match查询时不会拼接后缀的,拼接会违背初衷
      smartAddKeywordSuffix: true

启动类

这里需要关注的是EsMapperScan注解, 发现是和MP的注解很像的, 如果项目中也引入的MP, 那么需要注意mapper的包下要进行区分,

mapper

– ee 这里存放easy-es的mapper

– mp 这里存放mp的mapper

如果不这么做会有问题的, 详细可以看避坑指南 | Easy-Es](https://www.easy-es.cn/pages/4c01d7/#项目中同时使用mybatis-plus和easy-es), 这里详细说明如果项目中同时引入Easy-Es和MP的如何应对

/**
 * @author whiteBrocade
 * @description: 启动类
 */
// 这里替换成你项目的ES所在的Mapper
@EsMapperScan("com.whitebrocade.easy_es.mapper.ee")
@SpringBootApplication
public class EasyEsApplication {

    public static void main(String[] args) {
        SpringApplication.run(EasyEsApplication.class, args);
    }

}

model模型

domain
ES相关domain
/**
 * @author whiteBrocade
 * @description: Es索引模型, 所有的Es索引都要继承这个基类
 */
@Data
public class BaseEsEntity {
    /**
     * es中的唯一id, 此时id值将由es自动生成
     */
    @IndexId(type= IdType.NONE)
    private String id;
}
/**
 * @author whiteBrocade
 * @description: 学生ES模型
 */
@Data
@EqualsAndHashCode(callSuper = true)
// @IndexName("studentesentity")
// 当您想直接把类名当作索引名,且并不需要对索引进行其它配置时,可省略此注解, 索引名规则如下,必须全部小写
// 如果类名为StudentEsEntity这种, 那么就是studentesentity
public class StudentEsEntity extends BaseEsEntity {

    /**
     * 对应mysql中主键Id
     */
    private Long mysqlId;

    /**
     * 学生姓名
     */
    @HighLight // 高亮注解
    @IndexField(analyzer = Analyzer.IK_SMART)
    private String name;

    /**
     * 描述
     */
    @HighLight // 高亮注解
    // 高亮只对text类型字段有效,高亮是对分词的高亮,keyword类型不会有高亮的,这是es的规则,非框架
    // 是否自动拼接.keyword后缀是基于自动推断的,如果你当前实体类的字段类型是String,并且其字段类型未指定或指定为keyword_text双类型时,才会有自动拼接后缀,并且是否拼接取决于查询本身,如果是match查询时不会拼接后缀的,拼接会违背初衷
    // 见easy-es的issues: https://gitee.com/dromara/easy-es/issues/I73IXA
    // https://gitee.com/dromara/easy-es/issues/I5J86T
    @IndexField(analyzer = Analyzer.IK_SMART)
    private String description;
}
mysql相关domain
/**
 * @author whiteBrocade
 * @version 1.0
 * @description 学生类
 */
@Data
public class Student {
    /**
     * id
     */
    private Long id;

    /**
     * 姓名
     */
    private String name;

    /**
     * 描述
     */
    private String description;
}

DTO
Es相关DTO
/**
 * @author whiteBrocade
 * @description: 学生Es DTO
 */
@Data
public class StudentEsDTO {
    /**
     * 姓名
     */
    private String name;

    /**
     * 描述
     */
    private String description;
}
Query
Es相关Query
/**
 * @author whiteBrocade
 * @description: 学生Es Query
 */
@Data
public class StudentEsQuery {
    /**
     * 学生ID, 这里用String类型代替Long类型, 防止精度丢失问题
     */
    private String id;

    /**
     * 姓名
     */
    private String name;

    /**
     * 描述
     */
    private String description;
}
VO
Es相关VO
/**
 * @author whiteBrocade
 * @description: 学生Es VO模型
 */
@Data
public class StudentEsVO {
    /**
     * 学生Id
     */
    private Long id;

    /**
     * 学生姓名
     */
    private String name;

    /**
     * 描述
     */
    private String description;
}

StudentEsMapper

这里和MP的很像, 不同之处就是多个一个Es开头

/**
 * @author whiteBrocade
 * @description: 学生ES EsMapper
 */
@Component
public interface StudentEsMapper extends BaseEsMapper<StudentEsEntity> {
}

Controller

/**
 * @author whiteBrocade
 * @description: TestUseEeController
 */
@Slf4j
@RestController
@RequestMapping("/es")
@RequiredArgsConstructor
public class TestUseEeController {
    private final StudentEsMapper studentEsMapper;

    /**
     * 创建索引(相当于mysql中的表)
     */
    @PostMapping("/createIndex")
    public Boolean createIndex() {
        Class<StudentEsEntity> studentClass = studentEsMapper.getEntityClass();
        // 类名小写作为索引名称
        String indexName = studentClass.getSimpleName().toLowerCase();
        Boolean existsIndex = studentEsMapper.existsIndex(indexName);
        Boolean createIndex = null;
        if (! existsIndex) {
            log.info("{}索引不存在, 准备创建索引", indexName);
            createIndex = studentEsMapper.createIndex();
            log.info("是否成功创建{}索引: {}", indexName, createIndex);
        } else {
            throw new RuntimeException(StrUtil.format("索引已经存在: {}", indexName));
        }
        return createIndex;
    }

    /**
     * 插入数据
     */
    @PostMapping("/insert")
    public Integer insert(@RequestBody StudentEsDTO dto) {
        // 2.初始化-> 新增数据
        StudentEsEntity studentEsEntity = new StudentEsEntity();
        BeanUtil.copyProperties(dto, studentEsEntity);
        // 雪花ID
        studentEsEntity.setMysqlId(IdUtil.getSnowflakeNextId());

        return studentEsMapper.insert(studentEsEntity);
    }

    /**
     * 搜索数据
     */
    @GetMapping("/search")
    public List<StudentEsVO> search(@RequestBody StudentEsQuery query) {
        // ES条件查询
        List<StudentEsEntity> esEntityList = EsWrappers.lambdaChainQuery(studentEsMapper)
                // 注意, 用的是mysqlId
                .eq(StrUtil.isNotBlank(query.getId()), StudentEsEntity::getMysqlId, Long.parseLong(query.getId()))
                .like(StrUtil.isNotBlank(query.getName()), StudentEsEntity::getName, query.getName())
                .like(StrUtil.isNotBlank(query.getDescription()), StudentEsEntity::getDescription, query.getDescription())
                // 根据score排序, score高的在前面
                .sortByScore()
                .list();

        List<StudentEsVO> esVOList = new ArrayList<>(esEntityList.size());
        // 为空直接返回
        if (CollUtil.isEmpty(esEntityList)) {
            return esVOList;
        }

        // 进行转换
        for (StudentEsEntity esEntity : esEntityList) {
            StudentEsVO esVO = new StudentEsVO();
            // 这里的ID跳过, 因为类型不兼容
            BeanUtil.copyProperties(esEntity, esVO, "id");
            esVO.setId(esEntity.getMysqlId());
            esVOList.add(esVO);
        }

        return esVOList;
    }
}

ES和MySQL同步

分为两种

  • Flink-CDC监听MySQL直接写入ES
  • Flink-CDC监听MySQL写入ActiveMQ, MQ写入到ES

Flink-CDC内容详细见博主另外一个篇文章SpringBoot集成Flink CDC实现binlog监听

直接写入

相关依赖

在上一个依赖上引入Flink-CDC

<properties>
    <flink.version>1.19.0</flink.version>
</properties>
<!-- Flink CDC依赖 start-->
<!-- Flink核心依赖, 提供了Flink的核心API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<!--  Flink流处理Java API依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink客户端工具依赖, 包含命令行界面和实用函数 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink连接器基础包, 包含连接器公共功能 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink Kafka连接器, 用于和Apache Kafka集成, 这里不需要集成, 所以注释掉, 代码可以使用其它的MQ代替 -->
<!--<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>3.2.0-1.19</version>
    </dependency>-->
<!-- Flink Table Planner, 用于Table API和SQL的执行计划生成 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink Table API桥接器, 连接DataStream API和Table API -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- Flink JSON格式化数据依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
</dependency>
<!-- 开启Web UI支持, 端口为8081, 默认为不开启-->
<!--<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-runtime-web</artifactId>
    <version>1.19.1</version>
    </dependency>-->

<!-- MySQL CDC依赖
    org.apache.flink的适用MySQL 8.0

    具体参照这篇博客 https://blog.csdn.net/kakaweb/article/details/129441408
-->
<dependency>
    <!--MySQL 8.0适用-->
    <!--<groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>3.1.0</version>-->

    <!-- MySQL 5.7适用 , 2.3.0, 3.0.1均可用-->
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>2.3.0</version>
    <!--            <version>3.0.1</version>-->
</dependency>

<!-- gson工具类 -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
    <version>2.11.0</version>
</dependency>

<!-- ognl表达式 -->
<dependency>
    <groupId>ognl</groupId>
    <artifactId>ognl</artifactId>
    <version>3.1.1</version>
</dependency>

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>2.0.31</version>
</dependency>
代码

代码参考博主另外一篇文章SpringBoot集成Flink CDC实现binlog监听

有两处地方需要修改

  1. MysqlEventListener
  2. StudentLogHandler
  3. application.yaml
MysqlEventListener

这里改动点:

  1. 将发序列化器从Flink CDC自带的反序列化器切换成自定义的反序列化器
  2. 将sink从customSink切换成dataChangeSink
/**
 * @author whiteBrocade
 * @version 1.0
 * @description MySQL变更监听
 */
@Component
@AllArgsConstructor
public class MysqlEventListener implements ApplicationRunner {

    /**
     * Flink CDC相关配置
     */
    private final FlinkCDCConfig flinkCDCConfig;

    /**
     * 自定义Sink
     * customSink: 通过ognl解析ddl语句类型
     * dataChangeSink: 通过struct解析ddl语句类型
     * 通常两个选择一个就行
     */
    private final CustomSink customSink;
    private final DataChangeSink dataChangeSink;

    /**
     * 自定义反序列化处理器
     */
    private final MySQLDeserialization mySQLDeserialization;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置整个Flink程序的默认并行度
        env.setParallelism(flinkCDCConfig.getParallelism());
        // 设置checkpoint 间隔
        env.enableCheckpointing(flinkCDCConfig.getEnableCheckpointing());
        // 设置任务关闭的时候保留最后一次 CK 数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // todo 下列的两个MySqlSource选择一个
        // 自定义的反序列化器
        MySqlSource<DataChangeInfo> mySqlSource = this.buildBaseMySqlSource(DataChangeInfo.class)
                .deserializer(mySQLDeserialization)
                .build();

        // Flink CDC自带的反序列化器
        // MySqlSource<String> mySqlSource = this.buildBaseMySqlSource(String.class)
        //     .deserializer(new JsonDebeziumDeserializationSchema())
        //     .build();


        env.fromSource(mySqlSource,
                       WatermarkStrategy.noWatermarks(),
                       "mysql-source")
            // 设置该数据源的并行度
            .setParallelism(flinkCDCConfig.getParallelism())
            // todo 根据上述的选择,选择对应的Sink
            .addSink(dataChangeSink); // 添加Sink, 这里配合mySQLDeserialization+dataChangeSink
            // .addSink(customSink);

        env.execute("mysql-stream-cdc");
    }

    /**
     * 构建基本的MySqlSourceBuilder
     *
     * @param clazz 返回的数据类型Class对象
     * @param <T>   源数据中存储的类型
     * @return MySqlSourceBuilder
     */
    private <T> MySqlSourceBuilder<T> buildBaseMySqlSource(Class<T> clazz) {
        return MySqlSource.<T>builder()
            .hostname(flinkCDCConfig.getHostname())
            .port(flinkCDCConfig.getPort())
            .username(flinkCDCConfig.getUsername())
            .password(flinkCDCConfig.getPassword())
            .databaseList(flinkCDCConfig.getDatabaseList())
            .tableList(flinkCDCConfig.getTableList())
            /* initial: 初始化快照,即全量导入后增量导入(检测更新数据写入)
                 * latest: 只进行增量导入(不读取历史变化)
                 * timestamp: 指定时间戳进行数据导入(大于等于指定时间错读取数据)
                 */
            .startupOptions(StartupOptions.latest())
            .includeSchemaChanges(flinkCDCConfig.getIncludeSchemaChanges()) // 包括schema的改变
            .serverTimeZone("GMT+8"); // 时区
    }
}

StudentLogHandler

StudentLogHandler需要修改

监听到日志变化后, 操作ES

/**
 * @author whiteBrocade
 * @version 1.0
 * @description Student对应处理器
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class StudentLogHandler implements BaseLogHandler<Student> {

    private final StudentEsMapper studentEsMapper;

    @Override
    public void handleInsertLog(Student data, Long operatorTime) {
        log.info("处理Student表的新增日志: {}", data);
    public void handleInsertLog(Student student, Long operatorTime) {
        log.info("处理Student表的新增日志: {}", student);
        // 同步新增到Es中
        StudentEsEntity studentEsEntity = new StudentEsEntity();
        BeanUtil.copyProperties(student, studentEsEntity);
        studentEsEntity.setMysqlId(student.getId());
        studentEsMapper.insert(studentEsEntity);
    }

    @Override
    public void handleUpdateLog(Student data, Long operatorTime) {
        log.info("处理Student表的修改日志: {}", data);
    public void handleUpdateLog(Student student, Long operatorTime) {
        log.info("处理Student表的修改日志: {}", student);
        // 修改mysql, 再删除ES
        LambdaEsQueryWrapper<StudentEsEntity> wrapper = new LambdaEsQueryWrapper<>();
        wrapper.eq(StudentEsEntity::getMysqlId, student.getId());
        studentEsMapper.delete(wrapper);
    }

    @Override
    public void handleDeleteLog(Student data, Long operatorTime) {
        log.info("处理Student表的删除日志: {}", data);
    public void handleDeleteLog(Student student, Long operatorTime) {
        log.info("处理Student表的删除日志: {}", student);
        LambdaEsQueryWrapper<StudentEsEntity> wrapper = new LambdaEsQueryWrapper<>();
        wrapper.eq(StudentEsEntity::getMysqlId, student.getId());
        studentEsMapper.delete(wrapper);
    }
}
application.yaml
# Flink CDC相关配置
flink-cdc:
  mysql:
    hostname: localhost
    port: 3306
    username: root
    password: root
    databaseList: whitebrocade
    tableList: whitebrocade.student
    includeSchemaChanges: false
    parallelism: 1
    enableCheckpointing: 5000

引入MQ解耦

第一个方案存在以下问题

  1. 没有持久化保证数据安全, 单节点的ES宕机了, 那么是数据就不同步了

引入MQ保证同步的一个持久性

这里采用ActiveMQ(因为它有个基于内存的模式, 不用额外安装, 当然生产不能这么玩…)

application.yaml新增配置

我这里启动的是内存模式的MQ

spring:
  activemq:
    # activemq url
    broker-url: tcp://localhost:61616
    # 用户名&密码
    user: admin
    password: admin
    # 是否使用基于内存的ActiveMQ, 实际生产中使用基于独立安装的ActiveMQ
    in-memory: true
    pool:
      # 如果此处设置为true,需要添加activemq-pool的依赖包,否则会⾃动配置失败,⽆法注⼊JmsMessagingTemplate
      enabled: false
  # 我们需要在配置⽂件 application.yml 中添加⼀个配置
  # 发布/订阅消息的消息和点对点不同,订阅消息支持多个消费者一起消费。其次,SpringBoot中默认的点对点消息,所以在使用Topic时会不起作用。
  jms:
    # 该配置是 false 的话,则为点对点消息,也是 Spring Boot 默认的
    # 这样是可以解决问题,但是如果这样配置的话,上⾯提到的点对点消息⼜不能正常消费了。所以⼆者不可兼得,这并⾮⼀个好的解决办法
    # ⽐较好的解决办法是,我们定义⼀个⼯⼚,@JmsListener 注解默认只接收 queue 消息,如果要接收 topic 消息,需要设置⼀下containerFactory
    pub-sub-domain: true
ActiveMqConfig配置类
/**
 * @author whiteBrocade
 * @version 1.0
 * @description ActiveMqConfig配置
 */
@Configuration
public class ActiveMqConfig {
    /**
     * 用于接受student表的消费信息
     */
    public static final String TOPIC_NAME = "activemq:topic:student";
    public static final String QUEUE_NAME = "activemq:queue:student";

    @Bean
    public Topic topic() {
        return new ActiveMQTopic(TOPIC_NAME);
    }

    @Bean
    public Queue queue() {
        return new ActiveMQQueue(QUEUE_NAME);
    }

    /**
     * 接收topic消息,需要设置containerFactory
     */
    @Bean
    public JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 相当于在application.yml中配置:spring.jms.pub-sub-domain=true
        factory.setPubSubDomain(true);
        return factory;
    }
}
生产者CustomProducer
/**
 * @author whiteBrocade
 * @version 1.0
 * @description CustomProducer
 */
@Service
@RequiredArgsConstructor
public class CustomProducer {
    private final JmsMessagingTemplate jmsMessagingTemplate;

    @SneakyThrows
    public void sendQueueMessage(Queue queue, String msg) {
        String queueName = queue.getQueueName();
        jmsMessagingTemplate.convertAndSend(queueName, msg);
    }

    @SneakyThrows
    public void sendTopicMessage(Topic topic, String msg) {
        String topicName = topic.getTopicName();
        jmsMessagingTemplate.convertAndSend(topicName, msg);
    }
}
消费者
/**
 * @author whiteBrocade
 * @version 1.0
 * @description CustomQueueConsumer
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class CustomQueueConsumer {

    private final StudentEsMapper studentEsMapper;

    @JmsListener(destination = ActiveMqConfig.QUEUE_NAME)
    public void receiveQueueMsg(String msg) {
        log.info("消费者1111收到Queue消息: {}", msg);

        StudentMqDTO mqDTO = JSONUtil.toBean(msg, StudentMqDTO.class);
        Student student = mqDTO.getStudent();
        Integer operatorType = mqDTO.getOperatorType();
        OperatorTypeEnum operatorTypeEnum = OperatorTypeEnum.getEnumByType(operatorType);
        switch (operatorTypeEnum) {
            case INSERT:
                // 同步新增到Es中
                StudentEsEntity studentEsEntity = new StudentEsEntity();
                BeanUtil.copyProperties(student, studentEsEntity);
                studentEsEntity.setMysqlId(student.getId());
                studentEsMapper.insert(studentEsEntity);
                break;
            case UPDATE:
            case DELETE:
                // 修改mysql, 再删除ES
                LambdaEsQueryWrapper<StudentEsEntity> wrapper = new LambdaEsQueryWrapper<>();
                wrapper.eq(StudentEsEntity::getMysqlId, student.getId());
                studentEsMapper.delete(wrapper);
                break;
        }
    }

    @JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")
    public void receiveTopicMsg(String msg) {
        log.info("消费者1111收到Topic消息: {}", msg);
    }
}
/**
 * @author whiteBrocade
 * @version 1.0
 * @description Custom2QueueConsumer
 */
@Slf4j
@Service
public class Custom2QueueConsumer {
    @JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")
    public void receiveTopicMsg(String msg) {
        log.info("消费者2222收到Topic消息: {}", msg);
    }
}
Controller
/**
 * @author whiteBrocade
 * @version 1.0
 * @description ActiveMqController
 */
@Slf4j
@RestController
@RequestMapping("/activemq")
@RequiredArgsConstructor
public class ActiveMqController {
    private final CustomProducer customProducer;
    private final Queue queue;
    private final Topic topic;

    @PostMapping("/send/queue")
    public String sendQueueMessage() {
        log.info("开始发送点对点的消息-------------");
        Student student = new Student();
        student.setId(IdUtil.getSnowflakeNextId());
        student.setName("小牛马");
        student.setDescription("我是小牛马");
        StudentMqDTO mqDTO = StudentMqDTO.builder()
                .student(student)
                .operatorType(1)
                .build();
        String jsonStr = JSONUtil.toJsonStr(mqDTO);
        customProducer.sendQueueMessage(queue, jsonStr);
        return "success";
    }

    @PostMapping("/send/topic")
    public String sendTopicMessage() {
        log.info("===开始发送订阅消息===");
        Student student = new Student();
        student.setId(IdUtil.getSnowflakeNextId());
        student.setName("小牛马");
        student.setDescription("我是小牛马");
        StudentMqDTO mqDTO = StudentMqDTO.builder()
                .student(student)
                .operatorType(1)
                .build();
        String jsonStr = JSONUtil.toJsonStr(mqDTO);
        customProducer.sendTopicMessage(topic, jsonStr);
        return "success";
    }
}
改造StudentLogHandler
/**
 * @author whiteBrocade
 * @version 1.0
 * @description Student对应处理器
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class StudentLogHandler implements BaseLogHandler<Student> {
    private final Queue queue;

    @Override
    public void handleInsertLog(Student student, Long operatorTime) {
        log.info("处理Student表的新增日志: {}", student);
        this.sendMq(student, OperatorTypeEnum.INSERT);
    }

    @Override
    public void handleUpdateLog(Student student, Long operatorTime) {
        log.info("处理Student表的修改日志: {}", student);
        this.sendMq(student, OperatorTypeEnum.UPDATE);
    }

    @Override
    public void handleDeleteLog(Student student, Long operatorTime) {
        log.info("处理Student表的删除日志: {}", student);
        this.sendMq(student, OperatorTypeEnum.DELETE);
    }

    /**
     * 发送MQ
     *
     * @param student          Student
     * @param operatorTypeEnum 操作类型枚举
     */
    private void sendMq(Student student, OperatorTypeEnum operatorTypeEnum) {
        StudentMqDTO mqDTO = StudentMqDTO.builder()
                .student(student)
                .operatorType(operatorTypeEnum.getType())
                .build();
        String jsonStr = JSONUtil.toJsonStr(mqDTO);

        CustomProducer customProducer = SpringUtil.getBean(CustomProducer.class);

        // 发送到MQ
        customProducer.sendQueueMessage(queue, jsonStr);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2229267.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

unity中预制体的移动-旋转-放缩

unity中预制体的移动-旋转-放缩 左上侧竖栏图标介绍Tools(手形工具)Move Tool(移动工具&#xff0c;单位米)Rotate Tool(旋转工具&#xff0c;单位角度)Scale Tool(缩放工具&#xff0c;单位倍数)Rect Tool(矩形工具)Transform Tool(变换工具)图标快捷键对照表工具使用的小技巧…

用Pyhon写一款简单的益智类小游戏——2048

文字版——代码及讲解 代码—— import random# 初始化游戏棋盘 def init_board():return [[0] * 4 for _ in range(4)]# 在棋盘上随机生成一个2或4 def add_new_tile(board):empty_cells [(i, j) for i in range(4) for j in range(4) if board[i][j] 0]if empty_cells:i,…

【UBuntu20 配置usb网卡】 记录Ubuntu20配置usb网卡(特别是建立热点)

【UBuntu20 配置usb网卡】 Ubuntu20配置usb网卡&#xff08;特别是建立热点&#xff09; 一、 闲言碎语的前言 usb的外置网卡&#xff0c;相比Windows即插即用&#xff0c;Linux买回来一顿折腾&#xff0c;准备把过程梳理一下记录起来。 网卡的方案其实就那几家&#xff0c;…

Training-free layout control with cross-attention guidance

https://zhuanlan.zhihu.com/p/666445024https://zhuanlan.zhihu.com/p/666445024 支持两种模式,1.sd文生图;2.绑定了dreambooth和text inversion的图像编辑。 # ------------------ example input ------------------examples &

微信网页授权回调地址放多个参数的方法

https://open.weixin.qq.com/connect/oauth2/authorize?appidAPPID&redirect_uriREDIRECT_URI&response_typecode&scopeSCOPE&stateSTATE#wechat_redirect 跳转后地址 redirect_uri/?codeCODE&stateSTATE。 redirect_uri如果不进行urlencode编码, 跳转后…

Virtuoso使用layout绘制版图、使用Calibre验证DRC和LVS

1 绘制版图 1.1 进入Layout XL 绘制好Schmatic后&#xff0c;在原理图界面点击Launch&#xff0c;点击Layout XL进入版图绘制界面。 1.2 导入元件 1、在Layout XL界面左下角找到Generate All from Source。 2、在Generate Layout界面&#xff0c;选中“Instance”&#…

「Mac畅玩鸿蒙与硬件13」鸿蒙UI组件篇3 - TextInput 组件获取用户输入

在鸿蒙应用开发中,TextInput 组件用于接收用户输入,适用于文本、密码等多种输入类型。本文详细介绍鸿蒙 TextInput 组件的使用方法,包括输入限制、样式设置、事件监听及搜索框应用,帮助你灵活处理鸿蒙应用中的用户输入。 关键词 TextInput 组件用户输入输入限制事件监听搜索…

偷懒总结篇|贪心算法|动态规划|单调栈|图论

由于这周来不及了&#xff0c;先过一遍后面的思路&#xff0c;具体实现等下周再开始详细写。 贪心算法 这个图非常好 122.买卖股票的最佳时机 II(妙&#xff0c;拆分利润) 把利润分解为每天为单位的维度&#xff0c;需要收集每天的正利润就可以&#xff0c;收集正利润的区间…

时序数据分析:时序分割

目录 0 工况的定义 1 Changepoint 2 TreeSplit 3 Autoplait 4 应用示例 5.分析结论 0 工况的定义 工业设备系统在不同的外部条件&#xff08;即工况&#xff09;下&#xff0c;往往有多种运行模式&#xff0c;工业生产也往往会分阶段进行&#xff0c;在不同工况下&…

阿里云开源 AI 应用开发框架:Spring AI Alibaba

作者&#xff1a;刘军&#xff0c;Spring AI Alibaba 发起人&#xff0c;Apache Member 编者按&#xff1a; 6 年前&#xff0c;2018 年 10 月&#xff0c;阿里巴巴开源 Spring Cloud Alibaba&#xff0c;旨在帮助 Java 开发者通过 Spring Cloud 编程模型轻松开发微服务应用。…

一年期免费HTTPS证书:网络安全新选择

HTTPS证书的重要性 HTTPS证书&#xff0c;全称为安全套接字层/传输层安全协议证书&#xff0c;是一种在互联网上建立安全连接的数字证书。它通过公钥加密技术&#xff0c;对网站和用户之间的数据传输进行加密&#xff0c;有效防止数据被窃取或篡改&#xff0c;保障用户信息的安…

网络搜索引擎Shodan(7)完结

声明&#xff1a;学习视频来自b站up主 泷羽sec&#xff0c;如涉及侵权马上删除文章 声明&#xff1a;本文主要用作技术分享&#xff0c;所有内容仅供参考。任何使用或依赖于本文信息所造成的法律后果均与本人无关。请读者自行判断风险&#xff0c;并遵循相关法律法规。 感谢泷…

Web-高校教务考试管理系统

目录 一、前言 1.1 实践目的和要求 1.2 实践项目背景及意义 二、实践内容 2.1 实践过程 2.2 实践内容 2.2.1 项目介绍 2.2.2项目开发环境 2.2.3系统组成与功能 2.2.4 开发工作 2.3 主要成果 三、总结 3.1 个人心得 3.2 其它意见 一、前言 1.1 实践目的和…

顺序表排序相关算法题|负数移到正数前面|奇数移到偶数前面|小于x的数移到大于x的数前面|快排思想(C)

负数移到正数前面 已知顺序表 ( a 1 , … , a n ) (a_{1},\dots,a_{n}) (a1​,…,an​)&#xff0c;每个元素都是整数&#xff0c;把所有值为负数的元素移到全部正数值元素前边 算法思想 快排的前后指针版本 排序|冒泡排序|快速排序|霍尔版本|挖坑版本|前后指针版本|非递归版…

预览 PDF 文档

引言 在现代Web应用中&#xff0c;文件预览功能是非常常见的需求之一。特别是在企业级应用中&#xff0c;用户经常需要查看各种类型的文件&#xff0c;如 PDF、Word、Excel 等。本文将详细介绍如何在Vue项目中实现 PDF 文档的预览功能。 实现原理 后端API 后端需要提供一个…

蚁剑的介绍和使用

蚁剑介绍 蚁剑&#xff08;AntSword&#xff09;是一个开源的跨平台网站管理工具&#xff0c;主要用于渗透测试和安全研究。它提供了一个图形化界面&#xff0c;方便用户管理和操作被攻陷的网站。 安装教程&#xff1a; github官网&#xff1a;https://github.com/AntSwordPro…

AppInventor2能否用网络摄像头画面作为屏幕的背景?

// 视频是否可以作为背景&#xff1f; // 有会员提问&#xff1a;能否用网络摄像头的实时画面作为屏幕的背景&#xff1f;就跟这个一样背景全覆盖&#xff1a; 摄像头画面是一个在线的网站链接视频流。 // 原先思路 // 1、目前原生组件无法直接实现这个功能&#xff0c;屏幕…

DBeaver如何查看ER图

前言 我们在使用DBeaver时&#xff0c;有时候需要查看某张表的ER图&#xff0c;这能帮助我们快速看到表的结构&#xff0c;那么&#xff0c;我们应该如何在DBeaver里面查看ER图呢&#xff1f; 如何查看 首先&#xff0c;我们点击下我们要查看的某张表&#xff0c;鼠标右击一…

pytest高版本兼容test_data[“log“] = _handle_ansi(“\n“.join(logs))错误

一、问题现象&#xff1a; 执行seleniumpytest结束时报: INTERNALERROR> File "D:\workspace\pytestframe\.venv\Lib\site-packages\pytest_html\report_data.py", line 141, in add_test INTERNALERROR> test_data["log"] _handle_ansi(&q…

mysql8.0.32升级到8.0.40

上篇8.0.32库的准备&#xff1a;mysql: error while loading shared libraries: libncurses.so.5: cannot open shared object file: No suc-CSDN博客 此篇测试升级到8.0.40 MySQL :: Download MySQL Community Server rootjyc:~# mysql -u root -pabcd1234 mysql: [Warning]…