Ubuntu下C语言操作kafka示例

news2025/1/24 17:39:09

目录

安装kafka:

安装librdkafka

consumer

Producer

测试运行


安装kafka:

Ubuntu下Kafka安装及使用_ubuntu安装kafka-CSDN博客

安装librdkafka

github地址:GitHub - confluentinc/librdkafka: The Apache Kafka C/C++ library

$ apt install librdkafka-dev

安装路径如下:

consumer

/**
 * Simple high-level balanced Apache Kafka consumer
 * using the Kafka driver from librdkafka
 * (https://github.com/edenhill/librdkafka)
 */

#include <stdio.h>
#include <signal.h>
#include <string.h>
#include <ctype.h>


/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is builtin from within the librdkafka source tree and thus differs. */
//#include <librdkafka/rdkafka.h>
#include "rdkafka.h"


static volatile sig_atomic_t run = 1;

/**
 * @brief Signal termination of program
 */
static void stop (int sig) {
        run = 0;
}



/**
 * @returns 1 if all bytes are printable, else 0.
 */
static int is_printable (const char *buf, size_t size) {
        size_t i;

        for (i = 0 ; i < size ; i++)
                if (!isprint((int)buf[i]))
                        return 0;

        return 1;
}


int main (int argc, char **argv) {
        rd_kafka_t *rk;          /* Consumer instance handle */
        rd_kafka_conf_t *conf;   /* Temporary configuration object */
        rd_kafka_resp_err_t err; /* librdkafka API error code */
        char errstr[512];        /* librdkafka API error reporting buffer */
        const char *brokers;     /* Argument: broker list */
        const char *groupid;     /* Argument: Consumer group id */
        char **topics;           /* Argument: list of topics to subscribe to */
        int topic_cnt;           /* Number of topics to subscribe to */
        rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics */
        int i;

        /*
         * Argument validation
         */
        if (argc < 4) {
                fprintf(stderr,
                        "%% Usage: "
                        "%s <broker> <group.id> <topic1> <topic2>..\n",
                        argv[0]);
                return 1;
        }

        brokers   = argv[1];
        groupid   = argv[2];
        topics    = &argv[3];
        topic_cnt = argc - 3;


        /*
         * Create Kafka client configuration place-holder
         */
        conf = rd_kafka_conf_new();	// 创建配置文件

        /* Set bootstrap broker(s) as a comma-separated list of
         * host or host:port (default port 9092).
         * librdkafka will use the bootstrap brokers to acquire the full
         * set of brokers from the cluster. */
        if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
                              errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%s\n", errstr);
                rd_kafka_conf_destroy(conf);
                return 1;
        }

        /* Set the consumer group id.
         * All consumers sharing the same group id will join the same
         * group, and the subscribed topic' partitions will be assigned
         * according to the partition.assignment.strategy
         * (consumer config property) to the consumers in the group. */
        if (rd_kafka_conf_set(conf, "group.id", groupid,
                              errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%s\n", errstr);
                rd_kafka_conf_destroy(conf);
                return 1;
        }

        /* If there is no previously committed offset for a partition
         * the auto.offset.reset strategy will be used to decide where
         * in the partition to start fetching messages.
         * By setting this to earliest the consumer will read all messages
         * in the partition if there was no previously committed offset. */
        if (rd_kafka_conf_set(conf, "auto.offset.reset", "earliest",
                              errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%s\n", errstr);
                rd_kafka_conf_destroy(conf);
                return 1;
        }

        /*
         * Create consumer instance.
         *
         * NOTE: rd_kafka_new() takes ownership of the conf object
         *       and the application must not reference it again after
         *       this call.
         */
         // 创建一个kafka消费者
        rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
        if (!rk) {
                fprintf(stderr,
                        "%% Failed to create new consumer: %s\n", errstr);
                return 1;
        }

        conf = NULL; /* Configuration object is now owned, and freed,
                      * by the rd_kafka_t instance. */


        /* Redirect all messages from per-partition queues to
         * the main queue so that messages can be consumed with one
         * call from all assigned partitions.
         *
         * The alternative is to poll the main queue (for events)
         * and each partition queue separately, which requires setting
         * up a rebalance callback and keeping track of the assignment:
         * but that is more complex and typically not recommended. */
        rd_kafka_poll_set_consumer(rk);// poll机制,设置消费者实例到poll中


        /* Convert the list of topics to a format suitable for librdkafka */
        // 创建主题分区列表
        subscription = rd_kafka_topic_partition_list_new(topic_cnt);
        for (i = 0 ; i < topic_cnt ; i++)
                rd_kafka_topic_partition_list_add(subscription,
                                                  topics[i],
                                                  /* the partition is ignored
                                                   * by subscribe() */
                                                  RD_KAFKA_PARTITION_UA);

        /* Subscribe to the list of topics */
        err = rd_kafka_subscribe(rk, subscription);
        if (err) {
                fprintf(stderr,
                        "%% Failed to subscribe to %d topics: %s\n",
                        subscription->cnt, rd_kafka_err2str(err));
                rd_kafka_topic_partition_list_destroy(subscription);
                rd_kafka_destroy(rk);
                return 1;
        }

        fprintf(stderr,
                "%% Subscribed to %d topic(s), "
                "waiting for rebalance and messages...\n",
                subscription->cnt);

        rd_kafka_topic_partition_list_destroy(subscription);


        /* Signal handler for clean shutdown */
        signal(SIGINT, stop);

        /* Subscribing to topics will trigger a group rebalance
         * which may take some time to finish, but there is no need
         * for the application to handle this idle period in a special way
         * since a rebalance may happen at any time.
         * Start polling for messages. */

        while (run) {
                rd_kafka_message_t *rkm;
				
                rkm = rd_kafka_consumer_poll(rk, 100);
                if (!rkm)
                        continue; /* Timeout: no message within 100ms,
                                   *  try again. This short timeout allows
                                   *  checking for `run` at frequent intervals.
                                   */

                /* consumer_poll() will return either a proper message
                 * or a consumer error (rkm->err is set). */
                if (rkm->err) {
                        /* Consumer errors are generally to be considered
                         * informational as the consumer will automatically
                         * try to recover from all types of errors. */
                        fprintf(stderr,
                                "%% Consumer error: %s\n",
                                rd_kafka_message_errstr(rkm));
                        rd_kafka_message_destroy(rkm);
                        continue;
                }

                /* Proper message. */
                printf("Message on %s [%"PRId32"] at offset %"PRId64":\n",
                       rd_kafka_topic_name(rkm->rkt), rkm->partition,
                       rkm->offset);

                /* Print the message key. */
                if (rkm->key && is_printable(rkm->key, rkm->key_len))
                        printf(" Key: %.*s\n",
                               (int)rkm->key_len, (const char *)rkm->key);
                else if (rkm->key)
                        printf(" Key: (%d bytes)\n", (int)rkm->key_len);

                /* Print the message value/payload. */
                if (rkm->payload && is_printable(rkm->payload, rkm->len))
                        printf(" Value: %.*s\n",
                               (int)rkm->len, (const char *)rkm->payload);
                else if (rkm->payload)
                        printf(" Value: (%d bytes)\n", (int)rkm->len);

                rd_kafka_message_destroy(rkm);
        }


        /* Close the consumer: commit final offsets and leave the group. */
        fprintf(stderr, "%% Closing consumer\n");
        rd_kafka_consumer_close(rk);


        /* Destroy the consumer */
        rd_kafka_destroy(rk);

        return 0;
}

编译:

gcc consumer.c -o consumer -I/usr/include/librdkafka -L/usr/lib/x86_64-linux-gnu -lrdkafka++ -lrdkafka

Producer

/**
 * Simple Apache Kafka producer
 * using the Kafka driver from librdkafka
 * (https://github.com/edenhill/librdkafka)
 */

#include <stdio.h>
#include <signal.h>
#include <string.h>


/* Typical include path would be <librdkafka/rdkafka.h>, but this program
 * is builtin from within the librdkafka source tree and thus differs. */
#include "rdkafka.h"


static volatile sig_atomic_t run = 1;

/**
 * @brief Signal termination of program
 */
static void stop (int sig) {
        run = 0;
        fclose(stdin); /* abort fgets() */
}


/**
 * @brief Message delivery report callback.
 *
 * This callback is called exactly once per message, indicating if
 * the message was succesfully delivered
 * (rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) or permanently
 * failed delivery (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR).
 *
 * The callback is triggered from rd_kafka_poll() and executes on
 * the application's thread.
 */
static void dr_msg_cb (rd_kafka_t *rk,
                       const rd_kafka_message_t *rkmessage, void *opaque) {
        if (rkmessage->err)
                fprintf(stderr, "%% Message delivery failed: %s\n",
                        rd_kafka_err2str(rkmessage->err));
        else
                fprintf(stderr,
                        "%% Message delivered (%zd bytes, "
                        "partition %"PRId32")\n",
                        rkmessage->len, rkmessage->partition);

        /* The rkmessage is destroyed automatically by librdkafka */
}



int main (int argc, char **argv) {
        rd_kafka_t *rk;         /* Producer instance handle */
        rd_kafka_conf_t *conf;  /* Temporary configuration object */
        char errstr[512];       /* librdkafka API error reporting buffer */
        char buf[512];          /* Message value temporary buffer */
        const char *brokers;    /* Argument: broker list */
        const char *topic;      /* Argument: topic to produce to */

        /*
         * Argument validation
         */
        if (argc != 3) {
                fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
                return 1;
        }

        brokers = argv[1];
        topic   = argv[2];


        /*
         * Create Kafka client configuration place-holder
         */
        conf = rd_kafka_conf_new();

        /* Set bootstrap broker(s) as a comma-separated list of
         * host or host:port (default port 9092).
         * librdkafka will use the bootstrap brokers to acquire the full
         * set of brokers from the cluster. */
        if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers,
                              errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%s\n", errstr);
                return 1;
        }

        /* Set the delivery report callback.
         * This callback will be called once per message to inform
         * the application if delivery succeeded or failed.
         * See dr_msg_cb() above.
         * The callback is only triggered from rd_kafka_poll() and
         * rd_kafka_flush(). */
        rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);

        /*
         * Create producer instance.
         *
         * NOTE: rd_kafka_new() takes ownership of the conf object
         *       and the application must not reference it again after
         *       this call.
         */
        rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
        if (!rk) {
                fprintf(stderr,
                        "%% Failed to create new producer: %s\n", errstr);
                return 1;
        }

        /* Signal handler for clean shutdown */
        signal(SIGINT, stop);

        fprintf(stderr,
                "%% Type some text and hit enter to produce message\n"
                "%% Or just hit enter to only serve delivery reports\n"
                "%% Press Ctrl-C or Ctrl-D to exit\n");

        while (run && fgets(buf, sizeof(buf), stdin)) {
                size_t len = strlen(buf);
                rd_kafka_resp_err_t err;

                if (buf[len-1] == '\n') /* Remove newline */
                        buf[--len] = '\0';

                if (len == 0) {
                        /* Empty line: only serve delivery reports */
                        rd_kafka_poll(rk, 0/*non-blocking */);
                        continue;
                }

                /*
                 * Send/Produce message.
                 * This is an asynchronous call, on success it will only
                 * enqueue the message on the internal producer queue.
                 * The actual delivery attempts to the broker are handled
                 * by background threads.
                 * The previously registered delivery report callback
                 * (dr_msg_cb) is used to signal back to the application
                 * when the message has been delivered (or failed).
                 */
        retry:
                err = rd_kafka_producev(
                        /* Producer handle */
                        rk,
                        /* Topic name */
                        RD_KAFKA_V_TOPIC(topic),
                        /* Make a copy of the payload. */
                        RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                        /* Message value and length */
                        RD_KAFKA_V_VALUE(buf, len),
                        /* Per-Message opaque, provided in
                         * delivery report callback as
                         * msg_opaque. */
                        RD_KAFKA_V_OPAQUE(NULL),
                        /* End sentinel */
                        RD_KAFKA_V_END);

                if (err) {
                        /*
                         * Failed to *enqueue* message for producing.
                         */
                        fprintf(stderr,
                                "%% Failed to produce to topic %s: %s\n",
                                topic, rd_kafka_err2str(err));

                        if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) {
                                /* If the internal queue is full, wait for
                                 * messages to be delivered and then retry.
                                 * The internal queue represents both
                                 * messages to be sent and messages that have
                                 * been sent or failed, awaiting their
                                 * delivery report callback to be called.
                                 *
                                 * The internal queue is limited by the
                                 * configuration property
                                 * queue.buffering.max.messages */
                                rd_kafka_poll(rk, 1000/*block for max 1000ms*/);
                                goto retry;
                        }
                } else {
                        fprintf(stderr, "%% Enqueued message (%zd bytes) "
                                "for topic %s\n",
                                len, topic);
                }


                /* A producer application should continually serve
                 * the delivery report queue by calling rd_kafka_poll()
                 * at frequent intervals.
                 * Either put the poll call in your main loop, or in a
                 * dedicated thread, or call it after every
                 * rd_kafka_produce() call.
                 * Just make sure that rd_kafka_poll() is still called
                 * during periods where you are not producing any messages
                 * to make sure previously produced messages have their
                 * delivery report callback served (and any other callbacks
                 * you register). */
                rd_kafka_poll(rk, 0/*non-blocking*/);
        }


        /* Wait for final messages to be delivered or fail.
         * rd_kafka_flush() is an abstraction over rd_kafka_poll() which
         * waits for all messages to be delivered. */
        fprintf(stderr, "%% Flushing final messages..\n");
        rd_kafka_flush(rk, 10*1000 /* wait for max 10 seconds */);

        /* If the output queue is still not empty there is an issue
         * with producing messages to the clusters. */
        if (rd_kafka_outq_len(rk) > 0)
                fprintf(stderr, "%% %d message(s) were not delivered\n",
                        rd_kafka_outq_len(rk));

        /* Destroy the producer instance */
        rd_kafka_destroy(rk);

        return 0;
}

