6.Kafka发布和显示系统通知

news2025/1/19 10:18:29

1.阻塞队列

  1. 生产者线程

  1. 线程需要实现 Runnable 接口

  1. 重写接口的run方法

  1. 声明变量private BlockingQueue<Integer> queue接受传入的阻塞队列

  1. 创建有参构造器

  1. 实现示例逻辑,生产100个数据,put进阻塞队列,每生产一个数据停顿20毫秒,输出信息

class Producer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  1. 消费者线程

  1. 线程需要实现 Runnable 接口

  1. 重写接口的run方法

  1. 声明变量private BlockingQueue<Integer> queue接受传入的阻塞队列

  1. 创建有参构造器

  1. 实现示例逻辑,不停的从队列中take,每生产一个数据停顿0-1000随机毫秒,输出信息

class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  1. main函数

  1. 实例化阻塞队列BlockingQueue queue = new ArrayBlockingQueue(10);

  1. 实例化一个生产者线程

  1. 实例化三个消费者线程

public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }

2.Kafka入门

  • Kafka简介

早先只是消息队列,慢慢扩展功能不止消息队列

消息系统:消息队列的功能,核心功能

通过日志可以分析很多内容,用户追踪等

  • Kfaka特点

高吞吐量:可以处理TB级别数据

消息持久化:把数据永久保存到类似硬盘的某一介质。硬盘空间大,价格低。误解,读取硬盘速率高与低取决于对硬盘使用,对硬盘的顺序读取效率甚至高于对内存的随机读取,Kafka利用这一点保证能处理海量数据

高可靠性:分布式的服务,可以做集群部署,有容错能力

高扩展性:集群服务器不够用了简单的加一个服务器就可以

  • Kafka术语

Broker:Kafka的服务器,集群中每一台服务器成为一个Broker

Zookeeper:管理集群软件,Kafka内置了Zookeeper

Topic:消息队列实现的方式两种,一种点对点,如上面的BlockingQueue,生产者把消息放到一个队列里,消费者就从这里面取值,消费者可能有多个,如果A消费者取到了这个数据这数据就出队了,每个数据只被一个消费者消费;还有一种方式发布订阅方式,生产者把消息队列放到某一个位置,消息可以被多个消费者读到。生产者把消息发布到的位置(空间)就叫Topic

Partition:分区,对主题位置的分区,增强了并发能力

Offsrt:消息在分区内存放的索引

Leader Replica:主副本,从分区读数据时,主副本做响应

Follower Replica:从副本只是备份,不负责响应

  • 开启kafka环境:

在此文件夹中打开终端,先开启的是zookeeper服务:

bin/zookeeper-server-start.sh config/zookeeper.properties

然后再打开一个终端,这次开启的是kafka服务:

bin/kafka-server-start.sh config/server.properties

如果两条命令都没有报错,那么就可以认定kafka的运行环境启动成功了

3. Spring整合kafka

  1. 引入以来

  1. application.properties配置

# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092 服务器端口
spring.kafka.consumer.group-id=community-consumer-group 消费者分组id
spring.kafka.consumer.enable-auto-commit=true 是否自动提交消费者的偏移量
spring.kafka.consumer.auto-commit-interval=3000 自动提交频率
  1. 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test", "你好");
        kafkaProducer.sendMessage("test", "在吗");

        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic, content);
    }

}

@Component
class KafkaConsumer {

    @KafkaListener(topics = {"test"})
    public void handleMessage(ConsumerRecord record) {
        System.out.println(record.value());
    }


}

4.发布系统通知

在评论点赞关注以后,就不用管他,扔进队列,并发异步。

解决方式:基于事件驱动的方式

  1. 开发事件实体Event

ps:创建get,set方法时

set方法改一下,有返回值为Event,可以连续.set进行处理,比全参构造器更灵活。

setData改一下只传key和value,调用更方便

2.开发事件生产者

建立新包 event,包下实现 EventProducer

@Component
public class EventProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 处理事件
    public void fireEvent(Event event) {
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }
}

