kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

news2024/12/22 20:21:50

目录

  • 1- 单播模式,只有一个消费者组
  • 2- 广播模式,多个消费者组
  • 3- Java实践

kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一般情况下partition-0存储a,c,e3个数据,partition-1存储b,d,f另外3个数据。

1- 单播模式,只有一个消费者组

topic只有1个partition,该组内有多个消费者时,此时同一个partition内的消息只能被该组中的一个consumer消费。当消费者数量多于partition数量时,多余的消费者是处于空闲状态的,如图1所示。topic,test只有一个partition,并且只有1个group,G1,该group内有多个consumer,只能被其中一个消费者消费,其他的处于空闲状态。

在这里插入图片描述
该topic有多个partition,该组内有多个消费者,比如test 有3个partition,该组内有2个消费者,那么可能就是C0对应消费p0,p1内的数据,c1对应消费p2的数据;如果有3个消费者,就是一个消费者对应消费一个partition内的数据了。图解分别如图2,图3.这种模式在集群模式下使用是非常普遍的,比如我们可以起3个服务,对应的topic设置3个partiition,这样就可以实现并行消费,大大提高处理消息的效率。

在这里插入图片描述
在这里插入图片描述

2- 广播模式,多个消费者组

如果想实现广播的模式就需要设置多个消费者组,这样当一个消费者组消费完这个消息后,丝毫不影响其他组内的消费者进行消费,这就是广播的概念。

多个消费者组,1个partition;
该topic内的数据被多个消费者组同时消费,当某个消费者组有多个消费者时也只能被一个消费者消费.

在这里插入图片描述

多个消费者组,多个partition

该topic内的数据可被多个消费者组多次消费,在一个消费者组内,每个消费者又可对应该topic内的一个或者多个partition并行消费,如图

在这里插入图片描述

3- Java实践

这里使用Java服务进行实践,模拟2个parition,然后同一个组内有2个消费者的情况:

首先创建一个发送消息的controller方法:

@ApiOperation(value = "向具有kafka-2个partition的topic发送信息")
    @RequestMapping(value = "/testSendMessage2", method = RequestMethod.POST)
    public String testSendMessage(@RequestParam("msg") String msg) {
        KafkaTemplate.send(KafkaTopicEnum.TEST_TWO_PARTITION_MSG.code,msg);
        System.out.println("发送的消息是:"+msg);
        return "2个partition的topic数据!--ok";
    }

然后再创建一个监听类监听该topic,这里的监听类即为消费者。

