使用kafka完成数据的实时同步,同步到es中。(使用kafka实现自动上下架 upper、lower)

news2024/12/23 15:21:30

文章目录

  • 1、发送消息 KafkaService
  • 2、生产者 service-album -> AlbumInfoServiceImpl
    • 2.1、新增 saveAlbumInfo()
    • 2.2、更新 updateAlbumInfo()
    • 2.3、删除 removeAlbumInfo()
  • 3、消费者 service-search - > AlbumListener.java

  • 上架:新增专辑到 es
  • 下架:删除专辑
  1. 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据
  2. 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据
            如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
  3. 删除:发送消息给kafka,search通过监听器获取消息es删除数据

1、发送消息 KafkaService

package com.atguigu.tingshu.common.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.CompletableFuture;

@Service
public class KafkaService {

    private static final Logger logger = LoggerFactory.getLogger(KafkaService.class);

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 向指定主题发送消息
     * 此方法通过调用重载的sendMsg方法,向指定主题发送消息,使用默认的消息标签和消息键
     *
     * @param topic 发送消息的主题
     * @param msg   需要发送的消息内容
     */
    public void sendMsg(String topic, String msg){
        // 调用重载的sendMsg方法,传入默认值以简化调用
        this.sendMsg(topic, null, null, msg);
    }

    /**
     * 发送消息到指定的Kafka主题
     *
     * @param topic 消息主题
     * @param partition 分区编号
     * @param key 消息键值
     * @param msg 消息内容
     */
    public void sendMsg(String topic, Integer partition, String key, String msg){
        // 发生消息并返回异步结果
        CompletableFuture<SendResult> future = this.kafkaTemplate.send(topic, partition, key, msg);

        // 异步处理发送结果
        future.whenCompleteAsync((result, ex) -> {
            if (ex != null){
                // 如果发送过程中出现异常
                logger.error("生产者发送消息失败!原因:{}", ex.getMessage());
            }
        });
    }

}

  • whenCompleteAsync:异步完成时的处理、当异步操作完成时

在这里插入图片描述

2、生产者 service-album -> AlbumInfoServiceImpl

在这里插入图片描述

2.1、新增 saveAlbumInfo()

  • 新增:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步新增数据
    在这里插入图片描述
    在这里插入图片描述

在这里插入图片描述

@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {
    @Autowired
    private AlbumAttributeValueMapper attributeValueMapper;

    @Autowired
    private AlbumStatService albumStatService;

    @Autowired
    private KafkaService kafkaService;

    @Transactional(rollbackFor = Exception.class)
    @Override
    public void saveAlbumInfo(AlbumInfoVo albumInfoVo) throws FileNotFoundException {
        // 1.保存专辑信息表
        AlbumInfo albumInfo = new AlbumInfo();
        BeanUtils.copyProperties(albumInfoVo, albumInfo);
        // 设置当前用户的id
        Long userId = AuthContextHolder.getUserId();
        albumInfo.setUserId(userId == null ? 1 : userId);
        albumInfo.setTracksForFree(5);
        albumInfo.setSecondsForFree(30);
        albumInfo.setStatus(SystemConstant.ALBUM_STATUS_PASS);
        this.save(albumInfo);
        // 主键回写获取专辑id
        Long albumInfoId = albumInfo.getId();

        // 2.保存专辑标签值表
        List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();
        if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {
            albumAttributeValueVoList.forEach(albumAttributeValueVo -> {
                AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();
                BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);
                albumAttributeValue.setAlbumId(albumInfoId);
                this.attributeValueMapper.insert(albumAttributeValue);
            });
        }

//		new FileInputStream("xxx");

//		try {
//			TimeUnit.SECONDS.sleep(3);
//		} catch (InterruptedException e) {
//			throw new RuntimeException(e);
//		}

        // 3.保存统计信息:专辑状态表
        // this.saveAlbumStat(albumInfoId);
        this.albumStatService.saveAlbumStat(albumInfoId);

        if (StringUtils.equals(albumInfo.getIsOpen(), "1")) {
            this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumInfoId.toString());
        }

