Kafka-多线程消费及分区设置

news2025/1/14 4:58:01

目录

  • 一、Kafka是什么?
    • 消息系统:Publish/subscribe(发布/订阅者)模式
    • 相关术语
  • 二、初步使用
    • 1.yml文件配置
    • 2.生产者类
    • 3.消费者类
    • 4.发送消息
  • 三、减少分区数量
    • 1.停止业务服务进程
    • 2.停止kafka服务进程
    • 3.重新启动kafka服务
    • 4.重新启动业务服务
  • 参考文章

一、Kafka是什么?

Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统。可满足每秒百万级的消息生产和消费;有一套完善的消息存储机制,确保数据高效安全且持久化;Kafka作为一个集群运行在一个或多个服务器上,可以跨多个机房,当某台故障时,生产者和消费者转而使用其他的Kafka。

消息系统:Publish/subscribe(发布/订阅者)模式

1.消息发布者发布消息到主题中,有多个订阅者消费该消息。
2.当发布者发布消息时,不管是否有订阅者都不会报错。
3.一定要先有消息发布者,后有消息订阅者。

相关术语

1.Broker:Kafka服务器,负责创建topic、消息存储和转发。
2.Topic:消息类别(主题),用于区分消息。
3.Partition:分区,真正的存储数据单元。每个Topic包含一个或多个分区,用于保存消息和维护偏移量。(一般为kafka节点数CPU的总核心数量)
4.offset:分区消息此时被消费的位置。分区中消息的唯一id。
5.Producer:消息生产者。
6.Consumer:消息消费者。
7.Consumer Group:消费者组。由消费不同的分区的多个消费者实例组成,共用同一个Group-id。
8.Message:消息,由offset(分区上的消息id)、MessageSize(消息内容data大小)、data(消息具体内容)组成。

二、初步使用

1.yml文件配置

spring:
	kafka:
	    bootstrap-servers: http://127.0.0.1:9002
	    properties:
	      security:
	        protocol: SASL_PLAINTEXT
	      sasl:
	        mechanism: PLAIN
	        jaas:
	          config: org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="123456";
	    producer:
	      # 发生错误后,消息重发的次数。
	      retries: 0
	      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
	      batch-size: 16384
	      # 设置生产者内存缓冲区的大小。
	      buffer-memory: 33554432
	      # 键的序列化方式
	      key-serializer: org.apache.kafka.common.serialization.StringSerializer
	      # 值的序列化方式
	      value-serializer: org.apache.kafka.common.serialization.StringSerializer
	      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
	      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
	      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
	      acks: 1
	    consumer:
	      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
	      auto-commit-interval: 1S
	      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
	      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
	      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
	      auto-offset-reset: earliest
	      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
	      enable-auto-commit: false
	      # 键的反序列化方式
	      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
	      # 值的反序列化方式
	      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
	      # 消费者超时时间 6properties:
	        max:
	          poll:
	            interval:
	              ms: 6000
	    listener:
	      # 在侦听器容器中运行的线程数。消费者组中的实例数量。 【本次重点】
	      concurrency: 5
	      #listner负责ack,每调用一次,就立即commit
	      ack-mode: manual_immediate
	      missing-topics-fatal: false

2.生产者类

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
@Slf4j
public class KafkaProducer {

    // 消费者组
    public static final String TOPIC_GROUP2 = "topic.group2";


    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic,Object obj) {
        String obj2String = JSONObject.toJSONString(obj);
        log.info("准备发送消息为:{}", obj2String);
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, obj);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                //发送失败的处理
                log.info(topic + " - 生产者 发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                log.info(topic + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }
}

3.消费者类

使用注解的方式来创建主题和分区。

package com.lezhi.szxy.oa.core.kafka;

import com.alibaba.fastjson.JSON;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ServiceException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.poi.ss.formula.functions.T;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.RetryingBatchErrorHandler;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.util.backoff.FixedBackOff;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class KafkaConsumer {

    @Resource
    private addService addService;

    @Resource
    private RedisLockUtil redisLockUtil;
    @Resource
    RedissonClient redissonClient;

    @Resource
    RedisTemplate<String,String> redisTemplate;

    private static final String ADD_LOCK_PREFIX = "ADD_LOCK_PREFIX";
  
    ObjectMapper objectMapper = new ObjectMapper();


    /**
     * 初始化主题分区
     * @return
     */
    @Bean
    public NewTopic batchTopic() {
        log.info("初始化主题分区batchTopic : add_topic,分区:5,副本数:1 >>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
        return new NewTopic("add_topic", 5, (short) 1);
    }

    /**
     * 添加消息
     * @param ack
     */
    @KafkaListener(topics = "add_topic"C,groupId = KafkaProducer.TOPIC_GROUP2)
    public void handleAddMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("add_topic-队列消费端 topic:{}, 收到消息>>>>>>>>>>>>>>>>>", topic);
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            try {
                ParamImport param =  objectMapper.readValue(String.valueOf(msg) , ParamImport .class);
                String fullKey = redisLockUtil.getFullKey(ADD_LOCK_PREFIX , String.valueOf(msg));
                if(redisLockUtil.getLock(fullKey , 10000)){
					// 业务代码...
                    
                    log.info("add_topic 消费了: Topic:" + topic + ",Message:" + String.valueOf(msg));
                }else {
                    log.info("add_topic 已经被消费: Topic:" + topic + ",Message:" + String.valueOf(msg));
                }
                ack.acknowledge();
            } catch (Exception e) {
                e.printStackTrace();
                log.error("解析 <"+OaConstant.SALARY_SEND_MESSAGE_KAFKA_TOPIC+"> 数据异常");
            }
        }
    }
}

