架构师系列- 消息中间件(15)-kafka业务实战

news2025/2/23 10:15:02

7.1 顺序性场景

7.1.1 场景概述

假设我们要传输一批订单到另一个系统,那么订单对应状态的演变是有顺序性要求的。

已下单 → 已支付 → 已确认

不允许错乱!

7.1.2 顺序级别

1)全局有序:

串行化。每条经过kafka的消息必须严格保障有序性。

这就要求kafka单通道,每个groupid下单消费者

极大的影响性能,现实业务下几乎没必要

2)局部有序:

业务局部有序。同一条订单有序即可,不同订单可以并行处理。不同订单的顺序前后无所谓

充分利用kafka多分区的并发性,只需要想办法让需要顺序的一批数据进同一分区即可。

7.1.3 实现方案

1)发送端:

指定key发送,key=order.id即可,案例回顾:4.2.3,PartitionProducer

2)发送中:

给队列配置多分区保障并发性。

3)读取端:

单消费者:显然不合理

吞吐量显然上不去,kafka开多个分区还有何意义?

所以开多个消费者指定分区消费,理想状况下,每个分区配一个。

但是,这个吞吐量依然有限,那如何处理呢?

方案:多线程

在每个消费者上再开多线程,是个解决办法。但是,要警惕顺序性被打破!

参考下图:thread处理后,会将data变成 2-1-3

改进:接收后分发二级内存队列

消费者取到消息后不做处理,根据key二次分发到多个阻塞队列。

再开启多个线程,每个队列分配一个线程处理。提升吞吐量

 

 

7.1.4 代码验证

1)新建一个sort队列,2个分区

2)启动order项目

源码参考:

SortedProducer(顺序性发送端)

SortedConsumer(顺序性消费端 - 阻塞队列实现,方便大家理解设计思路)

SortedConsumer2(顺序性消费端 - 线程池实现,现实中推荐这种方式!)

 3)通过swagger请求

 

7.2 海量同步场景

假设大数据部门需要大屏来展示用户的打车订单情况,需要把订单数据送入druid

这里不涉及顺序,只要下单就传输,但是对实时性和并发量要求较高

7.2.1 常规架构

在下单完成mysql后,通过程序代码打印,直接进入kafka

或者logback和kafka集成,通过log输送

优点:

更符合常规的思维。将数据送给想要的部门

缺点:

耦合度高,将kafka发送消息嵌入了订单下单的主业务,形成代码入侵。

下单不关心,也不应该关注送入kafka的情况,一旦kafka不可用,程序受影响

7.2.2 解耦合

借助canal,监听订单表的数据变化,不再影响主业务。

 

7.2.3 部署实现

1)mysql部署

注意,需要打开binlog,8.0 默认处于开启状态

#启动mysql8
docker run --name mysql8 -v /opt/kafka/data/mysql8:/var/lib/mysql -p 3306:3306 -e TZ=Asia/Shanghai -e MYSQL_ROOT_PASSWORD=123456 -d daocloud.io/mysql:8.0
连上mysql,执行以下sql,添加canal用户

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
创建订单表

CREATE TABLE `orders` (
  `id` int unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
);
 

2)canal部署

#canal.properties
#附带资料里有,放到服务器 /opt/kafka/data/canal/ 目录下
#修改servers为你的kafka的机器地址
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.10.30:10903,192.168.10.30:10904
#docker-compose.yml
#附带资料里有canal.yml,随便找个目录,重命名为docker-compose.yml
#修改mysql的链接信息的链接信息
#然后在当前目录下执行 docker-compose up -d
version: '2'
services:
    canal:
        image: canal/canal-server
        container_name: canal
        restart: always
        ports:
            - "10908:11111"
        environment:
                #mysql的链接信息
            canal.instance.master.address: 192.168.10.30:3306
            canal.instance.dbUsername: canal
            canal.instance.dbPassword: canal
            #投放到kafka的哪个主题?要提前准备好!
            canal.mq.topic: canal
        volumes:
            - "/opt/kafka/data/canal/canal.properties:/home/admin/canal-server/conf/canal.properties"
 

3)数据通道验证

进入kafka容器,用上面3.2.4里的命令行方式监听canal队列

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal
 

在mysql上创建orders表,增删数据试一下

mysql> insert into orders (name) values ('张三');
Query OK, 1 row affected (0.03 sec)
 

在kafka控制台,可以看到同步的消息

{"data":[{"id":"1","name":"张三"}],"database":"canal","es":1611657853000,"id":5,"isDdl":false,"mysqlType":{"id":"int unsigned","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"orders","ts":1611657853802,"type":"INSERT"}
 

数据通道已打通,还缺少的是druid作为消费端来接收消息

 

4)druid部署

#druid.yml
#在附带资料里有
#随便找个目录,执行
docker-compose -f druid.yml up -d
 

5)验证