//		int i = 1/0;
    }
}

在这里插入图片描述

2.2、更新 updateAlbumInfo()

  • 更新:如果是公开的专辑则发送消息给kafka,search通过监听器获取消息同步更新数据
            如果是私有的专辑则发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {

    @Autowired
    private AlbumAttributeValueMapper attributeValueMapper;

    @Autowired
    private KafkaService kafkaService;

    @Transactional
    @Override
    public void updateAlbumInfo(Long albumId, AlbumInfoVo albumInfoVo) {
        AlbumInfo albumInfo = new AlbumInfo();
        BeanUtils.copyProperties(albumInfoVo, albumInfo);
        albumInfo.setId(albumId);
        this.updateById(albumInfo);

        // 更新专辑标签值表:先删除该专辑所有的标签及值 再去新增
        this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));
        List<AlbumAttributeValueVo> albumAttributeValueVoList = albumInfoVo.getAlbumAttributeValueVoList();
        if (!CollectionUtils.isEmpty(albumAttributeValueVoList)) {
            albumAttributeValueVoList.forEach(albumAttributeValueVo -> {
                AlbumAttributeValue albumAttributeValue = new AlbumAttributeValue();
                BeanUtils.copyProperties(albumAttributeValueVo, albumAttributeValue);
                albumAttributeValue.setAlbumId(albumId);
                this.attributeValueMapper.insert(albumAttributeValue);
            });
        }

        if (StringUtils.equals(albumInfoVo.getIsOpen(), "1")) {
            this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_UPPER, albumId.toString());
        } else {
            this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());
        }
    }
}

在这里插入图片描述

2.3、删除 removeAlbumInfo()

  • 删除:发送消息给kafka,search通过监听器获取消息es删除数据
@Slf4j
@Service
@SuppressWarnings({"unchecked", "rawtypes"})
public class AlbumInfoServiceImpl extends ServiceImpl<AlbumInfoMapper, AlbumInfo> implements AlbumInfoService {

    @Autowired
    private AlbumAttributeValueMapper attributeValueMapper;

    @Autowired
    private AlbumStatMapper albumStatMapper;

    @Autowired
    private KafkaService kafkaService;

    @Transactional
    @Override
    public void removeAlbumInfo(Long albumId) {
        this.removeById(albumId);

        this.albumStatMapper.delete(new LambdaUpdateWrapper<AlbumStat>().eq(AlbumStat::getAlbumId, albumId));

        this.attributeValueMapper.delete(new LambdaUpdateWrapper<AlbumAttributeValue>().eq(AlbumAttributeValue::getAlbumId, albumId));

        this.kafkaService.sendMsg(KafkaConstant.QUEUE_ALBUM_LOWER, albumId.toString());
    }
}

在这里插入图片描述

3、消费者 service-search - > AlbumListener.java

在这里插入图片描述

package com.atguigu.tingshu.search.listener;

@Component
public class AlbumListener {

    @Autowired
    private AlbumInfoFeignClient albumInfoFeignClient;

    @Autowired
    private UserInfoFeignClient userInfoFeignClient;

    @Autowired
    private CategoryFeignClient categoryFeignClient;

    @Autowired
    private ElasticsearchTemplate elasticsearchTemplate;

