在Go项目中二次封装Kafka客户端功能

news2025/1/12 10:45:16

1.摘要

在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。

在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafka Go客户端库, 该开源库地址为:GitHub - IBM/sarama: Sarama is a Go library for Apache Kafka.。

2.功能结构组织

为了能在项目中快速使用, 我在项目目录中专门新建了一个名为kafka的文件夹,在该文件夹下新建了四个文件,分别为:

kafka (目录)
  |
  ----- consumer.go   (消费者方法实现)
  |
  ----- producer.go   (生产者方法实现)
  |
  ----- kafka.go      (定义接口)
  |
  ----- kafka_test.go (单元功能测试)

为方便项目使用,在此基础上做了二次封装。

3.消费者实现

第一步首先定义了一个结构体, 里面包含了Kafka的主机、topic、接收通道和消费者对象信息:

type KafkaConsumer struct {
	Hosts    string          // Kafka主机IP:端口,例如:192.168.201.206:9092
	Ctopic   string          // topic名称
	Kchan    chan string     // 接收信息通道
	Consumer sarama.Consumer // 消费者对象
}

接下来是消费者初始化函数:

func (k *KafkaConsumer) kafkaInit() {
  // 定义配置选项 
	config := sarama.NewConfig()
	config.Consumer.Return.Errors = true
	config.Version = sarama.V0_10_2_0
	
	// 初始化一个消费对象
	consumer, err := sarama.NewConsumer(k.Hosts, config)
	if err != nil {
		err = errors.New("NewConsumer错误,原因:" + err.Error())
		fmt.Println(err.Error())
		return
	}
	
	// 获取所有Topic
	topics, err := consumer.Topics()
	if err != nil {
		fmt.Println(err.Error())
		return
	}
	
	// 判断是否有自定义的Topic
	var topicsName = ""
	for _, e := range topics {
		if e == k.Ctopic {
			topicsName = e
			break
		}
	}
	
	// 没有自定义的Topic则报错
	if topicsName == "" {
		err = errors.New("找不到topics内容")
		fmt.Println(err.Error())
		return
	}
	
	// 将消费对象保存到结构体以备后面使用
	k.Consumer = consumer
}

在上面的初始化函数中, 首先初始化一个消费对象, 然后获取所有的Topic名称,并判断了在这些Topic名称中是否有我自定义的名称,获取成功后则将消费对象保存到我们绑定的结构体中。

接下来是消费监控函数实现,代码如下:

func (k *KafkaConsumer) kafkaProcess() {
	var wg sync.WaitGroup
	
	// 遍历指定Topic分区持续监控消息
	Partitions, _ := k.Consumer.Partitions(k.Ctopic)
	for _, subPartitions := range Partitions {
		pc, err := k.Consumer.ConsumePartition(k.Ctopic, subPartitions, sarama.OffsetNewest)
		if err != nil {
			continue
		}
		wg.Add(1)
		go func() {
			defer wg.Done()
			// 这里进入另一个函数可以过滤消息内容
			k.processPartition(pc)
		}()
	}
	wg.Wait()
}

函数processPartition()的实现代码如下:

func (k *KafkaConsumer) processPartition(pc sarama.PartitionConsumer) {
	defer pc.AsyncClose()
	for msg := range pc.Messages() {
	  // 这里可以过滤不需要的Topic的信息
		if strings.Contains(string(msg.Value), "group_state2") {
			continue
		}
		// 这里将获取到的Topic信息发送到通道
		k.Kchan <- string(msg.Value)
	}
}

4.生产者实现

为了跟消费者代码配套,这里也同步实现了生产者代码,主要功能是完成工作后,给指定Topic的生产方返回一个指定消息。

定义生产者的结构体如下:

type KafkaProducer struct {
	hosts         string               // Kafka主机
	sendmsg       string               // 消费方返回给生产方的消息
	ptopic        string               // Topic
	AsyncProducer sarama.AsyncProducer // Kafka生产者接口对象
}

对应的生产者初始化函数实现如下:

