消息队列(kafka简单使用)

news2025/1/2 0:12:44

Dubbo远程调用的性能问题

Dubbo调用普遍存在于我们的微服务项目中

这些Dubbo调用全部是同步的操作

这里的"同步"指:消费者A调用生产者B之后,A的线程会进入阻塞状态,等待生产者B运行结束返回之后,A才能运行之后的代码

Dubbo消费者发送调用后进入阻塞状态,这个状态表示该线程仍占用内存资源,但是什么动作都不做

如果生产者运行耗时较久,消费者就一直等待,如果消费者利用这个时间,那么可以处理更多请求,业务整体效率会提升

实际情况下,Dubbo有些必要的返回值必须等待,但是不必要等待的服务返回值,我们可以不等待去做别的事情

这种情况下我们就要使用消息队列

什么是消息队列

消息队列(Message Queue)简称MQ,也称:"消息中间件"

消息队列是采用"异步(两个微服务项目并不需要同时完成请求)"的方式来传递数据完成业务操作流程的业务处理方式

消息队列的特征

常见面试题:消息队列的特征(作用)
  • 利用异步的特性,提高服务器的运行效率,减少因为远程调用出现的线程等待\阻塞时间

  • 削峰填谷:在并发峰值超过当前系统处理能力时,我们将没处理的信息保存在消息队列中,在后面出现的较闲的时间中去处理,直到所有数据依次处理完成,能够防止在并发峰值时短时间大量请求而导致的系统不稳定

  • 消息队列的延时:因为是异步执行,请求的发起者并不知道消息何时能处理完,如果业务不能接受这种延迟,就不要使用消息队列

常见消息队列软件

  • Kafka:性能好\功能弱:适合大数据量,高并发的情况,大数据领域使用较多

  • RabbitMQ:功能强\性能一般:适合发送业务需求复杂的消息队列,java业务中使用较多

  • RocketMQ:阿里的

  • ActiveMQ:前几年流行的,老项目可能用到

  • .....

消息队列的事务处理

当接收消息队列中信息的模块运行发生异常时,怎么完成事务的回滚?

当消息队列中(stock)发生异常时,在异常处理的代码中,我们可以向消息的发送者(order)发送消息,然后通知发送者(order)处理,消息的发送者(order)接收到消息后,一般要手写代码回滚,如果回滚代码过程中再发生异常,就又要思考回滚方式,如果一直用消息队列传递消息的话,可能发生异常的情况是无止境的

所以我们在处理消息队列异常时,经常会设置一个"死信队列",将无法处理的异常信息发送到这个队列中

死信队列没有任何处理者,通常情况下会有专人周期性的处理死信队列的消息

Kafka

什么是Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka最初是由LinkedIn开发,并随后于2011年初开源。

kafka软件结构

Kafka是一个结构相对简单的消息队列(MQ)软件

kafka软件结构图

Kafka Cluster(Kafka集群)

Producer:消息的发送方,也就是消息的来源,Kafka中的生产者

order就是消息的发送方,在Dubbo中order是消费者,这个身份变化了

Consumer:消息的接收方,也是消息的目标,Kafka中的消费者

stock就是消息的接收方,在Dubbo中stock是生产者,这个身份变化了

Topic:话题或主题的意思,消息的收发双方要依据同一个话题名称,才不会将信息错发给别人

Record:消息记录,就是生产者和消费者传递的信息内容,保存在指定的Topic中

Kafka的特征与优势

Kafka作为消息队列,它和其他同类产品相比,突出的特点就是性能强大

Kafka将消息队列中的信息保存在硬盘中

Kafka对硬盘的读取规则进行优化后,效率能够接近内存

硬盘的优化规则主要依靠"顺序读写,零拷贝,日志压缩等技术"

Kafka处理队列中数据的默认设置:

  • Kafka队列信息能够一直向硬盘中保存(理论上没有大小限制)

  • Kafka默认队列中的信息保存7天,可以配置这个时间,缩短这个时间可以减少Kafka的磁盘消耗