编译:

gcc producer.c -o producer -I/usr/include/librdkafka -L/usr/lib/x86_64-linux-gnu -lrdkafka++ -lrdkafka

测试运行

启动kafka:

bin/kafka-server-start.sh config/server.properties&

创建主题:demo1是主机名,mydemo1是主题。

./bin/kafka-topics.sh --create --bootstrap-server demo1:9092 --replication-factor 1 --partitions 1 --topic mydemo1

启动producer:

./producer demo1:9092 mydemo1

启动consumer:

./consumer demo1:9092 0 mydemo1

在producer终端输入测试消息,在consumer终端能够看到测试消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2263182.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

线程池ForkJoinPool详解

由一道算法题引发的思考 算法题&#xff1a;如何充分利用多核CPU的性能&#xff0c;快速对一个2千万大小的数组进行排序&#xff1f; 这道算法题可以拆解来看&#xff1a; 1&#xff09;首先这是一道排序的算法题&#xff0c;而且是需要使用高效的排序算法对2千万大小的数组…

基于多尺度动态卷积的图像分类

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

[Linux] 信号保存与处理

&#x1fa90;&#x1fa90;&#x1fa90;欢迎来到程序员餐厅&#x1f4ab;&#x1f4ab;&#x1f4ab; 主厨&#xff1a;邪王真眼 主厨的主页&#xff1a;Chef‘s blog 所属专栏&#xff1a;青果大战linux 总有光环在陨落&#xff0c;总有新星在闪烁 信号的保存 下面的概…