    @KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_UPPER)
    public void upper(String albumId){
        if (StringUtils.isBlank(albumId)){
            return;
        }

        // 根据专辑id查询专辑
        Result<AlbumInfo> albumInfoResult = this.albumInfoFeignClient.getAlbumInfo(Long.valueOf(albumId));
        Assert.notNull(albumInfoResult, "同步数据时,获取专辑信息失败!");
        AlbumInfo albumInfo = albumInfoResult.getData();
        Assert.notNull(albumInfo, "同步数据时,没有对应的专辑!");

        AlbumInfoIndex albumInfoIndex = new AlbumInfoIndex();
        // 把专辑信息中的数据复制到index对象
        BeanUtils.copyProperties(albumInfo, albumInfoIndex);

        // 查询主播获取主播信息
        Result<UserInfoVo> userInfoVoResult = this.userInfoFeignClient.getUserById(albumInfo.getUserId());
        Assert.notNull(userInfoVoResult, "数据导入时,获取主播信息失败!");
        UserInfoVo userInfoVo = userInfoVoResult.getData();
        if (userInfoVo != null){
            albumInfoIndex.setAnnouncerId(userInfoVo.getId());
            albumInfoIndex.setAnnouncerName(userInfoVo.getNickname());
        }

        // 根据三级分类id查询一二三级分类
        Result<BaseCategoryView> categoryResult = this.categoryFeignClient.getAllLevelCategories(albumInfo.getCategory3Id());
        Assert.notNull(categoryResult, "数据导入时,获取分类信息失败!");
        BaseCategoryView baseCategoryView = categoryResult.getData();
        if (baseCategoryView != null) {
            albumInfoIndex.setCategory1Id(baseCategoryView.getCategory1Id());
            albumInfoIndex.setCategory2Id(baseCategoryView.getCategory2Id());
        }

        // 查询专辑统计信息
//                Result<AlbumStatVo> albumStatesResult = this.albumInfoFeignClient.getAlbumStates(albumInfo.getId());
//                Assert.notNull(albumStatesResult, "数据导入时,获取专辑统计信息失败!");
//                AlbumStatVo albumStatVo = albumStatesResult.getData();
//                if (albumStatVo != null) {
//                    BeanUtils.copyProperties(albumStatVo, albumInfoIndex);
//                }
        // 假数据:
        int playNum = (new Random().nextInt(100) + 1) * 10000;
        albumInfoIndex.setPlayStatNum(playNum);
        int subscribeNum = (new Random().nextInt(100) + 1) * 10000;
        albumInfoIndex.setSubscribeStatNum(subscribeNum);
        int buyNum = (new Random().nextInt(100) + 1) * 10000;
        albumInfoIndex.setBuyStatNum(buyNum);
        int commentNum = (new Random().nextInt(100) + 1) * 10000;
        albumInfoIndex.setCommentStatNum(commentNum);
        // 热度
        albumInfoIndex.setHotScore(playNum * 0.1 + commentNum * 0.2 + subscribeNum * 0.3 + buyNum * 0.4);

        // 标签
        Result<List<AlbumAttributeValue>> albumAttributeValueResult = this.albumInfoFeignClient.getAlbumAttributeValue(albumInfo.getId());
        Assert.notNull(albumAttributeValueResult, "数据导入时,获取标签及值失败!");
        List<AlbumAttributeValue> albumAttributeValues = albumAttributeValueResult.getData();
        if (!CollectionUtils.isEmpty(albumAttributeValues)){
            // 把List<AlbumAttributeValue> 转化成  List<AttributeValueIndex>
            albumInfoIndex.setAttributeValueIndexList(albumAttributeValues.stream().map(albumAttributeValue -> {
                AttributeValueIndex attributeValueIndex = new AttributeValueIndex();
                BeanUtils.copyProperties(albumAttributeValue, attributeValueIndex);
                return attributeValueIndex;
            }).collect(Collectors.toList()));
        }

        this.elasticsearchTemplate.save(albumInfoIndex);
    }

    @KafkaListener(topics = KafkaConstant.QUEUE_ALBUM_LOWER)
    public void lower(String albumId){
        if (StringUtils.isBlank(albumId)){
            return;
        }
        this.elasticsearchTemplate.delete(albumId, AlbumInfoIndex.class);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2085760.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

信息学奥赛初赛天天练-78-NOIP2015普及组-基础题3-中断、计算机病毒、文件传输协议FTP、线性表、链式存储、栈

NOIP 2015 普及组 基础题3 8 所谓的“中断”是指( ) A 操作系统随意停止一个程序的运行 B 当出现需要时&#xff0c;CPU 暂时停止当前程序的执行转而执行处理新情况的过程 C 因停机而停止一个程序的运行 D 电脑死机 9 计算机病毒是( ) A 通过计算机传播的危害人体健康的一种病…

有什么还原空白试卷的免费软件?2024快速进行空白还原的软件

有什么还原空白试卷的免费软件&#xff1f;2024快速进行空白还原的软件 还原空白试卷通常是指将已经填写的试卷还原为未填写的空白版本&#xff0c;以便重复使用或进行其他用途。以下是五款可以帮助你快速还原空白试卷的免费软件&#xff0c;这些工具提供了不同的功能&#xf…

补录.day43动态规划

300.最长递增子序列 给你一个整数数组 nums &#xff0c;找到其中最长严格递增子序列的长度。 子序列 是由数组派生而来的序列&#xff0c;删除&#xff08;或不删除&#xff09;数组中的元素而不改变其余元素的顺序。例如&#xff0c;[3,6,2,7] 是数组 [0,3,1,6,2,2,7] 的子序…

【RNN】循环神经网络RNN学习笔记

时间序列任务场景&#xff1a; 语音识别生成一段音乐情感分析DNA序列分析机器翻译 如何理解时间序列&#xff1a;特点&#xff1a;前后关联强&#xff0c;前因后果&#xff0c;后面产生的结果依赖于之前的结果 标准神经网络建模的弊端&#xff1a; 输入和输出在不同例子中可…

MosaicML-面向生成式AI的机器学习平台

前段时间&#xff0c;大数据巨头 Databricks 宣布已签署最终协议&#xff0c;将以13亿美元的价格&#xff0c;收购位于旧金山的人工智能初创公司MosaicML 。这篇文章来自 MosaicML官方的技术博客&#xff0c;是对 MosaicML 大模型训练平台的一个简单介绍。 AIGC领域最大收购&am…

三防平板:定制化服务的趋势——以智慧医疗为例

随着科技的飞速发展&#xff0c;三防平板产品凭借其坚固耐用、适应复杂环境的特性&#xff0c;在众多行业领域中崭露头角。而在AI迅速增长的今天&#xff0c;AI智慧医疗成为了一个备受关注的热点&#xff0c;它不仅推动了医疗行业的数字化转型&#xff0c;也为三防平板产品的定…

OS向量测试方法-PPMU

1.OS向量测试方法 详细步骤&#xff1a; 检查工作&#xff1a; ①检查每根pin连接到指定的PPMU资源是否正确 ②继电器资源是否一一对应 代码编写步骤: 1、 ①设计者设计的测试电路继电器重置初始化 ②close应该闭合的继电器 2、 ①DPS pin电压置0V&#xff0c;同时考虑电流量…

1500万“黑悟空”,打醒一线大厂了吗?

《黑神话&#xff1a;悟空》“霸占”热搜以来&#xff0c;几乎每天都在创造新纪录。 近日&#xff0c;有机构称全平台已售1500万份&#xff0c;仅以268标准版计算&#xff0c;已然拿下40亿元人民币收入&#xff0c;这下10万天兵天将&#xff0c;每人要打150只猴子了。 发售之…

私有云仓库Harbor,docker-compose容器编排

一、私有云仓库 1.pip工具 是python的包管理工具&#xff0c;和yum对rehat的关系是一样的 pip install --upgrade pip 升级版本&#xff0c;会报错&#xff0c;需要指定源 pip install --upgrade pip20.3 -i https://mirrors.aliyun.com/pypi/simple pip …

本地部署一个WordPress博客结合内网穿透实现异地远程访问本地站点

文章目录 前言1. 安装WordPress2. 创建WordPress数据库3. 安装相对URL插件4. 安装内网穿透发布网站4.1 命令行方式&#xff1a;4.2. 配置wordpress公网地址 5. 配置WordPress固定公网地址 前言 本文主要介绍如何在Linux Ubuntu系统上使用WordPress搭建一个本地网站&#xff0c…

React学习day04-useEffect、自定义Hook函数

11、useEffect&#xff08;一个React Hook函数&#xff09; &#xff08;1&#xff09;作用&#xff1a;用于在React组件中创建不是由事件引起而是由渲染本身引起的操作&#xff0c;比如发送AJAX请求&#xff0c;更改DOM等&#xff08;即&#xff1a;视图渲染完后会触发一些事…

网络游戏服务器如何有效防护DDoS与CC攻击

随着网络游戏行业的蓬勃发展&#xff0c;其背后的服务器架构日益复杂&#xff0c;同时也面临着前所未有的网络安全威胁。其中&#xff0c;分布式拒绝服务&#xff08;DDoS&#xff09;和CC&#xff08;Challenge Collapsar&#xff09;攻击尤为突出&#xff0c;它们通过大量伪造…

90.游戏安全项目-项目搭建与解析

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a;易道云信息技术研究院 上一个内容&#xff1a;89.游戏安全项目-htdSdk安装 79.游戏分析工具闪屏问题优化与数据被修改高亮 这里面有注…

权力迷宫:皇权、律法与人性的深度博弈

权力迷宫&#xff1a;皇权、律法与人性的深度博弈 - 孔乙己大叔在人类社会的复杂织锦中&#xff0c;权力与律法的关系往往呈现出一种微妙而深刻的悖论。律法&#xff0c;这一社会秩序的基石&#xff0c;常被视作维护公正、约束行为的利器&#xff0c;然而&#xff0c;在金字塔的…

深度理解指针(5)----指针完结

hello&#xff0c;各位小伙伴们我们现在已经对指针有了深刻的理解&#xff0c;指针来到了收尾环节&#xff01;让我们来做几题例题来复习之前学习的内容吧&#xff01; 最近爆火的黑神话悟空不知道小伙伴们体验了没有&#xff0c;小编对八戒还有蜘蛛精的凄惨爱情深深打动特意找…

Springboot使用Mongo数据库实现文件的上传下载预览等服务接口

MongoDB GridFS 简介 MongoDB GridFS是一个用于存储和检索大型文件的规范&#xff0c;它允许在MongoDB数据库中存储超过16MB的文件&#xff0c;如图片、音频、视频等。GridFS通过将文件分割成多个小的chunk&#xff08;文件片段&#xff09;&#xff0c;每个chunk通常为255KB&…

记URL重定向漏洞骚技巧

0x1 前言 这几天跟着我那几个师傅们在学习URL重定向漏洞&#xff0c;学习了比较多的对于这个漏洞的骚技巧&#xff0c;以及在挖掘edusrc漏洞和企业src相关的URL重定向漏洞时的一些技巧和不错的思路。 最近在跟我那几个师傅们研究学习URL重定向漏洞&#xff0c;然后在一些厂商…

EHS综合管理解决方案落地:管理效率飞升70%!

所有制造企业都面临着一个问题&#xff1a;如何保证EHS制度高质、高效执行&#xff1f;——上海斯歌EHS综合管理解决方案应运而生。 前不久&#xff0c;上海斯歌EHS综合管理解决方案&#xff08;企业安环综合管理解决方案&#xff09;在某全球领先的汽配公司成功落地&#xff0…

安灯系统在汽车电子工厂应用案例汇总

在汽车电子工厂中&#xff0c;高效的生产管理和及时的问题解决至关重要。安灯系统作为精益制造执行中的核心工具也是 MES 制造执行系统的重要组成部分&#xff0c;为汽车电子工厂带来了显著的效益。安灯系统是一个面向制造业生产现场&#xff0c;快速联络生产、物料、维修、主管…

VSCode中TypeScript调试配置

一、背景 最近想用TypeScript编译项目&#xff0c;在创建完项目后&#xff0c;我发现VSCode只有在调试TypeScript的单个文件时生效&#xff0c;如果存在引用&#xff0c;再进行断点调试&#xff0c;则调试功能不生效了。 随后&#xff0c;我让Chatgpt 生成一个一套配置&#…