142. Go操作Kafka(confluent-kafka-go库)

news2025/1/10 23:50:43

文章目录

  • Apache kafka简介
  • 开始使用Apache Kafka
    • 构建生产者
    • 构建消费者
  • 总结

之前已经有两篇文章介绍过 Go如何操作 kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)

Apache kafka简介

在这里插入图片描述
Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种能力。

Kafka的用例和能力

  • 流数据管道: Kafka提供了一个分布式发布-订阅消息系统,可以在系统或应用程序之间流式传输数据。它提供了具有数据复制和容错能力的强大队列。

  • 实时分析:Kafka允许使用工具如Kafka StreamsKSQL处理实时数据流,用于构建流式分析和数据处理应用程序。

  • 数据集成 :Kafka可以用来通过在不同数据源和格式之间流式传输数据来集成不同的系统。这使它对流式ETL非常有用。

  • 事件源 : Kafka提供了可以重放的事件时间日志,用于重构应用程序状态,适用于事件源和CQRS模式。

  • 日志聚合 : Kafka通常用于将不同服务器和应用程序的日志聚合到一个中央存储库中。这允许统一访问日志数据。

为什么将Golang与Apache Kafka结合使用

Golang这一高效并发的编程语言与Apache Kafka这一分布式事件流平台结合起来,提供了一个在构建尖端现代应用程序方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:

  • 性能 : GolangApache Kafka都提供高性能。Golang快速、高效和轻量级。Kafka为速度而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。

  • 可扩展性 : GolanggoroutinesKafka的分区允许应用程序水平扩展以处理大量数据。Kafka可以轻松扩展生产者和消费者。

  • 并发性 : Golang通过goroutineschannels提供了出色的并发编程能力。Kafka并发传递消息并支持并行性。

  • 可用性 : Kafka的分布式架构使其高度可用和容错。Golang应用可以利用这一点来构建弹性系统。

  • 互操作性 : Kafka有多种语言的客户端,允许Golang应用与多语言环境互动。Kafka还使用二进制TCP协议以提高效率。

  • 现代设计 : KafkaGolang都采用现代设计理念,使它们非常适合云原生和微服务架构。

  • 开发人员体验 : Kafka的客户端库结合Goroutines、channels和接口,使其易于使用。

Kafka和Golang将性能、可扩展性和并发与生产力结合在一起 - 使它们成为构建可扩展的服务、管道和流应用程序的绝佳选择。

开始使用Apache Kafka

在开始使用GolangApache Kafka之前,我们必须确保golangKafka已经安装并在我们的机器上运行。

安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)

Apache Kafka的Golang包
您可以使用go get安装confluent-kafka-go包:

go get -u github.com/confluentinc/confluent-kafka-go/kafka

安装后,您可以在Go代码中导入并使用confluent-kafka-go

package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        fmt.Printf("创建生产者失败: %s\n", err)
        return
    }

    // 生产消息到主题,处理交付报告等。

    // 使用后记得关闭生产者
    defer p.Close()
}

构建生产者

Kafka生产者是Apache Kafka生态系统中的一个关键组成部分,作为一个客户端应用程序,负责向Kafka集群发布(写入)事件。这一部分提供了关于Kafka生产者的全面概述,以及针对调整其行为的配置设置的初步探讨。

下面是一个Golang应用程序的示例,它生产数据并将其发布到Kafka的具体topic。它还说明了如何在Golang中为Kafka消息序列化数据,并演示了如何处理错误和重试。

package main

import (
    "fmt"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

const (
    kafkaBroker = "localhost:9092"
    topic       = "test-topic"
)

type Message

 struct {
    Key   string `json:"key"`
    Value string `json:"value"`
}

func main() {
    // 创建一个新的Kafka生产者
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})
    if err != nil {
        fmt.Printf("创建生产者失败: %s\n", err)
        return
    }
    defer p.Close()

    // 定义要发送的消息
    message := Message{
        Key:   "example_key",
        Value: "Hello, Kafka!",
    }

    // 序列化消息
    serializedMessage, err := serializeMessage(message)
    if err != nil {
        fmt.Printf("消息序列化失败: %s\n", err)
        return
    }

    // 将消息生产到Kafka主题
    err = produceMessage(p, topic, serializedMessage)
    if err != nil {
        fmt.Printf("消息生产失败: %s\n", err)
        return
    }

    fmt.Println("消息成功生产!")
}

