go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

news2025/1/15 22:46:27

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:
Windows下安装Kafka(图文记录详细步骤)

sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:
    在这里插入图片描述
    在这里插入图片描述
  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademo

require (
	github.com/Shopify/sarama v1.19
)

go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy
在这里插入图片描述
等待命令运行完毕,打开.mod文件,看到如下内容就OK了:
在这里插入图片描述

利用sarama向Kafka发送消息(消息的生产)

代码

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
)

func main() {
	config := sarama.NewConfig()                              //创建config实例
	config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认
	config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区
	config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回

	//创建信息
	msg := &sarama.ProducerMessage{}
	msg.Topic = "web.log"
	msg.Value = sarama.StringEncoder("this is a test log")

	//连接KafKa
	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
	if err != nil {
		fmt.Println("producer closed, err:", err)
		return
	}
	defer client.Close()

	//发送消息
	pid, offset, err := client.SendMessage(msg)
	if err != nil {
		fmt.Println("send msg failed,err:", err)
		return
	}
	fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
    zkServer
    
    在这里插入图片描述
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述
最后运行程序即可,输出结果为:
在这里插入图片描述

补充:消息的消费

代码

package main

import (
	"fmt"
	"github.com/Shopify/sarama"
	"time"
)

func main() {
	customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
	if err != nil {
		fmt.Println("failed init customer,err:", err)
		return
	}

	partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区
	if err != nil {
		fmt.Println("failed get partition list,err:", err)
		return
	}

	fmt.Println("partitions:", partitionlist)

	for partition := range partitionlist { // 遍历所有分区
		//根据消费者对象创建一个分区对象
		pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)
		if err != nil {
			fmt.Println("failed get partition consumer,err:", err)
			return
		}
		defer pc.Close() // 移动到这里

		go func(consumer sarama.PartitionConsumer) {
			defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了
			for msg := range pc.Messages() {
				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)
			}
		}(pc)
		time.Sleep(time.Second * 10)
	}
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1603653.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

全国产化无风扇嵌入式车载电脑农耕车辆/钢厂天车行业应用

农耕车辆行业应用 背景介绍 当前农耕车车载电脑主要的功能,是要实现农耕车的精确的定位和导航,更加先进的系统则要实现农耕车自动驾驶,与农耕车上相关传感器的通讯(例如耕土深度的传感器, 油量存量传感器…)来实现更多的自动化、信息化的功能…

28岁转行嵌入式适合转嵌入式吗?

转行到嵌入式领域是一个很好的选择,特别是如果你对电子技术、嵌入式系统和软硬件交互感兴趣的话。我这里有一套嵌入式入门教程,不仅包含了详细的视频 讲解,项目实战。如果你渴望学习嵌入式,不妨点个关注,给个评论222&a…

Java面试八股之JDK和JRE的区别

JDK和JRE的区别 定义:JDK(Java Development Kit)是Java开发工具包的缩写,它是Java开发人员必备的工具。JDK包含了编译器(javac)、Java虚拟机(JVM)和Java类库等开发工具和资源。它提供了开发、编译、调试和运行Java程序所需的一切…

NLP学习(1)-搭建环境

前言 仅记录学习笔记,如有错误欢迎指正。 环境搭建 一、环境软件安装: 1、Anaconda安装(一款可以同时创建和管理多个python环境的软件) (1) 安装链接: https://blog.csdn.net/m0_61531676/article/details/126290…

适用于Windows电脑的最佳数据恢复软件是哪些?10佳数据恢复软件

丢失我们系统中可用的宝贵信息是很烦人的。我们可以尝试几种手动方法来重新获取丢失的数据。然而,当我们采用非自动方法来恢复数据时,这是一项令人厌烦和乏味的工作。在这种情况下,我们可以尝试使用一些正版硬盘恢复软件进行数据恢复。此页面…

【honggfuzz学习笔记】honggfuzz的基本特性

