golang—kafka架构原理快速入门以及自测环境搭建(docker单节点部署)

news2024/11/18 3:49:21

kafka

Apache Kafka 是一个分布式的流处理平台。它具有以下特点:

  • 支持消息的发布和订阅,类似于 RabbtMQ、ActiveMQ 等消息队列
  • 支持数据实时处理
  • 能保证消息的可靠性投递
  • 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错
  • 高吞吐率,单 Broker 可以轻松处理数千个分区以及每秒百万级的消息量

架构简介

在这里插入图片描述

Messages and Batches

kafka基本数据单元为消息,为了提高网络使用效率,采用批写入方式

Topics and Partitions

topic为kafka消费主题,每个主题下有若干分区(partitions),Kafka 通过分区来实现数据的冗余和伸缩性,分区可以分布在不同的服务器上。由于多个partition的特性,kafka无法保证topic范围内的消息顺序,但是可以保证单个分区内消息的顺序

broker

broker 对应着一个 kafka 的进程;一个 kafka 集群会包含多个 broker;同时需要在这些 broker中选举出一个controller,选举是通过 zk 来实现;controller 负责协调管理集群状态,同时也负责 partition 的 leader 选举;

Producers And Consumers
  • 消息的生产者,负责将消息发送到不同的 partition 中;消息的生产需要考虑幂等性、正确性以及安全性;kafka 引入了 ack 机制;ack 为 0,则不需要 kafka 回复,此时可能造成数据丢失;ack为 1, 则需要等待 leader 回复,此时其他 replica 可能还没同步 leader 挂掉,数据安全性没法得到保证;ack 为 -1,则需要等待其他 replica 同步完成后,才回复,此时数据最健壮,但是效率最低;
  • 消息的消费者,负责消费消息;一个 partition 对应一个consumer, 而一个 consumer 可以对应多个 partition;消费同一类消息的高吞吐量,可以设置 consumer group;
副本同步策略

每个分区里有多个副本,这些副本有一个leader。只有副本全部同步完成才发送ack。这里指同步策略,是全量同步,而不是半数以上同步了就认为该数据已经commit。不过也可以设置最少同步副本数提高性能(min.insync.replicas)

ISR

Leader 维护了一个动态的 in-sync replica set (ISR),意为和 leader 保持同步的 follower 集合。当 ISR 中的 follower 完成数据的同步之后,leader 就会给 producer 发送 ack。如果 follower 长时间未向 leader 同步数据,则该 follower 将被踢出 ISR,该时间阈值由 replica.lag.time.max.ms 参数设定。Leader 发生故障之后,就会从 ISR 中选举新的 leader。

数据可见性

需要注意的是,并不是所有保存在分区首领上的数据都可以被客户端读取到,为了保证数据一致性,只有被所有同步副本 (ISR 中所有副本) 都保存了的数据才能被客户端读取到。
在这里插入图片描述

kafka读写机制

producer写流程

producer写入消息流程如下:

  • 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息

  • 连接到对应Leader对应的broker

  • 将消息按批次发送到partition的Leader上

  • 其他Follower从Leader上复制数据

  • 依次返回ACK

  • 直到所有ISR中的数据写完成,才完成提交,整个写过程结束
    在这里插入图片描述

consumer 读流程
  • 连接ZK集群,从ZK中拿到对应topic的partition信息和partition的Leader的相关信息

  • 连接到对应Leader对应的broker

  • consumer将自己保存的offset发送给Leader

  • Leader根据offset等信息定位到segment(索引文件和日志文件)

  • 根据索引文件中的内容,定位到日志文件中该偏移量对应的开始位置读取相应长度的数据并返回给consumer

kafka集群选举

副本leader选举

只有完全追上Leader数据的follower才能进行选举,Leader发生故障之后,会从ISR中选出一个新的Leader

controller选举

这部分由ZK完成,不过高本版kafka引入kratf,就可以完成去ZK化了。 ratf是一种简单易理解并且严格复合数学归纳的共识算法。

自测环境搭建

zk

docker pull wurstmeister/zookeeper
docker run -itd --name zookeeper -p 2181:2181 wurstmeister/zookeeper

kafka

 docker pull wurstmeister/kafka
 docker run -itd --name kafka -p 9092:9092 -e HOST_IP=10.74.18.61 -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 -e KAFKA_ADVERTISED_HOST_NAME=10.74.18.61 --link zookeeper:zookeeper wurstmeister/kafka

go链接kafka生产消费

go版本:1.21
生产者

package main

import (
	"fmt"

	"github.com/IBM/sarama"
)