配置消费端主题分区启动后,查看kafka,add_topic主题生成五个分区实例
kafka配置
注意:一个消费线程,可以对应若干分区。但是为了保证数据的一致性,同一个分区同时只能备一个消费者实例消费,所以超过分区数量的消费者实例个数是多余的,会被闲置。

将消费者实例(消费线程)比为一个人,分区消息相当于一个办公位。办公位数>人数时,哪个办公位有消息待消费,人就到哪一个工位处理消息。当办公位数<人数时,后面的人数需要排队等待前面的人离开,才可以进入办公位消费。
当人再多时,只有一个办公位,人也得排队办公,属于同步消费;当办公位有多个时,才能实现多人同时操作。

单机kafka分区最好不超过5。默认使用轮询策略。

4.发送消息

public void addTopicMsg(ParamImport param) throws ServiceException {
        String json;
        try {
            json = objectMapper.writeValueAsString(param);
        } catch (JsonProcessingException e) {
            log.error("addTopicMsg-发送消息,kafka消息转换失败:{}", e);
            throw new ServiceException("发送失败");
        }
        log.info("addTopicMsg-发送消息,发送kafka请求>>>>>>>>>>>>>>>>>>>>>>>");
        kafkaTemplate.send("add_topic", json);
    }

三、减少分区数量

上文中,我们使用了new NewTopic()的方式创建分区,分区数量只能动态增加不能减少。所以我们需要根据以下步骤来重新生成分区,达成减少分区的目的。

1.停止业务服务进程

停止业务服务进程,使得不会重复生成分区。修改代码内配置的new NewTopic()配置分区数。

2.停止kafka服务进程

停止kafka服务进程,清空分区、主题等数据。

3.重新启动kafka服务

4.重新启动业务服务

此时就会根据修改后的分区设置重新生成分区。

参考文章

【SpringBoot】在Springboot中怎么设置Kafka自动创建Topic
SpringBoot+Kafka之如何优雅的创建topic
想弄明白Kafka到底是什么吗?看完这篇你就知道了!(概念、数据存储、生产者、消费者)
图解Kafka,看本篇就足够啦!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1395288.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PPT大神带你飞!!!

1、OneKeyTools 官网&#xff1a;http://oktools.xyz/ OneKeyTools是一款免费开源的PowerPoint第三方平面设计辅助插件&#xff0c;功能涵盖了形状、调色、三维、图片处理、辅助功能等等方面。 插件功能&#xff1a; 插件从面世逐步受到广大PPT设计师和爱好者的追捧&#x…

感谢大家的支持,继续耕耘

有什么大家感兴趣的领域&#xff0c;欢迎留言

SSH数据流量监控

简介 检查网络连接的数据传输情况有以下一些实际意义&#xff1a; 安全监控&#xff1a;检查数据传输情况可以帮助你识别异常活动或潜在的安全威胁。如果发现大量不寻常的数据传输活动&#xff0c;可能表示有未经授权的访问或恶意行为。通过监控数据传输&#xff0c;可以及时发…

软件测评中心▏正式验收测试和非正式验收的优缺点简析

软件正式验收测试需要非常严格的管理&#xff0c;是对系统测试的延续&#xff0c;这种验收测试的测试用例应是系统测试的子集。非正式验收测试不像正式验收测试那么严格&#xff0c;不需要执行指定的测试用例&#xff0c;可以由测试人员自主决定&#xff0c;但制定测试用例时必…

node.js(express.js)+mysql实现登录功能

文章目录 前言实现步骤 实现步骤一、检测登录表单的数据是否合法&#xff08;3&#xff09;新建schema/user.js&#xff08;4&#xff09;在routes/use.js中引入schema/user.js中的方法reg_login_schema&#xff0c;代码如下&#xff1a; 二、根据用户名查询用户的数据三、判断…

浅析Redis①:命令处理核心源码分析(上)

写在前面 Redis作为我们日常工作中最常使用的缓存数据库&#xff0c;其重要性不言而喻&#xff0c;作为普调开发者&#xff0c;我们在日常开发中使用Redis&#xff0c;主要聚焦于Redis的基层数据结构的命令使用&#xff0c;很少会有人对Redis的内部实现机制进行了解&#xff0c…

C++:类与结构体的对比

2024年1月18日 内容来自The Cherno:C系列 -------------------------------------------------------------------------------------------------------------------------------- C中关于class与struct&#xff0c;几乎没有区别&#xff0c;只有一个关于“可见度”的区别…