3.开发事件消费者

@Component
public class EventConsumer implements CommunityConstant {
    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

    @Autowired
    private EventProducer eventProducer;

    @Autowired
    private MessageService messageService;

    //处理 消息事件
    @KafkaListener(topics = {TOPIC_COMMENT,TOPIC_FOLLOW,TOPIC_LIKE})
    private void handleCommentMessage(ConsumerRecord record){
        //判空 并记录日志
        if(record == null || record.value() == null){
            logger.error("消息的内容为空!");
            return;
        }
        //将(json)event 回复成字符串
        Event event = JSONObject.parseObject(record.value().toString(), Event.class);
        //再判断event是否为空
        if(event == null){
            logger.error("消息格式错误!");
            return;
        }
        //发送站内通知
        //构建 message 实体
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        //填充 message.content   将 event.data(map) 填入
        Map<String,Object> content = new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
        if(!event.getData().isEmpty() ){ //event里的 map 有内容
            for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }
        //将content 转成json格式
        message.setContent(JSONObject.toJSONString(content));
        //存入message
        messageService.addMessage(message);
        //!!!!!!!!!!!!!!! message里的 to_id 存错了
        System.out.println(message);
    }
}

4.修改对应的Controller

CommentController like follow

//增加评论
    @RequestMapping(path = "/add/{discussPostId}",method = RequestMethod.POST)
    public String addComment(@PathVariable("discussPostId") int discussPostId, Comment comment){
        comment.setUserId(hostHolder.getUser().getId());
        comment.setStatus(0);
        comment.setCreateTime(new Date());
        commentService.addComment(comment);

        //出发 评论事件 kafka发消息
        //构造事件 event 实体

        Event event = new Event()
                .setTopic(TOPIC_COMMENT)
                .setUserId(hostHolder.getUser().getId())
                .setEntityType(comment.getEntityType())
                .setEntityId(comment.getEntityId())
                .setData("postId",discussPostId);

        //设置 跳转链接时 检查跳转类型 是 帖子 评论 还是关注  即 找到target
        if(comment.getEntityType() == ENTITY_TYPE_POST){ // 帖子类型
            DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        } else if (comment.getEntityType() == ENTITY_TYPE_COMMENT) { //评论类型
            Comment target = commentService.findCommentById(comment.getEntityId());
            event.setEntityUserId(target.getUserId());
        }
        //触发
        eventProducer.fireEvent(event);

        return "redirect:/discuss/detail/"+discussPostId;
    }

5.显示系统通知

  1. 通知列表

MessageController

获取当前用户

查询评论类通知

  1. 把查到的message放入map

  1. 转义字符反转HtmlUtils.htmlUnescape

查询点赞类通知(方法类似同上)

查询关注类通知(方法类似同上)

查询未读消息数量

