kafka-go使用:以及kafka一些基本概念说明

news2025/1/10 20:51:16

关于kafka

作为开发人员kafka中最常关注的几个概念,是topic,partition和group这几个概念。topic是主题的意思,简单的说topic是数据主题,这样解释好像显得很苍白,只是做了个翻译。一图胜前言,我们还是通过图解来说明。

生产者负责写数据,一个topic可以有多个分区。如下图所示,生产者写数据的示意。从这个图中,我们可以得出一个很重要的信息:分区有序,即消息在某个分区上是有顺序的,而全局是没有顺序的。这个意味着如果我们需要保证顺序,我们在写消息时需要往同一个分区中写数据。比如,我们有一个场景,有一个订单。首先,创建支付订单,发送一个kafka消息,然后,实际支付,发送一个kafka消息,最后,又想退款,又发起了退款,又发送了一个kafka消息。如果,这三个消息在不同的分区上,我们就无法保证,我们按照创建支付订单——支付——退款这个顺序执行。依据分区有序的特点,我们可以把跟这个订单相关的所有操作的消息都写到一个分区上,比如,可以通过根据订单id进行hash。

group只跟消费者有关系,消费者通过group进行标识,一个消费者实例表示一个消费者,消费者实例可以在不同的线程中开启,也可以在不同的进程中开启。这可以提升消费的并发能力。那么,这是否意味着可以无限开启消费者实例,以提升消费者消费消息的速度呢?

这就涉及到消费的逻辑,我先给出结论。接下来,我们还是通过示意图的方式来做补充说明。group跟partition的数量有关系,当消费者数量小于等于partition数量时,每个消费者都能消费到消息。

如果一个topic有4个分区,分别为p0,p1,p2,p3。现在有两个消费者组,分别是groupA和groupB。一个消费者就是一个消费实例,比如,C0就是在一个进程中启的一个消费者实例。因为这个topic有4个分区,groupA有4个消费者,则每个消费者被分配到一个对应的分区上消费,假如,这个分组又启了一个消费者consumer1,即消费者数量大于分区数量,则这个消费者不会读到消息。

而groupB只有两个消费者,则每一个消费者分别获取两个分区上的消息,如果,groupB只有一个消费者,那么,所有分区上的消息都只有这一个消费者获取。

开源项目kafka-go的使用

kafka-go是一个用go语言开发的kafka客户端。

生产者

我们看一个基于kafka-go实现的生产者的示例:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	// 创建Kafka写入器配置
	writerConfig := kafka.WriterConfig{
		Brokers:  []string{"10.10.37.100:30001"},
		Topic:    "my-topic",
		Balancer: &kafka.Hash{}, //消息分区的策略,这个策略是通过hash算法根据kafka.Message的key值来选择分区的
	}

	// 创建写入器
	writer := kafka.NewWriter(writerConfig)

	// 发送消息
	messages := []string{"message 1", "message 2", "message 3", "message 4", "message 5", "message 6", "message 7", 
	"message 8", "message 9", "message 10", "message 11", "message 12", "message 13", "message 14", "message 15", "message 16"}
	for _, msg := range messages {
		time.Sleep(1 * time.Second)
		if err := writer.WriteMessages(context.Background(), kafka.Message{
			Key:   []byte(fmt.Sprintf("key-%d", time.Now().UnixNano())),
			Value: []byte(msg),
		}); err != nil {
			fmt.Printf("Failed to write message: %v\n", err)
		} else {
			fmt.Printf("Message written: %s\n", msg)
		}
	}

	// 关闭写入器
	if err := writer.Close(); err != nil {
		fmt.Printf("Failed to close writer: %v\n", err)
	}
}