func (k *KafkaProducer) kafkaInit() {
  // 定义配置参数
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 5
	config.Producer.Return.Successes = true
	config.Version = sarama.V0_10_2_0
	
	// 初始化一个生产者对象
	producer, err := sarama.NewAsyncProducer(k.hosts, config)
	if err != nil {
		err = errors.New("NewAsyncProducer错误,原因:" + err.Error())
		fmt.Println(err.Error())
		return
	}
	
	// 保存对象到结构体
	k.AsyncProducer = producer
}

给生产者回复信息的函数实现如下:

func (k *KafkaProducer) kafkaProcess() {
	msg := &sarama.ProducerMessage{
		Topic: k.ptopic,
	}
	// 信息编码
	msg.Value = sarama.ByteEncoder(k.sendmsg)
	
	// 将信息发送给通道
	k.AsyncProducer.Input() <- msg
}

5.接口定义实现

首先对于生产者和消费者,都有对应的初始化和执行操作,因此定义接口函数如下:

// Kafka方法接口
type IKafkaMethod interface {
	kafkaInit()     // 初始化方法
	kafkaProcess()  // 执行方法
}

为了方便管理接口的赋值操作, 这里定义了一个接口管理方法, 并用Set()函数进行接口类型赋值, Run()函数负责运行对应的成员函数:

// 接口管理结构体
type KafkaManager struct {
	kafkaMethod IKafkaMethod  // 接口对象
}

// 定义实现Set方法
func (km *KafkaManager) Set(m IKafkaMethod) {
	km.kafkaMethod = m  // 将指定的方法赋给接口
}

// 定义实现Run方法
func (km *KafkaManager) Run() {
  km.kafkaMethod.kafkaInit()
  go km.kafkaMethod.kafkaProcess()
}

最后一部分是供外部调用的函数,首先定义一个结构体,该结构体中保存了Kafka的基础信息和三个对象指针:

type KafkaMessager struct {
	KafkaManager  *KafkaManager   // 接口管理对象指针
	KafkaProducer *KafkaProducer  // 生产者对象指针
	KafkaConsumer *KafkaConsumer  // 消费者对象指针
	Hosts         string          // Kafka主机
	topic         string          // topic
}

// 供外部调用初始化的函数,传入Kafka主机IP和Topic,返回操作对象指针,并初始化结构体成员变量
func NewKafkaMessager(hosts, topic string) *KafkaMessager {
	km := &KafkaMessager{
		KafkaManager:  new(KafkaManager),
		KafkaProducer: new(KafkaProducer),
		KafkaConsumer: new(KafkaConsumer),
		Hosts:         hosts,
		topic:         topic,
	}
	return km
}

6.功能调用和验证

在Kafka_test.go文件中,定义一个用于单元测试的函数,格式如下:

func TestKafka(t *testing.T) {
    ....
}

使用单元测试函数的好处是可以单独调试, 专注核心功能本身。

我使用的编辑器是Goland, 在TestKafka函数前面有个三角形小图标,点击可以选择各种调试选项,如图:

下面是我模拟用户调用的客户端代码片段:

// 这里选择我自己搭建的Kafka所在服务器,Topic为test123
// 注意:这里的hosts格式是IP:端口的格式,例如:192.168.201.206:9092
hosts := "192.168.201.206:9092"
topic := "test123"

// 调用初始化函数,并将上面的内容作为参数传进去
nkm := NewKafkaMessager(hosts, topic)

// 初始化消费者,当生产者发出消息,消费者自动消费
nkm.KafkaConsumer.Hosts = hosts             // 消费者host赋值
nkm.KafkaConsumer.Ctopic = topic            // 消费者topic赋值
nkm.KafkaConsumer.Kchan = make(chan string) // 初始化消息通道
nkm.KafkaManager.Set(nkm.KafkaConsumer)     // 接口赋值,设置成操作消费者方法
nkm.KafkaManager.Run()                  // 执行消费者初始化方法


// 监听通道,接收生产客户端发过来的消息
recv := <- nkm.KafkaConsumer.Kchan
fmt.Println(recv)  // 打印接收到的消息

现在我们可以选择直接运行程序了,然后在Kafka的生产者控制台中输入字符:Hello,Goland发送:

可以看到,我们的程序成功接收到Kafka生产者发送过来的信息。