func serializeMessage(message Message) ([]byte, error) {
    // 将消息结构体序列化为JSON
    serialized, err := json.Marshal(message)
    if err != nil {
        return nil, fmt.Errorf("消息序列化失败: %w", err)
    }
    return serialized, nil
}

func produceMessage(p *kafka.Producer, topic string, message []byte) error {
    // 创建一个新的要生产的Kafka消息
    kafkaMessage := &kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          message,
    }

    // 生产Kafka消息
    deliveryChan := make(chan kafka.Event)
    err := p.Produce(kafkaMessage, deliveryChan)
    if err != nil {
        return fmt.Errorf("消息生产失败: %w", err)
    }

    // 等待交付报告或错误
    e := <-deliveryChan
    m := e.(*kafka.Message)

    // 检查交付错误,即生成者方确保发送到Broker的消息不丢失
    // 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error
    // 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理
    if m.TopicPartition.Error != nil {
        return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)
    }

    // 关闭交付频道
    close(deliveryChan)

    return nil
}

步骤解释:

  1. 创建一个Kafka生产者。

  2. 使用json.Marshal函数将自定义消息结构体(Message)序列化为JSON

  3. 使用生产者将序列化的消息生产到Kafka topic

  4. 使用交付报告和错误检查处理错误和重试。

确保将localhost:9092替换为您的Kafka代理地址,将test-topic替换为所需的主题名称。此外,您可能需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑

构建消费者

Kafka消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探讨这些消费者的内部工作原理和调整其性能的配置。

下面是一个Golang应用程序的示例,它从Kafka主题消费消息。如下代码包括了如何处理消费到的消息的说明,以及对不同消费模式(如单个消费者和消费者组)的讨论。

package main

import (
    "fmt"
    "os"
    "os/signal"
    "github.com/confluentinc/confluent-kafka-go/kafka"
)

const (
    kafkaBroker = "localhost:9092"
    topic       = "test-topic"
    groupID     = "test-group"
)

func main() {
    // 创建一个新的Kafka消费者
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  kafkaBroker,
        "group.id":           groupID, // 消费者组标识
        "auto.offset.reset":  "earliest", // 从头开始消费
    })
    if err != nil {
        fmt.Printf("创建消费者失败: %s\n", err)
        return
    }
    defer c.Close()

    // 订阅Kafka主题
    err = c.SubscribeTopics([]string{topic}, nil)
    if err != nil {
        fmt.Printf("订阅主题失败: %s\n", err)
        return
    }

    // 设置一个通道来处理操作系统信号,以便优雅地关闭
    sigchan := make(chan os.Signal, 1)
    signal.Notify(sigchan, os.Interrupt)

    // 开始消费消息
    run := true
    for run == true {
        select {
        case sig := <-sigchan:
            fmt.Printf("接收到信号 %v: 正在终止\n", sig)
            run = false
        default:
            // 轮询Kafka消息,1次最多拉取100条消息
            ev := c.Poll(100) 
            if ev == nil {
                continue
            }

            switch e := ev.(type) {
            case *kafka.Message:
                // 处理消费的消息
                fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))
            case kafka.Error:
                // 处理Kafka错误
                fmt.Printf("错误: %v\n", e)
            }
        }
    }
}

步骤解释

  1. 创建一个Kafka消费者。

  2. 订阅一个Kafka主题。

  3. 设置一个通道来处理操作系统信号(如SIGINT)以优雅地关闭。

  4. 开始从订阅的Topic消费消息。

  5. 处理消费的消息以及Kafka错误。

不同的消费模式:

  • 单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用程序实例来处理来自Topic的所有消息时,这很有用。

  • 消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为可能,提供了容错能力和高吞吐量。

在提供的示例中,group.id配置设置用于指定消费者组ID。这允许消费者应用程序的多个实例在消费者组中一起工作,从Kafka Topic消费消息。

