牛客项目(五)-使用kafka实现发送系统通知

news2025/1/21 15:44:19

kafka入门以及与spring整合

在这里插入图片描述

Message.java

import java.util.Date;

public class Message {
    private int id;
    private int fromId;
    private int toId;
    private String conversationId;
    private String content;
    private int status;
    private Date createTime;

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public int getFromId() {
        return fromId;
    }

    public void setFromId(int fromId) {
        this.fromId = fromId;
    }

    public int getToId() {
        return toId;
    }

    public void setToId(int toId) {
        this.toId = toId;
    }

    public String getConversationId() {
        return conversationId;
    }

    public void setConversationId(String conversationId) {
        this.conversationId = conversationId;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public int getStatus() {
        return status;
    }

    public void setStatus(int status) {
        this.status = status;
    }

    public Date getCreateTime() {
        return createTime;
    }

    public void setCreateTime(Date createTime) {
        this.createTime = createTime;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", fromId=" + fromId +
                ", toId=" + toId +
                ", conversationId='" + conversationId + '\'' +
                ", content='" + content + '\'' +
                ", status=" + status +
                ", createTime=" + createTime +
                '}';
    }
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);
    @Autowired
    private MessageService messageService;
    @Autowired
    private DiscussPostService discussPostService;
    @Autowired
    private ElasticsearchService elasticsearchService;
    //加一个监听相关主题的listener
    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record){
        if(record == null || record.value()==null){
            System.out.println("错误发帖");
            return;
        }
        Event event= JSONObject.parseObject(record.value().toString(),Event.class);
        if(event == null){
            System.out.println("错误发帖");
            return;
        }
        //发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USERID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());
        //message的内容
        Map<String,Object> content=new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
        if(!event.getData().isEmpty()){
            for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }
        message.setContent(JSONObject.toJSONString(content));
        System.out.println(message);
        messageService.addMessage(message);
        System.out.println("成功处理事件");
    }
    }

Event.java

定义一个事件实体 以方便在消息的发送与处理

import java.util.HashMap;
import java.util.Map;

//用于事件驱动的kafka消息队列开发
public class Event {
    private String topic;
    //事件触发的人
    private int userId;
    //事件发生在哪个实体
    private int entityType;
    private int entityId;
    //实体作者
    private int entityUserId;
    //存储额外数据
    private Map<String,Object> data = new HashMap<>();

    public String getTopic() {
        return topic;
    }

    public Event setTopic(String topic) {
        this.topic = topic;
        return this;
    }

    public int getUserId() {
        return userId;
    }

    public Event setUserId(int userId) {
        this.userId = userId;
        return this;
    }

    public int getEntityType() {
        return entityType;
    }

    public Event setEntityType(int entityType) {
        this.entityType = entityType;
        return this;
    }

    public int getEntityId() {
        return entityId;
    }

    public Event setEntityId(int entityId) {
        this.entityId = entityId;
        return this;
    }

    public int getEntityUserId() {
        return entityUserId;
    }

    public Event setEntityUserId(int entityUserId) {
        this.entityUserId = entityUserId;
        return this;
    }

    public Map<String, Object> getData() {
        return data;
    }

    public Event setData(String key,Object value) {
        this.data.put(key,value);
        return this;
    }

}

EventProducer.java

定义事件的生产者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.Event;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class EventProducer {
//生产者使用kafkaTemplate发送消息
    @Autowired
    KafkaTemplate kafkaTemplate;
    //处理事件
    public void fireEvent(Event event){
        //将事件发布到指定的主题
        //将event转换为json数据进行消息发送
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
        System.out.println("成功发送"+event.getTopic());

    }
}

EventConsumer.java

定义事件消费者

import com.alibaba.fastjson.JSONObject;
import edu.npu.newcoder.community.community.entity.DiscussPost;
import edu.npu.newcoder.community.community.entity.Event;
import edu.npu.newcoder.community.community.entity.Message;
import edu.npu.newcoder.community.community.service.DiscussPostService;
import edu.npu.newcoder.community.community.service.ElasticsearchService;
import edu.npu.newcoder.community.community.service.MessageService;
import edu.npu.newcoder.community.community.util.CommunityConstant;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

@Component
public class EventConsumer implements CommunityConstant {
//    private static final Logger logger= LoggerFactory.getLogger(EventConsumer.class);
    @Autowired
    private MessageService messageService;
    @Autowired
    private DiscussPostService discussPostService;
    @Autowired
    private ElasticsearchService elasticsearchService;
    //加一个监听相关主题的listener
    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record){
        if(record == null || record.value()==null){
            System.out.println("错误发帖");
            return;
        }
        Event event= JSONObject.parseObject(record.value().toString(),Event.class);
        if(event == null){
            System.out.println("错误发帖");
            return;
        }
        //发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USERID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());
        //message的内容
        Map<String,Object> content=new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
        if(!event.getData().isEmpty()){
            for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }
        message.setContent(JSONObject.toJSONString(content));
        System.out.println(message);
        messageService.addMessage(message);
        System.out.println("成功处理事件");
    }
    }

