Kafka 之顺序消息

news2024/12/29 11:07:16

前言:

在分布式消息系统中,消息的顺序性是一个重要的问题,也是一个常见的业务场景,那 Kafka 作为一个高性能的分布式消息中间件,又是如何实现顺序消息的呢?本篇我们将对 Kafka 的顺序消息展开讨论。

Kafka 系列文章传送门

Kafka 简介及核心概念讲解

Spring Boot 整合 Kafka 详解

Kafka @KafkaListener 注解的详解及使用

Kafka 客户端工具使用分享【offsetexplorer】

Kafka 之消息同步/异步发送

Kafka 之批量消息发送消费

Kafka 之消息广播消费

Kafka 之消息并发消费

顺序消息的使用场景

顺序消息的使用场景众多,这里我简单列举几个如下:

  • 即时消息中的单对单聊天和群聊,保证发送方消息发送顺序与接收方的顺序一致。
  • 电商中下单后,订单创建、支付、订单发货和物流更新的顺序性。
  • 手机充值过程中的扣款短信和重置成功的短信应该有顺序性。
  • 。。。。等等等场景。

Kafka 如何保证消息的顺序性

讨论 Kafka 消息的顺序性,需要分单分区和多分区来讨论,具体如下:

  • 单分区:单分区的消息顺序性相对简单,因为消息在单分区中是相对有序的,只需要保证消息发送顺序和消费顺序即可。
  • 多分区:多分区要保证消息有序,就需要额外的设计来保证消息全局有序了。

根据上面的简单分析,我们知道 Kafka 单分区的消息有序相对简单,接下来我们分析一下 Kafka 如何保证单分区消息有序。

Kafka 如何保证单分区消息有序

Kafka 保证单分区消息有序需要从两个方面来讲,一个是消息生产者,一个是消息消费者,具体如下:

消息生产者:

  • 使用相同的分区键(Partition Key):生产者发送消息时,指定相同的分区键,使得所有消息都发送到同一个分区。
  • 指定消息 key,如果没有指定分区,我们指定一个相同的消息 Key,Kafka 会根据 Key 进行 Hash 计算出一个分区号,如果消息的 Key 相同,那么也会计算一个相同的分区号,消息也会发送到同一个分区了。
  • 自定义分区器:如果想要实现更复杂的分区逻辑,可以实现自定义分区器,来达到消息最终到达同一个分区。

消息消费者:

生产这已经保证了消费的发送有序,因此消息消费者使用单线程消费即可。

Kafka 顺序消息实现案例

上面我们对 Kafka 顺序消息的实现做了基本分析,下面我们就使用代码来实现 Kafka 的顺序消息。

Kafka 顺序消息 Producer

在 Producer 中分别实现了两种顺序消息的方式,分别是指定分区和指定 Key,具体代码如下:

package com.order.service.kafka.producer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutionException;

/**
 * @ClassName: MyKafkaOrderlyProducer
 * @Author: Author
 * @Date: 2024/10/22 19:22
 * @Description: 顺序消息发送者
 */
