Nodejs 第七十九章(Kafka进阶)

news2025/1/23 7:22:03

在这里插入图片描述

kafka前置知识在上一章讲过了 不再复述

kafka进阶

1. server.properties配置文件

server.properties是Kafka服务器的配置文件,它用于配置Kafka服务的各个方面,包括网络设置、日志存储、消息保留策略、安全认证

#broker的全局唯一编号,不能重复
broker.id=0
#端口号
port=9092
#处理网络请求的线程数量
#接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3
#用来处理磁盘IO的线程数量
#消息从内存中写入磁盘是时候使用的线程数量。
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=./logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#每个topic的分区数
offsets.topic.replication.factor=1
#每个topic的副本数
transaction.state.log.replication.factor=1
#每个topic的最小副本数
transaction.state.log.min.isr=1
#日志保留时间,单位小时 168就是7天
log.retention.hours=168
#定期检查日志是否过期的间隔,单位毫秒
log.retention.check.interval.ms=300000
#日志清理器是否启用
log.cleaner.enable=true
#zookeeper地址
zookeeper.connect=localhost:2181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=18000
#zookeeper会话超时时间
group.initial.rebalance.delay.ms=0
2.producer.properties配置文件

producer.properties是Kafka生产者客户端的配置文件,用于配置Kafka生产者的行为和属性。当你使用Kafka生产者API发送消息到Kafka集群时,可以使用该配置文件哟

#配置生产者的broker列表 可以配置多个,以逗号隔开 也就是做集群的
#来获取每一个topic的分片数等元数据信息。
bootstrap.servers=localhost:9092
# 配置数据压缩方式 有none,gzip,snappy,lz4,zstd
compression.type=none
#客户端等待请求的响应的最长时间 超时时间
#request.timeout.ms=
#定期发送消息的时间间隔,一般配合batch.size使用,例如设置了50ms,那么每50ms就会发送一次消息合集
#linger.ms=
#每次发送给Kafka服务器请求消息的最大大小
#max.request.size=
#批量发送消息比如说设置了值16KB,那么消息内容凑够16KB就会被发送出去,否则就不会发送,这样可以避免单条消息太大导致的发送失败
#batch.size=
#约束producer缓存池的大小,默认是32MB,可以根据实际情况调整
#buffer.memory=
3.consumer.properties配置文件

用于配置Kafka消费者的属性。它包含了一系列用于定义消费者行为的参数和数值

#定义Kafka的Broker列表 可以配置多个,以逗号隔开 也就是做集群的
bootstrap.servers=localhost:9092
#定义消费者组的ID
group.id=test-consumer-group
#用于指定当消费者加入一个消费者组但没有可用的消费位移时的行为
#有三种选项 earliest/latest/none
#earliest:表示消费者将从最早的可用消费位移开始消费。消费者将从主题的最早消息开始消费,即使这些消息已经过期。
#latest:表示消费者将从最新的可用消费位移开始消费。消费者将从主题的最新消息开始消费,即跳过已经过期的消息。
#none:表示如果没有可用的消费位移,消费者将抛出异常。这样可以确保消费者只消费已经提交的消费位移。
#auto.offset.reset=
#心跳间隔用于保持消费者活跃状态
#session.timeout.ms
#指定消费者一次性获取最大的消息数量,如果为0表示不限制
#fetch.max.bytes=1048576
#指定消费者一次性获取的最大等待时间,如果为0表示不限制
#fetch.max.wait.ms=500

消息模式

kafka同样支持发布订阅的方式发送消息 我们来编写一下案例

官方文档 https://kafka.js.org/docs/getting-started

1. 压缩

引入CompressionTypes 选择压缩模式 GIZP LZ4 zSTD

import { Kafka,CompressionTypes } from 'kafkajs'

await producer.send({
    topic: 'xiaoman',
    compression: CompressionTypes.GZIP,
    messages: [
        {
            value: '测试数据1',
            headers: {
                'name': Buffer.from('小满')
            }
        },
        { value: Buffer.from('测试数据2') },
    ],
})
2. 标头

允许使用标头传递对象元数据,把需要传递的数据放在headers即可数据将一起被发送过去

await producer.send({
    topic: 'xiaoman',
    messages: [
        {
            value: '测试数据1',
            headers: {
                'name': Buffer.from('小满')
            }
        },
        { value: Buffer.from('测试数据2') },
    ],
})

消费者获取headers 元数据

await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            topic,
            partition,
            value: message.value.toString(),
            headers: message.headers?.name?.toString(),
        })
    },
})
3. 多主题派发

