You辉编程_kafka

news2025/1/17 3:13:38

一、什么是kafka?

是分布式(项目部署于多个服务器)的基于发布/订阅模式的消息队列,主要用于处理活跃的数据,如:登录、浏览、点击、分享等用户行为产生的数据,说白了就是一个消息系统(消息队列)。

进一步理解:

1.消息队列

消息(Message):网络中的两台计算机或者两个通讯设备之间传递的数据,如:文本、音频等。

队列(Queue):直接把它想象成羽毛球筒,羽毛球先进先出,是一种特殊的线性表,特殊之处在于,只能在头部删除元素,在尾部添加元素。

消息队列(MQ):保存消息的队列,相当于消息传输过程中的一个容器,主要有两个作用,一个是给外部提供存入消息的接口,另一个是提供取出消息的接口。

保存消息的队列,是消息在传输过程中的容器。主要提供生产和消费接口供外部调用,进行数据的存储和获取。

二、MQ的分类

1.主要有两大类:点对点(Peer-to-Peer)、发布/订阅(Publish/Subsribe)。

2.共同点:生产消息发送到队列中,消费者从队列中读取并消费消息。

3.不同点:

(1)点对点

组成:消息队列、发送者(Sender)、接受者(Receiver)。

注:一个生产者生产的消息只能有一个消费者消费,一旦被消费了,消息就不会存在于消息队列中。

(2)发布/订阅

组成:消息队列、发布者(Publisher)、订阅者(Subscriber)、主题(Topic)

注:每个消息可以有多个消费者,彼此互不影响,如:我在微信公共号发了一篇文章,关注我的人都能看到,即消息被多个人接受到(订阅者)。

三、常见的消息系统

ActiveMQ:实现了JMS(Java Message Service)规范,支持性较好,性能相对不高。

RabbitMQ:可靠性高、安全。

Kafka:分布式、高性能、跨语言。

RockeMQ:阿里开源的消息中间件,纯Java实现。

四、kafka特性

1.高吞吐量、低延迟:每秒可以处理几十万条消息,其延迟只有几毫秒,每个主题可以分多个分区,消费组对其分区进行消费。

2.可扩展:集群支持热扩展。

3.持久性、可靠性:可以持久化到本地磁盘,支持数据备份防止数据丢失。

4.容错性:允许节点中节点失败。

5.高并发:支持数千个客户端同时读写。

五、kafka的组成

1.Broder:kafka集群中包含多个kafka服务节点,每个kafka服务节点就是一个broker。

2.Topic:主题(相当于消息的类型),用来存储不同类别的消息(kafka消息数据村存放于硬盘)。

3.Partition:分区,每个Topic可以包含一个或多个分区,分区的数据量是在创建主题时决定的,

目的在于进行分布式存储。

4.Replication:副本,每个分区可以有多个副本,分布在不同的Broker上,会选出一个副本呢作为Leader,所有请求都会通过Leader完成,Follower只负责备份数据。

5.Message:消息,是通信的基本单位,每个消息都存于Partition。

6.Producer:消息的生产者,向topic发布消息。

7.Consumer:消息的消费之,订阅topic并读取其发布消息。

8.Consumer Group:每个Consumer属于一个特定的Consumer Group,多个Consumer可以属于同一个Consumer Group中。

9.Zookeeper:协调kafka正常运行,同时存储kafka元数据信息如,topic的数量等。注:发送给Topic本身的数据不是存在Zookeeper中,而是存在于磁盘文件中。

 

 六、kafka的安装和配置

1.安装



1.解压kafka_2.12-2.3.0.tgz

cd ~/software tar -zxf kafka_2.12-2.3.0.tgz

2.配置

# 创建存放数据的文件夹 cd kafka_2.12-2.3.0 mkdir data # 修改kafka配置文件 cd config vi server.properties #listeners=PLAINTEXT://:9092 # kafka默认监听的端口号为9092 log.dirs=../data # 指定数据的存放目录 zookeeper.connect=localhost:2181 # zookeeper的连接信息

3.启动zookeeper

使用kafka内置zookeeper

cd ~/software/kafka_2.12-2.3.0/bin/ ./zookeeper-server-start.sh ../config/zookeeper.properties

4.使用外部zk(推荐)

cd ~/software/zookeeper-3.4.13/bin/ ./zkServer.sh start

5.启动kafka

./kafka-server-start.sh ../config/server.properties & # &表示后台运行,也可使用-daemon选项 jps # 查看进程,jps是jdk提供的用来查看所有java进程的命令

6.创建Topic(主题)

./kafka-topics.sh \ --create \ --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 3 \ --topic hello

