kafka复习:(22)一个分区只能被消费者组中的一个消费者消费吗?

news2024/11/24 23:07:10

默认情况下,一个分区只能被消费者组中的一个消费者消费。但可以自定义PartitionAssignor来打破这个限制。
一、自定义PartitionAssignor.

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class BroadcastAssignor extends AbstractPartitionAssignor {
    @Override
    public String name() {
        return "broadcast";
    }

    private Map<String, List<String>> consumersPerTopic(Map<String, Subscription> consumerMetadata) {
        Map<String, List<String>> res = new HashMap<>();
        for (Map.Entry<String, Subscription> subscriptionEntry : consumerMetadata.entrySet()) {
            String consumerId = subscriptionEntry.getKey();
            for (String topic : subscriptionEntry.getValue().topics())
                put(res, topic, consumerId);
        }
        return res;
    }

    @Override
    public Map<String, List<TopicPartition>> assign(
            Map<String, Integer> partitionsPerTopic,
            Map<String, Subscription> subscriptions) {
        Map<String, List<String>> consumersPerTopic =
                consumersPerTopic(subscriptions);
        Map<String, List<TopicPartition>> assignment = new HashMap<>();
        subscriptions.keySet().forEach(memberId ->
                assignment.put(memberId, new ArrayList<>()));
        consumersPerTopic.entrySet().forEach(topicEntry->{
            String topic = topicEntry.getKey();
            List<String> members = topicEntry.getValue();

            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
            if (numPartitionsForTopic == null || members.isEmpty())
                return;
            List<TopicPartition> partitions = AbstractPartitionAssignor
                    .partitions(topic, numPartitionsForTopic);
            if (!partitions.isEmpty()) {
                members.forEach(memberId ->
                        assignment.get(memberId).addAll(partitions));
            }
        });
        return assignment;
    }
}

二、定义两个消费者,给其配置上述PartitionAssignor.

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaTest19 {

    private static Properties getProperties(){
        Properties properties=new Properties();

        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup2023");
        properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                BroadcastAssignor.class.getName());
        return properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());
        String topic="study2023";
        myConsumer.subscribe(Arrays.asList(topic));

        while(true){
            ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));
            for(ConsumerRecord record: consumerRecords){
                System.out.println(record.value());
                System.out.println("record offset is: "+record.offset());
            }

        }



    }
}

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class KafkaTest20 {

    private static Properties getProperties(){
        Properties properties=new Properties();

        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"testGroup2023");
        properties.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                BroadcastAssignor.class.getName());
        return properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String,String> myConsumer=new KafkaConsumer<String, String>(getProperties());
        String topic="study2023";
        myConsumer.subscribe(Arrays.asList(topic));

        while(true){
            ConsumerRecords<String,String> consumerRecords=myConsumer.poll(Duration.ofMillis(5000));
            for(ConsumerRecord record: consumerRecords){
                System.out.println(record.value());
                System.out.println("record offset is: "+record.offset());
            }

        }



    }
}

在kafka创建只有一个分区的topic : study2023

创建一个生产者往study2023这个 topic发送消息:

package com.cisdi.dsp.modules.metaAnalysis.rest.kafka2023;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Date;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class KafkaTest01 {
    public static void main(String[] args) {
        Properties properties= new Properties();

        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"xx.xx.xx.xx:9092");
        KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(properties);
        ProducerRecord<String,String> producerRecord=new ProducerRecord<>("study2023",0,"fff","hello sister,now is: "+ new Date());
        Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
        long offset = 0;
        try {
            offset = future.get().offset();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println(offset);

        kafkaProducer.close();
    }
}

分别运行生产者和消费者,可以看到相同消费者组里两个消费者可以消费study2023这个topic的同一个分区的数据
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/941678.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【环境配置】Android-Studio-OpenCV-JNI以及常见错误 ( 持续更新 )