--- END --

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1134696.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习合集】优化目标与评估指标合集 ->(个人学习记录笔记)

文章目录 优化目标与评估指标1. 优化目标1.1 两类基础任务与常见优化目标1.2 分类任务损失0-1损失交叉熵损失与KL散度softmax损失的理解与改进Hinge损失 1.3 回归任务损失L1/L2距离L1/L2距离的改进 Huber loss 2. 评测指标2.1 分类任务中评测指标准确率(查准率)/召回率(查全率)…

应急响应-网站入侵篡改指南_Webshell内存马查杀_漏洞排查_时间分析

1. 前言 一般安服在做项目的时候&#xff0c;经常会遇到需要做应急响应的工作&#xff0c;所谓应急响应就是当网站出现异常的时候&#xff0c;根据相关的问题对其进行溯源分析&#xff0c;发现问题&#xff0c;解决问题。 2. 网络安全异常特征 这里大概汇总一下网络安全异常的…

二叉树相关问题细谈递归

大家好&#xff0c;我是Dark Flame Master&#xff0c;今天给大家带来的介绍的是递归的思想&#xff0c;然后利用递归的方法实现建树的各个函数&#xff0c;例如节点个数&#xff0c;前中后序遍历&#xff0c;判断一棵二叉树是否为完全二叉树等&#xff0c;看完本文相信你会对递…

基于情感词典的情感分析方法

计算用户情绪强弱性&#xff0c;对于每一个文本都可以得到一个情感分值&#xff0c;以情感分值的正负性表示情感极性&#xff0c;大于0为积极情绪&#xff0c;小于0反之&#xff0c;绝对值越大情绪越强烈。 基于情感词典的情感分析方法主要思路&#xff1a; 1、对文本进行分词…

【1】zabbix6.4监控windows电脑操作教程

实验目标&#xff1a; 1.客户端&#xff08;windows&#xff09;安装zabbix agent 并添加到zabbix服务端&#xff1b; 2.可视化常用指标方便快速监控&#xff0c;及时了解客户端情况。 实施1&#xff1a; 步骤1&#xff1a;下载zabbix windows端安装包 官网下载传送门>D…

Android 10.0 Launcher3定制化之动态日历图标功能实现

1.概述 在10.0的系统产品rom开发中,在Launcher3中的相关定制化功能中,对于一些产品要求需要动态日历图标功能,在日期改变的时候,日历图标也需要跟着改变 所以需要自定义日历图标,监听日历改变的广播,收到日期改变的广播后,刷新日历图标,接下来就来分析关于动态日历图标…

5G与无人驾驶:引领未来交通的新潮流

5G与无人驾驶&#xff1a;引领未来交通的新潮流 随着5G技术的快速发展和普及&#xff0c;无人驾驶技术也日益受到人们的关注。5G技术为无人驾驶提供了更高效、更准确、更及时的通信方式&#xff0c;从而改变了我们对交通出行的认知和使用方式。本文将探讨5G技术在无人驾驶领域的…

大数据软件系统的交付流程

大数据软件系统的开发和交付流程通常涉及多个阶段&#xff0c;需要按照一定的计划和方法进行。以下是一个一般性的大数据软件系统开发和交付流程&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎交流合作。 1.需求…

查找算法-顺序查找法(Sequential Search)

目录 查找算法-顺序查找法&#xff08;Sequential Search&#xff09; 1、说明 2、算法分析 3、C代码 查找算法-顺序查找法&#xff08;Sequential Search&#xff09; 1、说明 顺序查找法又称线性查找法&#xff0c;是一种比较简单的查找法。它是将数据一项一项地按顺序…

硬件知识积累 PCIE 接口

1. PCIE 是什么 中文名称&#xff1a;高速串行计算机扩展总线标准 PCI-Express(peripheral component interconnect express)是一种高速串行计算机扩展总线标准&#xff0c;它原来的名称为“3GIO”&#xff0c;是由英特尔在2001年提出的&#xff0c;旨在替代旧的PCI&#xff0c…

nexus 快速搭建-本地私有仓库 -maven