# 查看Topic列表 ./kafka-topics.sh --list --zookeeper localhost:2181 # __consumer_offsets是kafka的内部Topic # 查看某一个具体的Topic ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello # 修改Topic:只能增加partition个数,不能减少,且不能修改replication-factor ./kafka-topics.sh --alter --zookeeper localhost:2181 --topic hello --partitions 5 # 删除Topic (需要启用topic删除功能) ./kafka-topics.sh --delete --zookeeper localhost:2181 --topic hello

7.启动kafka的Producer(生产者 )

./kafka-console-producer.sh --broker-list localhost:9092 --topic hello

8.启动kafka的Consumer(消费者)

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello --from-beginning

9.验证

查看data数据存放目录:Topic的每个Partition对应一个目录,数据存储在目录下的00000000000000000000.log文件中

查看zookeeper中的内容:get /brokers/topics/hello/partitions/0/state

2.配置

############################# Server Basics #############################
# broker的id,值为整数,且必须唯一,在一个集群中不能重复
broker.id=0


############################# Socket Server Settings #############################
# kafka默认监听的端口为9092
#listeners=PLAINTEXT://:9092
# 处理网络请求的线程数量,默认为3个
num.network.threads=3
# 执行磁盘IO操作的线程数量,默认为8个
num.io.threads=8
# socket服务发送数据的缓冲区大小,默认100KB
socket.send.buffer.bytes=102400
# socket服务接受数据的缓冲区大小,默认100KB
socket.receive.buffer.bytes=102400
# socket服务所能接受的一个请求的最大大小,默认为100M
socket.request.max.bytes=104857600


############################# Log Basics #############################
# kafka存储消息数据的目录
log.dirs=../data
# 每个topic默认的partition数量
num.partitions=1
# 在启动时恢复数据和关闭时刷新数据时每个数据目录的线程数量
num.recovery.threads.per.data.dir=1


############################# Log Flush Policy #############################
# 消息刷新到磁盘中的消息条数阈值
#log.flush.interval.messages=10000
# 消息刷新到磁盘中的最大时间间隔
#log.flush.interval.ms=1000


############################# Log Retention Policy #############################
# 日志保留小时数,超时会自动删除,默认为7天
log.retention.hours=168
# 日志保留大小,超出大小会自动删除,默认为1G
#log.retention.bytes=1073741824
# 日志分片策略,单个日志文件的大小最大为1G,超出后则创建一个新的日志文件
log.segment.bytes=1073741824
# 每隔多长时间检测数据是否达到删除条件
log.retention.check.interval.ms=300000


############################# Zookeeper #############################
# Zookeeper连接信息,如果是zookeeper集群,则以逗号隔开
zookeeper.connect=localhost:2181
# 连接zookeeper的超时时间
zookeeper.connection.timeout.ms=6000


# 是否可以删除topic,默认为false
delete.topic.enable=true
   

七、Kafka集群搭建

1.搭建Zookeeper集群

可以在不同的服务器上搭建kafka服务形成多个服务节点集群,

也可以在一台主机上启动多个zk服务,使用不同的端口就可以了。

拷贝多个zk目录
zookeeper1、zookeeper2、zookeeper3

分别配置每个zk
   vi zookeeper1/conf/zoo.cfg
    clientPort=2181
     server.1=192.168.2.153:6661:7771
     server.2=192.168.2.153:6662:7772
     server.3=192.168.2.153:6663:7773
   echo 1 > zookeeper1/data/myid  

   
   vi zookeeper2/conf/zoo.cfg
    clientPort=2182
     server.1=192.168.2.153:6661:7771
     server.2=192.168.2.153:6662:7772
     server.3=192.168.2.153:6663:7773
   echo 2 > zookeeper2/data/myid    

   
   vi zookeeper3/conf/zoo.cfg
    clientPort=2183
     server.1=192.168.2.153:6661:7771
     server.2=192.168.2.153:6662:7772
     server.3=192.168.2.153:6663:7773
   echo 3 > zookeeper3/data/myid    
启动zk集群

2.搭建Kafka集群

拷贝多个kafka目录
kafka1、kafka2、kafka3

分别配置每个kafka
   vi kafka1/config/server.properties
    broker.id=1
    listeners=PLAINTEXT://192.168.2.153:9091
    zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183

    
   vi kafka2/config/server.properties
    broker.id=2
    listeners=PLAINTEXT://192.168.2.153:9092
    zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183

    
   vi kafka3/config/server.properties
    broker.id=3
    listeners=PLAINTEXT://192.168.2.153:9093
    zookeeper.connect=192.168.2.153:2181,192.168.2.153:2182,192.168.2.153:2183
启动kafka集群
创建Topic
   ./kafka-topics.sh \
    --create \
    --zookeeper 192.168.7.40:2181,192.168.7.40:2182,192.168.7.40:2183 \
    --replication-factor 3 \
    --partitions 5 \
    --topic aaa
生成数据/发布消息
   ./kafka-console-producer.sh --broker-list 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093 --topic aaa