@RequestMapping(path = "/notice/list", method = RequestMethod.GET)
public String getNoticeList(Model model) {
    User user = hostHolder.getUser();

    // 查询评论类通知
    Message message = messageService.findLatestNotice(user.getId(), TOPIC_COMMENT);
    Map<String, Object> messageVO = new HashMap<>();
    if (message != null) {
        messageVO.put("message", message);

        String content = HtmlUtils.htmlUnescape(message.getContent());
        Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

        messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
        messageVO.put("entityType", data.get("entityType"));
        messageVO.put("entityId", data.get("entityId"));
        messageVO.put("postId", data.get("postId"));

        int count = messageService.findNoticeCount(user.getId(), TOPIC_COMMENT);
        messageVO.put("count", count);

        int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_COMMENT);
        messageVO.put("unread", unread);
    }
    model.addAttribute("commentNotice", messageVO);

    // 查询点赞类通知
    message = messageService.findLatestNotice(user.getId(), TOPIC_LIKE);
    messageVO = new HashMap<>();
    if (message != null) {
        messageVO.put("message", message);

        String content = HtmlUtils.htmlUnescape(message.getContent());
        Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

        messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
        messageVO.put("entityType", data.get("entityType"));
        messageVO.put("entityId", data.get("entityId"));
        messageVO.put("postId", data.get("postId"));

        int count = messageService.findNoticeCount(user.getId(), TOPIC_LIKE);
        messageVO.put("count", count);

        int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_LIKE);
        messageVO.put("unread", unread);
    }
    model.addAttribute("likeNotice", messageVO);

    // 查询关注类通知
    message = messageService.findLatestNotice(user.getId(), TOPIC_FOLLOW);
    messageVO = new HashMap<>();
    if (message != null) {
        messageVO.put("message", message);

        String content = HtmlUtils.htmlUnescape(message.getContent());
        Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);

        messageVO.put("user", userService.findUserById((Integer) data.get("userId")));
        messageVO.put("entityType", data.get("entityType"));
        messageVO.put("entityId", data.get("entityId"));

        int count = messageService.findNoticeCount(user.getId(), TOPIC_FOLLOW);
        messageVO.put("count", count);

        int unread = messageService.findNoticeUnreadCount(user.getId(), TOPIC_FOLLOW);
        messageVO.put("unread", unread);
    }
    model.addAttribute("followNotice", messageVO);

    // 查询未读消息数量
    int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
    model.addAttribute("letterUnreadCount", letterUnreadCount);
    int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
    model.addAttribute("noticeUnreadCount", noticeUnreadCount);

    return "/site/notice";
}

2.通知详情

MessageConctroller

获得user信息

分页信息

存入map

设置已读

@RequestMapping(path = "/notice/detail/{topic}", method = RequestMethod.GET)
public String getNoticeDetail(@PathVariable("topic") String topic, Page page, Model model) {
    User user = hostHolder.getUser();

    page.setLimit(5);
    page.setPath("/notice/detail/" + topic);
    page.setRows(messageService.findNoticeCount(user.getId(), topic));

    List<Message> noticeList = messageService.findNotices(user.getId(), topic, page.getOffset(), page.getLimit());
    List<Map<String, Object>> noticeVoList = new ArrayList<>();
    if (noticeList != null) {
        for (Message notice : noticeList) {
            Map<String, Object> map = new HashMap<>();
            // 通知
            map.put("notice", notice);
            // 内容
            String content = HtmlUtils.htmlUnescape(notice.getContent());
            Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
            map.put("user", userService.findUserById((Integer) data.get("userId")));
            map.put("entityType", data.get("entityType"));
            map.put("entityId", data.get("entityId"));
            map.put("postId", data.get("postId"));
            // 通知作者
            map.put("fromUser", userService.findUserById(notice.getFromId()));

            noticeVoList.add(map);
        }
    }
    model.addAttribute("notices", noticeVoList);

    // 设置已读
    List<Integer> ids = getLetterIds(noticeList);
    if (!ids.isEmpty()) {
        messageService.readMessage(ids);
    }

    return "/site/notice-detail";
}

3.未读消息

拦截器处理 MessageInterceptor

@Component
public class MessageInterceptor implements HandlerInterceptor {

    @Autowired
    private HostHolder hostHolder;

    @Autowired
    private MessageService messageService;

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {
        User user = hostHolder.getUser();
        if (user != null && modelAndView != null) {
            int letterUnreadCount = messageService.findLetterUnreadCount(user.getId(), null);
            int noticeUnreadCount = messageService.findNoticeUnreadCount(user.getId(), null);
            modelAndView.addObject("allUnreadCount", letterUnreadCount + noticeUnreadCount);
        }
    }
}

WebMvcConfig

