【kafka】简单运用go语言操作kafka实现生产者和消费者功能的包,confluent-kafka-go和sarama

news2024/12/18 17:05:09

confluent-kafka-go和sarama对比

特性confluent-kafka-gosarama
底层实现基于 librdkafka C 库完全用 Go 实现
性能高吞吐量、低延迟吞吐量较低,适合常规应用
安装依赖需要 C 编译器和 librdkafka无需外部依赖,纯 Go 实现
功能支持 Kafka 所有功能,包括事务支持 Kafka 核心功能,事务支持较弱
使用难度配置复杂,需理解底层 C 库使用简便,快速上手
社区支持由 Confluent 官方支持社区驱动,文档丰富
错误处理和日志错误处理较为复杂,日志记录较为详细错误处理简单,日志记录清晰
适用场景高性能要求、高吞吐量的生产环境一般的生产和消费场景,快速开发

基础使用案例

  1. 使用 confluent-kafka-go 发送消息
    使用 confluent-kafka-go 库向 Kafka 发送消息。
package main

import (
	"fmt"
	"log"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	config := &kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092", // Kafka服务器地址
	}

	producer, err := kafka.NewProducer(config)
	if err != nil {
		log.Fatal(err)
	}
	defer producer.Close()

	message := &kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &"test_topic", Partition: kafka.PartitionAny},
		Value:          []byte("Hello Kafka from Go!"),
	}

	err = producer.Produce(message, nil)
	if err != nil {
		log.Fatal("Failed to produce message:", err)
	} else {
		fmt.Println("Message sent successfully!")
	}

	producer.Flush(15 * 1000)
}
  1. 使用 confluent-kafka-go 消费消息
    使用 confluent-kafka-go 库从 Kafka 中消费消息。
package main

import (
	"fmt"
	"log"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	config := &kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092", // Kafka服务器地址
		"group.id":          "test-group",     // 消费者组ID
		"auto.offset.reset": "earliest",        // 自动从最早的消息开始消费
	}

	consumer, err := kafka.NewConsumer(config)
	if err != nil {
		log.Fatal(err)
	}
	defer consumer.Close()

	err = consumer.Subscribe("test_topic", nil)
	if err != nil {
		log.Fatal("Failed to subscribe:", err)
	}

	for {
		msg, err := consumer.ReadMessage(-1)
		if err == nil {
			fmt.Printf("Consumed message: %s\n", string(msg.Value))
		} else {
			fmt.Printf("Error while consuming: %v\n", err)
		}
	}
}
  1. 使用 sarama 发送消息
    使用 sarama 库向 Kafka 发送消息。
package main

import (
	"fmt"
	"log"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true

	producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatal("Failed to create producer:", err)
	}
	defer producer.Close()

	message := &sarama.ProducerMessage{
		Topic: "test_topic",
		Value: sarama.StringEncoder("Hello Kafka from Go (Sarama)!"),
	}

	partition, offset, err := producer.SendMessage(message)
	if err != nil {
		log.Fatal("Failed to send message:", err)
	}

	fmt.Printf("Message sent to partition %d with offset %d\n", partition, offset)
}
  1. 使用 sarama 消费消息
    使用 sarama 库从 Kafka 中消费消息。
package main

import (
	"fmt"
	"log"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true

	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
	if err != nil {
		log.Fatal("Failed to create consumer:", err)
	}
	defer consumer.Close()

	partitions, err := consumer.Partitions("test_topic")
	if err != nil {
		log.Fatal("Failed to get partitions:", err)
	}

	for _, partition := range partitions {
		pc, err := consumer.ConsumePartition("test_topic", partition, sarama.OffsetNewest)
		if err != nil {
			log.Fatal("Failed to start consumer for partition:", err)
		}
		defer pc.Close()

		for msg := range pc.Messages() {
			fmt.Printf("Consumed message: %s\n", string(msg.Value))
		}
	}
}

性能案例对比

  1. 性能
    confluent-kafka-go:
    由于底层使用了 librdkafka,confluent-kafka-go 通常在吞吐量、延迟和连接管理方面表现得更加优越。
    适合用于高吞吐量、低延迟的生产环境。
// 高吞吐量性能优化示例:
package main

import (
	"fmt"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	config := &kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092",
	}

	producer, _ := kafka.NewProducer(config)
	defer producer.Close()

	for i := 0; i < 10000; i++ {
		producer.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &"test_topic", Partition: kafka.PartitionAny},
			Value:          []byte(fmt.Sprintf("Message %d", i)),
		}, nil)
	}

	producer.Flush(15 * 1000)
}

sarama:
虽然 sarama 的性能不及 confluent-kafka-go,但它对于大多数常规用途仍然足够快,特别是在吞吐量要求不是特别高的场景中。