配置druid的数据源,从kafka读取数据,验证数据可以正确进入druid。

7.3 kafka监控

7.3.1 eagle简介

Kafka Eagle监控系统是一款用来监控Kafka集群的工具,支持管理多个Kafka集群、管理Kafka主题(包含查看、删除、创建等)、消费者组合消费者实例监控、消息阻塞告警、Kafka集群健康状态查看等。

7.3.2 部署

推荐docker-compose启动

将配备的资料中 eagle.yml , 拷贝到服务器任意目录

修改对应的ip地址为你服务器的地址

 

#注意ip地址:192.168.10.30,全部换成你自己服务器的

version: '3'
services:
    zookeeper:
        image: zookeeper:3.4.13

    kafka-1:
        container_name: kafka-1
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10903:9092
            - 10913:10913
        environment:
            KAFKA_BROKER_ID: 1 
            HOST_IP: 192.168.10.30
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            #docker部署必须设置外部可访问ip和端口,否则注册进zk的地址将不可达造成外部无法连接
            KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
            KAFKA_ADVERTISED_PORT: 10903 
            KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10913"
            JMX_PORT: 10913
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper           
    kafka-2:
        container_name: kafka-2
        image: wurstmeister/kafka:2.12-2.2.2
        ports:
            - 10904:9092
            - 10914:10914
        environment:
            KAFKA_BROKER_ID: 2 
            HOST_IP: 192.168.10.30
            KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
            KAFKA_ADVERTISED_HOST_NAME: 192.168.10.30
            KAFKA_ADVERTISED_PORT: 10904 
            KAFKA_JMX_OPTS: "-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=192.168.10.30 -Dcom.sun.management.jmxremote.rmi.port=10914"
            JMX_PORT: 10914
        volumes:
            - /etc/localtime:/etc/localtime
        depends_on:
            - zookeeper 
    eagle:
        image: gui66497/kafka_eagle
        container_name: ke
        restart: always
        depends_on:
            - kafka-1
            - kafka-2
        ports:
            - "10907:8048"
        environment:
            ZKSERVER: "zookeeper:2181"

执行 docker-compose -f eagle.yml up -d

7.3.3 使用说明

访问 : http://192.168.10.30:10907/ke/

默认用户名密码: admin/ 123456

如果要删除topic等操作,需要管理token: keadmin

 

与km到底选哪个呢?

  • 界面美观程度和监控曲线优于km,有登录权限控制
  • 功能操作上不如km简单直白,但是km需要配置一定的连接信息

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1628737.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

系统设计 --- E2E Test System

系统设计 --- E2E Test System 什么是E2EE2E Architecture Example 什么是E2E E2E(端到端)测试是一种软件测试方法,旨在模拟真实的用户场景,测试整个应用程序或系统的端到端功能和交互流程。E2E 测试涵盖了从用户界面到后端系统的…

uniapp获取当前位置及检测授权状态

uniapp获取当前位置及检测授权定位权限 文章目录 uniapp获取当前位置及检测授权定位权限效果图创建js文件permission.jslocation.js 使用 效果图 Android设备 点击 “设置”,跳转应用信息,打开“权限即可”; 创建js文件 permission.js 新建…

视频怎么批量压缩?5个好用的电脑软件和在线网站

视频怎么批量压缩?有时候我们需要批量压缩视频来节省存储空间,便于管理文件和空间,快速的传输发送给他人。有些快捷的视频压缩工具却只支持单个视频导入,非常影响压缩效率,那么今天就向大家从软件和在线网站2个角度介绍…

ProcessOn已凉,绘图就用谷歌云盘+draw.io达到了巅峰

神器draw.io 首先:对不起ProcessOn,无意冒犯,ProcessOn并没有凉,而且还一直在用。谁让你们火呢,借词一用,哈哈哈哈。 4年前我用谷歌云盘时就无意间发现了draw.io这个绘图工具。 先说下如何发现的&#x…

NumPy 1.26 中文官方指南(一)

NumPy 用户指南 原文:numpy.org/doc/1.26/user/index.html 本指南是一个概述,解释了重要特性;细节请参阅 NumPy 参考文档。 入门指南 什么是 NumPy? 安装 NumPy 快速入门 NumPy:初学者的绝对基础 基础知识和用法 NumPy 基础…

java数据结构泛型

泛型 一.了解包装类二.基本数据类型对应的包装类三.装箱与拆箱四.什么是泛型?五.语法六.泛型类使用七.泛型的上界八.泛型方法 在学习泛型之前,我们需要先了解一下包装类。 一.了解包装类 在Java中,由于基本类型不是继承自Object,为了在泛型代…

爬虫的实战应用之短信炸弹playwright现代网页测试工具