总结

总之,Apache Kafka作为构建实时数据管道和流应用程序的强大解决方案,得益于其分布式、可扩展和容错的架构。当与Golang结合时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常适合现代应用程序。通过利用Kafka的功能和Golang的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用程序,这些应用程序可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成不同的系统还是聚合日志,KafkaGolang提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2094401.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

django外键表查询

Django外键&#xff08;ForeignKey&#xff09;操作以及related_name的作用-CSDN博客 django模型中外键操作_django的model的contain外键-CSDN博客 通过基本表可以查外键表 删基本表可以删外键表

【Redis】Redis 持久化 AOF、RDB—(七)

目录 一、AOF 日志二、RDB 内存快照 Redis 一旦服务器宕机&#xff0c;内存中的数据将全部丢失&#xff0c;从后端数据库恢复这些数据&#xff0c;对数据库压力很大&#xff0c;且性能肯定比不上从 Redis 中读取&#xff0c;会拖慢应用程序。所以&#xff0c;对 Redis 来说&…

临时性解决斐讯K3 路由器端口转发限制

几年前&#xff0c;原来买的斐讯路由器被我折腾坏掉了。然后那时候刚好K3出来。差不多2000块&#xff0c;因为之前的一个路由器顺利下车&#xff0c;然后就傻傻的上了K3的车。结局&#xff0c;你懂的。 最近因为需要&#xff0c;在折腾远程办公&#xff0c;大概目的就是方便连…

Python | Leetcode Python题解之第386题字典序排数

题目&#xff1a; 题解&#xff1a; class Solution:def lexicalOrder(self, n: int) -> List[int]:ans [0] * nnum 1for i in range(n):ans[i] numif num * 10 < n:num * 10else:while num % 10 9 or num 1 > n:num // 10num 1return ans

pycharm破解教程

下载pycharm https://www.jetbrains.com/pycharm/download/other.html 破解网站 https://hardbin.com/ipfs/bafybeih65no5dklpqfe346wyeiak6wzemv5d7z2ya7nssdgwdz4xrmdu6i/ 点击下载破解程序 安装pycharm 自己选择安装路径 安装完成后运行破解程序 等到Done图标出现 选择Ac…

数据安全法实施三周年 | 天空卫士引领关键技术突破

2024.09.01星期日 三年前的今天数据安全法正式实施&#xff0c;标志着我国数据安全防护体系迈入了一个崭新的发展阶段。 《数据安全法》提出&#xff1a;国家建立数据分类分级保护制度&#xff0c;对数据实行分类分级保护。建立健全全流程数据安全管理制度&#xff0c;组织开展…

NumPy实现线性回归