Go 爬虫之 colly 从入门到不放弃指南

文章目录 概要介绍如何学习官方文档如何安装快速开始如何配置调试分布式代理层面执行层面存储层面存储多收集器配置优化持久化存储启用异步加快任务执行禁止或限制 KeepAlive 连接扩展总结如果想用 GO 实现爬虫能力,该如何做呢?抽时间研究了 Go 的一款爬虫框架 colly。 概要…

【前后端的那些事】15min快速实现图片上传,预览功能(ElementPlus+Springboot)

文章目录 Element Plus SpringBoot实现图片上传&#xff0c;预览&#xff0c;删除效果展示 1. 后端代码1.1 controller1.2 service 2. 前端代码2.1 路由创建2.2 api接口2.2 文件创建 3. 前端上传组件封装 前言&#xff1a;最近写项目&#xff0c;发现了一些很有意思的功能&…

SD-WAN组网设计原则:灵活、安全、高效

在实现按需、灵活和安全的SD-WAN组网方案中&#xff0c;我们必须遵循一系列关键的设计原则&#xff0c;以确保网络的可靠性和效率。通过以下几点设计原则&#xff0c;SD-WAN能够满足企业对灵活性、安全性和高效性的迫切需求。 灵活的Overlay网络互联 SD-WAN通过IP地址在站点之间…

appium之联动pycharm

前置条件&#xff1a; 1.java环境安装好了 2.android-sdk安装好&#xff08;uiautomatorviewer 也可以把这个启动起来&#xff09; 3.appium安装好 4.adb devices查看下设备是否连接 pycharm入门代码--固定写法 from appium import webdriver# 定义字典变量 desired_caps …

SpringMVC下半篇之异常处理器及日期转换器

3.异常处理器 如果不加以异常处理&#xff0c;错误信息肯定会抛在浏览器页面上&#xff0c;这样很不友好&#xff0c;所以必须进行异常处理。 3.1.异常处理思路 系统的dao、service、controller出现都通过throws Exception向上抛出&#xff0c;最后由springmvc前端控制器交由…

线性代数基础【5】特征值和特征向量

第五章 特征值和特征向量 第一节、特征值和特征向量的基本概念 一、特征值和特征向量的理论背景 在一个多项式中,未知数的个数为任意多个,且每一项次数都是2的多项式称为二次型,二次型分为两种类型:即非标准二次型及标准二次型 注意: ①二次型X^T AX为非标准二次型的充分必…

在线扒站网PHP源码-在线扒站工具网站源码

源码介绍 这是一款在线的网站模板下载程序&#xff0c;也就是我们常说的扒站工具&#xff0c;利用它我们可以很轻松的将别人的网站模板样式下载下来&#xff0c;这样就可以大大提高我们编写前端的速度了&#xff01;注&#xff1a;扒取的任何站点不得用于商业、违法用途&#…

02 MyBatisPlus核心功能之基于Mapper接口CRUD+基于Service接口实现CRUD

项目结构&#xff1a; 1.1 Insert方法 // 插入一条记录 // T 就是要插入的实体对象 // 默认主键生成策略为雪花算法&#xff08;后面讲解&#xff09; //返回值是影响条数 int insert(T entity);1.2 Delete方法 // 根据 entity 条件&#xff0c;删除记录 int delete(Param(…

【PAT甲级】1178 File Path(25分)[文件树,模拟,unordered_map]

问题思路&#xff1a; 在不断输入的过程中&#xff0c;可以通过层深和一个二维的vector数组来建立一棵树。即每输入一个节点&#xff0c;应当作为该节点上一层的最后一个节点的子孩子。用一个哈希value来指定节点的id&#xff0c;通过一个last记录每个节点id&#xff08;此时作…

2023年移远车载全面开花,智能座舱加速进击

作为汽车智能化的关键组件&#xff0c;车载模组正发挥着越来越重要的作用。 移远通信进入车载模组领域近十年&#xff0c;已形成了完善的车载产品队列&#xff0c;不但在5G/4G车载通信、智能座舱、C-V2X车路协同等领域打造了一枝独秀的产品线&#xff0c;也推出了车规级Wi-Fi/蓝…

Verilog基础:强度建模(一)

相关阅读 Verilog基础https://blog.csdn.net/weixin_45791458/category_12263729.html?spm1001.2014.3001.5482 一、强度建模基础 Verilog HDL提供了针对线网信号0、1、x、z的精准强度建模方式&#xff0c;这样可以允许将两个线网信号进行线与操作从而更加精确地描述出硬件行…

【论文阅读】Deep Graph Contrastive Representation Learning

目录 0、基本信息1、研究动机2、创新点3、方法论3.1、整体框架及算法流程3.2、Corruption函数的具体实现3.2.1、删除边&#xff08;RE&#xff09;3.2.2、特征掩盖&#xff08;MF&#xff09; 3.3、[编码器](https://blog.csdn.net/qq_44426403/article/details/135443921)的设…