go语言kafka入门

news2025/1/19 11:19:37

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据

消息队列相关概念:

生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。

消费者(Consumer):从消息队列中接收和处理消息的应用程序或系统组件。

主题(Topic):消息队列中用于分类和分组消息的逻辑概念,生产者将消息发送到指定的主题,而消费者可以订阅特定的主题以接收相应的消息。

队列(Queue):消息队列中存储消息的容器,遵循先进先出(FIFO)的原则。

发布-订阅模式(Publish-Subscribe Pattern):一种消息分发模式,生产者将消息发送到一个或多个主题,而消费者通过订阅感兴趣的主题来接收相应的消息。

点对点模式(Point-to-Point Pattern):一种消息传递模式,生产者将消息发送到特定的队列中,而消费者从队列中接收并处理消息。

消息序列化(Message Serialization):将消息从应用程序的数据结构转换为可以在消息队列中传输和存储的格式,通常使用如JSON、XML 或二进制等格式。

消息持久化(Message Persistence):将消息保存到持久化存储中,以确保即使在消息队列或应用程序重启之后也不会丢失

在这里插入图片描述

安装kafka-go

go get github.com/segmentio/kafka-go

简单示例

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/segmentio/kafka-go"
)

// writeByConn 基于Conn发送消息
func writeByConn(wg *sync.WaitGroup) {
	defer wg.Done()
	topic := "my-topic"
	partition := 0

	// 连接至Kafka集群的Leader节点
	conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置发送消息的超时时间
	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))

	// 发送消息
	_, err = conn.WriteMessages(
		kafka.Message{Value: []byte("one!")},
		kafka.Message{Value: []byte("two!")},
		kafka.Message{Value: []byte("three!")},
	)
	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}

// readByConn 连接至kafka后接收消息
func readByConn(wg *sync.WaitGroup) {
	defer wg.Done()
	// 指定要连接的topic和partition
	topic := "my-topic"
	partition := 0

	// 连接至Kafka的leader节点
	conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)
	if err != nil {
		log.Fatal("failed to dial leader:", err)
	}

	// 设置读取超时时间
	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
	// 读取一批消息,得到的batch是一系列消息的迭代器
	batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max

	// 遍历读取消息
	b := make([]byte, 10e3) // 10KB max per message
	for {
		n, err := batch.Read(b)
		if err != nil {
			break
		}
		fmt.Println(string(b[:n]))
	}

	// 关闭batch
	if err := batch.Close(); err != nil {
		log.Fatal("failed to close batch:", err)
	}

	// 关闭连接
	if err := conn.Close(); err != nil {
		log.Fatal("failed to close connection:", err)
	}
}

func main() {
	var wg sync.WaitGroup
	wg.Add(2)
	go writeByConn(&wg)
	go readByConn(&wg)
	wg.Wait()
}

运行结果

one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
2023/08/25 10:14:10 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request

可以看到,利用两个goroutine,成功在一个文件里实现了生产者写入消息,消费者消费消息并打印出来

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/925942.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Node基础--WebStorm整合Node

通过上面的课程,我们对Node有了一个初步的体验,下面我们就把Node和开发工具WebStrom进行整合。 1.安装开发工具WebStrom (1).查找官方下载地址: https://www.jetbrains.com/webstorm/ (2).下载之后开始点击安装 (3).设置安装路径。注意:安装路径自定义 (4).按照自己的…

vue 简单实验 自定义组件 独立模块

