kafka快速入门+应用

news2025/1/8 4:05:16

Kafka, 构建TB级异步消息系统

1.快速入门

1.1 阻塞队列

        在生产线程  和  消费线程  之间起到了 , 缓冲作用,即避免CPU  资源被浪费掉

  • BlockingQueue
    • 解决  线程通信  的问题
    • 阻塞方法  put 、 take
  • 生产者、消费者 模式
    • 生产者:产生数据的线程
    • 消费者:使用数据的线程
  • 实现类
    • ArrayBlockingQueue
    • LinkedBlockingQueue
    • PriorityBlockingQueue、SynchronousQueue、DelayQueue等

1.2 Kafaka 入门

Kafaka 简介

  • Kafaka 是一个分布式的(消息队列    发展到---》 功能更加丰富) 流媒体平台
  • 应用:消息系统、日志收集、用户行为追踪、流式处理

Kafaka 特点

  • 高吞吐量(对于硬盘的顺序读写,性能很高,高于对于内存的随机读写)
  • 消息持久化(将数据存储在硬盘中)
  • 高可靠性 (分布式集群部署,一台服务器挂了,切换到其他服务器)
  • 高扩展性(服务器不够用,则加一台服务器)

Kafaka 术语

  • Broker(集群中的每一台服务器称为一个Broker)、Zookeeper(管理集群的软件)
  • Topic(发布订阅模式,生产者把消息发送的位置-》topic)、Partition(表示对于主题位置的分区,每一个分区按照顺序往其中追加数据)、Offset(消息在分区内存在的索引)
  • Leader Replica(主副本可以提供想要读取某个分区的数据,如果主副本挂了,则从集群中选用一个从副本作为主副本) 、Follower Replica(从副本只是 备份,不负责做响应)
    • 每一个分区有多个副本,如果一个副本坏了,还有备份,提高容错率

1.3 Spring 整合 Kafka

  1. 引入依赖
    1. spring-kafka
  2. 配置Kafka
    1. 配置server
    2. 配置consumer
  3. 访问Kafka
  • 生产者
    • kafkaTemplate.send(topic, data);
  • 消费者
    • @KafkaListener(topics = {"test"})  public void handleMessage(ConsumerRecord record) {}

2.应用

2.1 安装

(1)安装包位置

链接:https://pan.baidu.com/s/1QjZdOUYdmSCSf0zjprJQIQ
提取码:9630

下载后解压缩

(2)相关配置

1

 2

2.2 启动+使用

(1)启动zookeeper
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

(2)启动kafka
C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0>bin\windows\kafka-server-start.bat config\server.properties

 

(3)创建主题

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --create --bootstrap-server localhost:9092 -replication-factor 1 --partitions 1 --topic test1

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-topics.bat --list --bootstrap-server localhost:9092
__consumer_offsets
test
test1

(4)创建生产者

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic test1

(5)创建消费者

C:\Desktop\软件开发\项目\niuke_coder\code\kafka_2.12-2.2.0\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

再次生产消息,会自动消费消息

 

2.3 spring中整合kafka

(1)配置Properties

#9. kafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
# 是否自动提交(记录)   消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000

其中spring.kafka.consumer.group-id=community-consumer-group与。。。保持一致

(2)导入依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

(3)test

1.定义producer

@Component
class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    public void sendMessage(String topic, String content) {
        kafkaTemplate.send(topic,content);
    }
}

2.定义consumer

@Component
class KafkaConsumer {
    @KafkaListener(topics ={"test"})
    public void handleMessage(ConsumerRecord record){
        System.out.println(record.value());
        //PHDVB干嘛呢
        //呕吼
    }
}

