kafka入门到实战三(单线程实现顺序消费,含demo)

news2025/1/13 13:09:32

这里需要前面两章的基础,如果没有环境或者看不懂在说什么,就翻一翻前两章。

kafka顺序消费(单线程)

顺序消费

顺序消费:是指消息的产生顺序和消费顺序相同。不管你用的是什么q或者kafka还是sofa,顺序依赖都是一样的意思。

举个例子:订单A的消息为A1,A2,A3,发送顺序也如此,订单B的消息为B1,B2,B3,A订单消息先发送,B订单消息后发送。

  • A1,A2,A3,B1,B2,B3是全局顺序消息,严重降低了系统的并发度。完全的FIFO,导致无法使用多线程,速度极慢。
  • A1,B1,A2,A3,B2,B3是局部顺序消息,可以被接受。假设A1,A2,A3是创造,支付,完成订单。不论两者如何穿插,只要A1,A2,A3的顺序没有变,即可。
  • A2,B1,A1,B2,A3,B3不可接受,因为A2出现在了A1的前面。

要保证顺序消费,无非就是发送到topic的过程,发到同一个Partitioning(同一个分区视为一个队列,顺序一定是正确的)。消费的时候,按顺序获取即可,单线程直接取,没什么说的。多线程则需要

整体结构

在这里插入图片描述

  • yml文件:
spring:
  application:
    name: kafka-sort-consume
  kafka:
    bootstrap-servers: 192.168.56.101:9092,192.168.56.101:9093
    consumer:
      group-id: sort-consume
server:
  port: 8088
kafka:
  server: 192.168.56.101:9092
  order:
    topic: sort-consume
    concurrent: 3
  • log配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <!-- 文件输出格式 -->
    <property name="PATTERN" value="- %d{yyyy-MM-dd HH:mm:ss.SSS}, %5p, [%thread], %logger{39} - %m%n" />

    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <encoder charset="UTF-8">
            <pattern>${PATTERN}</pattern>
        </encoder>
    </appender>

    <root level="info">
        <appender-ref ref="CONSOLE" />
    </root>

</configuration>
  • 生产者
@RestController
@RequestMapping
@Slf4j
public class OrderController {
    @Autowired
    KafkaTemplate kafkaTemplate;
    @GetMapping
    public void send(){
        for (long i = 0; i <100 ; i++) {
            CreateOrderDTO createOrderDTO =new CreateOrderDTO();
            PayOrderDTO payOrderDTO = new PayOrderDTO();
            FinishOrderDTO finishOrderDTO = new FinishOrderDTO();
            createOrderDTO.setOrderName("创建订单号:"+i);payOrderDTO.setOrderName("支付订单号:"+i);finishOrderDTO.setOrderName("完成订单号:"+i);
            createOrderDTO.setId(i);payOrderDTO.setId(i);finishOrderDTO.setId(i);
            kafkaTemplate.send("sort-consume",GsonUtil.gsonToString(createOrderDTO));
            kafkaTemplate.send("sort-consume",GsonUtil.gsonToString(payOrderDTO));
            kafkaTemplate.send("sort-consume",GsonUtil.gsonToString(finishOrderDTO));

        }
    }
}

  • 消费者
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"sort-consume"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        System.out.println(record);
    }
}
  • DTO:这里三个类放一起了,要用的话,回头自行拆分下。
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CreateOrderDTO {
    private Long id;
    private Long status;
    private String orderName;

}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class PayOrderDTO {
    private Long id;
    private Long status;
    private String orderName;
}

@Data
@AllArgsConstructor
@NoArgsConstructor
public class FinishOrderDTO {
    private Long id;
    private Long status;
    private String orderName;
}
  • Gson工具包
public class GsonUtil {

    private static Gson gson = null;