消费数据/订阅消息
   ./kafka-console-consumer.sh --bootstrap-server 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093 --topic aaa --from-beginning
   

八、SpringBoot集成kafka

引入kafka的依赖
<dependency>
     <groupId>org.springframework.kafka</groupId>
     <artifactId>spring-kafka</artifactId>
</dependency>


配置kafka,编辑application.yml文件
   spring:
     kafka:
       # kafka服务器地址(可以多个)
       bootstrap-servers: 192.168.7.40:9091,192.168.7.40:9092,192.168.7.40:9093
       producer:
        # 每次批量发送消息的数量
         batch-size: 65536
         buffer-memory: 524288
         # key/value的序列化
         key-serializer: org.apache.kafka.common.serialization.StringSerializer
         value-serializer: org.apache.kafka.common.serialization.StringSerializer
       consumer:
         # 指定一个默认的组名
         group-id: test
         # key/value的反序列化
         key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
         value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
创建生产者
   @RestController
   public class KafkaProducer {

   
       @Autowired
       private KafkaTemplate template;

   
       /**
        * 发送消息到Kafka
        * @param topic   主题
        * @param message 消息
        */
       @RequestMapping("/sendMsg")
       public String sendMsg(String topic, String message) {
           template.send(topic, message);
           return "success";
       }
   }
创建消费者
   @Component
   public class KafkaConsumer {

   
       /**
        * 订阅指定主题的消息
        * @param record 消息记录
        */
       @KafkaListener(topics = {"hello","world"})
       public void listen(ConsumerRecord record) {
           // System.out.println(record);
           System.out.println(record.topic()+","+record.value());
       }

   
   }
测试
访问http://localhost:8080/sendMsg?topic=hello&message=aaaa

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/10146.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java项目-第132期ssm学生会管理系统-ssm+shiro+activity社团毕业设计

java项目-第132期ssm学生会管理系统-ssmshiroactivity社团毕业设计 【源码请到资源专栏下载】 今天分享的项目是《学生会管理系统》 该项目分为不同的角色&#xff0c;其中包含超级管理员、生活文体部部长、行政秘书部部长、 外联部部长、策划部部长、学生会干事等角色&#xf…

[附源码]java毕业设计基于的网上饮品店

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

FITC标记葡聚糖(40kDa),FITC Dextran-40,CAS号:60842-46-8

中文名称&#xff1a;FITC标记葡聚糖(40kDa) 英文名称&#xff1a;FITC Dextran-40 CAS号&#xff1a;60842-46-8 产品规格&#xff1a;50mg|250mg|1g 本制品是对平均分子量约40kDa葡聚糖进行标记的荧光素衍生物&#xff0c;即异硫氰酸荧光素葡聚糖40&#xff08;fluoresce…

QT编译Opencv库过程中出现的问题总结

一、人脸识别模块编译出错 出错原因&#xff1a;没有加入opencv_contrib OpenCV 4.4开始需要提供opencv_contrib 如果不需要人脸识别模块可以进行下列操作 [ 77%] Linking CXX executable ..\..\bin\opencv_test_dnn.exe jom: E:\1WT\18.OCR\opencv\build-sources-Desktop_…

计算机网络 3 - 传输层

第3章 传输层(Transport Layer)3.1 传输层服务与协议3.2 复用 分用无连接的分用、复用(UDP)面向连接的分用、复用(TCP)持续/非持续HTTP连接 与 常见端口3.3 无连接传输: UDP3.4 可靠数据传输原理(rdt)rdt 1.0&#xff1a;经完全可靠信道的可靠数据传输rdt 2.0&#xff1a;发现并…

了解操作符的那些事(二)

小叮当的任意门sizeof 和 数组关系操作符逻辑操作符条件操作符逗号表达式下标引用,函数调用和结果成员下标引用函数调用访问一个结构的成员表达式求值隐式类型转换算术转换操作符的属性前言&#xff1a;~ 对一个数的二进制按位取反 *间接访问操作符&#xff08;解引用操作符&am…

linux笔记(3):东山哪吒STU开发板(全志D-1H)开箱初体验helloworld

文章目录1.开发板上电观察串口1.1 从nand flash启动1.2 从SD卡启动2.上传文件到开发板2.1 使用FileZilla软件连接开发板2.2 使用ADB软件双11下单后&#xff0c;经过多日的等待&#xff0c;终于在昨天下午收到了开发板。在等待的过程中&#xff0c;看了一下文档和B站东山老师的视…

Java_封装

目录 1.访问修饰限定符 2.封装扩展之包 导入包中的类 3.自定义包 4.包的访问权限控制举例 5.常见的包 6.通过构造方法进行初始化 面向对象程序三大特性&#xff1a;封装、继承、多态。而类和对象阶段&#xff0c;主要研究的就是封装特性。何为封装呢&#xff1f;简单来…