// sarama 吞吐量示例:
package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true

	producer, _ := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	defer producer.Close()

	for i := 0; i < 10000; i++ {
		producer.SendMessage(&sarama.ProducerMessage{
			Topic: "test_topic",
			Value: sarama.StringEncoder(fmt.Sprintf("Message %d", i)),
		})
	}
}
  1. 功能
    confluent-kafka-go:
    提供了丰富的功能,支持 Kafka 的所有核心功能,如生产者、消费者、消费者组管理、消息传递、事务支持、数据压缩等。
    支持 Kafka 的最新特性,如消息事务、压缩、性能调优等。
    由于是 librdkafka 的封装,confluent-kafka-go 与 Kafka 的版本兼容性更好,能够快速支持 Kafka 的新功能。
// 使用事务的生产者示例:
package main

import (
	"fmt"
	"log"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	config := &kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092",
		"acks":               "all",
	}

	producer, _ := kafka.NewProducer(config)
	defer producer.Close()

	// 开启事务
	producer.BeginTransaction()

	producer.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &"test_topic", Partition: kafka.PartitionAny},
		Value:          []byte("Transactional Message"),
	}, nil)

	// 提交事务
	producer.CommitTransaction()
}

sarama:
提供了 Kafka 的核心功能,但可能在一些高级特性上不如 confluent-kafka-go 丰富。例如,sarama 对事务支持相对较弱,尽管在常规的生产/消费场景中功能足够。
支持 Kafka 的基本功能,如生产者、消费者组、消息传递等,但对一些高级功能(如流控、集群管理等)的支持可能稍有不足。

// sarama 事务支持相对较弱,但基本生产和消费功能已足够:
package main

import (
	"fmt"
	"log"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.Return.Successes = true

	producer, _ := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
	defer producer.Close()

	// 模拟事务:sarama 本身不直接支持事务,通常通过事务标记和重试来实现
	message := &sarama.ProducerMessage{
		Topic: "test_topic",
		Value: sarama.StringEncoder("Transactional Message"),
	}

	partition, offset, _ := producer.SendMessage(message)
	fmt.Printf("Message sent to partition %d with offset %d\n", partition, offset)
}
  1. 总结:
    如果你需要高性能和 Kafka 高级特性,选择 confluent-kafka-go。
    如果你追求易用性和快速开发,或者不希望依赖 C 库,选择 sarama。
  • 选择 confluent-kafka-go:适用于高性能、高吞吐量的场景。

  • 选择 sarama:适用于不需要复杂配置和高级特性的场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2261693.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实现按键按下(低电平)检测到下降沿

按照流程进行编程 步骤1&#xff1a; 初始化函数 包括时基工作参数配置 输入通道配置 更新中断使能 使能捕获、捕获中断及计数器 HAL_TIM_IC_Init(&ic_handle) //时基参数配置 HAL_TIM_IC_ConfigChannel(&ic_handle,&ic_config,TIM_CHANNEL_2) //输…

2024广东省职业技能大赛云计算——私有云(OpenStack)平台搭建

OpenStack搭建 前言 搭建采用双节点安装&#xff0c;即controller控制节点和compute计算节点。 CentOS7 系统选择 2009 版本&#xff1a;CentOS-7-x86_64-DVD-2009.iso 可从阿里镜像站下载&#xff1a;https://mirrors.aliyun.com/centos/7/isos/x86_64/ OpenStack使用竞赛培…

使用ENSP实现NAT(2)

一、NAT的类型 二、静态NAT 1.项目拓扑 2.项目实现 路由器AR1配置&#xff1a; 进入系统视图 sys将路由器命名为AR1 sysname AR1关闭信息中心 undo info-center enable 进入g0/0/0接口 int g0/0/0将g0/0/0接口IP地址配置为192.168.10.254/24 ip address 192.168.10.254 24进…

RNN LSTM Seq2Seq Attention

非端到端&#xff1a; data -》 cleaning -》 feature Engining &#xff08;70%-80%工作 设计特征&#xff09;-》 分类器 -》预测 端到端 End-to-End&#xff1a; data -》 cleaning -》Deep learning&#xff08;表示学习&#xff0c;从数据中学习特征&#xff09; -》…

PHP排序算法:数组内有A~E,A移到C或者C移到B后排序,还按原顺序排序,循环