Kafka的安装和配置

必须将我们kafka软件的解压位置设置在一个根目录,文件夹名称尽量短(例如:kafka)

然后路径不要有空格和中文

我们要创建一个空目录用于保存Kafka运行过程中产生的数据

本次创建名称为data的空目录

下面进行Kafka启动前的配置

先到F:\kafka\config下配置有文件zookeeper.properties

找到dataDir属性修改如下

dataDir=F:/data

修改完毕之后要Ctrl+S进行保存,否则修改无效!!!!

注意F盘和data文件夹名称,匹配自己电脑的真实路径和文件夹名称

还要修改server.properties配置文件

log.dirs=F:/data

修改注意事项和上面相同

启动kafka

要想启动Kafka必须先启动Zookeeper

Zookeeper介绍

zoo:动物园

keeper:园长

可以引申为管理动物的人

Linux服务器中安装的各种软件,很多都是有动物形象的

如果这些软件在Linux中需要修改配置信息的话,就需要进入这个软件,去修改配置,每个软件都需要单独修改配置的话,工作量很大

我们使用Zookeeper之后,可以创建一个新的管理各种软件配置的文件管理系统

Linux系统中各个软件的配置文件集中到Zookeeper中

实现在Zookeeper中,可以修改服务器系统中的各个软件配置信息

长此以往,很多软件就删除了自己写配置文件的功能,而直接从Zookeeper中获取

Kafka就是需要将配置编写在Zookeeper中的软件之一

所以要先启动zookeeper才能启动kafka

Zookeeper启动

进入路径F:\kafka\bin\windows

输入cmd进入dos命令行

F:\kafka\bin\windows>zookeeper-server-start.bat ..\..\config\zookeeper.properties

kafka启动

总体方式一样,输入不同指令

F:\kafka\bin\windows>kafka-server-start.bat ..\..\config\server.properties

附录

Mac系统启动Kafka服务命令(参考):

# 进入Kafka文件夹cd Documents/kafka_2.13-2.4.1/bin/# 动Zookeeper服务./zookeeper-server-start.sh -daemon ../config/zookeeper.properties # 启动Kafka服务./kafka-server-start.sh -daemon ../config/server.properties 

Mac系统关闭Kafka服务命令(参考):

# 关闭Kafka服务./kafka-server-stop.sh # 启动Zookeeper服务./zookeeper-server-stop.sh

在启动kafka时有一个常见错误

wmic不是内部或外部命令

这样的提示,需要安装wmic命令,安装方式参考

https://zhidao.baidu.com/question/295061710.html

如果启动kafka无响应

在“环境变量”的“用户变量路径”中Path属性添加一行后

%SystemRoot%\System32\Wbem;%SystemRoot%\System32\;%SystemRoot%

Kafka使用演示

启动的zookeeper和kafka的窗口不要关闭

我们在csmall项目中编写一个kafka使用的演示

csmall-cart-webapi模块

添加依赖

<!--  SpringBoot整合Kafka的依赖  -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<!--   google提供的可以将java对象和json格式字符串转换的工具   -->
<dependency>
    <groupId>com.google.code.gson</groupId>
    <artifactId>gson</artifactId>
</dependency>

修改yml文件进行配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    # consumer.group-id是Spring-Kafka框架要求的必须配置的内容,不配置启动会报错
    # 作用是给话题分组,防止不同项目恰巧相同的话题名称混淆
    # 本质上,在当前项目发送给kafka消息时,会使用这个分组作为话题名称的前缀
    # 例如发送一个message的话题名称,实际上会发送的话题名称是csmall_message
    consumer:
      group-id: csmall

在SpringBoot启动类中添加启动Kafka的注解