WriterConfig中有一个字段我们在开发过程中可能会需要关注到,Balancer:这个用于把消息分发到不同的分区上。这个策略可以自定义。当然,kafka-go中提供几种常用的方法,我以示例中的hash这个为例做简要说明,我们直接看源码。生产者发送消息的逻辑主要在这个WriteMessages方法中,我们直接定位到消息分发到对应分区的逻辑。

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
    //......
    //忽略与分区不相关的代码,我们只关注和分区相关的逻辑
    balancer := w.balancer()
	for i, msg := range msgs {
		topic, err := w.chooseTopic(msg)
		if err != nil {
			return err
		}

		numPartitions, err := w.partitions(ctx, topic)
		if err != nil {
			return err
		}
        //根据msg计算把消息分发到哪个分区
		partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)

		key := topicPartition{
			topic:     topic,
			partition: int32(partition),
		}

		assignments[key] = append(assignments[key], int32(i))
	}

	batches := w.batchMessages(msgs, assignments)
	if w.Async {
		return nil
	}

	.....
    //忽略一些细节
	return werr
}

如果设置了分区策略,则以设置的分区策略进行分发消息。 

func (w *Writer) balancer() Balancer {
	if w.Balancer != nil {
		return w.Balancer
	}
	return &w.roundRobin
}

接下来,我们看一下我的示例中使用的Hash的分发消息的策略。我们看到这个方法的主要逻辑,就是根据msg中key进行hash,然后根据topic下总分区数来计算消息分发到对应的分区号。hasher.Write(msg.Key)

func (h *Hash) Balance(msg Message, partitions ...int) int {
	if msg.Key == nil {
		return h.rr.Balance(msg, partitions...)
	}

	hasher := h.Hasher
	if hasher != nil {
		h.lock.Lock()
		defer h.lock.Unlock()
	} else {
		hasher = fnv1aPool.Get().(hash.Hash32)
		defer fnv1aPool.Put(hasher)
	}

	hasher.Reset()
	if _, err := hasher.Write(msg.Key); err != nil {
		panic(err)
	}

	// uses same algorithm that Sarama's hashPartitioner uses
	// note the type conversions here.  if the uint32 hash code is not cast to
	// an int32, we do not get the same result as sarama.
	partition := int32(hasher.Sum32()) % int32(len(partitions))
	if partition < 0 {
		partition = -partition
	}

	return int(partition)
}

消费者

消费者没有太多的注意事项,只是如果有多个分区,想要提升并发能力,可以启多个消费者。我们直接看示例。

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	// 创建Kafka读取器配置
	readerConfig := kafka.ReaderConfig{
		Brokers:  []string{"10.10.37.100:30001"},
		Topic:    "my-topic",
		GroupID:  "my-group",
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	}

	// 创建读取器
	reader := kafka.NewReader(readerConfig)

	// 处理信号以便优雅地关闭
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	// 读取消息
	for {
		select {
		case sig := <-sigChan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			return
		default:
			msg, err := reader.ReadMessage(context.Background())
			if err != nil {
				fmt.Printf("Failed to read message: %v\n", err)
				continue
			}
			fmt.Printf("当前时间:%v Message on topic: %s value: %s partion:%d\n", time.Now(), msg.Topic, string(msg.Value), msg.Partition)
		}
	}
}

创建topic

直接上示例。创建topic是个幂等操作。

package main

import (
	"net"
	"strconv"

	"github.com/segmentio/kafka-go"
)