不讲废话,先上原理: 短信炸弹,也就是说持续对一个手机进行发送短信,实现的方式就是,利用某些网站的登录 ,注册的时候,发送短信验证码来实现。 如下图,其中有一个id为phone的输入框&a…

golang beego结合wire依赖注入及自动路由

1 安装wire 1.1 通过命令直接安装 go install github.com/google/wire/cmd/wirelatest 1.2 通过go get方式安装 go get github.com/google/wire/cmd/wire进入目录编译 cd C:\Users\leell\go\pkg\mod\github.com\google\wirev0.6.0\cmd\wire go build 然后将wire.exe移动到…

代码随想录:二叉树22-24

目录 700.二叉搜索树的搜索 题目 代码(二叉搜索树迭代) 代码(二叉搜索树递归) 代码(普通二叉树递归) 代码(普通二叉树迭代) 98.验证二叉搜索树 题目 代码(中序递…

嵌入式全栈开发学习笔记---Linux基本命令2

目录 cp 源路径 目的路径 cp -r 源路径 目的路径 mv 源路径 目的路径 mv oldname newname 接下来我们继续介绍两个常用的命令 一个是拷贝文件,一个是剪切文件 ,或者也可以用来改名字。 cp 源路径 目的路径 “cp”用来拷贝文件或者目录,…

Swagger3.0(Springdoc)日常使用记录

文章目录 前言一、默认地址二、注解OperationTag 三、SpringBoot基础配置四、Swagger导入apifox五、Swagger其他配置六 knife4j 参考文章 前言 本文并不是Swagger的使用教程,只是记录一下本人的操作,感兴趣的可以看下 一、默认地址 http://localhost:…

38-数组 _ 一维数组

38-1 数组的创建 数组是一组相同类型元素的集合。 数组的创建方式: type_t arr_name [const_n]; //type_t 是指数组的元素类型 //const_n是一个常量表达式,用来指定数组的大小 举例: int arr[10]; char ch[5]; double data[20]; 问&…

Vue基础:为什么要学Vue3,Vue3相较于Vue2有那些优势?

为什么要学Vue3? 1.框架层面 1.响应式底层API的变化 Proxy 数组下标的修改 对象动态添加属性 解释说明:1.vue2采用的是Object.definePrototype,它每次只能对单个对象中的单个数据进行劫持,所以在Vue2中data()中的数据一多就要进行…

香港BTC、ETH现货ETF同时通过,对行业意义几何?

香港比美国更快一步通过以太坊现货 ETF。 2024 年 4 月 15 日,香港嘉实国际资产管理有限公司(Harvest Global Investments)今天宣布,得到香港证监会的原则上批准,将推出两大数字资产(比特币及以太坊&#…

计算公式基础

文章目录 MASMAEXMPA加权移动平均线成交量换手率MACDKDJ MA 均线一般指移动平均线。 移动平均线,Moving Average,简称MA,MA是用统计分析的方法,将一定时期内的证券价格(指数)加以平均,并把不同…

RabbitMQ工作模式(5) - 主题模式

概念 主题模式(Topic Exchange)是 RabbitMQ 中一种灵活且强大的消息传递模式,它允许生产者根据消息的特定属性将消息发送到一个交换机,并且消费者可以根据自己的需求来接收感兴趣的消息。主题交换机根据消息的路由键和绑定队列的路…

梦境绘师:揭秘生成对抗网络(GAN)的魔法

梦境绘师:揭秘生成对抗网络(GAN)的魔法 1 引言 在今日的深度学习领域,生成对抗网络(GAN)已成为一项无人能外的技术,以其独特的数据生成能力俘获了无数研究者和工程师的心。这项技术不仅在理论上…

Golang基础2-Array、Slice、Map

Array 数组 var a [5]int b:[5]int{} c:[...]int{}这样格式定义var a[5]int 和var a[10]int是不同类型从0开始下标&#xff0c;到len(a)-1遍历方式&#xff1a; for i : 0; i < len(a); i { }for index, v : range a { } 注意越界问题&#xff0c;panic值类型&#xff0c;…

知识图谱嵌入领域的重要研究:编辑基于语言模型的知识图谱嵌入

今天&#xff0c;向大家介绍一篇在知识图谱嵌入领域具有重要意义的研究论文——Editing Language Model-based Knowledge Graph Embeddings。这项工作由浙江大学和腾讯公司的研究人员联合完成&#xff0c;为我们在动态更新知识图谱嵌入方面提供了新的视角和方法。 研究背景 在…

go设计模式之工厂方法模式

工厂方法模式 什么是工厂方法模式 工厂方法模式是一种创建型设计模式&#xff0c;它定义了一个用于创建对象的接口&#xff0c;让子类决定实例化哪一个类。工厂方法使一个类的实例化推迟到其子类。 这个接口就是工厂接口&#xff0c;子类就是具体工厂类&#xff0c;而需要创…