librdkafka的简单使用

news2025/1/10 3:14:55

文章目录

    • 摘要
    • kafka是什么
    • 安装环境
    • librdkafka的简单使用
      • 生产者
      • 消费者

摘要

本文是Getting Started with Apache Kafka and C/C++的中文版, kafka的hello world程序。

本文完整代码见仓库,这里只列出producer/consumer的代码


kafka是什么

本节来源:Kafka - 维基百科,自由的百科全书、Kafka入门简介 - 知乎

首先我们得知道什么是Kafka。

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制。

在这里插入图片描述

kafka有以下一些基本概念:

  • Producer - 消息生产者,就是向kafka broker发消息的客户端。
  • Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。
  • Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。
  • Partition - 消息分区,一个topic可以分为多个 partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  • Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
  • Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消费一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。
  • Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消费者可以指定偏移量来指定要消费的消息。

安装环境

上一节,kafka的概念看着比较简单,发布-订阅/生产-消费的模型。

为了可以调用Kafka的C/C++ API, 需要先安装环境。

# almlinux8
# dnf search kafka
# dnf install librdkafka-devel

# dnf search glib
# dnf install glib2-devel

# ubuntu22
# 开发库apt install librdkafka-dev  libglib2.0-dev

# 安装docker环境apt install docker.io docker-compose

# 本地安装 Kafka
## ref: https://docs.confluent.io/confluent-cli/current/install.html#cpwget -qO - https://packages.confluent.io/confluent-cli/deb/archive.key | sudo apt-key add
➜ add-apt-repository "deb https://packages.confluent.io/confluent-cli/deb stable main"apt install confluent-cli

## 启动kafka
## usage: https://docs.confluent.io/confluent-cli/current/command-reference/local/kafka/confluent_local_kafka_start.html
## error: https://stackoverflow.com/questions/63776518/error-2-matches-found-based-on-name-network-nameofservice-default-is-ambiguo
## error:https://stackoverflow.com/questions/77985757/kafka-in-docker-using-confluent-cli-doesnt-workwhereis confluent
confluent: /usr/bin/confluent

➜ export CONFLUENT_HOME=/usr/bin/confluent

# 我执行下面命令后,没有看到Plaintext Ports信息
➜ confluent local kafka start

# 停止,然后重新启动,管用了
➜ confluent local kafka stop
➜ confluent local kafka start

The local commands are intended for a single-node development environment only, NOT for production usage. See more: https://docs.confluent.io/current/cli/index.html


Pulling from confluentinc/confluent-local
Digest: sha256:ad62269bf4766820c298f7581cf872a49f46a11dbaebcccb4fd2e71049288c5b
Status: Image is up to date for confluentinc/confluent-local:7.6.0
+-----------------+-------+
| Kafka REST Port | 8082  |
| Plaintext Ports | 43465 |
+-----------------+-------+
Started Confluent Local containers "8d72d911a4".
To continue your Confluent Local experience, run `confluent local kafka topic create <topic>` and `confluent local kafka topic produce <topic>`.

# Create a new topic, purchases, which you will use to produce and consume events.
➜ confluent local kafka topic create purchases
Created topic "purchases".

librdkafka的简单使用

confluenceinc/librdkafka是Apache Kafka协议的 C 库实现 ,提供生产者、消费者和管理客户端。

下面运行的程序来自:Apache Kafka and C/C++ - Getting Started Tutorial

代码中kafka的API可以查询:librdkafka: librdkafka documentation

代码中使用了glib库,日常开发我不会使用这个库,因为感觉比较冷,它的API可查询:GLib – 2.0: Automatic Cleanup


生产者

总体逻辑:

  • 从配置文件中加载配置
  • 创建生产者
  • 生产者发送消息
#include <glib.h>
#include <librdkafka/rdkafka.h>

#include "common.c"

#define ARR_SIZE(arr) ( sizeof((arr)) / sizeof((arr[0])) )

/* Optional per-message delivery callback (triggered by poll() or flush())
 * when a message has been successfully delivered or permanently
 * failed delivery (after retries).
 */
static void dr_msg_cb (rd_kafka_t *kafka_handle,
                       const rd_kafka_message_t *rkmessage,
                       void *opaque) {
    if (rkmessage->err) {
        g_error("Message delivery failed: %s", rd_kafka_err2str(rkmessage->err));
    }
}

