kafka-顺序消息实现

news2025/1/18 18:08:51

kafka-顺序消息实现

场景

在购物付款的时候,订单会有不同的订单状态,对应不同的状态事件,比如:待支付,支付成功,支付失败等等,我们会将这些消息推送给消息队列 ,后续的服务会根据订单状态进行不同的业务处理,这就要求订单状态推送就要有状态的保证

解决方案

  • 生产者将相同的key的订单状态事件推送到kafka的同一分区
  • kafka 消费者接收消息
  • 消费者将消息提交给线程池
  • 线程池根据接收到的消息,将订单状态事件使用路由策略选择其中一个线程,将具有相同路由key的事件发送到同一个线程的阻塞队列中
  • 单个线程不停的从阻塞队列获取订单状态消息消费

在这里插入图片描述

代码实现

引入依赖
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.2</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-kafka</name>
<description>boot-kafka</description>
<properties>
    <java.version>17</java.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>2.0.39</version>
    </dependency>
</dependencies>

使用到的DTO
@Data
public class InterOrderDto extends OrderDto implements OrderMessage{

    /**
     * 属于哪个分区
     */
    private String partition;

    @Override
    public String getUniqueNo() {
        return getOrderNo();
    }
}


@Data
public class InterOrderDto extends OrderDto implements OrderMessage{

    /**
     * 属于哪个分区
     */
    private String partition;

    @Override
    public String getUniqueNo() {
        return getOrderNo();
    }
}

public interface OrderMessage {

    /**
     * 线程池路由key
     * @return
     */
    String getUniqueNo();

}
定义topic

这里是 3个分区,2个副本

@Configuration
public class KafkaConfiguration {

    @Bean
    public NewTopic topic(){
        return new NewTopic(Constants.TOPIC_ORDER,3,(short) 2);
    }
}

public interface Constants {

     String TOPIC_ORDER = "order";
}
消费者

消费者:OrderListener

@Component
@Slf4j
public class OrderListener {

    @Autowired
    private OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool;

    @KafkaListener(topics = Constants.TOPIC_ORDER, groupId = "orderGroup", concurrency = "3")
    public void logListener(ConsumerRecord<String, String> record) {
        log.debug("> receive log event: {}-{}", record.partition(), record.value());
        try {
            OrderDto orderDto = JSON.parseObject(record.value(), OrderDto.class);
            InterOrderDto interOrderDto = new InterOrderDto();
            BeanUtils.copyProperties(orderDto, interOrderDto);
            interOrderDto.setPartition(record.partition() + "");
            orderThreadPool.dispatch(interOrderDto);
        } catch (Exception e) {
            log.error("# kafka log listener error: {}", record.value(), e);
        }
    }

}

线程池: OrderThreadPool

/**
 * @Date: 2024/1/24 10:23
 * 线程池实现
 *
 * @param W: worker
 * @param D: message
 */
@Slf4j
public class OrderThreadPool<W extends SingleThreadWorker<D>, D extends OrderMessage> {
    private List<W> workers;
    private int size;

    public OrderThreadPool(int size, Supplier<W> provider) {
        this.size = size;
        workers = new ArrayList<>(size);
        for (int i = 0; i < size; i++) {
            workers.add(provider.get());
        }
        if (CollectionUtils.isEmpty(workers)) {
            throw new RuntimeException("worker size is 0");
        }
        start();
    }

    /**
     * route message to single thread
     *
     * @param data
     */
    public void dispatch(D data) {
        W w = getUniqueQueue(data.getUniqueNo());
        w.offer(data);
    }

    private W getUniqueQueue(String uniqueNo) {
        int queueNo = uniqueNo.hashCode() % size;
        for (W worker : workers) {
            if (queueNo == worker.getQueueNo()) {
                return worker;
            }
        }
        throw new RuntimeException("worker 路由失败");
    }

    /**
     * start worker, only start once
     */
    private void start() {
        for (W worker : workers) {
            new Thread(worker, "OWorder-" + worker.getQueueNo()).start();
        }
    }

    /**
     * 关闭所有 workder, 等待所有任务执行完
     */
    public void shutdown() {
        for (W worker : workers) {
            worker.shutdown();
        }
    }

}

工作线程:SingleThreadWorker, 内部使用阻塞队列使其串行化

