.net core 中使用confluent kafka构建生产者

news2025/1/12 23:41:09
  1. 创建.net 6 API
  2. 安装依赖包
    在这里插入图片描述
  3. 创建kafka生产者
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using KafkaHelper.Config;
using Microsoft.Extensions.Options;

namespace KafkaHelper
{
    public class KafkaProducer
    {
        public IOptionsMonitor<KafkaConfig> _kafkaconfig;

        public KafkaProducer(IOptionsMonitor<KafkaConfig> kafkaconfig)
        {
            _kafkaconfig = kafkaconfig;
        }

        public void sendMessage()
        {

        }
        //创建topic
        public async Task<bool> createTopic(string topicName,short factorNum,int partitionNum)
        {
            using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}"}).Build())
            {
                try
                {
                    await adminClient.CreateTopicsAsync(new TopicSpecification[] {
                                new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }});
                    return true;
                }
                catch (CreateTopicsException e)
                {
                    Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
                    return false;
                }
            }
        }

        //删除topic
        public async Task<bool> deleteTopic(List<string> topicName)
        {
            using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build())
            {
                try
                {
                    await adminClient.DeleteTopicsAsync(topicName, null);
                    return true;
                }
                catch (Exception)
                {

                    return false;
                }
                
            }
        }

        //判断topic存在与否
        public async Task<bool> checkTopic(string topicName)
        {
            using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build())
            {
                var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));
                var topicsMetadata = metadata.Topics;
                var topicNames = metadata.Topics.Select(a => a.Topic).ToList();

                return topicNames.Contains(topicName);
            }
        }

        public static void handler(DeliveryReport<Null, string> deliveryReport)
        {
            Console.WriteLine(!deliveryReport.Error.IsError
                    ? $"Delivered message to {deliveryReport.TopicPartitionOffset}"
                    : $"Delivery Error: {deliveryReport.Error.Reason}");
        }


        public bool sendMessage(string topicName)
        {
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",
                MessageSendMaxRetries = 2
            };
            try
            {
                using (var p = new ProducerBuilder<Null, string>(producerConfig).Build())
                {
                    p.Produce(topicName, new Message<Null, string> { Value = $"my message" }, handler);

                    p.Flush(TimeSpan.FromSeconds(10));
                }
                return true;
            }
            catch (Exception)
            {

                return false;
            }
            
        }

        //自定义将消息发送到某个topic的分区中,以保证这个分区只存储某一个特定类型的数据
        public bool sendMessagePartition(string topicName)
        {
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",
                MessageSendMaxRetries = 2
            };
            try
            {
                using (var p = new ProducerBuilder<Null, string>(producerConfig).Build())
                {
                    var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build();
                    var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));

                    var topic = meta.Topics.SingleOrDefault(t => t.Topic == topicName);

                    var topicPartitions = topic.Partitions;

                    TopicPartition topicPartition = new TopicPartition(topicName, new Partition(1));
                    p.Produce(topicPartition, new Message<Null, string> { Value = $"my message" }, handler);

                    p.Flush(TimeSpan.FromSeconds(10));
                }
                return true;
            }
            catch (Exception)
            {

                return false;
            }

        }

        //添加标头
        public bool sendMessageHeader(string topicName)
        {
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",
                MessageSendMaxRetries = 2
            };
            try
            {
                using (var p = new ProducerBuilder<Null, string>(producerConfig).Build())
                {
                    var header = new Headers();


                    header.Add("ellis", Encoding.UTF8.GetBytes("{\"ellis\":\"dalian\"}"));

                    p.Produce(topicName, new Message<Null, string> { Value = $"my message",Headers=header }, handler);

                    p.Flush(TimeSpan.FromSeconds(10));
                }
                return true;
            }
            catch (Exception)
            {

                return false;
            }

        }


        public static void keyhandler(DeliveryReport<string, string> deliveryReport)
        {
            Console.WriteLine(!deliveryReport.Error.IsError
                    ? $"Delivered message to {deliveryReport.TopicPartitionOffset}"
                    : $"Delivery Error: {deliveryReport.Error.Reason}");
        }

        //通过指定key,让kafka按照key的hash值进行message的分区选择,相同的key会发送到相同的分区
        public bool sendMessageKey(string topicName,string key)
        {
            var producerConfig = new ProducerConfig
            {
                BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",
                MessageSendMaxRetries = 2,
                CompressionType= CompressionType.Gzip,
                Acks= Acks.All,
                EnableIdempotence= true,
            };
            try
            {
                using (var p = new ProducerBuilder<string, string>(producerConfig).Build())
                {
                    p.Produce(topicName, new Message<string, string> { Key=key, Value = $"my test" }, keyhandler);

                    p.Flush(TimeSpan.FromSeconds(10));
                }
                return true;
            }
            catch (Exception)
            {

                return false;
            }

        }

    }
}

