Kafka+PostgreSql,构建一个总线服务

news2025/1/11 0:26:20

之前开发的系统,用到了RabbitMQ和SQL Server作为总线服务的传输层和存储层,最近一直在看Kafka和PostgreSql相关的知识,想着是不是可以把服务总线的技术栈切换到这个上面。今天花了点时间试了试,过程还是比较顺利的,后续就是搭建基础服务的事情了。这里简单分享一下。

环境安装

安装Kafka

官方文档:Apache Kafka,可以直接参考,我这里简单介绍下我在本地搭建开发环境的过程,还是遇到了一个小坑。

我这里是在本地WSL 2环境下进行的安装,安装过程就参考官方文档的推荐流程即可

下载安装包

注意,这里要下载编译后的包,不嫌麻烦的话,可以下载源代码,编译后再使用。

wget -c https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz

安装

tar -xzf kafka_2.13-3.8.0.tgz
cd kafka_2.13-3.8.0

这里安装完成后的路径是这样子的

重点关注的就是bin,config和logs这3个目录。

启动服务

官方提供了2中启动策略,一个是KRaft,一个是Zookeeper,我这里用的zookeeper

先启动zookeeper服务

bin/zookeeper-server-start.sh config/zookeeper.properties

在启动kafka服务

bin/kafka-server-start.sh config/server.properties

后面的zookeeper.properties和server.properties是配置文件,后续有配置需求的时候可以修改,比如监听地址,brokerid等等,长这样👇

启动后控制台的输出是这样

这样,一个kafka的服务节点就启动了。

对了,kafka是依赖java环境的,安装之前本地要安装jdk,我这里使用的是openjdk,也是ok的。

*端口转发(仅WSL2环境)

在WSL2环境下,需要配置下端口转发,不然宿主机连接不到broker,

netsh interface portproxy add v4tov4 listenport=9092 listenaddress=0.0.0.0 connectport=9092 connectaddress=172.28.240.79

后面那个ip地址就写宿主机给WSL环境下发的地址

此外,宿主机和wsl环境都放开9092(或者你设置的)端口

链接测试

这里有很多客户端的ui工具或者插件可以连接Kafka,官方本身也提供了测试命令,比如官方文档里给的测试案例就是用这几个命令

本地开发的话,我这里用的vs code的tools for apache kafka@ 这个插件,在插件市场用关键字搜索完成,安装即可

至此,一个本地的Kafka节点就基本配置完成了

安装PostgreSql

这个我老早就装好了,一些安装过程没有截图,就忽略吧,大家有需求的可以问一下各种GPT

也可以用docker,快速部署一个节点做本地的测试。

docker run --name postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres

开发测试

新建项目

这里因为我是用的IDE做开发,所以直接创建个web项目就好,也可以用命令行来创建。

总之创建完成后,我的项目长这样

安装依赖

我这里是用的是dotnet.cap这个系列组件,然后为了测试方便,数据库的orm适用的是dapper,主要是图快,大家实际项目中可以用习惯的orm就好。

这里我的项目文件长这样

<Project Sdk="Microsoft.NET.Sdk.Web">

  <PropertyGroup>
    <TargetFramework>net8.0</TargetFramework>
    <Nullable>enable</Nullable>
    <ImplicitUsings>enable</ImplicitUsings>
  </PropertyGroup>
	
  <ItemGroup>
    <PackageReference Include="Dapper" Version="2.1.35" />
    <PackageReference Include="DotNetCore.CAP" Version="8.2.0" />
    <PackageReference Include="DotNetCore.CAP.Dashboard" Version="8.2.0" />
    <PackageReference Include="DotNetCore.CAP.Kafka" Version="8.2.0" />
    <PackageReference Include="DotNetCore.CAP.PostgreSql" Version="8.2.0" />
  </ItemGroup>

</Project>

注入服务

这里主要注入pg和Kafka