send 发方法换成 sendBatch 增加 topicMessages 是个数组

await producer.sendBatch({
    topicMessages: [
        {
            topic: 'xiaoman',
            messages: [
                { value: Buffer.from('测试数据1') },
            ],
        },
        {
            topic: 'xiaoman2',
            messages: [
                { value: Buffer.from('测试数据2') },
            ],
        },
    ],
})

消费多个消息时候的时候可以根据业务自由选择模式

  1. 逐条处理

  2. 批量处理(批量处理可以减少网络开销)

await consumer.subscribe({ topic: 'xiaoman', fromBeginning: true })
await consumer.subscribe({ topic: 'xiaoman2', fromBeginning: true })
//逐条处理
await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
        console.log({
            topic,
            partition,
            value: message.value.toString(),
            headers: message.headers?.name?.toString(),
        })
    },
})
//批量处理
await consumer.run({
    eachBatch: async ({ batch }) => {
        batch.messages.forEach( (message) => {
            console.log('Received message', message.value.toString())
        })
    },
})

案例演示

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1847468.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

市场价格到底是因为什么而变动?

在外汇及广泛的金融市场中,影响金融工具价格起伏的因素纷繁复杂。然而,万变不离其宗,无论是哪个市场,价格的最终决定力量始终是供需之间的平衡法则。 对于外汇、大宗商品等金融市场而言,表面上似乎受宏观经济数据、央…

极速查询:StarRocks 存算分离 Compaction 原理 调优指南

作者:丁凯,StarRocks TSC member/镜舟科技云原生技术负责人 StarRocks 在数据摄入过程中,每次操作都会创建一个新的数据版本。在查询时,为了得到准确的结果,必须将所有版本合并。然而,随着历史数据版本的累…

Midjourney v6 快速入门指南

Midjourney V6快速入门教程来了,这是Midjourney的AI图像生成器的又一次令人印象深刻的升级。最显著的是,V6在逼真渲染和图像中的文字功能方面取得了重大进展。 在这篇文章中,我们将探讨如何开始使用Midjourney V6,并提供一些示例…

【单片机毕业设计选题24018】-基于STM32和阿里云的农业大棚系统

系统功能: 系统分为手动和自动模式,上电默认为自动模式,自动模式下系统根据采集到的传感器值 自动控制,温度过低后自动开启加热,湿度过高后自动开启通风,光照过低后自动开启补 光,水位过低后自动开启水泵…

【LeetCode最详尽解答】42-接雨水 Trapping-Rain-Water

欢迎收藏Star我的Machine Learning Blog:https://github.com/purepisces/Wenqing-Machine_Learning_Blog。如果收藏star, 有问题可以随时与我交流, 谢谢大家! 链接: 42-接雨水 直觉 通过可视化图形来解决这个问题会更容易理解和解决。 给定输入: height [0,1,…

不破不立,B站终于跳出“舒适圈”?