@SpringBootApplication
@EnableDubbo
// 项目启动时启用对kafka的支持
@EnableKafka
// 我们为了测试kafka消息的收发效果
// 利用SpringBoot自带的定时任务工具,实现周期性向kafka发送消息的功能
// 明确我们SpringBoot自带定时任务和Kafka没有必然联系
@EnableScheduling
public class CsmallCartWebapiApplication {

    public static void main(String[] args) {
        SpringApplication.run(CsmallCartWebapiApplication.class, args);
    }

}

下面我们就可以实现周期性的向kafka发送消息并接收的操作了

编写消息的发送

cart-webapi包下创建kafka包

包中创建Producer类来发送消息

消息的发送

// SpringBoot启动时,将当前类对象实例化后保存到Spring容器,才能实现周期运行
@Component
public class Producer {
    // 配置Spring-Kafka框架提供的能够连接kafka操作的对象
    // 这个对象需要指定泛型<String,String>
    // KafkaTemplate<[话题类型],[消息类型]>
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    // 定义话题常量
    public static final String TOPIC_KEY="myCart";
    
    int i=1;
    // 发送消息的方法
    // 实现每隔10秒(10000毫秒)运行一次,需要添加下面的注解
    @Scheduled(fixedRate = 10000)
    public void sendMessage(){
        // 实例化一个要发送的对象
        Cart cart=new Cart();
        cart.setId(i++);
        cart.setCommodityCode("PC100");
        cart.setUserId("UU100");
        cart.setPrice(10+ RandomUtils.nextInt(90));
        cart.setCount(1+RandomUtils.nextInt(10));
        // 因为当前kafka连接只能发送字符串类型对象
        // 所以我们需要将上面的cart对象转换为json格式字符串
        // 例如 {"id":"1","userId":"UU100","price":"10",...}
        Gson gson=new Gson();
        // 使用gson转换cart对象称为json格式字符串
        String json=gson.toJson(cart);
        System.out.println("要发送的json格式字符串为:"+json);
        // 执行发送
        kafkaTemplate.send(TOPIC_KEY,json);
    }

}

消息的接收

下面开始接收

kafka包中创建一个叫Consumer的类来接收消息

接收消息的类可以是本模块的类,也可以是其它模块的类,编写的代码是完全一致
// 当前Consumer是用于接收kafka消息的类
// 要求将这个类也保存到Spring容器中,因为SpringKafka框架使用Spring容器中的对象
@Component
public class Consumer {

    // Spring-Kafka接收消息,使用了"监听机制"
    // 框架设计了一条线程,实时关注Kafka话题接收的情况
    // 我们可以指定一个话题名称(myCart),设置一旦这个话题中有消息,监听线程就通知下面方法运行
    @KafkaListener(topics = Producer.TOPIC_KEY)
    // 上面注解就实现了监听的机制
    // 当前Kafka的myCart话题出现消息时,会自动运行下面方法
    // 下面方法的参数和返回值是不能修改的(方法名随意)
    public void received(ConsumerRecord<String,String> record){
        // 方法的返回值必须是void ,参数必须是ConsumerRecord
        // ConsumerRecord<[话题类型],[消息类型]>
        // 从record对象中获取消息内容
        String json=record.value();
        // json可能是这样的字符串:{"id":"1","userId":"UU100","price":"10",...}
        // 最终要活的Cart对象,需要将json字符串转换为java对象
        // 仍然使用Gson工具类实现
        Gson gson=new Gson();
        Cart cart=gson.fromJson(json, Cart.class);
        // 转换完成输出cart,测试验证消息的接收效果
        System.out.println(cart);

    }
}

Nacos\Seata\Zookeeper\Kafka启动

启动cart模块

观察是否10秒出现一次输出

有发送消息和接收消息的效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/358615.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

再学C语言39:指针操作(2)

在编写处理int这样的基本类型的函数时&#xff0c;可以向函数传递int数值&#xff0c;也可以传递指向int的指针 通常直接传递int数值&#xff0c;只有需要在函数中修改该值时&#xff0c;才传递指针 对于处理数组的函数&#xff0c;只能传递指针&#xff0c;这样能使程序效率…