int main (int argc, char **argv) {
    rd_kafka_t *producer;
    rd_kafka_conf_t *conf;
    char errstr[512];

    // Parse the command line.
    if (argc != 2) {
        g_error("Usage: %s <config.ini>", argv[0]);
        return 1;
    }

    // Parse the configuration.
    // See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    const char *config_file = argv[1];

    g_autoptr(GError) error = NULL;
    g_autoptr(GKeyFile) key_file = g_key_file_new();
    if (!g_key_file_load_from_file (key_file, config_file, G_KEY_FILE_NONE, &error)) {
        g_error ("Error loading config file: %s", error->message);
        return 1;
    }

    // Load the relevant configuration sections.
    conf = rd_kafka_conf_new();
    load_config_group(conf, key_file, "default");

    // Install a delivery-error callback.
    rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

    // Create the Producer instance.
    producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
    if (!producer) {
        g_error("Failed to create new producer: %s", errstr);
        return 1;
    }

    // Configuration object is now owned, and freed, by the rd_kafka_t instance.
    conf = NULL;

    // Produce data by selecting random values from these lists.
    int message_count = 10;
    const char *topic = "purchases";
    const char *user_ids[6] = {"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"};
    const char *products[5] = {"book", "alarm clock", "t-shirts", "gift card", "batteries"};

    for (int i = 0; i < message_count; i++) {
        const char *key =  user_ids[random() % ARR_SIZE(user_ids)];
        const char *value =  products[random() % ARR_SIZE(products)];
        size_t key_len = strlen(key);
        size_t value_len = strlen(value);

        rd_kafka_resp_err_t err;

        err = rd_kafka_producev(producer,
                                RD_KAFKA_V_TOPIC(topic),
                                RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                                RD_KAFKA_V_KEY((void*)key, key_len),
                                RD_KAFKA_V_VALUE((void*)value, value_len),
                                RD_KAFKA_V_OPAQUE(NULL),
                                RD_KAFKA_V_END);

        if (err) {
            g_error("Failed to produce to topic %s: %s", topic, rd_kafka_err2str(err));
            return 1;
        } else {
            g_message("Produced event to topic %s: key = %12s value = %12s", topic, key, value);
        }

        rd_kafka_poll(producer, 0);
    }

    // Block until the messages are all sent.
    g_message("Flushing final messages..");
    rd_kafka_flush(producer, 10 * 1000);

    if (rd_kafka_outq_len(producer) > 0) {
        g_error("%d message(s) were not delivered", rd_kafka_outq_len(producer));
    }

    g_message("%d events were produced to topic %s.", message_count, topic);

    rd_kafka_destroy(producer);

    return 0;
}

消费者

总体逻辑:

  • 从配置文件中加载配置
  • 创建消费者
  • 订阅topic
  • 轮询消费者的消息
#include <glib.h>
#include <librdkafka/rdkafka.h>

#include "common.c"

static volatile sig_atomic_t run = 1;

/**
 * @brief Signal termination of program
 */
static void stop(int sig) { run = 0; }