@Autowired
private MessageInterceptor messageInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) {
    registry.addInterceptor(messageInterceptor)
            .excludePathPatterns("/**/*.css", "/**/*.js", "/**/*.png", "/**/*.jpg", "/**/*.jpeg");
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/372018.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu22.04 安装 mysql8,redis7,MongoDB6

服务器的准备 我的服务器是在腾讯云租的&#xff0c;所以服务器的apt源都是默认配好的&#xff0c;没配好的自行网上查找apt源配置。本文同样适用于Ubuntu 22&#xff0c;20。Ubuntu18亦可参考。云服务器一般防火墙未开放端口访问&#xff0c;请自行配置&#xff0c;否则后续远…

【unity游戏制作-mango的冒险】-4.场景二的镜头和法球特效跟随

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! 本文由 秩沅 原创 收录于专栏&#xff1a;unity游戏制作 ⭐mango的冒险场景二——镜头和法球特效跟随⭐ 文章目录⭐mango的冒险场景二——镜…

【2023蓝桥杯】枚举专项题笔记

【枚举】卡片小蓝有很多数字卡片&#xff0c;每张卡片上都是数字 0 到 9。小蓝准备用这些卡片来拼一些数&#xff0c;他想从 1开始拼出正整数&#xff0c;每拼一个&#xff0c;就保存起来&#xff0c;卡片就不能用来拼其它数了。小蓝想知道自己能从 1拼到多少。例如&#xff0c…

JVM垃圾回收器概述

Serial串行回收 Serial收集器是最基本、历史最悠久的垃圾收集器了。JDK1.3之前回收新生代唯一的选择。 Serial收集器作为HotSpot中client模式下的默认新生代垃圾收集器。 Serial收集器采用复制算法、串行回收和"stop-the-World"机制的方式执行内存回收。 除了年轻…

【Redis】概述环境搭建(一)

&#x1f697;Redis学习起始站~ &#x1f6a9;本文已收录至专栏&#xff1a;数据库学习之旅 &#x1f44d;希望您能有所收获 一.初识Redis (1) 概述 Redis诞生于2009年全称是Remote Dictionary Server 远程词典服务器&#xff0c;是一个基于内存的键值型NoSQL数据库。这里有两…

数字IC笔试题---千题解,量大管饱,图文并茂

前言&#xff1a;出笔试题汇总&#xff0c;是为了总结秋招可能遇到的问题&#xff0c;做题不是目的&#xff0c;在做题的过程中发现自己的漏洞&#xff0c;巩固基础才是目的。所有题目结果和解释由笔者给出&#xff0c;答案主观性较强&#xff0c;若有错误欢迎评论区指出&#…

es8集群模式部署

准备3台机器 192.168.1.41 192.168.1.42 192.168.1.43因为es集群有几个节点&#xff0c;所以我对应node1&#xff0c;node2&#xff0c;node3.这几个名称并不是主机名&#xff0c;而是es节点名称 2. 开始部署&#xff0c;基础配置 (三台都做) systemctl stop firewalld syste…

【数据库】SQL语句

第三章 SQL SQL(structured Query Language) SQL概述 SQL特点 综合统一。高度非过程化。面向集合的操作方式。以同一种语法结构提供多种使用方式。语言简洁易学易用。 主要版本 SQL-89SQL-92 ,SQL2SQL-99 ,SQL3 数据库结构 SQL语言是集DDL、DML和DCL于一体的数据库语言…

网安入门,这篇文章足够了(内含海量资料)

随着新一轮科技和产业变革加速演进&#xff0c;人工智能、物联网、大数据、5G等新兴技术在成为经济社会发展的助推器的同时&#xff0c;也让网络空间变得更加复杂。全球范围内网络安全事件日益增加&#xff0c;网络安全的重要性日渐凸显。 “我国网络空间安全人才年培养规模在…

STM32——窗口看门狗

什么是窗口看门狗&#xff1f; 窗口看门狗用于监测单片机程序运行时效是否精准&#xff0c;主要检测软件异常&#xff0c;一般用于需要精准检测 程序运行时间的场合。 窗口看门狗的本质是一个能产生系统复位信号和提前唤醒中断的6位计数器。 产生复位条件&#xff1a; 当递减…

CLion开发图书管理系统项目 (c++ + MySQL实现)

项目仓库 &#xff1a;传送门 需求分析 当下市场日益激烈的竞争迫使图书企业采用一种新的管理方式来加快图书管理操作&#xff0c;而计算机技术的发展为图书管理注入了新的生机。通过调查市场&#xff0c;一款合格的图书管理系统必须具备以下三个特点&#xff1a; 能够对图书…

注解原理剖析与实战

一、注解及其原理 1.注解的基本概念 注解&#xff0c;可以看作是对 一个类/方法的一个扩展的模版&#xff0c;每个类/方法按照注解类中的规则&#xff0c;来为类/方法注解不同的参数&#xff0c;在用到的地方可以得到不同的类/方法中注解的各种参数与值。 从JDK5开始&#xff…

【必学】最流行的云原生监控解决方案:Prometheus+Grafana

文章目录一、Prometheus和Grafana简介1.1、Prometheus是最受欢迎的云原生监控方案之一1.2、Grafana是最流行的开源可视化平台二、Prometheus的优势三、Prometheus架构原理四、Prometheus和Grafana安装部署一、Prometheus和Grafana简介 1.1、Prometheus是最受欢迎的云原生监控方…

如何进行单元测试

前言单元测试是指对软件中最小可测单元进行检查和验证&#xff1b;c语言中单元指一个函数&#xff0c;java中指一个类。图形化软件中可以指一个窗口或者一个菜单。总的来说&#xff0c;单元就是认为规定最小的被测试模块。1.1单元测试对我们开发程序有什么好处首先是一个前端单…

react: input 输入框 中文onChange事件异常问题 对input输入进行防抖处理

当我们使用Input时&#xff0c;我们可能会遇到一个问题&#xff0c;比如需要对用户输入的内容进行搜索时&#xff0c;当用户处于中文输入时&#xff0c;明明没有对内容进行确认&#xff0c;为什么会触发了onChange事件呢&#xff1f;比如以下场景&#xff0c;中文一边输入另外一…

机器学习知识总结 —— 20.使用朴素贝叶斯进行数据分类

文章目录准备基础数据计算先验概率计算条件概率预测分布验证结果作为一种监督学习分类方法&#xff0c;在上一章中我们已经介绍过它的数理原理。现在我们开始来实现一个简单的朴素贝叶斯分类的算法&#xff0c;这样我们能更好的理解它是怎么运作的。 准备基础数据 首先还是有…

加密流量专栏总览

文章目录加密流量专栏1. 原理篇2. 模型篇3. 文章分类总结3.1 研究方向3.2 特征提取3.3 机器学习模型改进3.4 深度学习模型改进3.5 其他模型改进3.7 实时检测3.8 概念漂移检索论文的方法加密流量专栏 1. 原理篇 原理&#xff1a; 会话、流、数据包之间的关系。 流&#xff1a;…

【离线数仓-4-数据仓库设计-分层规划构建流程】

离线数仓-4-数据仓库设计-分层规划&构建流程离线数仓-4-数据仓库设计-分层规划&构建流程1.数据仓库分层规划2.数据仓库构建流程1.数据调研1.业务调研2.需求分析3.总结2.明确数据域3.构建业务总线矩阵&维度模型设计4.明确统计指标1.指标体系相关概念1.原子指标2.派生…

【渝偲医药】DSPE-PEG-RGD;磷脂聚乙二醇多肽试剂级简介

DSPE-PEG-RGD、 二硬脂酰基磷脂酰乙醇胺-聚乙二醇-多肽、磷脂PEG多肽 英文名称: 1,2-Distearoyl-sn-Glycero-3-Phosphoethanolamine-PEG- RGD 溶剂:可溶解在水中和大多数有机溶剂中 外观:白色粉末 用途:用于链接带有链霉亲和素或其他的基团的分子 分子量(PEG ):2000、3400、…

那些开发过程中需要遵守的开发规范

入职公司三天&#xff0c;没干啥其他活&#xff0c;基本在配置本地环境和阅读相关文档。技术方面公司基本用的是主流的技术体系&#xff0c;入职后需要先阅读阿里的开发规范和其他的一些产研文档。今天整理一些平时需要关注的阿里规约和数据库开发规范&#xff0c;方便今后在开…