func main() {
	config := sarama.NewConfig()
	// 等待服务器所有副本都保存成功后的响应,对应ack=-1
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 是否等待成功和失败后的响应
	config.Producer.Return.Successes = true

	// 使用给定代理地址和配置创建一个同步生产者
	producer, err := sarama.NewSyncProducer([]string{"10.74.18.61:9092"}, config)
	if err != nil {
		panic(err)
	}

	defer producer.Close()

	//构建发送的消息,
	msg := &sarama.ProducerMessage{
		//Topic: "test",//包含了消息的主题
		Partition: int32(10),                   //
		Key:       sarama.StringEncoder("key"), //
	}

	var value string
	var msgType string
	for {
		_, err := fmt.Scanf("%s", &value)
		if err != nil {
			break
		}
		fmt.Scanf("%s", &msgType)
		fmt.Println("msgType = ", msgType, ",value = ", value)
		msg.Topic = msgType
		//将字符串转换为字节数组
		msg.Value = sarama.ByteEncoder(value)
		//fmt.Println(value)
		//SendMessage:该方法是生产者生产给定的消息
		//生产成功的时候返回该消息的分区和所在的偏移量
		//生产失败的时候返回error
		partition, offset, err := producer.SendMessage(msg)

		if err != nil {
			fmt.Println("Send message Fail", err)
		}
		fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
	}
}

消费者

package main

import (
	"fmt"
	"sync"

	"github.com/IBM/sarama"
)

var (
	wg sync.WaitGroup
)