RPC初识

一、为什么要学习RPC 要回答这个问题&#xff0c;那就必须先了解下 RPC 的使用场景。 只要涉及到网络通信&#xff0c;我们就可能用到RPC 大型分布式系统中&#xff1a;消息队列、分布式缓存、分布式数据库、统一配置中心等&#xff0c;应用程序与这些依赖的中间件都可以通过 …

【Java】SpringCloud基础知识点

SpringCloud什么是SpringCloud有哪些组件EurekaRibbonHystrixZuulConfigFeign什么是SpringCloud SpringCloud是一套分布式微服务的解决方案&#xff0c;Spring Cloud 的各个项目基于 Spring Boot&#xff0c;将 Netflix 的多个框架进行封装&#xff0c;并且通过自动配置的方式…

数据的标准化处理——基于python

数据的标准化处理——基于R归一化&#xff08;normalization&#xff09;python实现标准化python实现之前写过用R来进行标准化&#xff1a; 数据的标准化处理——基于R归一化&#xff08;normalization&#xff09; 将数据缩放到[0,1]的&#xff08;min—max Normalization&am…

电脑软件:推荐八款图片处理工具,值得收藏

目录 1、Inpaint 图片去水印神器 2、XnView 图片批量管理工具 3、TinyPNG图片压缩网站 4、IrfanView 5、GIMP 开源图片编辑器 6、Paint.NET 好用的图片编辑软件 7、Optimizilla 图片压缩工具 8、iLoveIMG 在线图片编辑工具 日常办公当中&#xff0c;图片处理是经常要用…

.ttf 字体剔除

想在 游戏/应用 中使用字体&#xff0c;让你的应用提升一个逼格&#xff1b;但是发现一个 .ttf 少则 几兆, 大则 十几兆&#xff0c;这时候可以通过 fontTools&#xff0c;来剔除不需要的畸形字体&#xff0c;保留常用字体&#xff1b; 1. 安装 python 环境 自行安装&#xff…

【JavaSE】类和对象 【this引用和构造方法】(二)

目录 1、this引用 1.1、this的三种用法 1.1.1、this.属性名 1.1.2、this.方法名 1.1.3、this ( ) 访问构造方法 详细讲解 1、this引用 1.1、为什么要有this引用 问题1&#xff1a;形参名不小心与成员变量名形同会发生什么问题&#xff1f; 问题2&#xff1a; 1.2、什…

《前端》css总结(上)

前言&#xff1a; css的定义有很多很多&#xff0c;大家不会的就去这个网站现查一下就好&#xff1a;https://developer.mozilla.org/zh-CN/docs/Web/CSS/text-decoration 文章目录样式定义方式行内样式表&#xff08;inline style sheet&#xff09;内部样式表&#xff08;in…

计算机毕业设计Python+Django的学生作业管理系统

项目介绍 在各学校的教学过程中&#xff0c;学生的作业管理是一项非常重要的事情。随着计算机多媒体技术的发展和网络的普及&#xff0c;“基于网络的学习模式”正悄无声息的改变着传统的教室学习模式&#xff0c;“基于网络的教学平台”的研究和设计也成为教育技术领域的热点…

接口高可用

架构决定系统质量上限&#xff0c;代码决定系统质量下限 接口高可用整体框架 雪崩效应&#xff1a;请求量超过系统处理能力后导致系统性能螺旋快速下降 链式效应&#xff1a;某个故障引起后续一连串的故障 限流 用户请求全流程各个环节都可以限流&#xff1a; 请求端限流&a…

区块链溯源相比传统追溯有什么优点?

区块链溯源&#xff1a;通过使用区块链和物联网技术的结合&#xff0c;记录产品的物流信息&#xff0c;并基于区块链不可篡改的特性把商品的物流信息、质量信息、质检信息等相关商品信息全部记录在上。从而实现了产品全过程的质检、物流、管理等&#xff0c;解决了信息缺乏透明…

01_网络概述

知识点1【分组交换】 知识点2【交换方式】存储-转发 知识点3【网络分层结构】&#xff08;重要&#xff09; 知识点4【协议的介绍】 1、IP协议 网际协议&#xff08;网络层&#xff09; 2、TCP协议 传输控制协议 &#xff08;传输层&#xff09;&#xff08;重要&#xff…

工程建设行业智能供应链系统:优化产业链运作效率,实现全链路数字化建设

工程建设行业是对建筑工程、线路管道和设备安装工程、建筑装饰装修工程等工程项目进行新建、扩建和改建的行业&#xff0c;对促进国民经济发展和改善人民生活提供了重要的物质技术基础。近年来&#xff0c;我国城镇化的迅速推进为工程建设行业带来了广阔的市场发展空间&#xf…