最近一个项目要编译深度学习的库&#xff0c;需要用到 opencv 和 JNI&#xff0c;本文档用于记录环境配置中遇到的常见错误以及解决方案 Invalid Gradle JDK configuration found failed Invalid Gradle JDK configuration foundInvalid Gradle JDK configuration found. Open…

【Acwing905】区间选点贪心策略超清晰证明!!包含题解

题目描述 贪心策略 首先按右端点对区间进行排序 然后从左到右遍历每一个区间&#xff0c;如果这个区间还没有选中的点&#xff0c;那么就选择这个区间的右端点&#xff0c;否则就pass掉这个区间 举一个栗子 贪心策略证明 假设最少的选点数为&#xff1a;ans 利用上述贪心策…

【JUC基础】JUC入门基础

目录 什么是JUC线程和进程锁传统的 synchronizedLock 锁Synchronized 与 Lock 的区别 生产者和消费者问题Synchronized 版Lock版Condition 的优势&#xff1a;精准通知和唤醒线程 8 锁现象问题1&#xff1a;两个同步方法&#xff0c;先执行发短信还是打电话&#xff1f;问题2&a…

代码随想录刷题笔记 (python版本) 持续更新.....

代码随想录刷题笔记总结: https://www.programmercarl.com/ 个人学习笔记 如有错误欢迎指正交流1. 数组 1.1 理论基础 详细介绍:https://www.programmercarl.com/%E6%95%B0%E7%BB%84%E7%90%86%E8%AE%BA%E5%9F%BA%E7%A1%80.html 数组下标都是从0开始的。数组内存空间的地址是…

什么是亚马逊类目核心关键词?

亚马逊类目核心关键词是指在亚马逊平台上&#xff0c;与特定产品类别相关且具有较高搜索量和竞争度的关键词。这些关键词在产品标题、描述、属性和搜索关键字等位置使用&#xff0c;有助于提高产品的曝光度和搜索排名&#xff0c;并吸引潜在买家的注意。选择适当的核心关键词可…

@Configuration 注解的 Full 模式和 Lite 模式!

Configuration 注解相信各位小伙伴经常会用到&#xff0c;但是大家知道吗&#xff0c;这个注解有两种不同的模式&#xff0c;一种叫做 Full 模式&#xff0c;另外一种则叫做 Lite 模式。 准确来说&#xff0c;Full 模式和 Lite 模式其实 Spring 容器在处理 Bean 时的两种不同行…

Nacos基础(2)——nacos的服务器和命名空间 springBoot整合nacos 多个nacos配置的情况

目录 引出nacos服务器和命名空间Nacos服务器命名空间 springBoot整合nacosspringcloud Alibaba 版本与springcloud对应关系引包配置maincontroller 报错以及解决【报错】错误&#xff1a;缺少服务名称报错&#xff1a;9848端口未开放 启动测试引入多个nacos配置多个配置的情况没…

基于java swing和mysql实现的学生选课成绩信息管理系统(源码+数据库+ER图文档+运行指导视频)

一、项目简介 本项目是一套基于java swing和mysql实现的学生选课成绩信息管理系统&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、项目文档、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。…

概念解析 | 无人机集群形状与轨迹建模: 集群舞蹈的艺术

注1:本文系“概念解析”系列之一,致力于简洁清晰地解释、辨析复杂而专业的概念。本次辨析的概念是:无人机集群形状和轨迹建模 无人机集群形状与轨迹建模: 集群舞蹈的艺术 无人机集群的形状和轨迹建模可能听起来像是一部科幻小说的标题,但它实际上是现实中的一个重要研究领…

国内精选五大现货黄金价格实时查询软件最新名单(综合榜单)

随着金融市场的不断发展和投资者的日益增多&#xff0c;现货黄金价格实时查询软件成为了人们关注的焦点。投资者需要一款功能强大、操作简便、数据准确的软件来帮助他们获取实时的黄金价格信息&#xff0c;以便做出更加明智的投资决策。 本文将介绍国内精选五大现货黄金价格实…

iOS - 订阅型内购指南

