netcore Kafka

news2024/11/18 2:21:00

一、新建项目KafakDemo

  <ItemGroup>
    <PackageReference Include="Confluent.Kafka" Version="2.6.0" />
  </ItemGroup>

二、Program.cs

using Confluent.Kafka;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace KafakDemo
{
    internal class Program
    {
        static void Main(string[] args)
        {
            Task.Run(() =>
            {
                Publish();
            });
            Task.Run(() =>
            {
                Consumer();
            });
            Console.ReadKey();
        }

        static void Publish() {

            var conf = new ProducerConfig { BootstrapServers = "192.168.31.135:9092" };

            Action<DeliveryReport<Null, string>> handler = r =>
                        Console.WriteLine(!r.Error.IsError
                             ? $"Delivered message to {r.TopicPartitionOffset}"
                             : $"Delivery Error: {r.Error.Reason}");

            while (true)
            {
                using (var p = new ProducerBuilder<Null, string>(conf).Build())
                {
                    for (int i = 0; i < 1; ++i)
                    {
                        p.Produce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);
                    }
                    p.Flush();
                }
                Thread.Sleep(TimeSpan.FromSeconds(2));
            }
       
        }

        static void Consumer()
        {
            var conf = new ConsumerConfig
            {
                GroupId = "test-consumer-group",
                BootstrapServers = "192.168.31.135:9092"
                // Note: The AutoOffsetReset property determines the start offset in the event
                // there are not yet any committed offsets for the consumer group for the
                // topic/partitions of interest. By default, offsets are committed
                // automatically, so in this example, consumption will only start from the
                // earliest message in the topic 'my-topic' the first time you run the program.
                //AutoOffsetReset = AutoOffsetReset.Earliest
            };

            using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
            {
                c.Subscribe("my-topic");

                CancellationTokenSource cts = new CancellationTokenSource();
                Console.CancelKeyPress += (_, e) => {
                    // Prevent the process from terminating.
                    e.Cancel = true;
                    cts.Cancel();
                };

                try
                {
                    while (true)
                    {
                        try
                        {
                            var cr = c.Consume(cts.Token);
                            Console.WriteLine($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");
                        }
                        catch (ConsumeException e)
                        {
                            Console.WriteLine($"Error occured: {e.Error.Reason}");
                        }
                    }
                }
                catch (OperationCanceledException)
                {
                    // Ensure the consumer leaves the group cleanly and final offsets are committed.
                    c.Close();
                }
            }
        }
    }
}

运行效果:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2242530.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在k8s上部署Crunchy Postgres for Kubernetes

目录 一、前言二、安装Crunchy Postgres for Kubernetes三、部署一个简单的postgres集群四、增加pgbouncer五、数据备份六、备份恢复七、postgres配置参数八、数据导入九、权限管理 一、前言 Crunchy Postgres可以帮助我们在k8s上快速部署一个高可用、具有自动备份和恢复功能的…

函数指针示例

目录&#xff1a; 代码&#xff1a; main.c #include <stdio.h> #include <stdlib.h>int Max(int x, int y); int Min(int x, int y);int main(int argc, char**argv) {int x,y;scanf("%d",&x);scanf("%d",&y);int select;printf(&q…

计算机网络:运输层 —— 运输层端口号

文章目录 运输层端口号的分类端口号与应用程序的关联应用举例发送方的复用和接收方的分用 运输层端口号的分类 端口号只具有本地意义&#xff0c;即端口号只是为了标识本计算机网络协议栈应用层中的各应用进程。在因特网中不同计算机中的相同端口号是没有关系的&#xff0c;即…

牛客挑战赛77

#include <iostream>// 函数 kXOR&#xff1a;计算两个数在 k 进制下的异或和 // 参数&#xff1a; // a: 第一个正整数 // b: 第二个正整数 // k: 进制基数 // 返回值&#xff1a; // 两数在 k 进制下的异或和&#xff08;十进制表示&#xff09; long long kXO…

大数据-225 离线数仓 - 目前需求分析 指标口径 日志数据采集 taildir source HDFS Sink Agent Flume 优化配置

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

【流量分析】常见webshell流量分析

免责声明&#xff1a;本文仅作分享&#xff01; 对于常见的webshell工具&#xff0c;就要知攻善防&#xff1b;后门脚本的执行导致webshell的连接&#xff0c;对于默认的脚本要了解&#xff0c;才能更清晰&#xff0c;更方便应对。 &#xff08;这里仅针对部分后门代码进行流量…

springboot基于Web足球青训俱乐部管理后台系统开发(代码+数据库+LW)

摘 要 随着社会经济的快速发展&#xff0c;人们对足球俱乐部的需求日益增加&#xff0c;加快了足球健身俱乐部的发展&#xff0c;足球俱乐部管理工作日益繁忙&#xff0c;传统的管理方式已经无法满足足球俱乐部管理需求&#xff0c;因此&#xff0c;为了提高足球俱乐部管理效率…

电子应用设计方案-12:智能窗帘系统方案设计