    static {
        if (Objects.isNull(gson)) {
            gson = new GsonBuilder()
                    .registerTypeAdapter(LocalDateTime.class, (JsonDeserializer<LocalDateTime>) (json, type, jsonDeserializationContext) -> {
                        String datetime = json.getAsJsonPrimitive().getAsString();
                        return LocalDateTime.parse(datetime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                    })
                    .registerTypeAdapter(LocalDate.class, (JsonDeserializer<LocalDate>) (json, type, jsonDeserializationContext) -> {
                        String datetime = json.getAsJsonPrimitive().getAsString();
                        return LocalDate.parse(datetime, DateTimeFormatter.ofPattern("yyyy-MM-dd"));
                    })
                    .registerTypeAdapter(Date.class, (JsonDeserializer<Date>) (json, type, jsonDeserializationContext) -> {
                        String datetime = json.getAsJsonPrimitive().getAsString();
                        LocalDateTime localDateTime = LocalDateTime.parse(datetime, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
                        return Date.from(localDateTime.atZone(ZoneId.systemDefault()).toInstant());
                    })
                    .registerTypeAdapter(LocalDateTime.class, (JsonSerializer<LocalDateTime>) (src, typeOfSrc, context) -> new JsonPrimitive(src.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))))
                    .registerTypeAdapter(LocalDate.class, (JsonSerializer<LocalDate>) (src, typeOfSrc, context) -> new JsonPrimitive(src.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))))
                    .registerTypeAdapter(Date.class, (JsonSerializer<Date>) (src, typeOfSrc, context) -> {
                        LocalDateTime localDateTime = LocalDateTime.ofInstant(src.toInstant(), ZoneId.systemDefault());
                        return new JsonPrimitive(localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
                    })
                    .create();
        }
    }

    public GsonUtil() {
    }

    /**
     * 将object对象转成json字符串
     *
     * @param object
     * @return
     */
    public static String gsonToString(Object object) {
        String gsonString = null;
        if (gson != null) {
            gsonString = gson.toJson(object);
        }
        return gsonString;
    }

    /**
     * 将gsonString转成泛型bean
     *
     * @param gsonString
     * @param cls
     * @return
     */
    public static <T> T gsonToBean(String gsonString, Class<T> cls) {
        T t = null;
        if (gson != null) {
            t = gson.fromJson(gsonString, cls);
        }
        return t;
    }

    /**
     * 转成list
     * 泛型在编译期类型被擦除导致报错
     * @param gsonString
     * @param cls
     * @return
     */
    public static <T> List<T> gsonToList(String gsonString, Class<T> cls) {
        List<T> list = null;
        if (gson != null) {
            list = gson.fromJson(gsonString, TypeToken.getParameterized(List.class,cls).getType());
        }
        return list;
    }

    /**
     * 转成list中有map的
     *
     * @param gsonString
     * @return
     */
    public static <T> List<Map<String, T>> gsonToListMaps(String gsonString) {
        List<Map<String, T>> list = null;
        if (gson != null) {
            list = gson.fromJson(gsonString,
                    new TypeToken<List<Map<String, T>>>() {
                    }.getType());
        }
        return list;
    }


    /**
     * 转成map的
     *
     * @param gsonString
     * @return
     */
    public static <T> Map<String, T> gsonToMaps(String gsonString) {
        Map<String, T> map = null;
        if (gson != null) {
            map = gson.fromJson(gsonString, new TypeToken<Map<String, T>>() {
            }.getType());
        }
        return map;
    }

    /**
     * 把一个bean(或者其他的字符串什么的)转成json
     * @param object
     * @return
     */
    public static String beanToJson(Object object){
        return gson.toJson(object);
    }
}
  • 启动类
@SpringBootApplication
public class KafkaSortConsumeDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaSortConsumeDemoApplication.class, args);
    }
}
  • 加下来,启动项目,然后访问http://localhost:8089/,访问结果如下:
ConsumerRecord(topic = sort-consume, partition = 2, leaderEpoch = 4, offset = 1404, CreateTime = 1677746104033, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":8,"orderName":"支付订单号:8"})
ConsumerRecord(topic = sort-consume, partition = 2, leaderEpoch = 4, offset = 1405, CreateTime = 1677746104033, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":8,"orderName":"完成订单号:8"})
ConsumerRecord(topic = sort-consume, partition = 2, leaderEpoch = 4, offset = 1406, CreateTime = 1677746104033, serialized key size = -1, serialized value size = 44, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":10,"orderName":"支付订单号:10"})
ConsumerRecord(topic = sort-consume, partition = 2, leaderEpoch = 4, offset = 1407, CreateTime = 1677746104033, serialized key size = -1, serialized value size = 44, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":10,"orderName":"完成订单号:10"})
ConsumerRecord(topic = sort-consume, partition = 2, leaderEpoch = 4, offset = 1408, CreateTime = 1677746104033, serialized key size = -1, serialized value size = 44, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":11,"orderName":"创建订单号:11"})
ConsumerRecord(topic = sort-consume, partition = 2, leaderEpoch = 4, offset = 1409, CreateTime = 1677746104033, serialized key size = -1, serialized value size = 44, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":11,"orderName":"支付订单号:11"})
ConsumerRecord(topic = sort-consume, partition = 0, leaderEpoch = 4, offset = 1182, CreateTime = 1677746104038, serialized key size = -1, serialized value size = 44, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":35,"orderName":"支付订单号:35"})
ConsumerRecord(topic = sort-consume, partition = 0, leaderEpoch = 4, offset = 1181, CreateTime = 1677746104033, serialized key size = -1, serialized value size = 44, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":10,"orderName":"创建订单号:10"})
ConsumerRecord(topic = sort-consume, partition = 0, leaderEpoch = 4, offset = 1183, CreateTime = 1677746104039, serialized key size = -1, serialized value size = 44, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":35,"orderName":"完成订单号:35"})
ConsumerRecord(topic = sort-consume, partition = 0, leaderEpoch = 4, offset = 1184, CreateTime = 1677746104039, serialized key size = -1, serialized value size = 44, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":36,"orderName":"创建订单号:36"})
  • 结果分析:先看10号,他先支付了订单,然后完成订单,最后支付订单,明显不符合我们所说的部分顺序。
  • 原因:因为kafka默认随机分配分区,不同分区下的消费顺序并不能得到保障,所以我们需要为发送的消息指定好分区。
@RestController
@RequestMapping
@Slf4j
public class OrderController {
    @Autowired
    KafkaTemplate kafkaTemplate;
    @GetMapping
    public void send(){
        for (long i = 0; i <100 ; i++) {
            CreateOrderDTO createOrderDTO =new CreateOrderDTO();
            PayOrderDTO payOrderDTO = new PayOrderDTO();
            FinishOrderDTO finishOrderDTO = new FinishOrderDTO();
            createOrderDTO.setOrderName("创建订单号:"+i);payOrderDTO.setOrderName("支付订单号:"+i);finishOrderDTO.setOrderName("完成订单号:"+i);
            createOrderDTO.setId(i);payOrderDTO.setId(i);finishOrderDTO.setId(i);
            kafkaTemplate.send("sort-consume",new Integer((int) i%3),null ,GsonUtil.gsonToString(createOrderDTO));
            kafkaTemplate.send("sort-consume",new Integer((int) i%3),null,GsonUtil.gsonToString(payOrderDTO));
            kafkaTemplate.send("sort-consume",new Integer((int) i%3),null,GsonUtil.gsonToString(finishOrderDTO));

        }
    }
}
  • 再次执行,查看结果:发现结果都正确
ConsumerRecord(topic = sort-consume, partition = 1, leaderEpoch = 4, offset = 1344, CreateTime = 1677746610654, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1,"orderName":"创建订单号:1"})
ConsumerRecord(topic = sort-consume, partition = 1, leaderEpoch = 4, offset = 1345, CreateTime = 1677746610654, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1,"orderName":"支付订单号:1"})
ConsumerRecord(topic = sort-consume, partition = 1, leaderEpoch = 4, offset = 1346, CreateTime = 1677746610654, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":1,"orderName":"完成订单号:1"})
ConsumerRecord(topic = sort-consume, partition = 1, leaderEpoch = 4, offset = 1347, CreateTime = 1677746610656, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":4,"orderName":"创建订单号:4"})
ConsumerRecord(topic = sort-consume, partition = 1, leaderEpoch = 4, offset = 1348, CreateTime = 1677746610656, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":4,"orderName":"支付订单号:4"})
ConsumerRecord(topic = sort-consume, partition = 1, leaderEpoch = 4, offset = 1349, CreateTime = 1677746610656, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":4,"orderName":"完成订单号:4"})
ConsumerRecord(topic = sort-consume, partition = 1, leaderEpoch = 4, offset = 1350, CreateTime = 1677746610657, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":7,"orderName":"创建订单号:7"})
ConsumerRecord(topic = sort-consume, partition = 1, leaderEpoch = 4, offset = 1351, CreateTime = 1677746610657, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":7,"orderName":"支付订单号:7"})
ConsumerRecord(topic = sort-consume, partition = 1, leaderEpoch = 4, offset = 1352, CreateTime = 1677746610657, serialized key size = -1, serialized value size = 42, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":7,"orderName":"完成订单号:7"})
  • 至此,单线程保证顺序消费的方法,就掌握了。单线程可以放在不同的分区(分区就是队列),然后先后拿取即可。但,消费者有可能是并发的。同一个分区,A线程拿了支付订单,B拿了完成订单。但B线程执行的比较快,这就会出现先完成了订单然后才支付订单。这就很尴尬了,怎么解决呢?下篇章再来一起探讨。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/384535.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

启动框架 Anchors接入和分析

参考:https://juejin.cn/post/6844904128443858958https://blog.csdn.net/gqg_guan/article/details/127760207从哪下手整个冷启动过程中&#xff0c;系统方法我们无法进行优化&#xff0c;主要需要优化的是系统暴露出来的一些生命周期方法&#xff0c;从Application的attachBa…

OSCP学习踩过的坑

OSCP终于拿到证&#xff0c;感觉参加考试备考的日子才过去没有多久&#xff0c;想起了那几个月被“虐待”的日子&#xff0c;我想总结下在课程和考试中的犯的错误&#xff01; 计划 我制定了一个学习计划&#xff0c;计划是学习、练习&#xff0c;然后再学习、练习一些&#…

SpringCloud:服务拆分及远程调用

目录 SpringCloud&#xff1a;服务拆分及远程调用 1、服务拆分 2、远程调用 SpringCloud&#xff1a;服务拆分及远程调用 SpringCloud是目前国内使用最广泛的微服务框架。 官网地址: Spring Cloud SpringCloud集成了各种微服务功能组件&#xff0c;并基于SpringBoot实现了…

【10】SCI易中期刊推荐——工程技术-计算机:人工智能(中科院2区)

🚀🚀🚀NEW!!!SCI易中期刊推荐栏目来啦 ~ 📚🍀 SCI即《科学引文索引》(Science Citation Index, SCI),是1961年由美国科学信息研究所(Institute for Scientific Information, ISI)创办的文献检索工具,创始人是美国著名情报专家尤金加菲尔德(Eugene Garfield…

JAVA开发(Eureka基本原理)

Eureka基本原理。 通过上图我们可以看出&#xff0c;服务提供者在启动的时候需要向注册中心注册自己的信息&#xff0c;而注册中心把向自己注册的服务提供者都保存下来&#xff0c;以便服务消费者获取用来发起请求&#xff0c;而服务消费者需要从注册中心获取服务提供者列表&am…

网络层:IP协议

目录 基本概念 IP报头 IP报文分片 为什么要分片&#xff1f; 如何分片&#xff1f; 分片的报文如何组装&#xff1f; 分片策略如何&#xff1f; 网段划分 IP地址被分成了五类IP&#xff1a; CIDR 特殊的IP地址&#xff1a; 私有IP和公网IP 路由 如何转发数据包&a…

「ABAP」一文带你入门OPEN SQL中的SELECT查询(附超详细案例解析)

&#x1f482;作者简介&#xff1a; THUNDER王&#xff0c;一名热爱财税和SAP ABAP编程以及热爱分享的博主。目前于江西师范大学会计学专业大二本科在读&#xff0c;同时任汉硕云&#xff08;广东&#xff09;科技有限公司ABAP开发顾问。在学习工作中&#xff0c;我通常使用偏后…

阿里云服务器价格,阿里云轻量应用服务器最新价格表

阿里云服务器从地域上来区分可分为ECS云服务器和轻量应用服务器&#xff0c;从活动内容来区分又可分为秒杀云服务器和新用户特惠云服务器&#xff0c;下面是截止目前&#xff0c;阿里云服务器最新的秒杀及新用户特惠购买价格&#xff0c;以表格形式形式展示出来以供参考。 阿里…

​ICLR 2023 | 图数据分布外检测:从能量模型出发

©PaperWeekly 原创 作者 | 吴齐天单位 | 上海交通大学博士生研究方向 | 机器学习与图深度学习继续探索 Graph OOD 的相关问题&#xff0c;与以往工作不同的是&#xff0c;这篇工作避开了复杂的数学推导和琐碎的数据生成过程&#xff0c;直接从简单有效的判别模型入手研究…

【Spring】掌握 Spring Validation 数据校验

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ Spring Validation 数据校验一、什么是 Spring…

RAN-in-the-Cloud:为 5G RAN 提供云经济性

RAN-in-the-Cloud&#xff1a;为 5G RAN 提供云经济性 5G 部署在全球范围内一直在加速。 许多电信运营商已经推出了5G服务并正在快速扩张。 除了电信运营商之外&#xff0c;企业也对使用 5G 建立私有网络产生了浓厚的兴趣&#xff0c;这些私有网络利用了更高的带宽、更低的延迟…

【编程基础之Python】8、Python复合数据类型

【编程基础之Python】8、Python复合数据类型Python复合数据类型列表&#xff08;List&#xff09;创建列表访问元素内置方法列表操作元组&#xff08;Tuple&#xff09;创建元组访问元素集合&#xff08;Set&#xff09;创建集合基本操作其他操作字典&#xff08;Dictionary&am…

4.ffmpeg命令转码规则、过滤器介绍、手动流map选项

在上章我们学习了ffmpeg命令行帮助以及选项查找 本章我们来深入学习ffmpeg命令转码规则、过滤器介绍、手动流map选项 参考链接: 1.ffmpeg命令行转码流程 ffmpeg命令行转码流程如下图所示: 对应中文则是: 步骤如下所示: ffmpeg调用libavformat库(包含解复用器)来读取输入文件…

Linux -- 查看进程 PS 命令 详解

我们上篇介绍了&#xff0c; Linux 中的进程等概念&#xff0c;那么&#xff0c;在Linux 中如何查看进程呢 &#xff1f;&#xff1f;我们常用到的有两个命令&#xff0c; PS 和 top 两个命令&#xff0c;今天先来介绍下 PS 命令~&#xff01;PS 命令 &#xff1a;作用 &#x…

C语言拔高知识——指针的进阶(万字大文超详细)

在之前的文章中&#xff0c;我已经讲解过了初阶指针的内容&#xff0c;今天就来讲一讲指针的进阶&#xff01; 上篇指针地址&#xff1a;保姆式指针讲解&#xff0c;超详细&#xff0c;适合初学者_指针详解_陈大大陈的博客-CSDN博客 目录 1. 字符指针 2. 指针数组 3. 数组指…

3年测试经验的人来面试,简历都没写明白,一开口就要给20K的offer?

​我最近阅读了大约15份简历&#xff0c;他们都在申请我的团队的测试工程师职位。但是没有一份表达清楚了他是如何进行测试的。 下面我摘录了一些 信息&#xff1a; 几乎所有的应聘者都罗列了成串儿的他们熟悉的“技术”&#xff08;包括但不限于….Net&#xff0c; Unix&#…

c++基础/类和对象

c基础 2.1名字空间 namespace 防止命名冲突 说明&#xff1a;名字空间可以在全局作用域或其他作用域&#xff08;另一个名字空间&#xff09;内部定义&#xff0c;但不能在函数或类的内部定义。 使用&#xff1a; #include<iostream> using namespace std; //std中包…

【C/C++ 数据结构】-八大排序之 冒泡排序快速排序

作者&#xff1a;学Java的冬瓜 博客主页&#xff1a;☀冬瓜的主页&#x1f319; 专栏&#xff1a;【C/C数据结构与算法】 分享&#xff1a;那我便像你一样&#xff0c;永远躲在水面之下&#xff0c;面具之后&#xff01; ——《画江湖之不良人》 主要内容&#xff1a;八大排序选…

前端构建工具大盘点:gulp、webpack、vite、rollup、esbuild、snowpack、babel、parcel、swc、tsc

文章目录背景分类转译器打包器对比gulp VS webpackBundle vs Bundleless&#xff08;代表就是webpack VS vite&#xff09;其他比较个人理解总结官网背景 做前端也有好多年了&#xff0c;从最早的 jQuery 时代到现在的三大框架&#xff0c;这过程中用到了很多构建工具&#xf…

婴幼儿常见八大疾病及护理方法

在1岁之前&#xff0c;婴儿的体质还没有完全发育&#xff0c;很容易生病&#xff0c;大多数婴儿在1岁之后都会更好。今天&#xff0c;新的稀有婴儿育儿专家组织了一些婴儿最容易患的疾病和护理方法。1、新生儿黄疸宝宝出生后&#xff0c;你可能会注意到他的皮肤发黄。别担心&am…