func main() {
	// to create topics when auto.create.topics.enable='false'
	topic := "my-topic"

	conn, err := kafka.Dial("tcp", "10.10.37.100:30001")
	if err != nil {
		panic(err.Error())
	}
	defer conn.Close()

	controller, err := conn.Controller()
	if err != nil {
		panic(err.Error())
	}
	var controllerConn *kafka.Conn
	controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
	if err != nil {
		panic(err.Error())
	}
	defer controllerConn.Close()

	topicConfigs := []kafka.TopicConfig{
		{
			Topic:             topic,
			NumPartitions:     10,
			ReplicationFactor: 1,
		},
	}

	err = controllerConn.CreateTopics(topicConfigs...)
	if err != nil {
		panic(err.Error())
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1995158.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PDF密码移除技巧: 五大 PDF 密码移除器

如果您想解密或删除 PDF 密码&#xff0c;该怎么办&#xff1f;PDF 是一种经常用于商业的格式&#xff0c;您可以在培训、教育和商业场合使用它。添加这些 PDF 文件的密码可以保护您的安全和隐私。因此&#xff0c;有很多 PDF 都用密码加密&#xff0c;当您想要查看这些 PDF 时…

PTrade常见问题系列22

反馈定义的上午7点执行run_daily函数&#xff0c;但是每周一上午都没法正常执行&#xff1f; 1、run_daily函数加载在initialize函数中&#xff0c;执行后才会创建定时任务&#xff1b; 2、由于周末会有例行重启操作&#xff0c;在重启以后拉起交易时相当于非交易日启动的交易…

【人工智能训练师】2 集群搭建

题目一、基础配置 core-site.xml参数配置详情 官方文档&#xff1a;http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/core-default.xml core-default.xml与core-site.xml的功能是一样的&#xff0c;如果在core-site.xml里没有配置的属性&#xff0c…

【C++二分查找 决策包容性】1300. 转变数组后最接近目标值的数组和

本文涉及的基础知识点 C二分查找 决策包容性 LeetCode1300. 转变数组后最接近目标值的数组和 给你一个整数数组 arr 和一个目标值 target &#xff0c;请你返回一个整数 value &#xff0c;使得将数组中所有大于 value 的值变成 value 后&#xff0c;数组的和最接近 target …

【开端】JAVA中的切面使用

一、绪论 在不使用过滤器和 拦截器的前提下&#xff0c;如果统一对JAVA的 方法进行 管理。比如对一类方法或者类进行日志监控&#xff0c;前后逻辑处理。这时就可以使用到切面。它的本质还是一个拦截器。只是通过注解的方式来标识所切的方法。 二、JAVA中切面的使用实例 Aspec…

如何看待“低代码”开发平台的兴起

目录 1.概述 1.1.机遇 1.2.挑战 1.3.对开发者工作方式的影响 2.技术概览 2.1.主要特点 2.2.市场现状 2.3.主流低代码平台 2.4.分析 3.效率与质量的权衡 3.1.提高开发效率 3.2.质量与安全隐患 3.3.企业应用开发的利弊分析 4.挑战与机遇 4.1.机遇 4.2.挑战 4.3.…

为什么需要在线实时预览3D模型?如何实现?

在线实时预览3D模型在现代设计、产品开发、市场营销、以及娱乐等领域中变得越来越重要&#xff0c;原因可以归结为以下几个方面&#xff1a; 1、多平台兼容性&#xff1a; 在线实时预览通常不依赖于特定的操作系统或软件平台&#xff0c;只要设备能够访问互联网和浏览器&…

21-原理图的可读性的优化处理

1.自定义原理图尺寸 先将原理图移动到左下角 2.划分模块 3.放置模块字符串

第三期书生大模型实战营——基础岛

1.书生大模型全链路开源体系 【书生浦语大模型全链路开源开放体系】 https://www.bilibili.com/video/BV18142187g5/?share_sourcecopy_web&vd_source711f676eb7f61df7d2ea626f48ae1769 视频里介绍了书生浦语大模型的开源开放体系&#xff0c;包括了其的技术发展、模型架…

ubuntu系统下安装LNMP集成环境的详细步骤(保姆级教程)

php开发中集成环境的安装是必不可少的技能,而LNMP代表的是:Linux系统下Nginx+MySQL+PHP这种网站服务器架构。今天就给大家分享下LNMP的安装步骤。 1 Nginx安装 在安装Nginx前先执行下更新命令: sudo apt-get update 接下来开始安装Nginx, 提示:Could not get lock /v…

【mysql 第二篇章】请求到真正执行 SQL 到底是一个怎么样的过程?

从用户调用到SQL执行的流程中间发生了什么事情 1、网络请求使用 线程 来处理&#xff0c;当数据库连接池中监听到有连接请求&#xff0c;这个时候会分配一个线程来处理。 2、SQL接口 负责接收 SQL 语句&#xff0c;当线程监听到有请求和读取数据的之后&#xff0c;将 SQL 语句…

Android Fragment:详解,结合真实开发场景Navigation

目录 1&#xff09;Fragment是什么 2&#xff09;Fragment的应用场景 3&#xff09;为什么使用Fragment? 4&#xff09;Fragment如何使用 5&#xff09;Fragment的生命周期 6&#xff09;Android开发&#xff0c;建议是多个activity&#xff0c;还是activity结合fragment&…

SparkSQL——AnalyzedLogicalPlan生成

Rule和RuleExecutor SparkSQL中对LogicalPlan的解析、优化、还有物理执行计划生成都是分成一个个Rule进行的。 RuleExecutor是一个规则引擎&#xff0c;它收集Rule&#xff0c;并对plan按照rule进行执行。 每一个Rule的实现类都要实现apply方法&#xff0c;具体逻辑都放在这个…

mysql中的时间相关函数

MySQL服务器中有3种时区设置&#xff1a; 系统时区&#xff08;保存在system_time_zone系统变量中&#xff09;服务器时区&#xff08;保存在全局系统变量time_zone中&#xff09;每个客户端连接的时区&#xff08;保存在会话变量time_zone中&#xff09; 其中&#xff0c;客…

极米RS10Plus性价比高吗?7款4-6K价位投影仪测评哪款最好

通常家庭想买个投影仪都会选择4-6K这个价位段的投影仪&#xff0c;3K以下的投影配置太低&#xff0c;6K以上的价格略高&#xff0c;4-6K价位段的中高端投影仪正好满足大部分家庭的使用需求。正好极米投影在8月份上新了一款Plus版本的长焦投影&#xff1a;极米RS10Plus&#xff…

剪切走的照片找回:数据恢复实战指南

一、引言&#xff1a;当珍贵瞬间遭遇剪切失误 在数字化时代&#xff0c;照片不仅是记忆的载体&#xff0c;更是情感与故事的传承。然而&#xff0c;一次不经意的剪切操作失误&#xff0c;却可能让这些珍贵的瞬间面临丢失的风险。面对剪切走的照片&#xff0c;许多用户会感到无…

AI看奥运 | 从巴黎奥运会看人工智能的应用和发展

2024巴黎奥运会火热空前&#xff0c;从开幕式到金牌争夺战&#xff0c;本届奥运会的关注热度持续攀升。与往届不同的是&#xff0c;本届奥运会不仅是首次在体育场馆外举办的户外开幕式的奥运会&#xff0c;同时也是在转播技术上首次广泛应用AI技术的奥运会&#xff0c;包括“时…

C++ 新特性 | C++20 常用新特性介绍

目录 1、模块(Modules) 2、协程(Coroutines) 3、概念(Concepts) 4、范围(Ranges) 5、三向比较符&#xff08;three-way comparison&#xff09; C软件异常排查从入门到精通系列教程&#xff08;专栏文章列表&#xff0c;欢迎订阅&#xff0c;持续更新...&#xff09;https…

哈尔滨等保测评——为工业网络安全保驾护航新航标

哈尔滨&#xff0c;这个以冰雪和美丽闻名世界的城市&#xff0c;现在又树立了一个全新的行业标准&#xff0c;那就是“等保”&#xff0c;正在掀起一场新的安全革命&#xff0c;保卫着这个智能时代&#xff01; ❄️【哈尔滨新视野】❄️ 哈尔滨是一块充满创新活力的土地&…

数据结构之Map和Set(下)

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;数据结构&#xff08;Java版&#xff09; 上一篇文章&#xff0c;我们学习了&#xff1a;二叉搜索树、Map和Set的介绍以及常见方法的基本使用…