一、系统概述 本设计方案旨在打造便捷、高效的全自动智能窗帘系统。 二、硬件选择 1. 电机&#xff1a;选用低噪音、扭矩合适的智能电机&#xff0c;根据窗帘尺寸和重量确定电机功率&#xff0c;确保能平稳拉动窗帘。 2. 轨道&#xff1a;选择坚固、顺滑的铝合金轨道&…

使用Element UI实现前端分页,及el-table表格跨页选择数据,切换分页保留分页数据,限制多选数量

文章目录 一、前端分页1、模板部分 (\<template>)2、数据部分 (data)3、计算属性 (computed)4、方法 (methods) 二、跨页选择1、模板部分 (\<template>)2、数据部分 (data)3、方法 (methods) 三、限制数量1、模板部分 (\<template>)2、数据部分 (data)3、方法…

mysql时间时区修改、set global、配置文件-default-time-zone

通过查看mysql错误日志或二进制日志可以看到时间和时区并不与国内的东八区时间一致。 查询mysql系统时区时间 show variables where variable_name"system_time_zone"; CST指的是中国标准时间&#xff0c;也是中国的标准时区。 set命令修改时区时间 global&#xf…

零基础利用实战项目学会Pytorch

目录 pytorch简介 1.线性回归 2.数据类型 2.1数据类型检验 2.2Dimension0/Rank0 2.3 Dim1/Rank1 2.4 Dim2/Rank2 3.一些方法 4.Pytorch完成分类任务 4.1模型参数 4.2 前向传播 4.3训练以及验证 4.4 三行搞定&#xff01; 4.5 准确率 5、Pytorch完成回归任务 5.…

信捷PLC转以太网连接电脑方法

信捷XC/XD/XL等系列PLC如何上下载程序?可以选择用捷米特JM-ETH-XJ模块轻松搞定,并不需要编程&#xff0c;即插即用&#xff0c;具体看见以下介绍&#xff1a; 产品介绍 捷米特JM-ETH-XJ是专门为信捷PLC转以太网通讯面设计&#xff0c;可实现工厂设备信息化需求&#xff0c;对…

【Flink】-- flink新版本发布:v2.0-preview1

目录 1、简介 2、非兼容变更 2.1、API 2.2、连接器适配计划 2.3、配置 2.4、其它 3、重要新特性 3.1、存算分离状态管理 3.2、物化表 3.3、批作业的自适应执行 3.4、流式湖仓 4、附加 4.1、非兼容性的 api 程序变更 4.1.2、Removed Classes # 4.1.3、Modified Cl…

头歌-本关任务:使用GmSSL命令行,生成SM2私钥并对文件进行签名验证(第二关)。

第一关在网上找到了&#xff0c;但第二关没找到&#xff0c;在这里做一下补充:) 如果想认真学的话可以看看文档 国密SM2椭圆曲线密码标准http://gmssl.org/docs/sm2.html 内容为 GuetPython 的明文文件msg.txt 私钥sm2.pem 公钥sm2Pub.pem 使用sm2utl对msg.txt进行签名&…

使用 unicorn 和 capstone 库来模拟 ARM Thumb 指令的执行(一)

import binascii import unicorn import capstonedef printArm32Regs(mu):for i in range(66,78):print("R%d,value:%x"%(i-66,mu.reg_read(i)))def testhumb():CODE b\x1C\x00\x0A\x46\x1E\x00"""MOV R3, R0 的机器码&#xff1a;0x1C 0x00&#xf…

【C++初阶】第1课—初识c++

文章目录 1. 学习c之前的开胃菜2. c的发展历程3. c参考文档4. c的第一个程序5. 命名空间5.1 关键字namespace5.2 namespace的嵌套使用5.3 命名空间的使用 6. c输入和输出7. 缺省参数8. 函数重载9. 引用9.1 引用的使用9.2 const引用9.3 引用和指针的关系 10. nullptr11. inline修…

HarmonyOS ArkUI(基于ArkTS) 常用组件

一 Button 按钮 Button是按钮组件&#xff0c;通常用于响应用户的点击操作,可以加子组件 Button(我是button)Button(){Text(我是button)}type 按钮类型 Button有三种可选类型&#xff0c;分别为胶囊类型&#xff08;Capsule&#xff09;、圆形按钮&#xff08;Circle&#xf…

Opengl光照测试

代码 #include "Model.h" #include "shader_m.h" #include "imgui.h" #include "imgui_impl_glfw.h" #include "imgui_impl_opengl3.h" //以上是放在同目录的头文件#include <glad/glad.h> #include <GLFW/glfw3.…

算法沉淀一:双指针

目录 前言&#xff1a; 双指针介绍 对撞指针 快慢指针 题目练习 1.移动零 2.复写零 3.快乐数 4.盛水最多的容器 5.有效三角形的个数 6.和为s的两个数 7.三数之和 8.四数之和 前言&#xff1a; 此章节介绍一些算法&#xff0c;主要从leetcode上的题来讲解&#xff…

js识别二维码

需要下载的js文件&#xff1a;https://download.csdn.net/download/impossible1994727/90001718https://download.csdn.net/download/impossible1994727/90001718 或者直接复制也行&#xff1a; var _aa {}; _aa._ab function (f, e) { var d qrcode.width; var b qrcode…