/**
 * @Date: 2024/1/24 10:58
 * single thread with a blocking-queue
 */
@Slf4j
public abstract class SingleThreadWorker<T> implements Runnable {

    private static AtomicInteger cnt = new AtomicInteger(0);
    private BlockingQueue<T> queue;
    private boolean started = true;

    /**
     * worker 唯一id
     */
    @Getter
    private int queueNo;

    public SingleThreadWorker(int size) {
        this.queue = new LinkedBlockingQueue<>(size);
        this.queueNo = cnt.getAndIncrement();
        log.info("init worker {}", this.queueNo);
    }

    /**
     * 提交消息
     *
     * @param data
     */
    public void offer(T data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            log.info("{} offer error: {}", Thread.currentThread().getName(), JSON.toJSONString(data), e);
        }
    }

    @Override
    public void run() {
        log.info("{} worker start take ", Thread.currentThread().getName());
        while (started) {
            try {
                T data = queue.take();
                doConsumer(data);
            } catch (InterruptedException e) {
                log.error("queue take error", e);
            }
        }
    }

    /**
     * do real consume message
     *
     * @param data
     */
    protected abstract void doConsumer(T data);

    /**
     * consume rest of message in the queue when thread-pool shutdown
     */
    public void shutdown() {
        this.started = false;
        ArrayList<T> rest = new ArrayList<>();
        int i = queue.drainTo(rest);
        if (i > 0) {
            log.info("{} has rest in queue {}", Thread.currentThread().getName(), i);
            for (T t : rest) {
                doConsumer(t);
            }
        }
    }


}

工作线程实现:OrderWorker, 这里就单独处理订单事件

/**
 * @Date: 2024/1/24 13:42
 * 具体消费者
 */
@Slf4j
public class OrderWorker extends SingleThreadWorker<InterOrderDto>{
    public OrderWorker(int size) {
        super(size);
    }

    @Override
    protected void doConsumer(InterOrderDto data) {
        log.info("{} consume msg: {}", Thread.currentThread().getName(), JSON.toJSONString(data));

    }
}

生产者

生产者:OrderController, 模拟发送不同的事件类型的订单

@RestController
public class OrderController {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @GetMapping("/send")
    public String send() throws InterruptedException {
        int size = 1000;
        for (int i = 0; i < size; i++) {
            OrderDto orderDto = new InterOrderDto();
            orderDto.setOrderNo(i + "");
            orderDto.setPayStatus(getStatus(0));
            orderDto.setTimestamp(System.currentTimeMillis());
            //相同的key发送到相同的分区
            kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
            TimeUnit.MILLISECONDS.sleep(10);
            orderDto.setPayStatus(getStatus(1));
            orderDto.setTimestamp(System.currentTimeMillis());
            kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
            TimeUnit.MILLISECONDS.sleep(10);
            orderDto.setPayStatus(getStatus(2));
            orderDto.setTimestamp(System.currentTimeMillis());
            kafkaTemplate.send(Constants.TOPIC_ORDER, orderDto.getOrderNo(), JSON.toJSONString(orderDto));
        }
        return "success";
    }

    private String getStatus(int status){
        return status == 0 ? "待支付" : status == 1 ? "已支付" : "支付失败";
    }
}

application.properties 配置

# kafka地址
spring.kafka.bootstrap-servers=192.168.x.x:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
启动类
@Slf4j
@SpringBootApplication
public class BootKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(BootKafkaApplication.class, args);
    }

    /**
     * 配置线程池
     * @return
     */
    @Bean
    public OrderThreadPool<OrderWorker, InterOrderDto> orderThreadPool(){
        OrderThreadPool<OrderWorker, InterOrderDto> threadPool =
            new OrderThreadPool<>(3, () -> new OrderWorker(100));
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            log.info("shutdown orderThreadPool");
            //容器关闭时让工作线程中的任务都被消费完
            threadPool.shutdown();
        }));
        return threadPool;
    }

}

测试

访问: http://localhost:8080/send, 结果:

OWorder-0 worker start take 
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"待支付","timestamp":1706084482134,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"已支付","timestamp":1706084482271,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"0","partition":"2","payStatus":"支付失败","timestamp":1706084482282,"uniqueNo":"0"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"待支付","timestamp":1706084482326,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"已支付","timestamp":1706084482336,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"3","partition":"2","payStatus":"支付失败","timestamp":1706084482347,"uniqueNo":"3"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"待支付","timestamp":1706084482391,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"已支付","timestamp":1706084482401,"uniqueNo":"6"}
OWorder-0 consume msg: {"orderNo":"6","partition":"1","payStatus":"支付失败","timestamp":1706084482412,"uniqueNo":"6"}

可以发现,在我们工作线程中,事件消费是有序的

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1416436.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Redis数据类型-string

Redis-string类型 Redis中的数据类型全局命令get&setredis中变量设置的过期时间是如何检测的 keysexistsdelexpirettlpexpirepttltype string数据类型的底层的数据结构操作string类型的常用命令get&setmset&mgetsetnxsetexpsetexincr&decrincrby&decrbyinc…

前端实现弹小球功能

这篇文章将会做弹小球游戏&#xff0c;弹小球游戏大家小时候都玩过&#xff0c;玩家需要在小球到达游戏区域底部时候控制砖块去承接小球&#xff0c;并不断的将小球弹出去。 首先看一下实现的效果。 效果演示 玩家需要通过控制鼠标来实现砖块的移动&#xff0c;保证在小球下落…

借款还款记录账本,助你轻松地应对借还款带来的种种问题

借还款明细管理看似琐碎&#xff0c;实则关乎我们的切身利益。现在有【晨曦记账本】为你的财务健康保驾护航&#xff0c;让你可以更加轻松地应对借款和还款带来的种种问题&#xff0c;让生活更加简单、有序。 所需工具&#xff1a; 一个【晨曦记账本】软件 操作步骤&#xf…

使用大模型检索增强 Rerank 模型,检索效果提升太明显了!

Rerank 在 RAG&#xff08;Retrieval-Augmented Generation&#xff09;过程中扮演了一个非常重要的角色&#xff0c;普通的 RAG 可能会检索到大量的文档&#xff0c;但这些文档可能并不是所有的都跟问题相关&#xff0c;而 Rerank 可以对文档进行重新排序和筛选&#xff0c;让…

【C++】——类和对象(中)

一、前言 好久没有更新内容了&#xff0c;今天为大家带来类和对形中期的内容 &#xff01; 二、正文 1.this指针 1.1this指针的引入 class Date { public:void Init(int year, int month, int day){_year year;_month month;_day day;}void Print(){cout << _year …

2. HarmonyOS 应用开发 DevEco Studio 准备-2

2. HarmonyOS 应用开发 DevEco Studio 准备-2 首选项设置 中文设置 主题 字体 插件安装和使用 保存时操作 编辑器 工程树管理 代码树管理 标记 字符串可视化编辑 参考文档 常用快捷键 编辑 查找或替换 编译与运行 调试 其他 预览 页面预览 自定义组件预览 预览…

2023年中国工控自动化市场现状及竞争分析,美日占主角,国产品牌初崭头角

工控自动化是一种运用控制理论、仪器仪表理论、计算机和信息技术&#xff0c;对工业生产过程实现检测、控制、优化、调度、管理和决策&#xff0c;达到增加产量、提高质量、降低消耗、确保安全等目的综合性技术。产品应用领域广泛&#xff0c;可分为OEM型行业和项目型行业。 近…

Metaphor(EXA) 基于大语言模型的搜索引擎

文章目录 关于 Metaphor使用示例 关于 Metaphor Metaphor是基于大语言模型的搜索引擎&#xff0c;允许用户使用完整的句子和自然语言搜索&#xff0c;还可以模拟人们在互联网上分享和谈论链接的方式进行查询内容。 Metaphor同时还能与LLMs结合使用&#xff0c;允许LLMs连接互联…

༺༽༾ཊ—Unity之-05-抽象工厂模式—ཏ༿༼༻

首先创建一个项目&#xff0c; 在这个初始界面我们需要做一些准备工作&#xff0c; 建基础通用文件夹&#xff0c; 创建一个Plane 重置后 缩放100倍 加一个颜色&#xff0c; 任务&#xff1a;使用 抽象工厂模式 创建 人物与宠物 模型&#xff0c; 首先资源商店下载 人物与宠物…