builder.Services.AddCap(x =>
{
    x.UsePostgreSql("User ID={pg用户名};Password={pg密码};Host={pg地址};Port=5432;Database=maigcTestDb;");

    x.UseKafka("localhost:9092");

    x.UseDashboard();
});

测试的业务代码

在常规的controller中注入服务

public class ValuesController(ICapPublisher producer) : Controller, ICapSubscribe
{
    /*
    业务代码
    */
}
//上面这是最新的写法,以前那种构造函数的写法也是ok的
public class Values2Controller : Controller
{
    private ICapPublisher _capPublisher;
    public Values2Controller(ICapPublisher capPublisher)
    {
        _capPublisher = capPublisher;
    }
}

写一个生产者接口

public async Task<IActionResult> Producer()
{
    Console.WriteLine("生产者发布消息: " + DateTime.Now);
    await producer.PublishAsync("sample.kafka.postgrsql", DateTime.Now);

    return Ok();
}

再写一个延时发送消息的生产者接口

public async Task<IActionResult> ProducerDelay()
{
    Console.WriteLine("生产者发布延时消息: " + DateTime.Now);
    await producer.PublishDelayAsync(TimeSpan.FromSeconds(delaySeconds), "sample.kafka.postgrsql", DateTime.Now);

    return Ok();
}

创建消费者

[CapSubscribe("sample.kafka.postgrsql")]
public void Test2(DateTime value)
{
    Console.WriteLine("订阅到消息: " + value);
}

我们访问下接口看下控制台的打印效果

可以看到,订阅到的时间和生产者发送的实际是一致的。

再试下延时发送

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。

我们延时了10秒发布,这里生产者执行生产消息后,过了10秒,被消费者订阅到。

再看下PostG里保存的消息记录

这是生产记录

这是消费记录

注意,在CAP的机制里,这些持久化的消息记录是可以设置过期时间的,也就是如果我们每天的并发量很高,产生的消息非常多,可以设置一个过期时间,比如7天,一个月,到期后,这些持久化的数据就会自动清除掉。

CAP的官方文档里,还有更多案例,大家感兴趣也可以去试试,当然除了CAP还有MediatR,MassTransit这类组件,也可以轻松实现消息总线的机制。

好了,到此我们的测试就结束了,从安装Kafka,到创建这个新项目并跑通这个测试服务,也就2个小时,所以,这个迁移成本应该还是非常高效的。

小总结

实际上,我们的生产环境中,正正常运行的一套总线服务,依赖的是RabbitMQ和SQL Server,RabbitMQ还好,SQL Server在以后应该不会是做项目的首选数据库了,尤其是做一些高并发的项目,不是说它性能不够,而是成本太高,社区版的限制有太多,还是要早做规划,提前准备更加适合未来发展的方案,而PostgreSql是目前最受全球开发者欢迎的关系数据库,社区活跃度非常高,开源协议对企业也十分友好,即便是面对国内高标准的信创要求,也完全没问题,是绝佳的首选。

至于Kafka,这是目前世界上最为流行的消息队列,性能,可用性,可扩展性等各方面都比其他消息队列要好上一点。阿里后来推出的RocketMQ,也是基于Kafka的设计原理做了简化和更加适应国内环境的一些调整,根骨还是来自Kafka。而且就生态环境而言,无论国内还是国外,Kafka都是遥遥领先,对dotnet框架的支持,Kafka也远比RocketMQ更好(RocketMQ更多的还是用在java环境里),所以我们再选型的时候,优先考虑的还是Kafka。

更多关于这些内容的知识,大家感兴趣可以去搜一下或者找个AI问一下。

好了,就这些吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2136538.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

破解AI生成检测:如何用ChatGPT降低论文的AIGC率

学境思源&#xff0c;一键生成论文初稿&#xff1a; AcademicIdeas - 学境思源AI论文写作 降低论文的“AIGC率”是个挑战&#xff0c;但有一些策略可以尝试。使用ChatGPT逐步调整和改进内容&#xff0c;使其更加自然和原创&#xff0c;降低AI检测工具识别出高“AIGC率”的概率…