func main() {
	// 根据给定的代理地址和配置创建一个消费者
	consumer, err := sarama.NewConsumer([]string{"10.74.18.61:9092"}, nil)
	if err != nil {
		panic(err)
	}
	//Partitions(topic):该方法返回了该topic的所有分区id
	partitionList, err := consumer.Partitions("test")
	if err != nil {
		panic(err)
	}

	for partition := range partitionList {
		//ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者
		//如果该分区消费者已经消费了该信息将会返回error
		//sarama.OffsetNewest:表明了为最新消息
		pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
		if err != nil {
			panic(err)
		}
		defer pc.AsyncClose()
		wg.Add(1)
		go func(sarama.PartitionConsumer) {
			defer wg.Done()
			//Messages()该方法返回一个消费消息类型的只读通道,由代理产生
			for msg := range pc.Messages() {
				fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
			}
		}(pc)
	}
	wg.Wait()
	consumer.Close()
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1270353.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【代码】考虑区域多能源系统集群协同优化的联合需求侧响应模型(完美复现)

程序名称:考虑区域多能源系统集群协同优化的联合需求侧响应模型 实现平台:matlab-yalmip-cplex/gurobi 代码简介:风电、光伏发电等波动电源接入比例不断提高,使得区域多能源系统中能量转化和协调能力减弱。基于此,该…

orvibo旗下的VS30ZW网关分析之一

概述 从官网的APP支持的智能中枢来看,一共就两种大类: MixPad系列和网关系列 排除MixPad带屏网关外,剩余的设备如下图: 目前在市场上这四种网关已经下市,官方已经宣布停产。所以市场上流通的也几乎绝迹。 从闲鱼市场上可以淘到几个,拿来分析一下,这里我手头有如下的两…

简单字符串处理

答案&#xff1a; #include <stdio.h> #include <string.h> #define MAX 51 //该定义宏为字符串最大长度 int main() {char arr[MAX] { 0 }; gets(arr); //读取存给arrint len 0, i 0, num 0;len strlen(arr); //len代表字符串长度for (i 0; i &l…

【实战教程】PHP如何轻松对接阿里云直播?

1. 配置阿里云直播的推流地址和播放地址 使用阿里云直播功能前&#xff0c;首先需要在阿里云控制台中创建直播应用&#xff0c;然后获取推流地址和播放地址。 推流地址一般格式为&#xff1a; rtmp://{Domain}/{AppName}/{StreamName}?auth_key{AuthKey}-{Timestamp}-{Rand…

Docker安装Elasticsearch以及ik分词器

Elasticsearch 是一个分布式、RESTful 风格的搜索和数据分析引擎&#xff0c;能够解决不断涌现出的各种用例。作为 Elastic Stack 的核心&#xff0c;Elasticsearch 会集中存储您的数据&#xff0c;让您飞快完成搜索&#xff0c;微调相关性&#xff0c;进行强大的分析&#xff…

微服务中配置Nacos热更新

启动Nacos startup.cmd -m standalone 在IDE中启动服务 打开nacos管理后台并选择配置列表 创建配置(这里以日期格式为例) 因为这里配置的是userservice的服务,所以在userservice服务的pom文件中引入依赖 配置一个bootstrap.yml文件 注意这里bootstrap文件中配置过的内容,在app…

一起学docker系列之十五深入了解 Docker Network:构建容器间通信的桥梁

目录 1 前言2 什么是 Docker Network3 Docker Network 的不同模式3.1 桥接模式&#xff08;Bridge&#xff09;3.2 Host 模式3.3 无网络模式&#xff08;None&#xff09;3.4 容器模式&#xff08;Container&#xff09; 4 Docker Network 命令及用法4.1 docker network ls4.2 …

Web安全漏洞分析-XSS(中)

随着互联网的迅猛发展&#xff0c;Web应用的普及程度也愈发广泛。然而&#xff0c;随之而来的是各种安全威胁的不断涌现&#xff0c;其中最为常见而危险的之一就是跨站脚本攻击&#xff08;Cross-Site Scripting&#xff0c;简称XSS&#xff09;。XSS攻击一直以来都是Web安全领…

(动手学习深度学习)第13章 实战kaggle竞赛:树叶分类

文章目录 实战kaggle比赛&#xff1a;树叶分类1. 导入相关库2. 查看数据格式3. 制作数据集4. 数据可视化5. 定义网络模型6. 定义超参数7. 训练模型8. 测试并提交文件 竞赛技术总结1. 技术分析2. 数据方面模型方面3. AutoGluon4. 总结 实战kaggle比赛&#xff1a;树叶分类 kagg…

目标检测常用评价指标

1 基本概念 1.1 IOU(Intersection over Union) 1.2 TP TN FP FN 2. 各种率 3. PR曲线 4. mAP的计算 4.1 AP的计算 4.2 mAP 4.3 mAP0.5和mAP0.5:0.95 1.1 IOU(Intersection over Union) 1.2 TP TN FP FN TP(Truth Positive)&#xff1a; 预测正类&#xff0c;实际正类&#x…

JIT精益理念下SMT物料配送模式的智能化创新

纬湃汽车电子&#xff0c;作为现代汽车电子行业的佼佼者&#xff0c;专注于为全球汽车制造商提供高品质的电子组件。公司致力于通过采用最先进的技术和流程&#xff0c;持续提升产品质量和生产效率。在这一背景下&#xff0c;纬湃汽车电子的SMT车间的转型升级尤为引人关注。 项…

【C++高阶(六)】哈希的应用--位图布隆过滤器

&#x1f493;博主CSDN主页:杭电码农-NEO&#x1f493;   ⏩专栏分类:C从入门到精通⏪   &#x1f69a;代码仓库:NEO的学习日记&#x1f69a;   &#x1f339;关注我&#x1faf5;带你学习C   &#x1f51d;&#x1f51d; 哈希的应用 1. 前言2. 位图的概念以及定义3. 位…

WEB渗透—反序列化(九)

Web渗透—反序列化 课程学习分享&#xff08;课程非本人制作&#xff0c;仅提供学习分享&#xff09; 靶场下载地址&#xff1a;GitHub - mcc0624/php_ser_Class: php反序列化靶场课程&#xff0c;基于课程制作的靶场 课程地址&#xff1a;PHP反序列化漏洞学习_哔哩哔_…

yolov8 原木识别模型

一、模型介绍 模型基于 yolov8数据集采用SKU-110k&#xff0c;这数据集太大了十几个 G&#xff0c;所以只训练了 10 轮左右就拿来微调了原木数据微调&#xff1a;纯手工标注 200 张左右原木图片&#xff0c;训练 20 轮的效果 PS&#xff1a;因为训练时间比较长 Google 的 Cola…

贪心算法的介绍

贪心算法&#xff08;又称贪婪算法&#xff09;是指&#xff0c;在对问题求解时&#xff0c;总是做出在当前看来是最好的选择。也就是说&#xff0c;不从整体最优上加以考虑&#xff0c;他所做出的是在某种意义上的局部最优解。贪心算法不是对所有问题都能得到整体最优解&#…

开放远程访问MySQL的权限

访问远程数据库时&#xff0c;产生Access denied for user ‘root‘‘xxx.xxx.xxx.xxx‘ (using password: YES)异常的解决办法 一. 异常现象 我编写了一个SpringBoot项目&#xff0c;项目中连接的数据库服务器地址是192.168.87.107&#xff0c;然后打包生成了对应的jar包&am…

【开源】基于Vue+SpringBoot的创意工坊双创管理系统

项目编号&#xff1a; S 049 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S049&#xff0c;文末获取源码。} 项目编号&#xff1a;S049&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 管理员端2.2 Web 端2.3 移动端 三、…

EXCEL一对多关系将结果合并到一个单元格

EXCEL一对多关联结果&#xff0c;合并到1个单元格&#xff0c;变成一对一 需求说明 举例说明 假设给出国家省和国家市的对应表&#xff0c;因为每个省都有很多个城市&#xff08;如图1&#xff0c;截取了部分&#xff09;&#xff0c;属于一对多的情况&#xff1b; 如何将同…

NX二次开发UF_CURVE_create_conic 函数介绍

文章作者&#xff1a;里海 来源网站&#xff1a;https://blog.csdn.net/WangPaiFeiXingYuan UF_CURVE_create_conic Defined in: uf_curve.h int UF_CURVE_create_conic(UF_CURVE_conic_p_t conic_data, tag_t * conic ) overview 概述 Creates a conic curve. See the des…

如何成为一名高效的前端开发者(10X开发者)

如今&#xff0c;每个人都想成为我们所说的“10倍开发者”。然而&#xff0c;这个术语经常被误解和高估。 本质上&#xff0c;一个高效或者10倍开发者&#xff0c;在我看来&#xff0c;是指那些能够充分利用所有可用工具的人&#xff0c;通过让这些工具处理冗余和重复的任务&am…