【JavaWeb】【C00153】基于SSM的大学生家教平台管理系统(论文+PPT)

基于SSM的大学生家教平台管理系统&#xff08;论文PPT&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于ssm大学生家教平台管理系统 本系统分为前台模块、后台管理员模块、用户木块及家教模块。 其中前台的权限为&#xff1a;首页、家教、公告信息…

猫用空气净化器哪款牌子好?好用能吸毛的宠物空气净化器推荐

作为一个养猫多年的铲屎官&#xff0c;我真的无法抗拒猫星人的可爱魅力&#xff01;以前&#xff0c;每当我路过宠物店&#xff0c;我总会忍不住停下来&#xff0c;在玻璃窗前停留半个小时以上。但是后来&#xff0c;我终于有了自己的猫咪。每天都能享受到给它摸小肚子的乐趣&a…

Filter Listener

文章目录 一 过滤器&#xff08;Filter&#xff09;1 什么是过滤器2 为什么使用过滤器3 过滤器执行流程4 过滤器的生命周期5 过滤器的注册5.1 XML方式5.2 WebFilter 注解方式 6 FilterConfig7 过滤器链8 过滤器应用 二 什么是监听器1 监听器分类2 监听器使用2.1 监听对象的创建…

Mac忘记本机MySql怎么办?

Mac忘记本机MySql怎么办&#xff1f; 1.打开系统偏好设置 2.打开Mysql 3.停止服务 4.直接初始化服务上图有一个初始化数据库 5.输入8位密码确认 6.重启服务

Blender教程(基础)-初始用户界面-01

开始第一天的Blender学习、也是业余学习。希望记录下这一份学习的过程、并且分享给大家。今天带大家认识Blender这一款软件&#xff0c;先说说我为什么选择了Blender&#xff0c;我在软件市场找了好久&#xff0c;市场上其他雷同软件都是要么收费要么不好用&#xff0c;最终决定…

【面试深度解析】滴滴后端二面:12306场景设计、Redis缓存设计、MyBatis两级缓存(下)

欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术的推送&#xff01; 在我后台回复 「资料」 可领取编程高频电子书&#xff01; 在我后台回复「面试」可领取硬核面试笔记&#xff01; 文章导读地址…

Mac安装nvm,安装多个不同版本node,指定node版本

一.安装nvm brew install nvm二。配置文件 touch ~/.zshrc echo export NVM_DIR~/.nvm >> ~/.zshrc echo source $(brew --prefix nvm)/nvm.sh >> ~/.zshrc三.查看安装版本 nvm -vnvm常用命令如下&#xff1a;nvm ls &#xff1a;列出所有已安装的 node 版本nvm…

一张图文深入了解信息量概念

通信原理第10页最后一段&#xff1a; 概率论告诉我们&#xff0c;事件的不确定程度可以用其出现的概率来描述。因此&#xff0c;消息中包含的信息量与消息发生的概率密切相关。消息出现的概率越小&#xff0c;则消息中包含的信息量就越大。 这句话怎么理解呢&#xff1f; 比如…

小红构造数组-牛客周赛 Round 29(DFS方法)

题目很直白&#xff0c;方法就是暴力即可。 虽然说数据范围显得很大&#xff0c;但是在长整型范围内&#xff0c;一个数字的素因子数量最多不超64&#xff0c;而如果是不相同的素因子&#xff0c;虽然没有计算过&#xff0c;但是如果是12个不同的素因子应该会超过数据范围了。…

消息中间件之八股面试回答篇:三、RabbitMQ如何解决消息堆积问题(100万条消息堆积)+RabbitMQ高可用性和强一致性机制+回答模板

RabbitMQ中的消息堆积问题 当生产者发送消息的速度超过了消费者处理消息的速度&#xff0c;就会导致队列中的消息堆积&#xff0c;直到队列存储消息达到上限。之后发送的消息就会成为死信&#xff0c;可能会被丢弃&#xff0c;这就是消息堆积问题。 解决消息堆积有三种种思路…

c++阶梯之引用与内联函数

1. 引用 1.1 引用概念 引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名&#xff0c;编译器不会为引用变量开辟内存空间&#xff0c;它和它引用的变量共用同一块内存空间。 语法 类型& 引用变量名(对象名) 引用实体; 示例 很显然&#xff0c;在下面这…