专访阿里云:AI 时代服务器操作系统洗牌在即,生态合作重构未来

编者按&#xff1a;近日&#xff0c;2024 龙蜥操作系统大会已于北京圆满举办。大会期间&#xff0c;CSDN 采访了阿里云基础软件部资深技术总监、龙蜥社区技术委员会主席杨勇&#xff0c;前瞻性宏观解读面向 AI 智算时代&#xff0c;服务器操作系统面临的挑战与机遇。以下为采访…

云曦2024秋考核

真正的hacker 进去以后一眼就能看出来&#xff0c;是ThinkphpV5漏洞&#xff0c;只是版本不能确定&#xff0c;一开始考核的时候是&#xff0c;抓包看了php的版本&#xff0c;是7.23&#xff0c;是手注了几个尝试出来的&#xff08;后面才发现报错信息里面就有&#xff09;。漏…

记录word转xml文件踩坑

word文件另存为xml文件后&#xff0c;xml文件乱码 解决方法&#xff1a; 1.用word打开.docx文件 2.另存为xml文件 3.点击工具 -> Web选项 -> 编码&#xff0c;选择UTF-8 4.点击确定 5.使用notpad打开xml文件 6.使用xml tool进行xml格式化即可。

【免费资料推荐】数据资产管理实践白皮书(6.0版)

荐言&#xff1a;随着数字经济的快速发展&#xff0c;数据已成为企业最重要的资产之一。为有效管理和利用数据资产&#xff0c;各行业纷纷推出数据管理框架和标准。数据资产管理实践白皮书&#xff08;6.0版&#xff09;由中国信息通信研究院联合相关企业共同编写&#xff0c;是…

利士策分享,细品礼仪之美:在日常中优雅相处的艺术

利士策分享&#xff0c;细品礼仪之美&#xff1a;在日常中优雅相处的艺术 在当今这个快节奏、高压力的社会里&#xff0c;人与人之间的交往似乎被简化成了快餐式的信息交流。 然而&#xff0c;根植于文化深处的礼仪之花&#xff0c;依然是促进社会和谐、深化人际关系的宝贵财富…

python使用Pandas读取excel的行列内容

我的Excel文件名称是“测试.xlsx” 首先读取excle的文件内容 import pandas as pd dfpd.read_excel(测试.xlsx) #这个会直接默认读取到这个Excel的第一个sheet print(df)可以看看输出的是什么&#xff1a; 2. df.loc[0]&#xff0c;表示读取Excel的第一行&#xff08;这里…

docker容器中的内存占用高的问题分析

文章目录 问题描述原因分析分析1分析2验证猜想 结论和经验 问题描述 运维新增对某服务的监控后发现&#xff1a;内存不断上涨的现象。进一步确认&#xff0c;是因为有多个导出日志操作导致的内存上涨问题。 进一步的测试得出的结果是&#xff1a;容器刚启动是占用内存约为50M…

白话:大型语言模型中的幻觉(Hallucinations)

大型语言模型&#xff08;LLM&#xff09;可是自然语言处理和人工智能的一大步。它们能做的事情可多了&#xff0c;比如生成听起来挺靠谱的文本&#xff0c;翻译语言&#xff0c;总结文档&#xff0c;甚至写诗。但你知道吗&#xff0c;这些模型有时候会出现 “幻觉&#xff08;…

音视频开发常见的开源项目

FFmpeg 地址&#xff1a;https://ffmpeg.org/介绍&#xff1a;FFmpeg 是一个非常强大的开源多媒体框架&#xff0c;它可以用来处理视频和音频文件。它支持多种格式的转换、编码、解码、转码、流处理等。FFmpeg 包括了 libavformat、libavcodec、libavutil、libswscale、libpos…

Matlab求解微分方程(解析解与数值解)

matlab求解微分方程解析解和数值解 Matlab求微分方程解析解例题1例题2例题3 Matlab求微分方程数值解一阶微分方程例题一例题二 高阶微分方程例题 Matlab求微分方程解析解 dsolve(eqns,conds,options) eqns:微分方程&#xff08;组&#xff09;、conds&#xff1a;初值条件、opt…