效果 PHP代码 public function demo($params){function moveNext($arr){$length count($arr);$lastElement $arr[$length - 1];for ($i $length - 1; $i > 0; $i--) {$arr[$i] $arr[$i - 1];}$arr[0] $lastElement;return $arr;}function moveAndReplace($array, $from…

Nginx主要知识点总结

1下载nginx 到nginx官网nginx: download下载nginx&#xff0c;然后解压压缩包 然后双击nginx.exe就可以启动nginx 2启动nginx 然后在浏览器的网址处输入localhost&#xff0c;进入如下页面说明nginx启动成功 3了解nginx的配置文件 4熟悉nginx的基本配置和常用操作 Nginx 常…

如何跟进项目

在跟进项目的过程中&#xff0c;我们需要通过清晰的沟通和高效的执行来确保目标按时达成。简单来说&#xff0c;“如何跟进项目”可归纳为&#xff1a;明确目标和交付物、建立高效沟通机制、持续监控进度与风险、灵活应对变更。尤其是“明确目标和交付物”这一点&#xff1a;当…

获取微信用户openid

附上开发文档:https://developers.weixin.qq.com/doc/offiaccount/OA_Web_Apps/Wechat_webpage_authorization.html 开发之前,准备事项 一个已认证过的服务号|基本信息配置js域名和网站授权域名配置最后确认当前账号网页授权功能是否开通,没有开通的无法获取到用户授权开发人…

【WRF工具】WRF 模型评估MET(Model Evaluation Tools)

WRF 模型评估MET&#xff08;Model Evaluation Tools&#xff09; METplus 简介WRF 模型评估工具 MET 的安装与使用步骤安装步骤使用步骤 参考 METplus 简介 METplus 是一个增强型的模型评估和验证框架&#xff0c;支持从短期预报&#xff08;如实时警报&#xff09;到长期气候…

ARMS 用户体验监控正式发布原生鸿蒙应用 SDK

作者&#xff1a;杨兰馨&#xff08;楠瑆&#xff09; 背景 2024 年 10 月 22 日&#xff0c;华为正式发布了原生鸿蒙操作系统&#xff08;HarmonyOS NEXT&#xff09;。原生鸿蒙实现了系统底座全部自研&#xff0c;系统的流畅度、性能、安全特性等方面显著提升&#xff0c;也…

云计算HCIP-OpenStack04

书接上回&#xff1a; 云计算HCIP-OpenStack03-CSDN博客 12.Nova计算管理 Nova作为OpenStack的核心服务&#xff0c;最重要的功能就是提供对于计算资源的管理。 计算资源的管理就包含了已封装的资源和未封装的资源。已封装的资源就包含了虚拟机、容器。未封装的资源就是物理机提…

MyBatis-Plus 实用工具:SqlHelper

SqlHelper 是MyBatis-Plus的一款SQL 辅助工具类&#xff0c;提供了一些常用的方法&#xff0c;简便我们的操作&#xff0c;提高开发效率。文档 最常用的是SqlHelper.table(Obj.class) 返回的 TableInfo 对象通常包含以下常用方法&#xff1a; 1. getTableName() 获取表名。示例…

压力测试Jmeter简介

前提条件&#xff1a;要安装JDK 若不需要了解&#xff0c;请直接定位到左侧目录的安装环节。 1.引言 在现代软件开发中&#xff0c;性能和稳定性是衡量系统质量的重要指标。为了确保应用程序在高负载情况下仍能正常运行&#xff0c;压力测试变得尤为重要。Apache JMeter 是一…

QT6 Socket通讯封装(TCP/UDP)

为大家分享一下最近封装的以太网socket通讯接口 效果演示 如图&#xff0c;界面还没优化&#xff0c;后续更新 废话不多说直接上教程 添加库 如果为qmake项目中&#xff0c;在.pro文件添加 QT network QT core gui QT networkgreaterThan(QT_MAJOR_VERS…

函数指针的作用

函数指针的主要作用&#xff0c;就是用来选择不同的调度函数&#xff0c;来满足特殊需求。它的优点&#xff0c;使程序设计更加灵活。缺点&#xff1a;初学者很难理解函数指针&#xff0c;从而引起程序的可读性不高。 1、使用函数指针选择调度函数 #include "stm32f10x.…

DateRangePickerDialog组件的用法

文章目录 概念介绍使用方法示例代码我们在上一章回中介绍了DatePickerDialog Widget相关的内容,本章回中将介绍DateRangePickerDialog Widget.闲话休提,让我们一起Talk Flutter吧。 概念介绍 我们在这里说的DateRangePickerDialog是一种弹出窗口,只不过窗口的内容固定显示为…

【LeetCode每日一题】——220.存在重复元素 III

文章目录 一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【题目提示】七【解题思路】八【时空频度】九【代码实现】十【提交结果】 一【题目类别】 数组 二【题目难度】 困难 三【题目编号】 220.存在重复元素 III 四【题目描述】 给你一个…

SQL server学习07-查询数据表中的数据(下)

目录 一&#xff0c;自连接查询 二&#xff0c;多表查询 三&#xff0c;关系代数运算 1&#xff0c;笛卡尔乘积运算 1&#xff09;交叉连接 2&#xff0c;连接运算 2&#xff09;内连接 四&#xff0c;外连接 1&#xff0c;左外连接 2&#xff0c;右外连接 3&…

Three.js资源-模型下载网站

在使用 Three.js 进行 3D 开发时&#xff0c;拥有丰富的模型资源库可以大大提升开发效率和作品质量。以下是一些推荐的 Three.js 模型下载网站&#xff0c;它们提供了各种类型的 3D 模型&#xff0c;适合不同项目需求。无论你是需要逼真的建筑模型&#xff0c;还是简单的几何体…

景联文科技入选中国信通院发布的“人工智能数据标注产业图谱”

近日&#xff0c;由中国信息通信研究院、中国人工智能产业发展联盟牵头&#xff0c;联合中国电信集团、沈阳市数据局、保定高新区等70多家单位编制完成并发布《人工智能数据标注产业图谱》。景联文科技作为人工智能产业关键环节的代表企业&#xff0c;入选图谱中技术服务板块。…