kafka的介绍和基本使用

news2024/11/15 22:52:37

文章目录

  • Kafka介绍
    • 1.Kafka的使用场景
    • 2.Kafka基本概念
  • kafka基本使用
    • 1.安装前的环境准备
    • 2.启动kafka服务器
    • 3.创建主题topic
    • 4.发送消息
    • 5.消费消息


Kafka介绍

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的
(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理
大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、
Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编 写,Linkedin于 2010
年贡献给了Apache基金会并成为顶级开源 项目。

1.Kafka的使用场景

日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式
开放给各种consumer,例如hadoop、Hbase、Solr等。 消息系统:解耦和生产者和消费者、缓存消息等。
用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网⻚、
搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过
订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖 掘。
运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产 各种操作的集中反馈,比如报警和报告。

2.Kafka基本概念

kafka是一个分布式的,分区的消息(官方称之为commit log)服务。它提供一个消息系统应该
具备的功能,但是确有着独特的设计。可以这样来说,Kafka借鉴了JMS规范的思想,但是确 并 没有完全遵循JMS规范。

首先,让我们来看一下基础的消息(Message)相关术语:

名称解释
Broker消息中间件处理节点,⼀个Kafka节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个Kafka集群
TopicKafka根据topic对消息进⾏归类,发布到Kafka集群的每条消息都需要指定⼀个topic
Producer消息⽣产者,向Broker发送消息的客户端
Consumer消息消费者,从Broker读取消息的客户端
ConsumerGroup每个Consumer属于⼀个特定的Consumer Group,⼀条消息可以被多个不同的Consumer Group消费,但是⼀个Consumer Group中只能有⼀个Consumer能够消费该消息
Partition物理上的概念,⼀个topic可以分为多个partition,每个partition内部消息是有序的

因此,从一个较高的层面上来看,producer通过网络发送消息到Kafka集群,然后consumer 来进行消费,如下图: 输入图片说明
在这里插入图片描述

服务端(brokers)和客户端(producer、consumer)之间通信通过 TCP协议 来完成。

kafka基本使用

1.安装前的环境准备

  • 安装jdk

  • 安装zk

  • 官网下载kafka的压缩包:http://kafka.apache.org/downloads

    解压缩至如下路径

    /usr/local/kafka/
    
  • 修改配置文件:/usr/local/kafka/kafka2.11-2.4/config/server.properties

    #broker.id属性在kafka集群中必须要是唯一
    broker.id= 0
    #kafka部署的机器ip和提供服务的端口号
    listeners=PLAINTEXT://192.168.65.60:9092
    #kafka的消息存储文件
    log.dir=/usr/local/data/kafka-logs
    #kafka连接zookeeper的地址
    zookeeper.connect= 192.168.65.60:2181
    

2.启动kafka服务器

进入到bin目录下。使用命令来启动

./kafka-server-start.sh -daemon../config/server.properties

验证是否启动成功:

进入到zk中的节点看id是 0 的broker有没有存在(上线)

ls /brokers/ids/

server.properties核心配置详解:

PropertyDefaultDescription
broker.id0每个broker都可以⽤⼀个唯⼀的⾮负整数id进⾏标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯⼀的即可。
log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯⼀的,可以是多个,路径之间只需要使⽤逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进⾏。
listenersPLAINTEXT://192.168.65.60:9092server接受客户端连接的端⼝,ip配置kafka本机ip即可
zookeeper.connectlocalhost:2181zooKeeper连接字符串的格式为:hostname:port,此处hostname和port分别是ZooKeeper集群中某个节点的host和port;zookeeper如果是集群,连接⽅式为hostname1:port1, hostname2:port2,hostname3:port3
log.retention.hours168每个⽇志⽂件删除之前保存的时间。默认数据保存时间对所有topic都⼀样。
num.partitions1创建topic的默认分区数
default.replication.factor1⾃动创建topic的默认副本数量,建议设置为⼤于等于2
min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定replicas的最⼩数⽬(必须确认每⼀个repica的写数据都是成功的),如果这个数⽬没有达到,producer发送消息会产⽣异常
delete.topic.enablefalse是否允许删除主题

3.创建主题topic

topic是什么概念?topic可以实现消息的分类,不同消费者订阅不同的topic。

在这里插入图片描述

执行以下命令创建名为“test”的topic,这个topic只有一个partition,并且备份因子也设置为1

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test

查看当前kafka内有哪些topic

./kafka-topics.sh --list --zookeeper 172.16.253.35:2181

4.发送消息

kafka自带了一个producer命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到kafka集群中。在默认情况下,每一个行会被当做成一个独立的消息。使用kafka的发送消息的客户端,指定发送到的kafka服务器地址和topic

./kafka-console-producer.sh --broker-list 172.16.253.38:9092 --topic test

5.消费消息

对于consumer,kafka同样也携带了一个命令行客户端,会将获取到内容在命令中进行输 出, 默认是消费最新的消息
。使用kafka的消费者消息的客户端,从指定kafka服务器的指定 topic中消费消息

方式一:从最后一条消息的偏移量+1开始消费

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test

方式二:从头开始消费

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test

几个注意点:

  • 消息会被存储
  • 消息是顺序存储
  • 消息是有偏移量的
  • 消费时可以指明偏移量进行消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/173823.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

载波相位误差对BPSK解调性能的影响理论推导

在上一篇博客基础上,继续讨论载波相位误差对解调性能的影响! 【通信原理】通信原理书中解调器输入端信噪比a2/(2σ2)与比特信噪比Eb/No有什么关系? 以BPSK为例,从解调原理图可以看出,当本地参考载波信号与原始载波存在相位误差时,假设相位差为 φ \varphi φ,则解调器低…

【Linux】了解磁盘/文件系统/inode

文章目录一.磁盘1.磁盘的结构2.磁盘的定位(寻找方案)3.磁盘的分区与格式化介绍二.理解inode三.ext2文件系统的存储方案一.磁盘 1.磁盘的结构 问题1:什么是磁盘? 磁盘是在冯诺依曼体系结构中几乎唯一的机械设备,机械设…

AntDB数据库助力中国移动结算中心建设

为响应中国移动集团公司IT集中化的要求:全面落实“十三五”十大战略工程,加快“推动公司IT资源一体化整合“重点专项工作。以IT系统为载体,构建高效运营支撑体系,形成集中化支撑和协同业务支撑模式,打造极致体验、高效…

列表初始化(内置类型、自定义类型)

列表初始化的特性来源于单参数的隐式类型转换。以下面这个赋值为例,我们可以理解成 先创建一个匿名对象Point(2),这个时候就变成了 Point p Point(2);然后会调用拷贝构造。 虽然隐式转换的可以这样理解,但是最后会被编译器优化成直接调用有…

[Android]Bitmap Drawable

在实际开发中&#xff0c;我们可以直接引用原始的图片&#xff0c;但是也可以通过xml的方式来描述它&#xff0c;通过xml来描述的BitmapDrawable可以设置更多效果。 <?xml version"1.0" encoding"utf-8"?> <bitmap xmlns:android"http://…

java spring IOC Bean管理操作讲解 并代码演示xml的实现方式

查看本文 需要您使用spring创建过对象管理 如果之前没有接触过 可以先查看我的文章 java 手把手带你创建一个spring入门案例 IOC 操作中 Bean管理主要有两个部分 分别是创建对象和注入属性 他们都有两种实现方式 分别是xml和注解方式实现 本文只演示xml 后续我会出注解方式的文…

第十三届蓝桥杯省赛 JAVA A组 - 蜂巢

✍个人博客&#xff1a;https://blog.csdn.net/Newin2020?spm1011.2415.3001.5343 &#x1f4da;专栏地址&#xff1a;蓝桥杯题解集合 &#x1f4dd;原题地址&#xff1a;蜂巢 &#x1f4e3;专栏定位&#xff1a;为想参加蓝桥别的小伙伴整理常考算法题解&#xff0c;祝大家都能…

背包问题——“0-1背包”,“完全背包”(这样讲,还能不会?)

目录 一、0-1背包 1.1、0-1背包解决的问题 1.2、dp数组定义 1.3、转移方程 1.3.1、二维dp数组 1.3.2、一维dp数组 1.4、遍历顺序 1.5、测试代码 1.6、练习 二、完全背包 2.1、完全背包解决问题 2.2、与0-1背包的区别 2.3、测试代码 2.4、拓展问题&#xff1a;装满…

【2022】13 年终总结

新年Flag 2023年&#xff0c;为了各方面能有所进步&#xff0c;列一些希望达成的目标和想做的事&#xff0c;到年底看看效果。 撰写一篇英文论文 申请到CSC 和xl去外地玩两次 想到了再加 去年Flag倒了几个&#xff1f; 一维河网水动力学模型导师说不用自己编&#xff0c;看懂…

Numpy文件交互:.npy和.npz有什么区别?

文章目录saveloadsavezsavez_compressedNumpy提供了以.npy为后缀的文件存储方案&#xff0c;与这种文件格式密切相关的读、写函数分别是np.load和np.save。通过savez可以一次性存储多个数组&#xff0c;并可通过load以键值对的形式读取出来&#xff1b;如果觉得文件太大&#x…

Mybatis缓存

内存中的一块存储空间&#xff0c;服务于某个应用程序&#xff0c;旨在将频繁读取的数据临时保存在内存中&#xff0c;便于二次快速访问。 一级缓存 SqlSession级别的缓存&#xff0c;同一个SqlSession的发起多次同构查询&#xff0c;会将数据保存在一级缓存中。 注意&#x…

【NI Multisim 14.0虚拟仪器设计——放置虚拟仪器仪表(频率特性测试仪)】

目录 序言 &#x1f34d;放置虚拟仪器仪表 &#x1f349;频率特性测试仪 &#x1f34a;&#x1f34a;1.“模式”选项组 &#x1f34a;&#x1f34a;2.“水平”选项组 &#x1f34a;&#x1f34a;3.“垂直”选项组 &#x1f34a;&#x1f34a;4.“控件”选项组 序言 N…

SpringBoot+Vue项目大学生租房平台

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 服务器&#xff1a;tomcat7 数据库&#xff1a;mysql 5.7/8.0 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.3.9 浏…

数据结构课程设计[2023-01-19]

数据结构课程设计[2023-01-19] 数据结构课程设计 一、课程设计要求 实现指定的题目&#xff08;学号最后两位%41&#xff09;&#xff0c;并撰写课程设计报告。独立完成&#xff0c;功能不完备也没关系&#xff0c;只要是自己做的 使用 C、C或者 JAVA 语言&#xff0c;采用…

​第四章 Flink 窗口和水位线​

Flink 系列教程传送门 第一章 Flink 简介 第二章 Flink 环境部署 第三章 Flink DataStream API 第四章 Flink 窗口和水位线 第五章 Flink Table API&SQL 第六章 新闻热搜实时分析系统 一、时间概念&#xff1a;事件时间和处理时间 在流式处理的过程中&#xff0c;数据…

详解微信小程序开发中的“数据绑定”和代码样例

简介 首先需要区分微信小程序的运行环境和框架系统。运行环境为小程序在手机当中运行的时候&#xff0c;微信客户端所能提供的环境支持&#xff0c;也就是在这种环境下如何进行数据渲染工作&#xff1b;框架系统则是微信小程序在进行开发的过程中&#xff0c;如何通过代码实现…

数字逻辑理论——组合电路

利用数据选择器设计组合逻辑电路 m&#xff1a;组合电路输入变量个数 n&#xff1a;数据选择器的控制端个数 &#xff08;1&#xff09;mn 利用8选1数据选择器设计函数&#xff1a;FAB’A’CBC’ 待设计卡诺图&#xff1a; F∑(1,2,3,4,5,6) &#xff08;2&#xff09;m&g…

【每日一题】【LeetCode】【第十九天】【Python】汇总区间

解决之路 题目描述 测试案例&#xff08;部分&#xff09; 第一次 没有想到什么更快的方法&#xff0c;先用两个循环来写出来思路。 class Solution(object):def summaryRanges(self, nums):res []index 0n len(nums)while index < n:if index n - 1:res.append(str…

Spring_FrameWork_07(SpringMVC与SSM整合)

SpringMVC&#xff08;一种基于java实现的轻量级web框架&#xff09; 请求与响应 REST风格 SSM整合 拦截器 public class ServletContainersInitConfig extends AbstractDispatcherServletInitializer {Overrideprotected WebApplicationContext createServletApplicationCont…

【工具】用AI辅助论文/博客的写作:Obsidian+Text Generator的详细安装教程

目录 前言 介绍 Obsidian Text-Generator 使用教程 安装Obsidian 安装Text Generator 插件安装 获取开放 AI API 密钥 插件选项配置 初体验 前言 对于作家、博主和学生来说&#xff0c;这是一个很好的工具&#xff0c;它通过使用最强大的语言模型之一&#xff1a;Ope…