int main(int argc, char **argv) {
  rd_kafka_t *consumer;
  rd_kafka_conf_t *conf;
  rd_kafka_resp_err_t err;
  char errstr[512];

  // Parse the command line.
  if (argc != 2) {
    g_error("Usage: %s <config.ini>", argv[0]);
    return 1;
  }

  // Parse the configuration.
  // See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
  const char *config_file = argv[1];

  g_autoptr(GError) error = NULL;
  g_autoptr(GKeyFile) key_file = g_key_file_new();
  if (!g_key_file_load_from_file(key_file, config_file, G_KEY_FILE_NONE,
                                 &error)) {
    g_error("Error loading config file: %s", error->message);
    return 1;
  }

  // Load the relevant configuration sections.
  conf = rd_kafka_conf_new();
  load_config_group(conf, key_file, "default");
  load_config_group(conf, key_file, "consumer");

  // Create the Consumer instance.
  consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
  if (!consumer) {
    g_error("Failed to create new consumer: %s", errstr);
    return 1;
  }
  rd_kafka_poll_set_consumer(consumer);

  // Configuration object is now owned, and freed, by the rd_kafka_t instance.
  conf = NULL;

  // Convert the list of topics to a format suitable for librdkafka.
  const char *topic = "purchases";
  rd_kafka_topic_partition_list_t *subscription =
      rd_kafka_topic_partition_list_new(1);
  rd_kafka_topic_partition_list_add(subscription, topic, RD_KAFKA_PARTITION_UA);

  // Subscribe to the list of topics.
  err = rd_kafka_subscribe(consumer, subscription);
  if (err) {
    g_error("Failed to subscribe to %d topics: %s", subscription->cnt,
            rd_kafka_err2str(err));
    rd_kafka_topic_partition_list_destroy(subscription);
    rd_kafka_destroy(consumer);
    return 1;
  }

  rd_kafka_topic_partition_list_destroy(subscription);

  // Install a signal handler for clean shutdown.
  signal(SIGINT, stop);

  // Start polling for messages.
  while (run) {
    rd_kafka_message_t *consumer_message;

    consumer_message = rd_kafka_consumer_poll(consumer, 500);
    if (!consumer_message) {
      g_message("Waiting...");
      continue;
    }

    if (consumer_message->err) {
      if (consumer_message->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        /* We can ignore this error - it just means we've read
         * everything and are waiting for more data.
         */
      } else {
        g_message("Consumer error: %s",
                  rd_kafka_message_errstr(consumer_message));
        return 1;
      }
    } else {
      g_message("Consumed event from topic %s: key = %.*s value = %s",
                rd_kafka_topic_name(consumer_message->rkt),
                (int)consumer_message->key_len, (char *)consumer_message->key,
                (char *)consumer_message->payload);
    }

    // Free the message when we're done.
    rd_kafka_message_destroy(consumer_message);
  }

  // Close the consumer: commit final offsets and leave the group.
  g_message("Closing consumer");
  rd_kafka_consumer_close(consumer);

  // Destroy the consumer.
  rd_kafka_destroy(consumer);

  return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1551807.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

腾讯云优惠券领取及使用常见问题解答

随着云计算的普及&#xff0c;腾讯云作为国内领先的云计算服务提供商&#xff0c;为越来越多的企业和个人提供了丰富的云产品和服务。为了帮助用户更好地了解和使用腾讯云优惠券&#xff0c;本文将为大家解答关于腾讯云优惠券领取及使用的常见问题。 一、腾讯云优惠券概述 腾讯…

幻兽帕鲁服务器多少钱?2024年Palworld服务器价格整理

2024年全网最全的幻兽帕鲁服务器租用价格表&#xff0c;阿里云幻兽帕鲁游戏服务器26元1个月、腾讯云32元一个月、京东云26元一个月、华为云24元1个月&#xff0c;阿腾云atengyun.com整理最新幻兽帕鲁专用4核16G、8核16G、8核32G游戏服务器租用价格表大全&#xff1a; 阿里云幻…

vue3+Pinia的使用 - 封装

目录&#xff1a; persist.ts 可存储到本地 import { PersistedStateOptions } from "pinia-plugin-persistedstate";/*** description pinia 持久化参数配置* param {String} key 存储到持久化的 name* param {Array} paths 需要持久化的 state name* return per…

EfficientVMamba:Atrous Selective Scan for LightWeightVisualMamba

摘要 https://arxiv.org/pdf/2403.09977.pdf 先前的轻量级模型开发努力主要集中在基于CNN和Transformer的设计上&#xff0c;但仍面临持续的挑战。CNN擅长局部特征提取&#xff0c;但会牺牲分辨率&#xff0c;而Transformer提供了全局范围&#xff0c;但会加剧计算需求 O ( N…

苹果CMS影视APP源码,二开版本带视频教程

编译app教程 工具下载&#xff1a;Android Studio 官网地址&#xff1a;https://developer.android.google.cn/studio/ 环境设置&#xff1a; 设置中文&#xff1a;https://blog.csdn.net/qq_37131111/article/details/131492844 汉化包找最新的下载就行了&#xff0c;随便下载…

如何压缩视频到最小?教会你压缩原理~

在网上上传视频时&#xff0c;经常会遇到因为视频体积过大上传失败等情况发生&#xff0c;怎么降低视频体积呢&#xff1f;科普一个小知识&#xff1a;视频体积和视频的时长、编码格式、分辨率和比特率&#xff08;又称码率&#xff09;有关。视频文件大小计算公式&#xff1a;…

脚本实现Ubuntu设置屏幕无人操作,自动黑屏

使用 xrandr 命令可以实现对屏幕的控制&#xff0c;包括调整分辨率、旋转屏幕以及关闭屏幕等。要实现 Ubuntu 设置屏幕在无人操作一段时间后自动黑屏&#xff0c;非待机&#xff0c;并黑屏后点击触摸屏可以唤醒屏幕&#xff0c;可以借助 xrandr 命令来实现。 首先&#xff0c;…

Microsoft Edge浏览器修改网页背景颜色

目录 一、原始页面二、Microsoft Edge网页背景颜色修改三、Microsoft Edge主体颜色修改四、最终效果五、总结 一、原始页面 下面是大多数默认的网页界面。 二、Microsoft Edge网页背景颜色修改 在Microsoft Edge网页地址栏中输入下面指令&#xff1a; edge://flags在搜索框…

快速上手Spring Cloud 十三:探究Spring Cloud在跨境业务中的应用与优势

快速上手Spring Cloud 一&#xff1a;Spring Cloud 简介 快速上手Spring Cloud 二&#xff1a;核心组件解析 快速上手Spring Cloud 三&#xff1a;API网关深入探索与实战应用 快速上手Spring Cloud 四&#xff1a;微服务治理与安全 快速上手Spring Cloud 五&#xff1a;Spring …

Netty剖析 - Why Netty

文章目录 Why NettyI/O 请求的两个阶段I/O 模型Netty 如何实现自己的 I/O 模型线程模型 - 事件分发器&#xff08;Event Dispather&#xff09;弥补 Java NIO 的缺陷更低的资源消耗网络框架的选型Netty 发展现状Netty 的使用思维导图 Why Netty I/O 模型、线程模型和事件处理机…

货币系统(闫氏DP分析法)

题目描述&#xff1a; 给定 V 种货币&#xff08;单位&#xff1a;元&#xff09;&#xff0c;每种货币使用的次数不限。 不同种类的货币&#xff0c;面值可能是相同的。 现在&#xff0c;要你用这 V 种货币凑出 N 元钱&#xff0c;请问共有多少种不同的凑法。 输入格式&am…

深入Facebook的世界:探索数字化社交的无限可能性

引言 随着数字化时代的到来&#xff0c;社交媒体平台已经成为了人们日常生活中不可或缺的一部分&#xff0c;而其中最为突出的代表之一便是Facebook。作为全球最大的社交媒体平台之一&#xff0c;Facebook不仅仅是一个社交网络&#xff0c;更是一个数字化社交的生态系统&#…

Windows/Linux-openEuler系统使用路由侠内网穿透,部署项目详细教程

文章目录 Windows/Linux-openEuler系统使用路由侠内网穿透&#xff0c;部署项目详细教程一、在windows系统下载安装路由侠并实现项目部署1、下载路由侠并注册安装到Windows系统2、点击内网映射&#xff0c;添加映射&#xff0c;注册域名前缀3、选择网站应用4、配置你想要代理项…

【Bug-ModuleNotFoundError: No module named ‘models‘】

&#x1f680; 作者 &#xff1a;“码上有前” &#x1f680; 文章简介 &#xff1a;Python &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; 出现这个错误&#xff1a; 出现了ModuleNotFoundError: No module named models’的问题。 文件在Model…

「吞噬星空」存在哪些境界,不朽级呼延博是否有一席之位?

吞噬星空中浩瀚无垠的宇宙&#xff0c;其深邃与广阔&#xff0c;仿佛一个无尽的迷宫&#xff0c;蕴藏着无数未知的境界。从地球出发&#xff0c;见证了行星级与恒星级的威能&#xff0c;然而这只是宇宙力量的冰山一角。行星级强者&#xff0c;在地球上已是至高无上的存在&#…

MTransE阅读笔记

Multilingual Knowledge Graph Embeddings for Cross-lingual Knowledge Alignment 用于交叉知识对齐的多语言知识图谱嵌入(MTransE) Abstract 最近的许多工作已经证明了知识图谱嵌入在完成单语知识图谱方面的好处。由于相关的知识库是用几种不同的语言构建的&#xff0c;因…

2010-2021年银行网点及员工信息数据

2010-2021年银行网点及员工信息数据 1、时间&#xff1a;2010-2021年 2、来源&#xff1a;整理自csmar 3、指标&#xff1a;银行代码、股票代码、银行中文简称、统计截止日期、分行数量、机构网点数量、其中&#xff1a;境内网点数量、其中&#xff1a;境外网点数量、在职员…

python和c语言的区别是什么

Python可以说是目前最火的语言之一了&#xff0c;人工智能的兴起让Python一夜之间变得家喻户晓&#xff0c;Python号称目前最最简单易学的语言&#xff0c;现在有不少高校开始将Python作为大一新生的入门语言。本萌新也刚开始接触Python&#xff0c;发现Python与其他语言确实有…

在Semantic Kernel中使用Qdrant向量数据库

本文将介绍如何在Semantic Kernel中使用Qdrant向量数据库&#xff0c;并演示如何在Semantic Kernel中进行向量更新和查询操作。 1. 背景 在前一篇文章《Qdrant 向量数据库的部署以及如何在 .NET 中使用 TLS 安全访问》中&#xff0c;我们介绍了如何使用 Docker 部署 Qdrant 向…

9-Dubbo源码分析之:Dubbo Serialize 层:多种序列化算法,总有一款适合你

通过前面课时的介绍&#xff0c;我们知道一个 RPC 框架需要通过网络通信实现跨 JVM 的调用。既然需要网络通信&#xff0c;那就必然会使用到序列化与反序列化的相关技术&#xff0c;Dubbo 也不例外。下面我们从 Java 序列化的基础内容开始&#xff0c;介绍一下常见的序列化算法…