本文架构 1.动机2.honggfuzz的基本概念官网描述解读 3. honggfuzz的反馈驱动(Feedback-Driven)软件驱动反馈(software-based coverage-guided fuzzing)代码覆盖率代码覆盖率的计量单位 代码覆盖率的统计方式 硬件驱动反馈( hardware-based co…

.NET开源免费的跨平台框架 - MAUI(附学习资料)

前言 前几天分享了一个.NET MAUI开源免费的UI工具包 - Uranium,然后技术群有不少同学问.NET MAUI是不是免费的?能做什么?今天特意写这篇文章来介绍一下.NET开源、免费(基于MIT License)的跨平台框架:MAUI。…

使用WebSocket实现答题积分排名实时更新的功能

需求分析 接到一个需求,是一个答题积分小程序,其中有一个功能需求是需要实时更新答题积分排名的。之前通常比较常见的需求,都是指定某个时间点才更新答题排行榜的数据的。 经过技术调研,要实现答题积分排名实时更新的功能&#…

Stable Diffusion XL优化终极指南

如何在自己的显卡上获得SDXL的最佳质量和性能,以及如何选择适当的优化方法和工具,这一让GenAI用户倍感困惑的问题,业内一直没有一份清晰而详尽的评测报告可供参考。直到全栈开发者Flix San出手。 在本文中,Flix介绍了相关SDXL优化…

李沐45_SSD实现——自学笔记

主体思路: 1.生成一堆锚框 2.根据真实标签为每个锚框打标(类别、偏移、mask) 3.模型为每个锚框做一个预测(类别、偏移) 4.计算上述二者的差异损失,以更新模型weights 先读取一张图像。 它的高度和宽度分别为561和728像素。 %matplotlib inline import …

openjudge_2.5基本算法之搜索_1804:小游戏

题目 1804:小游戏 总时间限制: 1000ms 内存限制: 65536kB 描述 一天早上,你起床的时候想:“我编程序这么牛,为什么不能靠这个赚点小钱呢?”因此你决定编写一个小游戏。 游戏在一个分割成w * h个正方格子的矩形板上进行。如图所示…

如何使用自定义Promptbooks优化您的安全工作流程

在当今的数字化时代,安全工作流程的优化变得前所未有的重要。安全团队需要快速、有效地响应安全事件,以保护组织的数据和资产。Microsoft Copilot for Security提供了一种强大的工具——自定义Promptbooks,它可以帮助安全专家通过自动化和定制…

【shell】利用k9s和config文件进入k8s集群脚本

1、需要自行安装k9s 2、config文件放在home的.kube隐藏文件夹下 #!/bin/bash# define log & color readonly FG_GREY"30" #灰色 readonly FG_RED"31" readonly FG_GREEN"32" readonly FG_YELLOW"33" readonly FG_BLU…

element-plus中的图标和文字水平对齐

<span><el-icon size"14px"><Delete /></el-icon> <span>删除</span> </span>解决方法&#xff1a;加上vertical-align: middle样式就可以了 <span><el-icon size"14px" style"vertical-align: …

搜维尔科技:【工业仿真】煤矿安全知识基础学习VR系统

产品概述 煤矿安全知识基础学习VR系统 系统内容&#xff1a; 煤矿安全知识基础学习VR系统内容包括&#xff1a;下井流程&#xff08;正确乘坐罐笼、班前会、井下行走注意事项、工作服穿戴、入井检身及人员清点、下井前准备工作、提升运输安全&#xff09;&#xff1b;运煤流程…

Web前端 Javascript笔记5

1、事件类型 ①、HTML事件 事件说明load当页面完全加载后在window上面触发&#xff0c;或当框架集加载完毕后在框架集上触发&#xff1b;unload当页面完全卸载后在window上面触发&#xff0c;或当框架集卸载后在框架集上触发&#xff1b;select当用户选择文本框&#xff08;i…

如何在原生项目中集成flutter

两个前提条件&#xff1a; 从flutter v1.17版本开始&#xff0c;flutter module仅支持AndroidX的应用在release模式下flutter仅支持一下架构&#xff1a;x84_64、armeabi-v7a、arm6f4-v8a,不支持mips和x86;所以引入flutter前需要在app/build.gradle下配置flutter支持的架构 a…

springSecurity用户认证和授权

一&#xff0c;框架介绍 Spring 是一个非常流行和成功的 Java 应用开发框架。Spring Security 基于 Spring 框架&#xff0c;提供了一套 Web 应用安全性的完整解决方案。一般来说&#xff0c;Web 应用的安全性包括用户认证&#xff08;Authentication&#xff09;和用户授权&am…

EelasticSearch的docker安装-----》es客户端使用!!!

1.Docker安装 docker run -d --name es7 -e ES_JAVA_POTS"-Xms256m -Xmx256m" -e "discovery.typesingle-node" -v /opt/es7/data/:/usr/share/elasticsearch/data -p 9200:9200 -p 9300:9300 elasticsearch:7.14.02.客户端UI工具&#xff0c;Edge浏览器…

量子密钥分发系统设计与实现(一):系统基本架构讨论

经过一段时间讨论&#xff0c;我们了解到量子密钥分发设备是当前量子保密通信系统的基础。从本文开始&#xff0c;我将开启量子密钥分发系统设计与实现系列&#xff0c;详细讨论量子密钥分发设备如何从0到1的搭建。 1.QKD系统总体讨论 QKD系统的核心功能就是为通信双方提供理论…