如何运行YOLOv6的代码实现目标识别?

YOLOv6是由美团视觉团队开发的1.环境配置我们先把YOLOv6的代码clone下来git clone https://github.com/meituan/YOLOv6.git安装一些必要的包pip install pycocotools2.0作者要求pytorch的版本是1.8.0,我的环境是1.7.0&#xff0c;也是可以正常运行的pip install -r requirement…

C#服务号推送微信公众号模板消息

一、准备工作微信公众平台&#xff1a;https://mp.weixin.qq.com/申请测试账号&#xff1a;https://mp.weixin.qq.com/debug/cgi-bin/sandboxinfo?actionshowinfo&tsandbox/index微信推送消息模板不需要发布服务器&#xff0c;也不需要填写授权回调域名&#xff0c;只需要…

【Vagrant】下载安装与基本操作

文章目录概述软件安装安装VirtualBox安装Vagrant配置环境用Vagrant创建一个VMVagrantfile文件配置常用命令概述 Vagrant是一个创建虚拟机的技术&#xff0c;是用来创建和管理虚拟机的工具&#xff0c;本身自己并不能创建管理虚拟机。创建和管理虚拟机必须依赖于其他的虚拟化技…

11 OpenCV图像识别之人脸识别

文章目录1 Eigenfaces1.1 建模流程1.2 示例代码2 Fisherfaces2.1 建模流程2.2 示例代码3 Local Binary Histogram3.1 建模流程3.2 示例代码OpenCV 提供了三种人脸识别方法&#xff1a;Eigenfaces Eigenfaces是一种基于PCA&#xff08;Principal Component Analysis&#xff0c…

多城市二手车买卖发布管理小程序开发

多城市二手车买卖发布管理小程序开发 功能特性: 为你介绍二手车微信小程序的功能特性。 车辆分类搜索&#xff0c;支持按品牌、售价、年龄、上牌时间、排量等筛选。 车源发布&#xff0c;支持用户一键发布二手车&#xff0c;平台审核上线&#xff0c;发布可编辑、删除等操作。…

【结构体版】通讯录

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前是C语言学习者 ✈️专栏&#xff1a;项目 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&#x…

扬帆优配|五千亿巨头一度涨停! 4天3倍,港股又现“狂飙”股!

周一&#xff0c;A股三大指数走势分化。到午间收盘&#xff0c;沪指震荡走高涨近1%&#xff0c;深证成指涨0.75%&#xff0c;创业板指继续弱势调整。 盘面上&#xff0c;钢铁、煤炭、大金融等权重板块团体走强&#xff0c;三大通讯运营商一同拉升&#xff0c;其间我国电信盘中一…

合作协议书合同怎么写?

合作协议书合同怎么写&#xff1f;以品牌推广的合作协议书合同为例&#xff0c;参考内容如下&#xff1a;业务合作协议甲方&#xff08;项目方&#xff09;&#xff1a;乙方&#xff08;客户推荐方&#xff09;&#xff1a;甲乙双方本着平等自愿、互惠互利原则&#xff0c;就结…

(十五)docker安装sentinel,客户端配置规则本地持久化

一、简介 操作系统&#xff1a;Linux CentOS 7.3 64位 docker版本&#xff1a;19.03.8 sentinel版本&#xff1a;1.8.0 二、实践 1、拉取镜像 docker pull bladex/sentinel-dashboard:1.8.0 2、运行容器 docker run --name sentinel \ -p 8858:8858 \ --privilegedtrue …

django项目实战三(django+bootstrap实现增删改查)进阶分页

目录 一、分页 1、修改case_list.html页面 2、修改views.py的case_list方法&#xff08;分页未封装&#xff09; 二、分页封装 1、新建类Pagination 2、修改views.py的case_list方法 三、再优化&#xff0c;实现搜索分页qing情况 四、优化其他查询页面实现分页和查询 五…