/**
     * @date 2020-09-24
     * 两个partition的topic,同一个组的两个消费者就可以并行的消费了,需要kafka也是集群才行,单机版并不支持
     * @param consumerRecord
     * @param acknowledgment
     */
    @KafkaListener(topics = "two-partition-msg",groupId ="serverGroup1",containerFactory = "ackContainerFactory")
    public void receiveKafkaTwoParMsg(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){
        InetAddress address = null;
        try {
            address = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        System.out.println("当前的IP地址是:"+address.getHostAddress());
        System.out.println("监听服务A-收到的消息是::");
        System.out.println(consumerRecord.value().toString());
        System.out.println("=================== end =================");
//        ack 提交掉,避免服务重启再次拉取到消息
        acknowledgment.acknowledge();
    }

然后我们给该服务起2个实例,即模拟该组内serverGroup1内的2个消费者,然后我们使用测试方法进行测试,向该topic内发送多个消息,观察2个实例的输出日志:

实例1:

发送的消息是:111
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“111=================== end =================
发送的消息是:222
发送的消息是:333
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“333=================== end =================
发送的消息是:444
发送的消息是:555
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“555=================== end =================
发送的消息是:666
发送的消息是:777
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“777=================== end =================
发送的消息是:888
发送的消息是:999
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“999-----------------------------------

实例2:

当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“222=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“444=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“666=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“888”
发现该组内的一个消费者消费到了111,333,555,777,999 ,另外一个消费者消费到了222,444,666,888,起到了均衡消费的效果。

所以在微服务的集群中,我们可以通过给topic设置多个partition,然后让每一个实例对应消费1个partition的数据,从而实现并行的处理数据,可以显著地提高处理消息的速度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1305802.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux实用操作(超级实用)

Linux实用操作篇-上篇&#xff1a;Linux实用操作-上篇-CSDN博客 Linux实用操作篇-下篇&#xff1a;Linux实用操作篇-下篇-CSDN博客 一、各类小技巧&#xff08;快捷键&#xff09; 1.1 ctrl c 强制停止 Linux某些程序的运行&#xff0c;如果想要强制停止它&#xff0c;可以…

Redis缓存异常问题,常用解决方案总结

前言 Redis缓存异常问题分别是&#xff1a;1.缓存雪崩。2.缓存预热。3.缓存穿透。4.缓存降级。5.缓存击穿&#xff0c;以 及对应Redis缓存异常问题解决方案。 1.缓存雪崩 1.1、什么是缓存雪崩 如果缓存集中在一段时间内失效&#xff0c;发生大量的缓存穿透&#xff0c;所有…

【MySQL】MySQL数据库基础--什么是数据库/基本使用/MySQL架构/存储引擎

文章目录 1.什么是数据库2.主流数据库3.基本使用3.1MySQL安装3.2连接服务器3.3服务器管理3.4服务器&#xff0c;数据库&#xff0c;表关系3.5使用案例3.6数据逻辑存储 4.MySQL架构5.SQL分类6.存储引擎6.1什么是存储引擎6.2查看存储引擎6.3存储引擎对比 1.什么是数据库 对于回答…

MySQL笔记-第18章_MySQL8其它新特性

视频链接&#xff1a;【MySQL数据库入门到大牛&#xff0c;mysql安装到优化&#xff0c;百科全书级&#xff0c;全网天花板】 文章目录 第18章_MySQL8其它新特性1. MySQL8新特性概述1.1 MySQL8.0 新增特性1.2 MySQL8.0移除的旧特性 2. 新特性1&#xff1a;窗口函数2.1 使用窗口…

【LeetCode刷题-树】--144.二叉树的前序遍历

144.二叉树的前序遍历 /*** Definition for a binary tree node.* public class TreeNode {* int val;* TreeNode left;* TreeNode right;* TreeNode() {}* TreeNode(int val) { this.val val; }* TreeNode(int val, TreeNode left, TreeNode right) …

『npm』一条命令快速配置npm淘宝国内镜像

&#x1f4e3;读完这篇文章里你能收获到 一条命令快速切换至淘宝镜像恢复官方镜像 文章目录 一、设置淘宝镜像源二、恢复官方镜像源三、查看当前使用的镜像 一、设置淘宝镜像源 npm config set registry https://registry.npm.taobao.org服务器建议全局设置 sudo npm config…

HarmonyOS使用Tabs组件实现页面切换

Tabs组件的使用 概述 在我们常用的应用中&#xff0c;经常会有视图内容切换的场景&#xff0c;来展示更加丰富的内容。比如下面这个页面&#xff0c;点击底部的页签的选项&#xff0c;可以实现“首页”和“我的” 两个内容视图的切换。 ArkUI开发框架提供了一种页签容器组件…

Pytorch中Group Normalization的具体实现

Group Normalization (GN) 是一种用于深度神经网络中的归一化方法&#xff0c;它将每个样本划分为小组&#xff0c;并在每个小组内进行标准化。与批归一化&#xff08;Batch Normalization&#xff09;不同&#xff0c;Group Normalization 不依赖于小批量数据&#xff0c;因此…

【Hadoop_04】HDFS的API操作与读写流程

1、HDFS的API操作1.1 客户端环境准备1.2 API创建文件夹1.3 API上传1.4 API参数的优先级1.5 API文件夹下载1.6 API文件删除1.7 API文件更名和移动1.8 API文件详情和查看1.9 API文件和文件夹判断 2、HDFS的读写流程&#xff08;面试重点&#xff09;2.1 HDFS写数据流程2.2 网络拓…

[Angular] 笔记1:开发设置 , 双向绑定

1 设置开发环境 1.1 安装 node 下载 node&#xff0c;因为要使用 npm 工具&#xff0c;教程中使用 Angualr 14, 最新版 node 20 用不了&#xff0c;安装 node 16 就可以。 1.2 安装 Angular CLI Angular CLI 是用于创建 Angular 工程的工具集&#xff0c;使用如下命令&…

HTML实现页面

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>工商银行电子汇款单</title> </head> &…

数据挖掘目标(客户价值分析)

import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as snsIn [2]: datapd.read_csv(r../教师文件/air_data.csv)In [3]: data.head()Out[3]: Start_timeEnd_timeFareCityAgeFlight_countAvg_discountFlight_mileage02011/08/182014/0…

网络基础(八):路由器的基本原理及配置

目录 1、路由概述 2、路由器 2.1路由器的工作原理 2.2路由器的转发原理 3、路由表 3.1路由表的概述 3.2路由表的形成 4、静态路由配置过程&#xff08;使用eNSP软件配置&#xff09; 4.1两个静态路由器配置过程 4.2三个静态路由器配置过程 5、默认路由配置过程 5.…

得帆云为玉柴打造CRM售后服务管理系统,实现服务全过程管理|基于得帆云低代码的CRM案例系列

广西玉柴机器股份有限公司 广西玉柴机器股份有限公司始建于1992年&#xff0c;是国内行业首家赴境外上市的中外合资企业&#xff0c;产品远销亚欧美非等180多个国家和地区。公司总部设在广西玉林市&#xff0c;下辖11家子公司&#xff0c;生产基地布局广西、江苏、安徽、山东等…

收发货拥抱新技术,纵行科技推ZETag方案实现更精准的自动识别

对于制造及物流企业来说&#xff0c;收发货是影响其运营效率和成本控制的关键因素。然而传统的收发货管理高度依赖人工核对&#xff0c;比如目前国内汽车工厂零件到货验收主要采用人工方式&#xff0c;验收人员需根据送货看板进行数量清点&#xff0c;确认无误后用手持终端扫描…

多维时序 | Matlab实现GA-LSTM-Attention遗传算法优化长短期记忆神经网络融合注意力机制多变量时间序列预测

多维时序 | MATLAB实现BWO-CNN-BiGRU-Multihead-Attention多头注意力机制多变量时间序列预测 目录 多维时序 | MATLAB实现BWO-CNN-BiGRU-Multihead-Attention多头注意力机制多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 多维时序 | Matlab实…

K8S(一)—安装部署

目录 安装部署前提以下的操作指导(在master)之前都是三台机器都需要执行 安装docker服务下面的操作仅在k8smaster执行 安装部署 前提 以下的操作指导(在master)之前都是三台机器都需要执行 关闭防火墙 [rootk8smaster ~]# vim /etc/selinux/config [rootk8smaster ~]# swa…

Axure电商产品移动端交互原型,移动端高保真Axure原型图(RP源文件手机app界面UI设计模板)

本作品是一套 Axure8 高保真移动端电商APP产品原型模板&#xff0c;包含了用户中心、会员成长、优惠券、积分、互动社区、运营推广、内容推荐、商品展示、订单流程、订单管理、售后及服务等完整的电商体系功能架构和业务流程。 本模板由一百三十多个界面上千个交互元件及事件组…

Pytorch-Transformer轴承故障一维信号分类(三)

目录 前言 1 数据集制作与加载 1.1 导入数据 第一步&#xff0c;导入十分类数据 第二步&#xff0c;读取MAT文件驱动端数据 第三步&#xff0c;制作数据集 第四步&#xff0c;制作训练集和标签 1.2 数据加载&#xff0c;训练数据、测试数据分组&#xff0c;数据分batch…

科技提升安全,基于YOLOv6开发构建商超扶梯场景下行人安全行为姿态检测识别系统

在商超等人流量较为密集的场景下经常会报道出现一些行人在扶梯上摔倒、受伤等问题&#xff0c;随着AI技术的快速发展与不断普及&#xff0c;越来越多的商超、地铁等场景开始加装专用的安全检测预警系统&#xff0c;核心工作原理即使AI模型与摄像头图像视频流的实时计算&#xf…