1.概要 2.代码 2.1 const Counter {data() {return {counter: 0}},template:<div>Counter: {{ counter }}</div> }export default Counter 2.2 import Counter from ./t2.jsconst app Vue.createApp({components: {component-a: Counter} })app.mount(#count…

C - 滑动窗口 /【模板】单调队列

Description 有一个长为 n 的序列 a&#xff0c;以及一个大小为 k 的窗口。现在这个从左边开始向右滑动&#xff0c;每次滑动一个单位&#xff0c;求出每次滑动后窗口中的最大值和最小值。 例如&#xff1a; The array is [1,3,−1,−3,5,3,6,7] and k3。 Input 输入一共有…

ssm农产品仓库管理系统系统源码和论文

ssm农产品仓库管理系统系统源码和论文064 开发工具&#xff1a;idea 数据库mysql5.7 数据库链接工具&#xff1a;navcat,小海豚等 技术&#xff1a;ssm 选题的背景 中国是一个农产品生产的大国&#xff0c;可利用的农产品资源相对贫乏&#xff0c;传统的单纯依靠大量物质…

k3s or RKE2 helm安装报错dial tcp 127.0.0.1:8080: connect: connection refused

1.报错&#xff1a; Error: INSTALLATION FAILED: Kubernetes cluster unreachable: Get "http://127.0.0.1:8080/version": dial tcp 127.0.0.1:8080: connect: connection refused 2.问题原因&#xff1a; 1.因为helm默认使用k8s的配置文件&#xff0c;默…

华为云服务器如何安装多个数据库

由于我的项目对数据库版本有限制&#xff0c;需要在先前安装5.7的基础上再安装一个8.0.28版本的数据库。 安装5.7版本数据库的过程&#xff1a;华为云服务器部署mysql_瓜是西瓜的瓜的博客-CSDN博客 1、安装前准备 服务器版本&#xff1a;CentOS 7 将/etc/selinux/config里的…

最新Rimini主题 1.3.0 资源付费WordPress主题下载

RiMini主题介绍&#xff0c;支持微信&#xff0c;支付宝官方支付接口&#xff0c;个人PAYJS&#xff0c;虎皮椒&#xff0c;码支付&#xff0c;免登录购买&#xff0c;会员中心&#xff0c;支付齐全&#xff0c;体验超速&#xff0c;简单粗暴&#xff0c;支付接口齐全&#xff…

【日常积累】Linux下ftp服务安装

概述 FTP是一种在互联网中进行文件传输的协议&#xff0c;基于客户端/服务器模式&#xff0c;默认使用20、21号端口&#xff0c;其中端口20用于进行数据传输&#xff0c;端口21用于接受客户端发出的相关FTP命令与参数。FTP服务器普遍部署于内网中&#xff0c;具有容易搭建、方…

[Go版]算法通关村第十三关白银——数字数学问题之数组实现加法、幂运算

目录 数组实现加法专题题目&#xff1a;数组实现整数加法思路分析&#xff1a;数组末尾开始&#xff0c;逐个元素1&#xff0c;10就进位&#xff0c;!10就退出复杂度&#xff1a;时间复杂度 O ( n ) O(n) O(n)、空间复杂度 O ( n ) O(n) O(n)Go代码 题目&#xff1a;字符串加法…

Navicat导出Postgres表结构并在新环境导入

0、背景及环境 背景 工程升级&#xff0c;新增了一些表。需要把这些表在生产环境中创建。故此写一下操作示例笔记。 工具 Navicat、postgres数据库 1、导出 1.1、打开想要导出表结构的数据库&#xff0c;找到表 1.2、右键目标表选择导出类型 Structure Only 只导出表结构…

c语言调用mciSendString播放音乐

如下所示&#xff0c;这是一个使用c语言调用系统方法mciSendString()&#xff0c;让系统播放音乐的示例&#xff1a; baihuaxiang 代码&#xff1a; #include <graphics.h> #include <Windows.h> #include <mmsystem.h>#pragma comment(lib,"WINMM.LIB…

【C++】抽象类

2023年8月25日&#xff0c;周五上午 目录 声明抽象类 抽象类的特点 举例说明 声明抽象类 要在C中声明一个抽象类&#xff0c;要求类中至少有一个纯虚函数。 在C中&#xff0c;一个类如果包含至少一个纯虚函数&#xff0c;那么这个类就被称为抽象类。 总结起来&#xff0c…

天眼查接口 查询企业信息API 企查查接口

item_get-获得tyc详情 tyc.item_get 公共参数 请求地址: https://api-gw.cn/tyc/item_get 名称类型必须描述keyString是调用key&#xff08;必须以GET方式拼接在URL中&#xff09;secretString是调用密钥api_nameString是API接口名称&#xff08;包括在请求地址中&#xff0…

板卡设计+硬件每日学习十个知识点(44)23.8.24 (检测单元设计,接口部分设计,板卡电源输入设计,电源检测电路)

文章目录 1.检测单元介绍&#xff08;使用GD32单片机&#xff09;2.GD32的最小系统板3.GD32的温度监测4.GD32的电压监测和电流监测5.GD32的布线6.接口部分设计7.板卡电源输入设计8.电源检测电路 1.检测单元介绍&#xff08;使用GD32单片机&#xff09; 答&#xff1a; 首先要为…

告别gazebo开启长时间等待 设置gazebo打开不再联网找模型

学过ros的对gazebo仿真软件应该都不会陌生&#xff0c;但是有时启动真的很烦人&#xff0c;经常卡在这个地方很长时间&#xff0c;查阅资料 gazebo软件开启的时候会自动从国外官网下载模型&#xff0c;因此这个过程比较漫长&#xff0c;原因是网站在国外&#xff0c;下载不顺畅…

基于大核注意力的高效鲁棒脑损伤分割

文章目录 Large-kernel Attention for Efficient and Robust Brain Lesion Segmentation摘要本文方法实验结果 Large-kernel Attention for Efficient and Robust Brain Lesion Segmentation 摘要 ViT是用于医学图像分割等视觉任务的有效深度学习模型。然而&#xff0c;与卷积…

slowhttptest

压力测试工具 kail-linux安装 apt-get install slowhttptest slowhttptest -c 1000 -H -g -o my_header_stats -i 10 -r 200 -t GET -u "http://192.168.3.239:8889" -x 24 -p 3 SlowRead模式 slowhttptest -c 8000 -X -r 200 -w 512 -y 1024 -n 5 -z 32 -k 3 -u …

用于毒情监测和溯源的自动采样器是哪种

最新多领域采样器的功能升级特点&#xff1a; 1.手机APP 实时观测采样状况、采样量&#xff1b; 2.配置高可靠性蠕动泵&#xff0c;采样过程运行平稳精准&#xff0c;待采集水样全程无与外界接触&#xff0c;防止水样产生污染、破坏&#xff1b; 3.可设置全自动样品采集&#x…

Windows逆向项目-LoadPE

Windows逆向项目-LoadPE 这是PE阶段时&#xff0c;老师布置的作业&#xff1a;[ 写一个LordPE ]&#xff1b;木马&#xff0c;免杀&#xff0c;病毒等等都需要对PE结构有足够的了解PE结构详细图&#xff1a; 项目介绍&#xff1a; 项目使用 C &#xff0c; MFC 开发 功能&am…

JAVA 单线程、多线程测试百度网址

JAVA 单线程、多线程访问百度 单线程示例代码多线程示例代码 单线程示例代码 package org.apache.jmeter.functions; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.OutputStream; import java.net.HttpURLConnection; import java.net.U…