如何寻找SAP中的增强

文章目录0 简介1 寻找一代增强2 寻找二代增强2.2 在包里也可以看到2.3 在出口对象里输入包的名字也可以找到2.4 通过以下函数可以发现已有的增强2.5 也可以在cmod里直接找2.6 总结3 寻找第三代增强0 简介 在SAP中&#xff0c;对原代码的修改最不容易的是找增强&#xff0c;以下…

Springboot 整合 分布式定时任务 XXL-JOB

起因 恰逢周末&#xff0c; 最近公司接入了分布式定时任务&#xff0c;我是负责接入这块的&#xff0c;正好在网上想起了之前看过的分布式任务的文章&#xff0c;然后学习一下 各路框架发现看了很多框架比如 elasticjob 跟xxl-job不同的是&#xff0c;elasticjob是采用zookeepe…

Cesium 卫星轨迹、卫星通信、卫星过境,模拟数据传输。

起因&#xff1a;看了cesium官网卫星通信示例发现只有cmzl版本的&#xff0c;决定自己动手写一个。欢迎大家一起探讨&#xff0c;评论留言。 效果 全部代码在最后 起步 寻找卫星轨迹数据&#xff0c;在网站space-track上找的&#xff0c;自己注册账号QQ邮箱即可。 卫星轨道类…

stm32f407探索者开发板(十六)——串行通信原理讲解-UART

文章目录一、串口通信接口背景知识1.1 处理器与外部设备通信的两种方式1.2 按照数据传送方向1.3 是否带有时钟信号1.4 常见的串行通信接口二、STM32F4串口通信基础2.1 STM32的串口通信接口2.2 UART异步通信方式引脚连接方法2.3 UART异步通信方式引脚(STM32F407ZGT6)2.4 UART异步…

模拟物流快递系统程序设计-课后程序(JAVA基础案例教程-黑马程序员编著-第四章-课后作业)

【案例4-8】模拟物流快递系统程序设计 欢迎点赞收藏关注 【案例介绍】 案例描述 网购已成为人们生活的重要组成部分&#xff0c;当人们在购物网站中下订单后&#xff0c;订单中的货物就会在经过一系列的流程后&#xff0c;送到客户的手中。而在送货期间&#xff0c;物流管理…

实际项目角度优化App性能

前言&#xff1a;前年替公司实现了一个在线检疫App&#xff0c;接下来一年时不时收到该App的需求功能迭代&#xff0c;部分线下问题跟进。随着新冠疫情防控政策放开&#xff0c;该项目也是下线了。 从技术角度来看&#xff0c;有自己的独特技术处理特点。下面我想记录一下该App…

c++动态内存分布以及和C语言的比较

文章目录 前言一.c/c内存分布 C语言的动态内存管理方式 C内存管理方式 operator new和operator delete函数 malloc/free和new/delete的区别 定位new 内存泄漏的危害总结前言 c是在c的基础上开发出来的&#xff0c;所以关于内存管理这一方面是兼容c的&…

02- OpenCV绘制图形及图像算术变换 (OpenCV基础) (机器视觉)

知识重点 OpenCV用的最多的色彩空间是HSV. 方便OpenCV做图像处理img2 img.view() # 浅拷贝img3 img.copy() # 深拷贝split(mat) 分割图像的通道: b, g, r cv2.split(img) # b, g, r 都是数组merge((ch1, ch2, ch3)) 融合多个通道cvtColor(img, colorspace): 颜…

Centos7系统编译Hadoop3.3.4

1、背景 最近在学习hadoop&#xff0c;此篇文章简单记录一下通过源码来编译hadoop。为什么要重新编译hadoop源码&#xff0c;是因为为了匹配不同操作系统的本地库环境。 2、编译源码 2.1 下载并解压源码 [roothadoop01 ~]# mkdir /opt/hadoop [roothadoop01 ~]# cd /opt/had…