p.Flush(TimeSpan.FromSeconds(10)),这里Flush函数的作用是等待所有回调函数执行完成,参数是超时时间,也就是最大的等待时间,这个操作无法被取消,所以应该设置较短的时间。还有需要注意的是,Flush函数的位置,不要让阻塞出现在循环中。

需要说明的是,kafka生产者在不指定key的时候,消息会均衡的分布在各个分区,我们可以指定消息的key,使得同一个key的消息发送到同一个分区。也可以指定消息发送的partition。

同一个分区消息是有序的。

关于kafka生产者的配置
https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html

样例
https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples

本博客源码
https://github.com/xdqt/asp.net-core-efcore-jwt-middleware/tree/master/CoreKafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/141511.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Cuk拓扑产生负压

1、基础拓扑的输入输出电流连续情况 1>Buck电路 图中绿色波形为输入端的电流&#xff08;测的图中MOS上的电流&#xff09;&#xff0c;蓝色的输出端的电流&#xff08;图中电感L4的电流&#xff09;&#xff0c;可以看出输入端电流不连续&#xff0c;输出端电流连续。 2&…

Revit中项目特别大如何将项目完整的体现在图纸中?

一、Revit中项目特别大如何将项目完整的体现在图纸中? 遇到项目特别大&#xff0c;在一张图纸是放置不下时&#xff0c;如图1所示&#xff0c;怎样才能将项目完整的体现在图纸当中? 在遇到特别大的项目可能会在图纸中放不下&#xff0c;在这种情况下我们要用拼接线来处理。在…

【JavaEE】SSM框架

文章目录一、Spring1、Spring相关概念1.1 Spring Framework系统架构1.2 核心概念(lOC、lOC容器、Bean、DI)2、入门案例2.1 IOC入门案例2.2 DI入门案例3、lOC相关内容3.1 bean配置3.2 bean实例化3.3 bean的生命周期3.3.1 控制bean生命周期执行的方法3.3.2 bean销毁时机4、DI相关…

AI智能分析在智慧电厂的典型应用

电力供应是整个社会生产、人民生活的基本保证之一。智慧电力作为城市智能化发展的客观需求&#xff0c;是智慧城市的重要基础&#xff0c;也是智慧城市建设的一项重要内容。 智慧能源用最前沿技术淋漓尽致地表达着对未来能源发展趋势的理解与实践。智慧电力将多项创新成果应用于…

【前端】Vue项目:旅游App-(6)city:隐藏TabBar的2种方法

文章目录目标过程与代码方法1&#xff1a;通过路由隐藏方法2&#xff1a;用样式隐藏对方法2封装总代码修改的文件common.cssindex.jscity.vue目标 city页是点击上篇“广州”位置所跳转的页面。此页面要隐藏TabBar。 过程与代码 city页要隐藏TabBar。我们这里有两种隐藏的方法…

【Effective Objective - C】—— 读书笔记(五)

【Effective Objective - C】—— 读书笔记&#xff08;五&#xff09; 内存管理 文章目录【Effective Objective - C】—— 读书笔记&#xff08;五&#xff09;内存管理29.理解引用计数引用计数工作原理属性存取方法中的内存管理自动释放池保留环要点30.以ARC简化引用计数使…

Qt扫盲-QSystemTrayIcon理论总结

QSystemTrayIcon理论总结一、概述二、使用对象三、使用四、常用函数介绍1. 静态函数2. 公共槽函数3. 信号一、概述 现代操作系统通常在桌面上提供一个特殊的区域&#xff0c;称为系统托盘或通知区域&#xff0c;长期运行的应用程序可以在这里显示图标和短消息。什么意思呢&…

【Spring】1. Java对象序列化和反序列化

1. 概念 1.1 序列化 将数据结构或对象转换成二进制字节流的过程 1.2 反序列化 序列化的反过程把二进制字节流恢复为数据结构或对象的过程1.3 序列化的目的&#xff1a; 通过网络传输对象或者说是将对象存储到文件系统、数据库、内存中。 2. 为什么要进行序列化&#xff1f;&…

【Linux】Linux开发工具(一)——vim工具

作者&#xff1a;一个喜欢猫咪的的程序员 专栏&#xff1a;《Linux》 喜欢的话&#xff1a;世间因为少年的挺身而出&#xff0c;而更加瑰丽。 ——《人民日报》 目录 1.什么是vim 1.1什么是vim 1.2vim和vi的区别&#xff1a; 2.vim基础 2.…