3.编写测试类

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {

    @Autowired
    private KafkaProducer kafkaProducer;

    @Test
    public void testKafka() {
        kafkaProducer.sendMessage("test","PHDVB干嘛呢");
        kafkaProducer.sendMessage("test","呕吼");

        // 生产者发消息是  主动的 ,消费者收消息是  被动地
        try {
            Thread.sleep(1000 * 10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1589791.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Mac自带终端进行远程ssh连接Linux服务器

废话不多说&#xff0c;直接上图 好吧&#xff0c;我承认我是多此一举&#xff0c;脱裤子放pi了&#xff0c;其实只需要在终端输入一行命令就可以了&#xff08;呜呜&#xff5e;&#xff09; ssh rootip -p 22 需要注意的是&#xff0c;命令里的ip地址同样要替换成你自己的服…

云LIS系统源码,ASP.NET区域LIS系统源码,实验室信息系统

云LIS系统源码&#xff0c;ASP.NET区域LIS系统源码&#xff0c;实验室信息系统 LIS技术架构&#xff1a;ASP.NET CORE 3.1 MVC SQLserver Redis等 开发语言&#xff1a;C# 6.0、JavaScript 前端框架&#xff1a;JQuery、EasyUI、Bootstrap 后端框架&#xff1a;MVC、S…

阿里云服务器带宽价格全解析,附报价单

阿里云服务器公网带宽怎么收费&#xff1f;北京地域服务器按固定带宽计费一个月23元/M&#xff0c;按使用流量计费0.8元/GB&#xff0c;云服务器地域不同实际带宽价格也不同&#xff0c;阿里云服务器网aliyunfuwuqi.com分享不同带宽计费模式下带宽收费价格表&#xff1a; 公网…

十五届web模拟题整理

模拟赛一期 1.动态的Tab栏 请在 style.css 文件中补全代码。 当用户向下滚动的高度没有超过标题栏&#xff08;即 .heading 元素&#xff09;的高度时&#xff0c;保持 Tab 栏在其原有的位置。当滚动高度超过标题栏的高度时&#xff0c;固定显示 Tab 栏在网页顶部。 /* TODO…

03 SQL基础 -- 查询与运算符

一、SELECT 语句基础 1.1 从表中选取数据 SELECT 语句 从表中选取数据时需要使用SELECT语句,也就是只从表中选出(SELECT)必要数据的意思。通过SELECT语句查询并选取出必要数据的过程称为匹配查询或查询(query) 基本SELECT语句包含了SELECT和FROM两个子句(clause)。示…

PointNet++函数square_distance(src, dst):计算两组点之间的欧式距离(代码详解)

文章目录 一、计算两组点之间的欧式距离二、举例三、中间结果输出 一、计算两组点之间的欧式距离 def square_distance(src, dst):"""Calculate Euclid distance between each two points.src^T * dst xn * xm yn * ym zn * zm&#xff1b;sum(src^2, dim-1…

模块化组合优势凸显钡铼IOy系列轻松应对大规模工业自动化工程

模块化组合是钡铼IOy系列独立式I/O模块的一大优势&#xff0c;它为大规模工业自动化工程提供了灵活性、可扩展性和定制性&#xff0c;从而轻松应对不同规模和复杂度的工厂应用。以下是关于模块化组合优势的详细解析&#xff1a; 1. 灵活性和定制性 模块化设计使得钡铼IOy系列…

字节Coze实现多Agent模式,文内在线体验,实时给产品经理提需求

摘要&#xff1a; 多Agent模式是一种分布式计算范式&#xff0c;它通过将复杂任务分解为多个子任务&#xff0c;并由独立的智能体&#xff08;Agents&#xff09;并行处理&#xff0c;从而提高系统的处理能力和效率。这种模式在自然语言处理、机器学习和其他数据密集型应用中尤…

未来汽车硬件安全的需求(1)

目录 1.概述 2.EVITA 2.1 EVITA HSM 2.2 EVITA保护范围 3.市场变化对车载网络安全的影响 3.1 非侵入式攻击的风险 3.2 量子计算机的蛮力攻击 3.3 整车E/E架构的变化 3.4 网络安全标准和认证 3.5 汽车工业的网络安全措施 4.汽车安全控制器 4.1 TPM2.0 4.2 安全控…

【2024最新】微信公众号怎么开启留言功能

关注微信公众号&#xff1a;怒码少年&#xff0c;回复关键词【电子书】可以免费获取计算机相关电子书 本文首发于&#xff1a;原文阅读-wx公众号&#xff1a;怒码少年 大家好&#xff0c;我是小码。 微信公众号从18年开始&#xff0c;正式关闭了留言功能。自此以后新注册的公…

华为校园公开课走入上海交大,鸿蒙成为专业核心课程

4月12日&#xff0c;华为校园公开课在中国上海交通大学成功举办&#xff0c;吸引了来自计算机等相关专业的150余名学生参加。据了解&#xff0c;由吴帆、陈贵海、过敏意、吴晨涛、刘生钟等教授在中国上海交通大学面向计算机系本科生开设的《操作系统》课程&#xff0c;是该系学…

python中time库的time.time()函数的作用是什么?

python中time库的time.time()函数的作用是什么&#xff1f; 作用&#xff1a;Python time time() 返回当前时间的时间戳&#xff08;1970纪元后经过的浮点秒数&#xff09;。 time()方法语法&#xff1a;time.time() #!/usr/bin/python # Write Python 3 code in this onlin…

程序“猿”自动化脚本(一)

1.剪贴板管理器&#x1f4cb; 您是否曾经发现自己在处理多个文本片段时忘记了复制的内容&#xff1f;有没有想过有一个工具可以跟踪您一天内复制的所有内容&#xff1f; 该自动化脚本会监视您复制的所有内容&#xff0c;将每个复制的文本无缝存储在时尚的图形界面中&#xff0c…

中国绿色技术助力全球能源转型(国际论坛)

中国的清洁能源发展战略和实践对全球能源结构转型产生了深远影响。作为全球最大的可再生能源生产和消费国&#xff0c;中国在推动国内可再生能源产业发展的同时&#xff0c;也积极与世界各国分享技术和经验&#xff0c;促进全球范围内清洁能源技术的普及和应用成本的降低。例如…

乘势而上 韧性增长丨凡泰极客入选华为首批HarmonyOS开发服务商

3月日&#xff0c;凡泰极客受邀参加华为在南京举办的“鲸鸿动能服务商政策发布暨鸿蒙服务商先锋计划授牌仪式”。此次仪式主题为“乘势而上&#xff0c;韧性增长”&#xff0c;首批HarmonyOS开发服务商汇聚一堂&#xff0c;携手共进&#xff0c;打造鸿蒙生态繁荣未来。凡泰极客…

PCIe总线-存储器域和PCIe总线域访问流程(二)

1.概述 PCIe总线的最大特点是像CPU访问DDR一样&#xff0c;可以直接使用地址访问PCIe设备&#xff08;桥&#xff09;&#xff0c;但不同的是DDR和CPU同属于存储器域&#xff0c;而CPU和PCIe设备属于两个不同的域&#xff0c;PCIe设备&#xff08;桥&#xff09;的地址空间属于…

李彦宏放话:百度AI大模型绝不抢开发者饭碗

关注卢松松&#xff0c;会经常给你分享一些我的经验和观点。 昨晚&#xff0c;李彦宏内部讲话称&#xff1a;AI大模型开源意义不大&#xff0c;百度绝不抢开发者饭碗。 但你一定要说话算话哦&#xff0c;可千万别说&#xff1a;“我永远不做手机&#xff0c;谁再敢提做手机就给…

最新PDD商家端Anti-Content参数逆向分析与纯算法还原

文章目录 1. 写在前面2. 接口分析3. 断点分析4. 扣JS代码 【&#x1f3e0;作者主页】&#xff1a;吴秋霖 【&#x1f4bc;作者介绍】&#xff1a;擅长爬虫与JS加密逆向分析&#xff01;Python领域优质创作者、CSDN博客专家、阿里云博客专家、华为云享专家。一路走来长期坚守并致…

jeecg-boot安装

我看大家都挺关注&#xff0c;所以集中上传了下代码和相关工具&#xff0c;方便大家快速完成 链接&#xff1a;https://pan.baidu.com/s/1-Y9yHVZ-4DQFDjPBWUk4-A 提取码&#xff1a;op1r 1. 下载代码 下载地址 : JEECG官方网站 - 基于BPM的低代码开发平台(低代码平台_零代…

Qt | 对象树与生命期(对象的创建、销毁、擦查找)

一、组合模式与对象树 1、组合模式指的是把类的对象组织成树形结构,这种树形结构也称为对象树,Qt 使用对象树来管理 QObject 及其子类的对象。注意:这里是指的类的对象而不是类。把类组织成树形结构只需使用简单的继承机制便可实现。 2、使用组合模式的主要作用是可以通过…