社区系统项目复盘-5

news2024/10/7 18:26:10

文章目录

      • Kafka消息队列实现系统通知功能
      • 什么是Kafka?
      • Spring是怎么整合Kafka的?
      • 发送系统通知
      • 显示系统通知

Kafka消息队列实现系统通知功能

  • 阻塞队列

    可以用阻塞队列来实现消息队列,阻塞队列是一个接口:BlockingQueue,可以用来解决线程通信问题,依靠两个方法,这两个方法都是阻塞的,put方法往队列里存数据,take从队列里拿数据。

    https://res.craft.do/user/full/fd148a50-4a5b-9a85-bec3-e1645571e2c7/doc/6C708C00-2D5F-456E-9347-E93ADC36F3FB/296D14C4-3C6D-45C5-906D-E3DCEC0BDF9F_2/vY6efFTgZHTVJayyxExIASo6BbgkZavx4u2WWRLPLOoz/Image.png

生产者消费者模式:

  • 生产者:产生数据的线程
  • 消费者:使用数据的线程

阻塞队列BlockingQueue接口的实现类:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue、DelayQueue等。

代码示例:

public class BlockingQueueTests {

    public static void main(String[] args) {
        BlockingQueue queue = new ArrayBlockingQueue(10);
        new Thread(new Producer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
        new Thread(new Consumer(queue)).start();
    }
}

// 生产者线程
class Producer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) {
                Thread.sleep(20);
                queue.put(i);
                System.out.println(Thread.currentThread().getName() + "生产:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

// 消费者线程
class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(new Random().nextInt(1000));
                queue.take();
                System.out.println(Thread.currentThread().getName() + "消费:" + queue.size());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

什么是Kafka?

  • Kafka 简介
    • Kafaka是一个分布式的流处理平台
    • 应用:消息系统、日志收集、用户行为追踪、流式处理
  • Kafaka 特点
    • 高吞吐量、消息持久化、高可靠性、高扩展性
  • Kafka术语
    • Broker:Kafka的服务器,kafka集群中的每一台服务器称为一个broker
    • Zookeeper:同来管理其他的集群,kafka中内置zookeeper
    • Topic、Partition、Offset:kafka采用发布订阅模式,生产者把消息发布到的那个地方就叫topic,partition是分区的意思,一个主题分为多个分区,offset表示消息在分区内存放的索引。
    • Leader Replica、Follow Replica:Kafka通过副本的形式对数据存储多份,主副本可以处理请求,从副本只是备份,不负责做响应,当主副本挂掉时,集群会从众多从副本中选择一个作为leader。
  • 下载安装kafka
    • 从Kafka官网下载安装包到本地,解压缩就可以
    • 修改配置文件 config/zookeeeper.properties,dataDir
    • 修改配置文件 config/server.properties,log.dirs
    • 启动zookeeper:
      • cd /usr/local/kafka_2.13-3.2.1
      • sh bin/zookeeper-server-start.sh config/zookeeper.properties
    • 启动kafka:
      • cd /usr/local/kafka_2.13-3.2.1
      • sh bin/kafka-server-start.sh config/server.properties

Spring是怎么整合Kafka的?

  • 第一步:引入依赖 spring-kafka

    <dependency>
    	<groupId>org.springframework.kafka</groupId>
    	<artifactId>spring-kafka</artifactId>
    </dependency>
    
  • 第二步:配置Kafka

    • 配置server、consumer
    # KafkaProperties
    spring.kafka.bootstrap-servers=localhost:9092
    spring.kafka.consumer.group-id=community-consumer-group
    spring.kafka.consumer.enable-auto-commit=true
    spring.kafka.consumer.auto-commit-interval=3000
    
  • 第三步:访问Kafka

    • 生产者:kafkaTemplate.send(topic,data);
    • 消费者:
      • @KafkaListener(topics={“test”})
      • public void handleMessage(ConsumerRecord record) { }

    代码示例:

    public class KafkaTests {
        @Autowired
        private KafkaProducer kafkaProducer;
        @Test
        public void testKafka() {
            kafkaProducer.sendMessage("test", "你好");
            kafkaProducer.sendMessage("test", "在吗");
            try {
                Thread.sleep(1000 * 10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    @Component
    class KafkaProducer {
        @Autowired
        private KafkaTemplate kafkaTemplate;
        public void sendMessage(String topic, String content) {
            kafkaTemplate.send(topic, content);
        }
    }
    
    @Component
    class KafkaConsumer {
        @KafkaListener(topics = {"test"})
        public void handleMessage(ConsumerRecord record) {
            System.out.println(record.value());
        }
    }
    

发送系统通知

功能描述:当用户收到评论、点赞和关注后,会收到来自系统的通知信息。

  • 实现:从技术上来讲,使用kafka消息队列的方式,根据不同的主题处理不同的任务。从业务的角度来讲,采用事件驱动的方式,评论、点赞、关注分别为事件。开发时,在Kafka框架的基础上,基于事件对代码逻辑进一步的封装。

    https://res.craft.do/user/full/fd148a50-4a5b-9a85-bec3-e1645571e2c7/doc/6C708C00-2D5F-456E-9347-E93ADC36F3FB/B54638E1-E998-4C07-A7CC-D87D5D3AF893_2/a7thjomjJ16dZx8NUwC6dYgi2sNZDpDw1HdYgbgWmOMz/Image.png

  • 触发事件

    • 评论后,发布通知
    • 点赞后,发布通知
    • 关注后,发布通知
  • 处理事件

    • 封装事件对象
    • 开发事件的生产者
    • 开发事件的消费者

在这里插入图片描述

具体实现:

1.封装事件对象 Event,对类中的set函数都进行改写,使得构造对象时可以连续set

private String topic;
private int userId;
private int entityType;
private int entityId;
private int entityUserId;
private Map<String,Object> data = new HashMap<>();
// 思考重写set方法的用处,全部都做这样的处理,以topic为例。
// 好处:可以连续set
public Event setTopic(String topic) {
    this.topic = topic;
    return this;
}

2.开发事件的生产者EventProducer,将事件发布到指定的主题,主要有评论、点赞、关注3个topic

@Component
public class EventProducer {
    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 处理事件
    public void fireEvent(Event event){
        // 将事件发布到指定的主题
        kafkaTemplate.send(event.getTopic(), JSONObject.toJSONString(event));
    }
}

3.开发事件的消费者EventConsumer

@Component
public class EventConsumer implements CommunityConstant {
    private static final Logger logger = LoggerFactory.getLogger(EventConsumer.class);

	@Autowired
    private MessageService messageService;

	@KafkaListener(topics = {TOPIC_COMMENT,TOPIC_LIKE,TOPIC_FOLLOW})
    public void handleCommentMessage(ConsumerRecord record){
        if(record == null || record.value() == null){
            logger.error("消息的内容为空!");
            return;
        }

        Event event = JSONObject.parseObject(record.value().toString(),Event.class);
        if(event == null){
            logger.error("消息格式错误!");
            return;
        }

        // 发送站内通知
        Message message = new Message();
        message.setFromId(SYSTEM_USER_ID);
        message.setToId(event.getEntityUserId());
        message.setConversationId(event.getTopic());
        message.setCreateTime(new Date());

        Map<String,Object> content = new HashMap<>();
        content.put("userId",event.getUserId());
        content.put("entityType",event.getEntityType());
        content.put("entityId",event.getEntityId());
        if(!event.getData().isEmpty()){
            for(Map.Entry<String,Object> entry:event.getData().entrySet()){
                content.put(entry.getKey(),entry.getValue());
            }
        }
        message.setContent(JSONObject.toJSONString(content));

        messageService.addMessage(message);
    }
}

4.触发事件,有3个地方会触发该事件

  • 发布评论后,触发事件,系统向被评论用户发送通知
// 触发评论事件
Event event = new Event()
        .setTopic(TOPIC_COMMENT)
        .setUserId(hostHolder.getUser().getId())
        .setEntityType(comment.getEntityType())
        .setEntityId(comment.getEntityId())
        .setData("postId",discussPostId);
if(comment.getEntityType() == ENTITY_TYPE_POST){
    DiscussPost target = discussPostService.findDiscussPostById(comment.getEntityId());
    event.setEntityUserId(target.getUserId());
}else if(comment.getEntityType() == ENTITY_TYPE_COMMENT){
    Comment target = commentService.findCommentById(comment.getEntityId());
    event.setEntityUserId(target.getUserId());
}
eventProducer.fireEvent(event);
  • 点赞后,触发事件,系统向被赞用户发送通知。⚠️ 需要判断点赞状态。
// 触发点赞事件
if(likeStatus == 1){
    Event event = new Event()
            .setTopic(TOPIC_LIKE)
            .setUserId(hostHolder.getUser().getId())
            .setEntityType(entityType)
            .setEntityId(entityId)
            .setEntityUserId(entityUserId)
            .setData("postId",postId);
    eventProducer.fireEvent(event);
}
  • 关注后,触发事件,系统向被关注用户发送通知
// 触发关注事件
Event event = new Event()
        .setTopic(TOPIC_FOLLOW)
        .setUserId(hostHolder.getUser().getId())
        .setEntityType(entityType)
        .setEntityId(entityId)
        .setEntityUserId(entityId);
eventProducer.fireEvent(event);

**小结:**首先需要创建事件生产者类,将事件发布到指定主题,主题包括评论、点赞、关注这3种,然后创建事件消费者类,定义消费该类事件的方法,使用@KafkaListener注解,属性topics中是消费的时间类型,然后在相应的controller类中加入触发事件的代码块,发布评论时、点赞后、关注后都需要触发事件。

显示系统通知

功能描述:

  • 通知列表:显示评论、点赞、关注三种类型的通知
  • 通知详情:分页显示某一类主题所包含的通知
  • 未读消息:在页面头部显示所有的未读消息数量;使用拦截器实现,拦截器首先获取当前用户,如果用户不为null,说明已经登录了,那么就查询未读私信数量和未读通知数量,总数显示在页面上

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/45548.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

易云维医院后勤综合管理平台为医院智慧后勤的建设与发展做出贡献

近年来&#xff0c;随着国家医疗卫生改革进程的不断推进&#xff0c;越来越多的医院开始关注运营成本控制问题&#xff0c;医院后勤管理服务模式的创新和优化变得越来越重要。利用医院后勤综合管理平台将医院后勤管理信息化将极大地提高医院智慧后勤建设与发展。在这种形势下&a…

Mac下安装Hadoop

1、引言 如果想在Mac下安装Hadoop而且让Hadoop能正常运行&#xff0c;那安装之前需要先安装java&#xff0c;在Mac环境下安装Hadoop。 2、配置ssh环境 在Mac下如果想使用Hadoop&#xff0c;必须要配置ssh环境&#xff0c; 如果不执行这一步&#xff0c;后面启动hadoop时会出现…

Spring MVC应该怎么学?这份教程带你快速入门,深入剖析源码!

前言: 什么是MVC&#xff1f; MVC&#xff08;Model-View-Controller&#xff09;&#xff1a;它是一种软件架构设计模式&#xff0c;分为三个部分&#xff1a; Model&#xff08;模型&#xff09;&#xff1a;业务的数据模型&#xff1b; View&#xff08;视图&#xff09;&…

xss-labs/level5

输入 <script>alert(xss)</script> 查看回显 如下所示 能够发现script被恶意替换为scr_ipt 查看源代码 第一个输出点被转义了 所以没有利用价值了 第二个输出点如同刚才所言被进行了关键字的恶意替换操作 那没办法 我们只能继续尝试一下在标签内部构造一个新…

91183-98-1,UDP-N-acetylglucosamine,5′-二磷酸尿嘧啶核苷-N-乙酰半乳糖胺二钠盐

5′-二磷酸尿嘧啶核苷-N-乙酰半乳糖胺二钠盐 英文名称&#xff1a;UDPAG&#xff1b;UDP-GlcNAc&#xff1b;UDP-N-acetylglucosamine&#xff1b;Uridine 5′-diphospho-N-acetylglucosamine sodium salt 其他名称&#xff1a;尿苷-5′-二磷酸-N-乙酰基-葡糖胺钠盐 CAS号&am…

Linux进阶-进程

目录 终端查询进程参数 进程状态 进程状态转换 子进程被Linux内核调入CPU执行的过程 子进程进入睡眠状态 子进程结束 进程控制 pid_t fork(void)&#xff1a;创建子进程 exec()函数族&#xff1a;运行一个可执行文件。 void exit(int status)&#xff1a;结束进程 w…

Illuminate/22圆桌回顾:Web3互操作性的未来现已到来

Illuminate/22 由Moonbeam主办的Illuminate/22于2022年11月10-11日成功举办。为期2天的线上会议聚集了60演讲嘉宾超过40个话题讨论。通过本次会议&#xff0c;来自行业领先的项目及负责人分享了通过互操作性和跨互连合约实现的最新进展。 本次以“Web3互操作性的未来现已到来”…

AI是如何影响全球的安防监控产业

全球AI安防市场现状 人工智能安防监控技术正在以更快的速度传播到更广泛的国家。全球176个国家中&#xff0c;至少有75个国家正在积极将AI技术用于监视目的。其中包括&#xff1a;智慧城市/安全城市平台&#xff08;56个国家&#xff09;&#xff0c;面部识别系统&#xff08;6…

【torch】如何把给定mask按比例选取再次划分mask?

背景 在以torch为基础的很多框架下有一些集成的数据集&#xff0c;数据集往往自带已经划分好的mask。但是如何能够把框架给出的mask再次划分&#xff1f;比如按比例划分出来80%的train mask。 解决 新生成一个每个元素都是0-1分布的与mask2的true位置相同的矩阵&#xff0c;…

xss-labs/level4

首先还是输入我们最熟悉的payload <script>alert(xss)</script> 查看界面回显 发现表单中的尖括号都消失了 说明后台服务器将尖括号删除了 再去查看源代码 通过源代码我们可以知道存在两个有意义的输出点 第一个输出点被转义了 没办法利用了script标签去执行js代…

亲戚小孩月薪17k,而我只有4k+,好慌......

我们总是在悲观与乐观中反复折磨自己&#xff0c;感觉自己一事无成。总是眼高手低&#xff0c;总以为大运会砸到自己&#xff0c;遇到挫折就会感到很沮丧。 大学四年没考到英语六级证书&#xff0c;小学教资考了两次。现在想要考研&#xff0c;但总是觉得来不及&#xff0c;或…

SpringBoot概念、创建和运行

文章目录什么是Spring Boot &#xff1f;为什么要学Spring Boot &#xff1f;Spring Boot 优点Spring Boot 项目创建项目目录介绍和运行约定大于配置什么是Spring Boot &#xff1f;为什么要学Spring Boot &#xff1f; Spring 的诞生是为了简化 Java 程序的开发的&#xff0c…

外汇天眼:外汇市场为何在周末休市?为什么周末行情有波动?

虽然从理论上而言&#xff0c;货币市场从不休市&#xff0c;但您很难见到有人在周日交易。新手甚至会认为&#xff0c;交易活动的停止是因为外汇经纪商周末休息。但如下文所述&#xff0c;实际情况并非如此。 外汇市场中的主要参与者 货币交易主要是为了促进贸易和旅游业。而且…

助推专精特新企业数字化的低代码

近两年&#xff0c;“专精特新”成为行业的热门词。根据工信部的定义&#xff0c;“专精特新”中小企业&#xff0c;是指具有专业化、精细化、特色化、新颖化等特点的企业。它们多专注于产业链上某个环节&#xff0c;主营业务聚焦&#xff0c;同时具有较强大的创新能力、创新活…

PG::Potato

nmap -Pn -p- -T4 --min-rate1000 192.168.171.101 nmap -Pn -p 22,80,2112 -sCV 192.168.171.101 打开80端口未发现可利用的服务 尝试对路径爆破&#xff0c;同时FTP可匿名访问&#xff0c;查看FTP内是否有可用信息 dirb http://192.168.171.101 在FTP中得到了网站源码的…

跑通Intellij Platform Plugin项目

目录需求描述尝试方向1. 用2022.1版本运行2. 用2019.2版本运行结论需求描述 在研究DDD逆向建模支持工具的过程中&#xff0c;需要复现期刊作者的cargo项目&#xff0c;实现C2MD&#xff0c;即代码转UML的功能。虽然按照文章的要求安装DddTool插件&#xff0c;但是不能成功使用…

第三次上机作业 大连理工大学

8.7 某种电子元件的寿命x(单位:小时)服从正态分布。现测得16只元件的寿命如下 是否有理由认为元件的平均寿命显著地大于225小时(a=0.05)? data = read.csv("习题8.7.csv") t.test(data$寿命,mu = 225,alternative = "greater")p值小于0.05,不能认为认…

Python CNN卷积神经网络实例讲解,CNN实战,CNN代码实例,超实用

一、CNN简介 1. 神经网络基础 输入层&#xff08;Input layer&#xff09;&#xff0c;众多神经元&#xff08;Neuron&#xff09;接受大量非线形输入讯息。输入的讯息称为输入向量。 输出层&#xff08;Output layer&#xff09;&#xff0c;讯息在神经元链接中传输、分析、权…

3.Linux传统性能检测工具——vmstat

命令&#xff1a;vmstat <duration> 参数duration&#xff1a;统计间隔如果不加参数则输出自启动以来的统计结果&#xff08;注意&#xff1a;输出的第一行总是为该结果&#xff08;除memory counter相关数据以外&#xff09;&#xff09; 输出示例&#xff1a; 虽然vmst…

nVisual部署之nginx配置说明

Nginx 是一个高性能的HTTP和反向代理web服务器&#xff0c;因此nvisual在部署前端包时便采用了它作为服务器&#xff0c;版本使用1.14.1以上。在默认的配置下&#xff0c;还需要向nginx各模块添加配置才能达到生产需要。 接下来&#xff0c;从http模块开始&#xff0c;再到ser…