字符串的模式匹配

字符串的模式匹配引言应用方法一 暴力匹配算法 (C语言实现)程序实现暴力算法思想暴力算法的时间复杂度方法二 KMP 算法程序实现KMP 算法思想KMP 算法的时间复杂度暴力匹配算法 vs KMP 算法next 数组的训练KMP 算法的优化next 数组 转换成 nextval 数组的思想引言 在我们日常生…

第12章 角色页的修改、添加

1 定义src\components\Users\EditRole.vue <template> <el-dialog width"30%"> <!-- <span>{{propParent}}</span> --> <template #header> <div class"my-header"> <h1 style"margin: 0px; padding: …

快速指南 :ESP-IDF 自定义以太网 PHY 驱动程序

“我想用我最喜欢的芯片开始新的产品设计&#xff0c;但它断货了&#xff01;哦&#xff0c;不&#xff01;我必须设计一个新的 PCB&#xff0c;并重新开发驱动程序&#xff01;”如今&#xff0c;每个设计师都非常清楚这种感觉…好消息是&#xff0c;至少在 ESP-IDF 以太网 PH…

[C语言]浮点型在内存中的存储

在上一篇文章&#xff0c;我们讲述了整型在内存中的存储&#xff0c;这篇文章我们就一起来看一下“浮点型在内存中的存储” 回顾&#xff1a;整型在内存中的存储[C语言]和我一起来认识“整型在内存中的存储”_HY_PIGIE的博客-CSDN博客 目录 1.浮点数家族 2.整型和浮点型的存储…

教你从零开始搭建自己的魔兽世界服务器

首先需要一份 WOW 的程序底包:1底包使用方法: 解压后,放到 d: 目录即可, 如下图 &#xff08;最好是D盘下 因为有很多东西都是D:/连接的 &#xff09;2运行http-mysql/下的文件INIT.CM_重命名为INIT.CMD 运行3设置登录器下载 &#xff0c;在http-mysql/htdocs下创建DOWNLOAD文件…

再次改进MBR(从磁盘读入Loader加载器)

文章目录前言前置知识代码说明实验操作前言 本博客记录《操作系统真象还原》第二章第2个实验操作~ 实验环境&#xff1a;ubuntu18.04VMware &#xff0c; Bochs下载安装 实验内容&#xff1a;从磁盘读入Loader加载器 实验思路&#xff1a; MBR 受到512字节大小的限制&#…

kernel pwn gdb调试

前言 对于Linux的二进制程序&#xff0c;gdb调试是十分重要的&#xff0c;可以清楚的了解程序是如何运行的&#xff0c;这里单独拉一篇记录我在kernel pwn中遇到的一些调试 GDB选择 在三大件pwndbg,gef,peda中&#xff0c;用了一圈下来感觉gef和pwndbg都挺好 gdb安装 简单…

PaddleNLP教程文档

文章目录一、快速开始1.1 安装PaddleNLP并 加载数据集1.2 数据预处理1.3 加载预训练模型1.4 设置评价指标和训练策略1.5 模型训练与评估1.6 模型预测二、数据处理2.1 整体介绍2.2 加载内置数据集2.3 自定义数据集2.3.1 从本地文件创建数据集2.3.2 paddle.io.Dataset/IterableDa…

OpenShift Security - 用 RHACS 为应用自动生成 NetworkPolicy

《OpenShift / RHEL / DevSecOps / Ansible 汇总目录》 说明&#xff1a;本文已经在 OpenShift 4.12 RHACS 3.73.1 环境中验证 文章目录什么是 NP-Guard用 NP-Guard 自动生成 NetworkPolicy参考什么是 NP-Guard NP-Guard 是 IBM 发起的一个开源项目&#xff0c;用来自动创建 …

WindowsTerminal 安装 oh-my-posh

文章目录1 前言2 安装过程3 Posh Themes 自定义主题参考1 前言 在Linux中&#xff0c;有非常好用的oh-my-zsh&#xff0c;最近使用WindowsTerminal时想想有没有和oh-my-zsh相同好用的插件呢&#xff0c;答案是&#xff1a;oh-my-posh 2 安装过程 进入最新版PowerShell&#…

干货 | 解决 App 自动化测试的常见痛点(弹框及首页启动加载完成判断处理)

1. 常见痛点App 自动化测试中有些常见痛点问题&#xff0c;如果框架不能很好的处理&#xff0c;就可能出现元素定位超时找不到的情况&#xff0c;自动化也就被打断终止了。很容易打消做自动化的热情&#xff0c;导致从入门到放弃。比如下面的两个问题&#xff1a;一是 App 启动…