1 单变量线性回归 1.1 sklearn实现(最小二乘法) import osimport pandas as pd import matplotlib.pyplot as plt import syscurrent_diros.getcwd() pathcurrent_dir\\"Salary Data.csv"def plot_data(path):tablepd.read_csv(path)experience table["Exper…

六、Selenium操作指南(三)

文章目录 七、模拟鼠标操作&#xff08;一&#xff09;左键 click()&#xff08;二&#xff09;右键 context_click()&#xff08;三&#xff09;双击 double_click()&#xff08;四&#xff09;拖拽 drag_and_drop(source,target)&#xff08;五&#xff09;悬停 move_to_elem…

python-禁止抽烟

题目描述 小理的朋友有 n 根烟&#xff0c;他每吸完一根烟就把烟蒂保存起来&#xff0c;k&#xff08; k>1&#xff09;个烟蒂可以换一个新的烟&#xff0c;那么小理的朋友最终能吸到多少根烟呢&#xff1f; 与某些脑筋急转弯不同的是&#xff0c;小理的朋友并不能从异次元借…

AI 通过python脚本自动化导出交易软件某一天的分笔成交明细

一.背景需求 打开交易软件,我们想要导出非今日的日线股票成交分笔明细,其实,很麻烦的。你得在日线图上点击某一天的柱状图,然后双击,就会出现当日的成交明细,然后导出。如果你想到导出30天或者1年的数据呢?你难道盯着电脑一步一步的操作?不,我不允许你还不知道用pytho…

应急响应-爆破漏洞应急响应流程(以SSH爆破为例)

目录 概述研判分析登录成功登录失败历史命令authorized_keys 定损止损攻击链路还原清理恢复总结复盘参考 概述 爆破漏洞是比较常见漏洞&#xff0c;端口开放&#xff0c;管理后台没有做登录频率限制等情况都可能遭受到爆破攻击&#xff0c;本文以SSH爆破为例&#xff0c;介绍下…

【SpringCloud Alibaba】(十二)学习 Sleuth + ZipKin

目录 1、ZipKin 核心架构1.1、ZipKin 概述1.2、ZipKin 核心架构 2、集成 ZipKin2.1、下载安装 ZipKin 服务端2.2、集成 ZipKin 客户端 3、ZipKin 数据持久化3.1、ZipKin 数据持久化到 MySQL 在前面整合 Sleuth 实现链路追踪时&#xff0c;我们是通过查看日志的情况来了解系统调…

【书生大模型实战营】进阶岛 第2关 Lagent 自定义你的 Agent 智能体

文章目录 【书生大模型实战营】进阶岛 第2关 Lagent 自定义你的 Agent 智能体学习任务Lagent 介绍环境配置Lagent Web Demo 使用基于 Lagent 自定义智能体 【书生大模型实战营】进阶岛 第2关 Lagent 自定义你的 Agent 智能体 学习任务 使用 Lagent 自定义一个智能体&#xff…

Nginx: 负载均衡基础配置, 加权轮序, hash算法, ip_hash算法, least_conn算法

负载均衡 在真正的反向代理场景中&#xff0c;必然涉及到的一个概念&#xff0c;就是负载均衡所谓负载均衡&#xff0c;也就是将Nginx的请求发送给后端的多台应用程序服务器通常的应用程序服务器&#xff0c;后面的每台服务器都是一个同等的角色&#xff0c;提供相同的功能 用…

阿里巴巴发布 Qwen2-VL 人工智能模型,具备先进的视频分析和推理能力

中国阿里巴巴集团的云计算部门阿里云周四宣布推出一款名为 Qwen2-VL 的新型人工智能模型&#xff0c;该模型具有高级视觉理解能力和多语言对话能力。 该公司在 Qwen-VL 人工智能模型的基础上&#xff0c;历时一年研发出了新模型&#xff0c;并表示它可以实现对长度超过 20 分钟…

easy_spring_boot Java 后端开发框架

Easy SpringBoot 基于 Java 17、SpringBoot 3.3.2 开发的后端框架&#xff0c;集成 MyBits-Plus、SpringDoc、SpringSecurity 等插件&#xff0c;旨在提供一个高效、易用的后端开发环境。该框架通过清晰的目录结构和模块化设计&#xff0c;帮助开发者快速构建和部署后端服务。…

基于Java+SpringBoot+Vue的学生评奖评优管理系统的设计与实现

基于JavaSpringBootVue的学生评奖评优管理系统的设计与实现 前言 ✌全网粉丝20W,csdn特邀作者、博客专家、CSDN[新星计划]导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345; 某信 gzh 搜索【智…

CGAL 2D Polygons

CGAL 2D Polygons 简单概述 CGAL 2D Polygons使用。 简述 2D Polygon多边形是由一条封闭的边链表组成。对于多边形的操作有若干种常见的算法&#xff0c;有些算法要求多边形是简单多边形。如果边不相交&#xff0c;则多边形为简单多边形&#xff0c;除非连续的边相交于它们的…

django外键表查询存储删除

查询 之前用get 现在用filter,get返回对象&#xff0c;filter返回列表django model的get和filter方法的区别_django模型objects.get-CSDN博客 存储 删除

[001-07-001].Redis中的BigKey使用分析

1、常见面试题&#xff1a; 1.阿里的广告平台&#xff0c;海量数据里面查询某一固定前缀的key2.小红书&#xff0c;如何在生产限制keys*/flushdb/flushall等危险命令以防止误删除误使用3.美团&#xff0c;MEMORU USAGE命令你使用过吗4.Bikey问题&#xff0c;多大算big&#xf…