计算机网络-GRE Over IPSec实验

一、概述 前情回顾&#xff1a;上次基于IPsec VPN的主模式进行了基础实验&#xff0c;但是很多高级特性没有涉及&#xff0c;如ike v2、不同传输模式、DPD检测、路由方式引入路由、野蛮模式等等&#xff0c;以后继续学习吧。 前面我们已经学习了GRE可以基于隧道口实现分支互联&…

进网许可认证、交换路由设备检测项目更新25年1月起

实施时间 2025年1月1日起实施 涉及设备范围 核心路由器、边缘路由器、以太网交换机、三层交换机、宽带网络接入服务器&#xff08;BNAS&#xff09; 新增检测依据 GBT41266-2022网络关键设备安全检测方法交换机设备 GBT41267-2022网络关键设备安全技术要求交换机设备 GB/…

用C#(.NET8)开发一个NTP(SNTP)服务

完整源码&#xff0c;附工程下载&#xff0c;工程其实也就下面两个代码。 想在不能上网的服务器局域网中部署一个时间服务NTP&#xff0c;当然系统自带该服务&#xff0c;可以开启&#xff0c;本文只是分享一下该协议报文和能跑的源码。网上作为服务的源码不太常见&#xff0c;…

Connection lease request time out 问题分析

Connection lease request time out 问题分析 问题背景 使用apache的HttpClient&#xff0c;我们知道可以通过setConnectionRequestTimeout()配置从连接池获取链接的超时时间&#xff0c;而Connection lease request time out正是从连接池获取链接超时的报错&#xff0c;这通常…