@Slf4j
@Component
public class MyKafkaOrderlyProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;


    //指定分区
    public void sendOrderlyByPartitionMessage() {
        try {
            this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666创建").get();
            this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666支付").get();
            this.kafkaTemplate.send("my-topic", 1, null, "Partition--订单666发货").get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

    //指定 key
    public void sendOrderlyByKeyMessage() {
        try {
            this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();
            this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();
            this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}

在 Producer 代码中我们使用了 Kafka 的同步发送消息。

Kafka 顺序消息 Consumer

顺序消息的消费者代码十分简单,还是使用 @KafkaListener 完成消息消费,注意是单线程消费即可。

package com.order.service.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @ClassName: MyKafkaConsumer
 * @Author: zhangyong
 * @Date: 2024/10/22 19:22
 * @Description: MyKafkaOrderlyConsumer
 */
@Slf4j
@Component
public class MyKafkaOrderlyConsumer {

    @KafkaListener(id = "my-kafka-order-consumer",
            groupId = "my-kafka-consumer-groupId",
            topics = "my-topic",
            containerFactory = "myContainerFactory")
    public void listen(String message) {
        log.info("消息消费成功消息内容:{}", message);
    }

}

Kafka 顺序消息发送消费验证

验证指定分区情况下的顺序消息:

2024-10-28 20:55:18.495  INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Partition--订单666创建
2024-10-28 20:55:18.599  INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Partition--订单666支付
2024-10-28 20:55:18.704  INFO 24876 --- [-consumer-1-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Partition--订单666发货

消息是按照发送顺序来消费的,结果符合预期。

验证指定 Key 情况下的顺序消息:

2024-10-28 20:56:13.238  INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666创建
2024-10-28 20:56:13.341  INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666支付
2024-10-28 20:56:13.443  INFO 24876 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666发货

消息是按照发送顺序来消费的,结果符合预期。

Kafka 自定义分区器

自定义分区器就是按自己的规则来指定消息最终要发送的分区,可以根据自己的需求灵活实现,案例代码中先获取分区数量,然后使用的是 key 的 Hash 值进行 Hash 取模的方式获取分区,具体代码如下:

package com.order.service.kafka;

import com.order.service.exception.BusinessException;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

/**
 * @ClassName: CustomPartitioner
 * @Author: Author
 * @Date: 2024/10/28 20:57
 * @Description:
 */
public class CustomPartitioner implements Partitioner {


    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //获取 分区数量
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);

         if (key == null || keyBytes == null && !(key instanceof String)) {
            throw new BusinessException("key 不能为空且需要是字符串类型");
        }
        String keyStr = key.toString();
        int partition = keyStr.hashCode() % partitionInfos.size();
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

配置自定义分区器

自定义了分区器后还需要再 Kafka 配置中配置上我们自定义的分区器,关键配置如下:

//自定义分区器配置
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

完整的配置 KafkaProducerConfig 配置如下:

package com.order.service.config;

import com.order.service.kafka.CustomPartitioner;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
 * @author :author
 * @description:
 * @modified By:
 * @version: V1.0
 */
@Configuration
@EnableKafka
public class KafkaProducerConfig {


    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Value("${spring.kafka.producer.batch-size}")
    private String batchSize;

    @Value("${spring.kafka.producer.buffer-memory}")
    private String bufferMemory;

    @Value("${spring.kafka.producer.properties.linger.ms}")
    private String lingerMs;

    @Bean("myProducerKafkaProps")
    public Map<String, Object> getMyKafkaProps() {
        Map<String, Object> props = new HashMap<>(10);
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //批量发送消息的大小 默认 16KB
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,batchSize);
        //生产者可用于缓冲等待发送到服务器的消息占用的总内存字节数  默认 32M
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,bufferMemory);
        //批量发送的的最大时间间隔,单位是毫秒
        props.put(ProducerConfig.LINGER_MS_CONFIG,lingerMs);
        //自定义分区器配置
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> newProducerFactory() {
        return new DefaultKafkaProducerFactory<>(getMyKafkaProps());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(newProducerFactory());
    }



}

自定义分区 Consumer 代码案例

自定义分区 Consumer 代码没有什么特殊之处,指定一个 key 即可,key 一致就可以保证消息发送到同一个 Partition 中,保证消息的顺序,具体代码如下:

//自定义分区发送消息
public void sendOrderlyByCustomPartitionerMessage() {
	try {
		this.kafkaTemplate.send("my-topic", "666", "Key--订单666创建").get();
		this.kafkaTemplate.send("my-topic", "666", "Key--订单666支付").get();
		this.kafkaTemplate.send("my-topic", "666", "Key--订单666发货").get();
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		e.printStackTrace();
	}
}

自定义分区顺序消息验证

触发消息发送后 debugger 如下:

在这里插入图片描述

控制台记录消费日志如下:

2024-10-30 17:24:52.716  INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666创建
2024-10-30 17:24:52.819  INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666支付
2024-10-30 17:24:52.921  INFO 1308 --- [-consumer-0-C-1] c.o.s.k.consumer.MyKafkaOrderlyConsumer  : 消息消费成功消息内容:Key--订单666发货

消息是按顺序消费的,结果符合预期。

总结:Kafka 只能在单个 Partition 中保持消息的顺序存储,要想保证消息的顺序性就必须让需要保持顺序的消息发送到同一个 Partition,对于消费端,消费消息的顺序性只需要保证使用单线程进行消费即可,一般来说比较少用到 Kafka 的顺序消息,这里分享一下还是希望可以帮助到有需要的朋友。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2234449.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

js WebAPI黑马笔记(万字速通)

此笔记来自于黑马程序员&#xff0c;pink老师yyds 复习&#xff1a; splice() 方法用于添加或删除数组中的元素。 注意&#xff1a; 这种方法会改变原始数组。 删除数组&#xff1a; splice(起始位置&#xff0c; 删除的个数) 比如&#xff1a;1 let arr [red, green, b…

C 学习(5)

哈哈哈哈哈&#xff0c;终于想起来了&#xff01;贴一下主要的参考&#xff1a; 基本语法 - 《阮一峰《C 语言教程》》 - 书栈网 BookStack 内容写的比较浅显有疏漏&#xff0c;如果看不明白&#xff0c;再结合一下百度。 注释 C 语言的注释有两种表示方法。 第一种方法是…

redis7学习笔记

文章目录 1. 简介1.1 功能介绍1.1.1 分布式缓存1.1.2 内存存储和持久化(RDBAOF)1.1.3 高可用架构搭配1.1.4 缓存穿透、击穿、雪崩1.1.5 分布式锁1.1.6 队列 1.2 数据类型StringListHashSetZSetGEOHyperLogLogBitmapBitfieldStream 2. 命令2.1 通用命令copydeldumpexistsexpire …

【设计模式系列】建造者模式(十)

目录 一、什么是建造者模式 二、建造者模式的角色 三、建造者模式的典型应用 四、建造者模式在StringBuilder中的应用 五、典型建造者模式的案例 一、什么是建造者模式 建造者模式&#xff08;Builder Pattern&#xff09;是一种创建型设计模式&#xff0c;用于构建复杂对…

用vite创建项目

一. vite vue2 1. 全局安装 create-vite npm install -g create-vite 2. 创建项目 进入你想要创建项目的文件夹下 打开 CMD 用 JavaScript create-vite my-vue2-project --template vue 若用 TypeScript 则 create-vite my-vue2-project --template vue-ts 这里的 …

JVM结构图

JVM&#xff08;Java虚拟机&#xff09;是Java编程语言的核心组件之一&#xff0c;负责将Java字节码翻译成机器码并执行。JVM由多个子系统组成&#xff0c;包括类加载子系统、运行时数据区、执行引擎、Java本地接口和本地方法库。 类加载子系统&#xff08;Class Loading Subsy…

WordPress伪静态设置

为什么要设置WordPress伪静态&#xff0c;搜索引擎优化&#xff08;SEO&#xff09;中&#xff0c;静态URL通常被认为更易于搜索引擎爬虫抓取和索引&#xff0c;有助于提高网站的搜索引擎排名。 WordPress伪静态设置方法主要依赖于服务器环境&#xff0c;以下是针对不同服务器…

【黑盒测试】等价类划分法及实例

本文主要介绍黑盒测试之等价类划分法&#xff0c;如什么是等价类划分法&#xff0c;以及如何划分&#xff0c;设计等价类表。以及关于三角形案例的等价类划分法。 文章目录 一、什么是等价类划分法 二、划分等价类和列出等价类表 三、确定等价类的原则 四、建立等价类表 …

宠物领养救助管理软件有哪些功能 佳易王宠物领养救助管理系统使用操作教程

一、概述 佳易王宠物领养救助管理系统V16.0&#xff0c;集宠物信息登记、查询&#xff0c;宠物领养登记、查询&#xff0c; 宠物领养预约管理、货品进出库库存管理于一体的综合管理系统软件。 概述&#xff1a; 佳易王宠物领养救助管理系统V16.0&#xff0c;集宠物信息登记…

RK3568开发板静态IP地址配置

1. 连接SSH MYD-LR3568 开发板设置了静态 eth0:1 192.168.0.10 和 eth1:1 192.168.1.10&#xff0c;在没有串口时调试开发板&#xff0c;可以用工具 SSH 登陆到开发板。 首先需要用一根网线直连电脑和开发板&#xff0c;或者通过路由器连接到开发板&#xff0c;将电脑 IP 手动设…

Flutter错误: uses-sdk:minSdkVersion 16 cannot be smaller than version 21 declared

前言 今天要做蓝牙通信的功能&#xff0c;我使用了flutter_reactive_ble这个库&#xff0c;但是在运行的时候发现一下错误 Launching lib/main.dart on AQM AL10 in debug mode... /Users/macbook/Desktop/test/flutter/my_app/android/app/src/debug/AndroidManifest.xml Err…

【含开题报告+文档+源码】基于Java的房屋租赁服务系统设计与实现

开题报告 随着城市化进程的加速和人口流动性的增加&#xff0c;租房需求不断增长。传统的租赁方式往往存在信息不对称、流程不规范等问题&#xff0c;使得租户和房东的租赁体验不佳。而而房屋租赁系统能够提供便捷、高效的租赁服务&#xff0c;满足租户和房东的需求。房屋租赁…

斯托克斯矢量,表示电磁波的(不是散射体)平均后,可分解为完全极化电磁波和噪声

可见完全极化分就表示只有一种&#xff0c;在T矩阵中是只有一种散射体&#xff0c;在电磁波协方差矩阵中是只有一种电磁波

微服务day03

导入黑马商城项目 创建Mysql服务 由于已有相关项目则要关闭DockerComponent中的已开启的项目 [rootserver02 ~]# docker compose down WARN[0000] /root/docker-compose.yml: version is obsolete [] Running 4/4✔ Container nginx Removed …

大腾智能荣获盐田区黄金珠宝产业“产业赋能数字化优选能力伙伴”荣誉

11月2日&#xff0c;盐田区黄金珠宝产业数智化转型促进中心&#xff08;简称“促进中心”&#xff09;揭牌仪式圆满举办。盐田区委书记李忠&#xff0c;市工业和信息化局、市市场监督管理局、华为技术有限公司等相关单位、企业负责人共同见证促进中心揭牌启动。 大腾智能也出席…

10天进阶webpack---(2)webpack模块兼容性处理

回顾CMJ和ESM的区别 CMJ的本质可以使用一个函数概括 // require函数的伪代码 function require(path){if(该模块有缓存吗){return 缓存结果;}function _run(exports, require, module, __filename, __dirname){// 模块代码会放到这里}var module {exports: {}}_run.call(mod…

034_Structural_Transient_In_Matlab结构动力学问题求解

结构动态问题 问题描述 我们试着给前面已经做过的问题上加一点有趣的东西。 结构静力学求解 当时求解这个问题&#xff0c;在最外面的竖直切面加载了一个静态的固定的力。下面我们试试看在上方的表面增加一个脉冲压力载荷。 采用统一的有限元框架&#xff0c;定义问题&…

简单的 docker 部署ELK

简单的 docker 部署ELK 这是我的运维同事部署ELK的文档&#xff0c;我这里记录转载一下 服务规划 架构: Filebeat->kafka->logstash->ES kafka集群部署参照: kafka集群部署 部署服务程序路径/数据目录端口配置文件elasticsearch/data/elasticsearch9200/data/elas…

TortoiseSVN小乌龟下载安装(Windows11)

目录 TortoiseSVN 1.14.7工具下载安装 TortoiseSVN 1.14.7 工具 系统&#xff1a;Windows 11 下载 官网&#xff1a;https://tortoisesvn.subversion.org.cn/downloads.html如图选 TortoiseSVN 1.14.7 - 64 位 下载完成 安装 打开 next&#xff0c;next Browse&#xf…

Python实例:爱心代码

前言 在编程的奇妙世界里,代码不仅仅是冰冷的指令集合,它还可以成为表达情感、传递温暖的独特方式。今天,我们将一同探索用 Python 语言绘制爱心的神奇之旅。 爱心,这个象征着爱与温暖的符号,一直以来都在人类的情感世界中占据着特殊的地位。而通过 Python 的强大功能,…