在特定的地方触发消息产生

CommentController

 //触发评论事件
        Event event=new Event().setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(comment.getEntityType())
                .setEntityId(comment.getEntityId())
                .setData("postId",discussPostId);
        if(comment.getEntityType() == ENTITY_TYPE_POST){
            DiscussPost target=discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }else if(comment.getEntityType()==ENTITY_TYPE_COMMENT){
            //根据评论的id查询评论
            Comment target =commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }
         eventProducer.fireEvent(event);

LikeController

 //触发点赞事件
 if(likeStatus ==1){
            Event event =new Event()
                    .setTopic(TOPIC_LIKE)
                    .setUserId(hostHolder.getUser().getId())
                    .setEntityType(entityType)
                    .setEntityId(entityId)
                    .setEntityUserId(entityUserId)
                    .setData("postId",postId);
            eventProducer.fireEvent(event);
        }

FollowController

 //触发关注事件
 Event event = new Event()
                .setTopic(TOPIC_FOLLOW)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(entityType)
                .setEntityId(entityId)
                .setEntityUserId(entityId);
        eventProducer.fireEvent(event);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1165315.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

不使用屏幕在树莓派4B安装Ubuntu22.04桌面版(64位)

因为时间有限只说一下基本路径&#xff1a; 1首先安装Ubuntu22.04server版本 2设置服务器版本的SSH和WiFi 3通过服务器版本安装Ubuntu-desktop升级到Ubuntu22.04桌面版 4在桌面版上安装远程控制软件&#xff1a;xrdp; 5使用Windows自带的远程桌面连接访问Ubuntu 6完成

Linux -----------------------Shell 流程判断

什么是shell Shell是操作系统的用户界面&#xff0c;负责接收和解释用户的命令&#xff0c;并将这些命令转化为操作系统内部能够理解的指令&#xff0c;然后执行相应的操作。Shell还允许用户编写脚本&#xff0c;以自动化和批处理任务&#xff0c;从而提高效率。在Linux系统中&…

【Nginx38】Nginx学习:SSL模块(二)错误状态码、变量及宝塔配置分析

Nginx学习&#xff1a;SSL模块&#xff08;二&#xff09;错误状态码、变量及宝塔配置分析 继续我们的 SSL 模块的学习。上回其实我们已经搭建起了一个 HTTPS 服务器了&#xff0c;只用了三个配置&#xff0c;其中一个是 listen 的参数&#xff0c;另外两个是指定密钥文件的地址…

【LeetCode刷题-排序】--912.排序数组

912.排序数组 方法&#xff1a;使用插入排序 class Solution {public int[] sortArray(int[] nums) {int len nums.length;//循环不变量:使nums[i]插入区间[0,i),使之成为有序数组for(int i 1;i<len;i){//先缓存这个元素&#xff0c;然后之前的元素逐个后移&#xff0c;…

LeetCode----52. N 皇后 II

 题目 n 皇后问题 研究的是如何将 n 个皇后放置在 n n 的棋盘上,并且使皇后彼此之间不能相互攻击。 给你一个整数 n ,返回 n 皇后问题 不同的解决方案的数量。 示例 1: 输入:n = 4 输出:2 解释:如上图所示,4 皇后问题存在两个不同的解法。 示例 2: 输入:n = …

【CSDN 每日一练 ★★☆】【数学】旋转图像

【CSDN 每日一练 ★★☆】【数学】旋转图像 数学 数组 题目 给定一个 n n 的二维矩阵 matrix 表示一个图像。请你将图像顺时针旋转 90 度。 你必须在 原地 旋转图像&#xff0c;这意味着你需要直接修改输入的二维矩阵。请不要 使用另一个矩阵来旋转图像。 示例 示例 1&a…

前端埋点方式

前言&#xff1a; 想要了解用户在系统中所做的操作&#xff0c;从而得出用户在本系统中最常用的模块、在系统中停留的时间。对于了解用户的行为、分析用户的需求有很大的帮助&#xff0c;想实现这种需求可以通过前端埋点的方式。 埋点方式&#xff1a; 1.什么是埋点&#xff1f…

【Selenium+python】自动化测试登录界面

前言&#xff1a;已经学习selenium许久了&#xff0c;奈何公司的项目还在码代码中...&#xff0c;感觉自己学的东西快忘的差不多了&#xff0c;所以就找个网站练练手&#xff0c;顺便回顾一下UI自动化的知识&#xff0c;也希望跟我一样的小白有所受益。 一、用例分析&#xff…