【课程论文系列实战】:随机对照实验驱动的电商落地页优化

数据与代码见文末 摘要 随机对照试验&#xff08;Randomized Controlled Trial&#xff0c;RCT&#xff09;被认为是因果推断的“金标准”方法。通过随机分配实验参与者至不同组别&#xff0c;确保了组间可比性&#xff0c;RCT能够有效地消除样本选择偏差和混杂变量问题。本文…

UML 建模实验

文章目录 实验一 用例图一、安装并熟悉软件EnterpriseArchitect16二、用例图建模 实验二 类图、包图、对象图类图第一题第二题 包图对象图第一题第二题 实验三 顺序图、通信图顺序图银行系统学生指纹考勤系统饮料自动销售系统“买到饮料”“饮料已售完”“无法找零”完整版 通信…

高质量翻译如何影响软件用户体验 (UX)

在软件开发领域&#xff0c;用户体验 (UX) 是决定产品成败的关键因素之一。一个流畅、吸引人且直观的用户体验可以决定一款软件的成功与否。在影响优秀用户体验的众多因素中&#xff0c;高质量翻译尤为重要&#xff0c;尤其是在当今全球化的市场环境中。确保软件为不同语言和文…

ArcGIS Pro 3.4新功能2:Spatial Analyst新特性,密度、距离、水文、太阳能、表面、区域分析