哔哩哔哩已经很久没有这么振奋人心的时刻了。 6月19日,哔哩哔哩当日股价涨超18%,最高达到145.6元每股,时隔11个月,再次回归高位。从时间线上看,这次的股价大涨明显与哔哩哔哩刚(以下简称“B站”&#xff0…

分享HTML显示2D/3D时间

效果截图 实现代码 <!DOCTYPE html> <head> <title>three.jscannon.js Web 3D</title><meta charset"utf-8"><meta name"viewport" content"widthdevice-width,initial-scale1,maximum-scale1"><meta n…

vulnhub靶场之FunBox-11

一.环境搭建 1.靶场描述 As always, its a very easy box for beginners. Add to your /etc/hosts: funbox11 This works better with VirtualBox rather than VMware. 2.靶场下载 https://www.vulnhub.com/entry/funbox-scriptkiddie,725/ 3.靶场启动 二.信息收集 1.寻找靶…

2. zabbix-agent获取监控数据的三种方式

zabbix-agent获取监控数据的三种方式 一、监控其他主机1、在被监控机安装部署zabbix-agent2、在web界面添加监控主机3、系统级别的监控数据参考 二、zabbix-agent监控的三种方式1、利用自带key监控数据1.1 示例1: 监控node01网卡的流入流量1.2 常用的key 2、自定义key的监控2.1…

Python基础用法 之 运算符

1.算数运算符 符号作用说明举例加与“”相同 - 减与“-”相同*乘 与“ ”相同 9*218/除 与“ ”相同 9/24.5 、6/32.0//求商&#xff08;整数部分&#xff09; 两个数据做除法的 商 9//24%取余&#xff08;余数部分&#xff09; 是两个数据做除法的 余数 9%21**幂、次方2**…

IDEA 配置方法模板无法获取到参数值和返回值(methodParameters()、methodReturnType()获取不到值)

问题现象&#xff1a; 我在 review 同事代码时候&#xff0c;发现方法上有注释&#xff0c;但是注释上又没有方法参数和返回值&#xff0c;这不是IDEA 配置了方法模板就可以自动生成的嘛&#xff0c;我出于好奇去问了下该同事是怎么回事&#xff0c;该同事有点不好意思的说我配…

客户集中度高,毛利率下滑,江苏永成的IPO之路能走通吗?

撰稿|行星 来源|贝多财经 近年来&#xff0c;汽车市场蓬勃向上&#xff0c;助推上游配套产业链进入增长热潮。 行业利好前景下&#xff0c;不少汽车上游供应商开始向资本市场进发&#xff0c;希望借助上市拓宽融资渠道&#xff0c;加速业务拓展和技术创新&#xff0c;在产业…

【数据库备份完整版】物理备份、逻辑备份,mysqldump、mysqlbinlog的备份方法

【数据库备份完整版】物理备份、逻辑备份&#xff0c;mysqldump、mysqlbinlog的备份方法 一、物理备份二、逻辑备份1.mysqldump和binlog备份的方式&#xff1a;2.mysqldump完整备份与恢复数据2.1 mysqldump概念2.2 mysqldump备份2.3 数据恢复2.4 **使用 Cron 自动执行备份**2.5…

Elk安装及使用

es安装及使用 单机版安装 集群安装 132 node-01 133 node-02 135 node-03 日志用户权限有问题 看日志 解决方案&#xff1a; 出现错误后&#xff0c;再次重启前&#xff0c;需要删除三个节点/data/下的内容 9300-http 9300-tcp logstasha安装及使用 Ssh错误 Yum安装默认路…

海量数据处理——bitMap/BloomFilter、hash + 统计 + 堆/归并/快排

前言&#xff1a;海量数据处理是面试中一道常考的问题&#xff0c; 生活中也容易遇到这种问题。 通常就是有一个大文件&#xff0c; 让我们对这个文件进行一系列操作——找出现次数最多的数据、求交集、是否重复出现等等。 因为文件的内容太多&#xff0c; 我们的内存通常是放不…

Java基础 - 练习(五)根据今天日期获取一周内的日期(基姆拉尔森公式)

基姆拉尔森计算公式用于计算一周内的日期。比如给你年月日&#xff0c;从而计算今天是星期几。 基姆拉尔森公式 Week (d2*m3*(m1)/5yy/4-y/100y/4001) mod 7&#xff0c; 3<m<14Week的取值范围是0 ~ 6&#xff0c;其中0代表星期日&#xff0c;1 ~ 6分别代表星期一到星期…

【考研408计算机组成原理】数值表示和运算之快速数值转换

苏泽 “弃工从研”的路上很孤独&#xff0c;于是我记下了些许笔记相伴&#xff0c;希望能够帮助到大家 另外&#xff0c;利用了工作之余的一点点时间&#xff0c;整理了一套考研408的知识图谱&#xff0c; 我根据这一套知识图谱打造了这样一个408知识图谱问答系统 里面的每一…

Java学习笔记(二)变量原理、常用编码、类型转换

Hi i,m JinXiang ⭐ 前言 ⭐ 本篇文章主要介绍Java变量原理、常用编码、类型转换详细使用以及部分理论知识 🍉欢迎点赞 👍 收藏 ⭐留言评论 📝私信必回哟😁 🍉博主收将持续更新学习记录获,友友们有任何问题可以在评论区留言 1、变量原理 1.1、变量的介绍 变量是程…

在寻找电子名片在线制作免费生成?5个软件帮助你快速制作电子名片

在寻找电子名片在线制作免费生成&#xff1f;5个软件帮助你快速制作电子名片 当你需要快速制作电子名片时&#xff0c;有几款免费在线工具可以帮助你实现这个目标。这些工具提供了丰富的设计模板和元素&#xff0c;让你可以轻松地创建个性化、专业水平的电子名片。 1.一键logo…

逻辑回归(Logistic Regression)及其在机器学习中的应用

&#x1f680;时空传送门 &#x1f50d;逻辑回归原理&#x1f4d5;Sigmoid函数&#x1f388;逻辑回归模型 &#x1f4d5;损失函数与优化&#x1f388;损失函数&#x1f680;优化算法 &#x1f50d;逻辑回归的应用场景&#x1f340;使用逻辑回归预测客户流失使用scikit-learn库实…