萌宠宜家商城系统

摘 要 随着现在经济的不断发展和信息技术性日益完善和优化&#xff0c;传统式数据信息的管理升级成手机软件存放、梳理和数据信息集中统一处理的管理方式。本萌宠物宜家商城系统软件起源于这个环境中&#xff0c;能够帮助管理者在短期内进行庞大数据信息。使用这个专业软件能够…

【开源免费】基于SpringBoot+Vue.JS购物商城网站(JAVA毕业设计)

本文项目编号 T 032 &#xff0c;文末自助获取源码 \color{red}{T032&#xff0c;文末自助获取源码} T032&#xff0c;文末自助获取源码 目录 一、系统介绍二、演示录屏三、启动教程四、功能截图五、文案资料5.1 选题背景5.2 国内外研究现状5.3 可行性分析 六、核心代码6.1 查…

文字识别多功能工具箱 | eSearch v13.1.6

eSearch v13.1.6 是一款开源的截屏识屏搜索工具&#xff0c;它提供了丰富的功能&#xff0c;包括截屏、OCR识别、搜索翻译、贴图、以图搜图和录屏等一体化实用工具。该软件基于 Electron 框架开发&#xff0c;适用于 Linux、Windows 和 macOS 平台。 软件的主要特点和功能包括…

Lua发邮件:实现自动化邮件发送教程指南!

Lua发邮件高级技巧有哪些&#xff1f;如何利用Lua发送电子邮件&#xff1f; 自动化邮件发送是一个非常实用的功能&#xff0c;广泛应用于各种场景&#xff0c;如通知、提醒、报告生成等。Lua作为一种轻量级脚本语言&#xff0c;因其简洁和高效而受到广泛欢迎。AokSend将详细介…

金钥匙系列:Kubernetes (K8s) 服务集群技术栈学习路线

维护Kubernetes (K8s) 服务集群是一个复杂且多层次的技术任务&#xff0c;涉及容器化技术、集群管理、网络、安全、监控等多个领域。为了成为一名优秀的K8s集群维护工程师&#xff0c;技术栈需要广泛且深入。本文将为你详细介绍从零开始到深入掌握K8s集群维护的职业技术栈学习路…

在 Mac 上安装双系统会影响性能吗,安装双系统会清除数据吗?

在 Mac 系统安装并使用双系统已经成为了许多用户办公的选择之一&#xff0c;双系统可以让用户在 Mac 上同时运行 Windows 或其他操作系统。然而&#xff0c;许多用户担心这样做会对 Mac 的性能产生影响。 接下来将给大家介绍 Mac 装双系统会影响性能吗&#xff0c;Mac装双系统…

【Hue导入Hive文件类型数据(自动建表)】

1、进入Hue访问界面&#xff0c;点击要导入表的schema&#xff0c;点击号&#xff0c;上传要导入的文件。 2、本次测试文件数据用逗号分隔&#xff0c;也可根据文件分隔符选择具体格式 3、点击下一步&#xff0c;可自定义表名&#xff0c;以及选择字段数据类型&#xff0c;定…

【PyCharm】常用快捷键

此篇文章内容会不定期更新&#xff0c;仅作为学习过程中的笔记记录 PyCharm的所有快捷键&#xff0c;其实均可以自定义&#xff0c;在位于Settings -> Keymap的目录下&#xff08;如图&#xff09;&#xff0c;可以自行改写为自己熟悉的键位组合。 若更改为PyCharm已存在的键…

【网络安全】PHP配置注入漏洞

未经许可&#xff0c;不得转载。 文章目录 正文 正文 前提&#xff1a;通过探测等方式发现某个 PHP 文件存在 PHPRC 参数&#xff1a; curl "https://xxx.com/about.php?PHPRC/dev/fd/0" --data-binary auto_prepend_file"/etc/passwd"PHPRC 用于指定 P…