Spatial Analyst 扩展模块在 ArcGIS Pro 3.4 中引入了新功能和增强功能。此版本为您提供了用于表面和区域分析的新工具以及改进的密度和距离分析功能&#xff0c;多种用于水文分析的工具性能的提高&#xff0c;一些新的太阳能分析功能。 目录 1.密度分析 2.距离分析 3.水文…

Linux C 程序 【05】异步写文件

1.开发背景 Linux 系统提供了各种外设的控制方式&#xff0c;其中包括文件的读写&#xff0c;存储文件的介质可以是 SSD 固态硬盘或者是 EMMC 等。 其中常用的写文件方式是同步写操作&#xff0c;但是如果是写大文件会对 CPU 造成比较大的负荷&#xff0c;采用异步写的方式比较…

凯酷全科技抖音电商服务的卓越践行者

在数字经济蓬勃发展的今天&#xff0c;电子商务已成为企业增长的新引擎。随着短视频平台的崛起&#xff0c;抖音作为全球领先的短视频社交平台&#xff0c;不仅改变了人们的娱乐方式&#xff0c;也为品牌和商家提供了全新的营销渠道。厦门凯酷全科技有限公司&#xff08;以下简…

精准提升:从94.5%到99.4%——目标检测调优全纪录

&#x1f680; 目标检测模型调优过程记录 在进行目标检测模型的训练过程中&#xff0c;我们面对了许多挑战与迭代。从初始模型的训练结果到最终的调优优化&#xff0c;每一步的实验和调整都有其独特的思路和收获。本文记录了我在优化目标检测模型的过程中进行的几次尝试&#…

STM8单片机学习笔记·GPIO的片上外设寄存器

目录 前言 IC基本定义 三极管基础知识 单片机引脚电路作用 STM8GPIO工作模式 GPIO外设寄存器 寄存器含义用法 CR1&#xff1a;Control Register 1 CR2&#xff1a;Control Register 2 ODR&#xff1a;Output Data Register IDR&#xff1a;Input Data Register 赋值…

国标GB28181平台EasyGBS在安防视频监控中的信号传输(电源/视频/音频)特性及差异

在现代安防视频监控系统中&#xff0c;国标GB28181协议作为公共安全视频监控联网系统的国家标准&#xff0c;该协议不仅规范了视频监控系统的信息传输、交换和控制技术要求&#xff0c;还为不同厂商设备之间的互联互通提供了统一的框架。EasyGBS平台基于GB28181协议&#xff0c…

如何使用checkBox组件实现复选框

文章目录 概念介绍使用方法示例代码我们在上一章回中介绍了DatePickerDialog Widget相关的内容,本章回中将介绍Checkbox Widget.闲话休提,让我们一起Talk Flutter吧。 概念介绍 我们在这里说的Checkbox也是叫复选框,没有选中时是一个正方形边框,边框内容是空白的,选中时会…

基于“2+1 链动模式商城小程序”的微商服务营销策略探究

摘要&#xff1a;本文探讨在竞争激烈的市场经济与移动互联网时代背景下&#xff0c;微商面临的机遇与挑战。着重分析“21 链动模式商城小程序”如何助力微商改变思路&#xff0c;通过重视服务、提升服务质量&#xff0c;以服务营销放大利润&#xff0c;实现从传统微商模式向更具…

1-1 STM32-0.96寸OLED显示与控制

1.0 模块原理图 2.0 0.96OLED简介 资料下载&#xff1a;https://jiangxiekeji.com/download.html 程序介绍&#xff1a;https://jiangxiekeji.com/tutorial/oled.html SSD1306是一款OLED/PLED点阵显示屏的控制器&#xff0c;可以嵌入在屏幕中&#xff0c;用于执行接收数据、显…

在Visual Studio 2022中配置C++计算机视觉库Opencv

本文主要介绍下载OpenCV库以及在Visual Studio 2022中配置、编译C计算机视觉库OpenCv的方法 1.Opencv库安装 ​ 首先&#xff0c;我们需要安装OpenCV库&#xff0c;作为一个开源库&#xff0c;我们可以直接在其官网下载Releases - OpenCV&#xff0c;如果官网下载过慢&#x…