场景&#xff1a; 需要上传打包starer本地、局域网内 jar包上传、下载搭建后本地有层代理&#xff0c;可节省代宽&#xff0c;无网可拉包等… 下载&#xff1a; https://help.sonatype.com/repomanager3/product-information/download 基本说明&#xff1a; proxy 用来代理远程…

ChatGPT AIGC 快速合并Excel工作薄 Vlookup+INDIRECT

在职场中进行数据处理,数据分析汇报与统计的过程中,经常会遇到这样的一个问题,那就是需要统计的数据源在多个文件中,多个工作薄中,如果要进行数据处理,汇总的时候会很不方便。 如果要汇总6个月的数据可能就得需要手动复制了。 再或者用其它方法来进行数据合并。 例如我…

高效的文件管理方法:如何批量在文件名中间插入特定内容

在高效的文件管理中&#xff0c;批量操作是一项非常重要的技能。通过批量操作&#xff0c;我们可以同时处理多个文件&#xff0c;节省时间和精力。本文将介绍一种实用的方法&#xff0c;即云炫文件管理器如何在文件名中间批量插入特定内容&#xff0c;以实现高效的文件管理。现…

力扣刷题 day55:10-25

1.数组异或操作 给你两个整数&#xff0c;n 和 start 。 数组 nums 定义为&#xff1a;nums[i] start 2*i&#xff08;下标从 0 开始&#xff09;且 n nums.length 。 请返回 nums 中所有元素按位异或&#xff08;XOR&#xff09;后得到的结果。 方法一&#xff1a;位运…

Go学习第九章——面向“对象”编程(三大特性与接口和断言)

Go面向“对象”编程&#xff08;三大特性与接口和断言&#xff09; 1. 封装1.1 介绍1.2 快速入门 2.继承2.1 介绍2.2 快速入门2.3 深入学习 3.接口3.1 接口特点和语法说明3.2 快速入门3.3 注意事项和细节说明3.4 接口和继承关系 4. 多态4.1 基本概念4.2 快速入门4.3 使用场景 5…

大数据调度最佳实践 | 从Airflow迁移到Apache DolphinScheduler

迁移背景 有部分用户原来是使用 Airflow 作为调度系统的&#xff0c;但是由于 Airflow 只能通过代码来定义工作流&#xff0c;并且没有对资源、项目的粒度划分&#xff0c;导致在部分需要较强权限控制的场景下不能很好的贴合客户需求&#xff0c;所以部分用户需要将调度系统从…

ROS笔记之visualization_msgs-Marker学习

ROS笔记之visualization_msgs-Marker学习 code review! 文章目录 ROS笔记之visualization_msgs-Marker学习一.line_strip例程二.line_list例程一二.line_list例程二二.TEXT_VIEW_FACING例程三.附CMakeLists.txt和package.xml五.关于odom、base_link和map坐标系六.关于visualiz…

工作:三菱伺服驱动器连接参数及其电机钢性参数配置与调整

工作&#xff1a;三菱伺服驱动器参数及电机钢性参数配置与调整 一、三菱PLC与伺服驱动器连接参数的设置 1. 伺服配置 单个JET伺服从站链接侧占用点数:Rx/Ry占用64点、RWw/RWr占用32点 图中配置了22个JET伺服从站&#xff0c;占用点数:Rx/Ry占用64222048‬点、RWw/RWr占用322…

薛定谔的猫重出江湖?法国初创公司AliceBob研发猫态量子比特

总部位于巴黎的初创公司Alice&Bob使用超导芯片的两个相反的量子态&#xff08;他们称之为“猫态量子比特”芯片&#xff09;来帮助开发量子计算的不同自旋方式。&#xff08;图片来源&#xff1a;网络&#xff09; 有的人认为&#xff0c;构建量子计算机的模块模仿了著名的…

安科瑞剩余电流继电器在智能建筑中的应用

安科瑞 华楠 【摘 要】 分析了智能建筑应用剩余电流继电器的必要性&#xff0c;介绍了ASJ剩余电流继电器的主要功能、工作原理、分类情况和提出了在选择剩余电流保护断路器时的原则和注意事项。 【关键词】 ASJ剩余电流继电器&#xff1b;智能建筑&#xff1b;应用 一、前言…