一、App Store Connect 帮助 二、测试 三、订阅状态 四、问题思考 1、订阅归属&#xff1a; 以往的消耗性内购, 通常会生成订单ID对应到苹果的内购ID及用户id&#xff0c;对于我们来说&#xff0c;内购仅仅只是个支付工具&#xff0c;而订阅型内购有一整套销售模型订阅内购…

zabbix语言无法选择中文--zabbix安装配置中文

You are not able to choose some of the languages, because locales for them are not installed on the web server. 1、安装wget yum -y install wget 2、下载中文中文字体并配置 wget https://github.com/echohn/zabbix-zh_CN/archive/master.zip yum -y install unzip un…

4年经验来面试20K的测试岗,一问三不知,我还真不如去招应届生...

公司前段缺人&#xff0c;也面了不少测试&#xff0c;结果竟然没有一个合适的。一开始瞄准的就是中级的水准&#xff0c;也没指望来大牛&#xff0c;提供的薪资在10-20k&#xff0c;面试的人很多&#xff0c;但平均水平很让人失望。看简历很多都是4年工作经验&#xff0c;但面试…

恒运资本:A股三大指数是什么?A股三大指数怎么看?

炒股并不是盲目跟风&#xff0c;投资者自身要了解股票基本常识。例如指数反映的是股票商场上股票的变动状况&#xff0c;能够给我们的投资决策带来一定的依据。那么&#xff0c;A股三大指数是什么&#xff1f;A股三大指数怎么看&#xff1f;恒运资本为我们准备了相关内容&#…

如何给图片加水印?

如何给图片加水印&#xff1f;在我们的日常生活中&#xff0c;许多热爱摄影的朋友都会选择给自己的照片添加水印。这是因为我们深知&#xff0c;一张出色的照片背后需要付出大量的努力和心血&#xff0c;而通过添加水印可以有效地保护自己照片的版权&#xff0c;这样即使将图片…

MySQL概述,架构原理

一.MySQL简介 MySQL是一个关系型数据库管理系统&#xff0c;由瑞典的MySQL AB公司开发&#xff0c;后被oracle公司收购&#xff0c;MySQL是当下最流行的关系型数据库管理系统之一&#xff0c;在WEB应用方面&#xff0c;MySQL是最好的RDBMS&#xff08;Relational Database Man…

得帆信息西区总经理——何龙:低代码初识

企业数字化建设、数字化转型是近年来企业经营管理必然面对的课题&#xff1b;相较于面对传统经营管理的驾车就熟&#xff0c;这无疑给企业管理者提出了新的课题和新的挑战。在当前新技术新生产力不断涌现、新市场特点不断变化的时代&#xff0c;企业在要练好内功、加强经营管理…

龙讯旷腾Q-Studio新增力场优化功能

Q-Studio新功能 Q-Studio&#xff08;在线建模功能&#xff09;依托Mcloud平台免费向用户开放使用&#xff0c;基于jsmol的建模功能无需安装任何软件/插件&#xff0c;通过web端即可在线完成格式转换和可视化建模工作&#xff0c;并可对模型进行个性化的二次编辑&#xff0c;快…

【80天学习完《深入理解计算机系统》】第九天 3.2 数据传送指令【mov】【栈和堆 就是内存】【leaq】【一元操作】【二元操作】

专注 效率 记忆 预习 笔记 复习 做题 欢迎观看我的博客&#xff0c;如有问题交流&#xff0c;欢迎评论区留言&#xff0c;一定尽快回复&#xff01;&#xff08;大家可以去看我的专栏&#xff0c;是所有文章的目录&#xff09;   文章字体风格&#xff1a; 红色文字表示&#…

开源利器推荐:美团动态线程池框架的接入分享及效果展示

前言 蛮早前有些过关于线程池的使用及参数的一些参考配置&#xff0c;有兴趣的可以翻看以前的博文&#xff0c;但终究无法解决线程池的动态监控和实时修改。 以前读过美团早期发布的动态线程池框架的思路相关文章&#xff0c;但想要独自实现不是一件容易的事。 去年&#xff0c…