线扫相机DALSA--常见问题四:修改相机参数,参数保存无效情况

该问题是操作不当&#xff0c;未按照正常步骤保存参数所致&#xff0c;相机为RAM机制&#xff0c;参数需保存在采集卡的ROM内。 保存参数步骤&#xff1a; ①首先将相机参数保存至User Set1&#xff1b; ②然后回到Board(采集卡)参数设置区&#xff0c;鼠标选中Basic Timing&a…

C++——类和对象(中)完结

赋值运算符重载 运算符重载 C 为了增强代码的可读性引入了运算符重载 &#xff0c; 运算符重载是具有特殊函数名的函数 &#xff0c;也具有其 返回值类型&#xff0c;函数名字以及参数列表&#xff0c;其返回值类型与参数列表与普通的函数类似。 函数名字为&#xff1a;关键…

中富转债,章鼓转债上市价格预测

中富转债-123226 基本信息 转债名称&#xff1a;中富转债&#xff0c;评级&#xff1a;AA-&#xff0c;发行规模&#xff1a;5.2亿元。 正股名称&#xff1a;中富电路&#xff0c;今日收盘价&#xff1a;30.03元&#xff0c;转股价格&#xff1a;36.44元。 当前转股价值 转债面…

桥接模式birdge

简介 桥接模式&#xff1a;将抽象与实现相分离&#xff0c;使他们可以独立变化。 角色 抽象化&#xff08;Abstraction&#xff09;角色&#xff1a; 该类持有一个对实现角色的引用&#xff0c;抽象角色中的方法需要实现角色来实现&#xff0c;抽象角色一般为抽象类&#xf…

NOA标配搭载率不足3%!英伟达中国「自研」智驾方案

汽车行业的特殊性&#xff0c;意味着&#xff0c;对于供应链来说&#xff0c;一家独大几乎不可能。对于芯片赛道来说&#xff0c;同样如此。 以智能驾驶为例&#xff0c;目前的供应链形态&#xff0c;也正处于变革的关键周期。各路玩家也都在尝试与车企合作模式的多元化。 高工…

并发请求控制

Chrome 浏览器最多并发6个请求。一般情况下&#xff0c;我们都会设置并发数为 3。 并发请求控制主要有两种区别&#xff1a;假设并发数为 3 三个请求为一组进行并发&#xff0c;这三个请求全部完成了&#xff0c;再进行下一组。在第一种方式的基础上加上滑动补位&#xff0c;…

Express框架开发接口之前台分类导航

1.初始化 const handleDB require(../handleDB/index) // 获取全部导航 exports.allNav (req, res) > {(async function () {})() } // 更新或者添加导航 exports.upNav (req, res) > {(async function () {})() } // 根据id删除 exports.delNav (req, res) > {(a…

Mac电脑风扇控制推荐 Macs Fan Control Pro 中文 for mac

Macs Fan Control Pro是一款功能全面、易于使用且具有良好兼容性和安全性的风扇控制软件&#xff0c;适用于各种Mac用户。 除了能够调整风扇速度外&#xff0c;Macs Fan Control Pro还支持实时监测硬件传感器的温度&#xff0c;例如CPU、硬盘等&#xff0c;同时显示每个传感器…

SpringBoot系列之自定义Jackson对象映射器格式日期数据

开发环境 JDK 1.8SpringBoot2.2.1Maven 3.2Mysql5.7.36开发工具 IntelliJ IDEAsmartGit 背景 在我之前的博客中&#xff0c;有对Springboot2.0集成Mybatis Plus做了比较详细的描述&#xff0c;现在这篇博客介绍&#xff0c;基于开源的jackson api来自定义ObjectMapping&…

MPLAB X IDE 仿真打断点提示已中断的断点?

这种中间带裂缝的是无效断点。 原因可能与XC编译器的优化有关&#xff0c;最后生成的汇编与C语言并不是一一对应的(官方给的解释是效率高)。所以这一行C语言转换的汇编代码可能并不在这个位置&#xff0c;也可能与其它汇编合并后根本就没有 我的解决方法是把优化等级调到最低&a…

【多线程面试题二十二】、 说说你对读写锁的了解

文章底部有个人公众号&#xff1a;热爱技术的小郑。主要分享开发知识、学习资料、毕业设计指导等。有兴趣的可以关注一下。为何分享&#xff1f; 踩过的坑没必要让别人在再踩&#xff0c;自己复盘也能加深记忆。利己利人、所谓双赢。 面试官&#xff1a;说说你对读写锁的了解 …

windows server 2016调优

1. 增加TCP连接的最大数量&#xff1a; 在您当前的注册表路径&#xff08;HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\Services\Tcpip\Parameters&#xff09;中的右侧窗格&#xff0c;右击空白处&#xff0c;选择“新建” -> “DWORD (32位) 值”。为新的值命名为TcpNu…