Kafka实战案例

news2025/1/12 18:06:54

kafka系统的生成,自顶向下

1. kafaka发送消息

  • 1.1 是最初始外部调用kafaka的地方
  • 1.6 是最初调用kafaka的函数。中间是对kafaka的构建

1.1 向Kafka发送一条发布视频的message

  • 在videoHandler的发布视频逻辑中,向Kafka发送一条发布视频的mq,之后就解耦,先返回状态告知发布成功,不再等待具体执行
	// 通过MQ异步处理视频的上传操作, 包括上传到OSS,截帧, 保存到MySQL, 更新redis
	zap.L().Info("上传视频发送到消息队列", zap.String("videoPath", videoPath))
	kafka.VideoMQInstance.Produce(&kafka.VideoMessage{
		VideoPath:     videoPath,
		VideoFileName: videoFileName,
		UserID:        uint(request.GetUserId()),
		Title:         request.GetTitle(),
	})

	return &video.PublishVideoResponse{
		StatusCode: common.CodeSuccess,
		StatusMsg:  common.MapErrMsg(common.CodeSuccess),
	}, nil

1.2. 构造MQ结构体,核心包括Topic,GroupId,Producer,Consumer

  • 上面的VideoMQInstance 是*VideoMQ的类型,实际上就是包括Topic,GroupId,Producer,Consumer这几个成员的结构体
    在这里插入图片描述

1.3 对MQ结构体进行初始化

  • 对上面这个结构体VideoMQInstance中的几个成员进行初始化
func InitVideoKafka() {
	VideoMQInstance = &VideoMQ{
		MQ{
			Topic:   "videos",
			GroupId: "video_group",
		},
	}

	// 创建 Video 业务的生产者和消费者实例
	VideoMQInstance.Producer = kafkaManager.NewProducer(VideoMQInstance.Topic)
	VideoMQInstance.Consumer = kafkaManager.NewConsumer(VideoMQInstance.Topic, VideoMQInstance.GroupId)

	go VideoMQInstance.Consume()
}

Topic、GroupId都很简单,赋一个string的字符串就好了,关键在Producer和Consumer需要一步步创建

1.4 Producer和Consumer的创建流程

先看代码:

type Manager struct {
	Brokers []string
}

var kafkaManager *Manager

func (m *Manager) NewProducer(topic string) *kafka.Writer {
	return &kafka.Writer{
		Addr:                   kafka.TCP(m.Brokers...),
		Topic:                  topic,
		Balancer:               &kafka.Hash{}, // 使用Hash算法按照key将消息均匀分布到不同的partition上
		WriteTimeout:           1 * time.Second,
		RequiredAcks:           kafka.RequireAll, // 需要确保Leader和所有Follower都写入成功才可以发送下一条消息, 确保消息成功写入, 不丢失
		AllowAutoTopicCreation: true,             // Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目
	}
}

func (m *Manager) NewConsumer(topic, groupId string) *kafka.Reader {
	// TODO reader 优雅关闭
	return kafka.NewReader(kafka.ReaderConfig{
		Brokers: m.Brokers,
		Topic:   topic,
		GroupID: groupId,
		// CommitInterval: 1 * time.Second, // 不配置此项, 默认每次读取都会自动提交offset
		StartOffset: kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效
	})
}

可以看到,Producer实际上就是kafka.Writer,consumer实际上就是kafka.Reader,其中writer肯定需要绑定Topic,而reader肯定需要Topic和GroupId,去消费这些消息。

1.5 创建Kafaka的manager

  • 发现上述创建Producer和Consumer的代码都是Manager的成员方法,Manager是什么呢?
  • 是Manager的成员方法说明肯定是使用Manager这个结构体去创建Producer和Consumer,而Manager核心包含的就是Brokers(存的是broker的url地址)
type Manager struct {
	Brokers []string
}

var kafkaManager *Manager

type MQ struct {
	Topic    string
	GroupId  string
	Producer *kafka.Writer
	Consumer *kafka.Reader
}

func Init(appConfig *config.AppConfig) (err error) {
	var conf *config.KafkaConfig
	if appConfig.Mode == config.LocalMode {
		conf = appConfig.Local.KafkaConfig
	} else {
		conf = appConfig.Remote.KafkaConfig
	}
	brokerUrl := conf.Address + ":" + strconv.Itoa(conf.Port)
	// 初始化 Kafka Manager
	brokers := []string{brokerUrl}
	kafkaManager = NewKafkaManager(brokers)

	//InitMessageKafka()
	//InitCommentKafka()
	//InitVideoKafka()

	return nil
}

func NewKafkaManager(brokers []string) *Manager {
	return &Manager{
		Brokers: brokers,
	}
}

1.6 VideoMQ 它有个成员方法是Produce(和最早的1.1调用对应)

// Produce 发布将本地视频上传到OSS的消息
func (m *VideoMQ) Produce(message *VideoMessage) {
	err := kafkaManager.ProduceMessage(m.Producer, message)
	if err != nil {
		log.Println("kafka发送添加视频的消息失败:", err)
		return
	}
}

Produce其中又调用了ProduceMessage方法,方法具体内容如下,就是将通过producer将要发送的消息序列化后发送出去

// ProduceMessage 向 Kafka 写入消息的公共函数, 由于不同业务的消息格式不同, 所以使用 interface{} 代替
func (m *Manager) ProduceMessage(producer *kafka.Writer, message interface{}) error {
	messageBytes, err := json.Marshal(message)
	if err != nil {
		return err
	}
	return producer.WriteMessages(context.Background(), kafka.Message{
		Value: messageBytes,
	})
}

2. kafka消费消息

2.1 开启消费goroutine

kafka消费消息的代码之前在initMQ的时候就已经开启一个goroutine开始消费,只要有消息对应上topic就可以消费

func InitVideoKafka() {
	VideoMQInstance = &VideoMQ{
		MQ{
			Topic:   "videos",
			GroupId: "video_group",
		},
	}

	// 创建 Video 业务的生产者和消费者实例
	VideoMQInstance.Producer = kafkaManager.NewProducer(VideoMQInstance.Topic)
	VideoMQInstance.Consumer = kafkaManager.NewConsumer(VideoMQInstance.Topic, VideoMQInstance.GroupId)

	go VideoMQInstance.Consume()
}

2.2 消费的具体逻辑举例:执行一个上传视频到oss的函数

步骤:

  1. Consumer.ReadMessage 先拿到序列化的消息msg,并反序列化为最初的结构体
  2. 现在拿到了msg,利用里面的内容,开启goroutine执行相关函数
  3. 开启一个goroutine:比如拿到msg中: video的url,和name。那现在就可以调用oss的函数,将指定url地址中name为name的视频上传到oss。上传完成之后,还可以将最开始传来的msg(包含video的消息)的内容上传到mysql
  4. 再开启一个goroutine:将视频上传到redis
  5. 再开启一个goroutine:删除用户哈希字段
  6. 再开启一个goroutine:将视频id加入到布隆过滤器中
    上面开的那么多goroutine都是互相不影响的,没有先后执行的需要,因此可以分别开启
// Consume 消费将本地视频上传到OSS的消息
func (m *VideoMQ) Consume() {
	for {
		msg, err := m.Consumer.ReadMessage(context.Background())
		if err != nil {
			log.Fatal("[VideoMQ]从消息队列中读取消息失败:", err)
		}
		videoMsg := new(VideoMessage)
		err = json.Unmarshal(msg.Value, videoMsg)
		if err != nil {
			log.Println("[VideoMQ]解析消息失败:", err)
			return
		}
		go func() {
			defer func() {
				os.Remove(videoMsg.VideoPath)
			}()
			zap.L().Info("开始处理视频消息", zap.Any("videoMsg", videoMsg))
			// 视频存储到oss
			if err = common.UploadToOSS(videoMsg.VideoPath, videoMsg.VideoFileName); err != nil {
				zap.L().Error("上传视频到OSS失败", zap.Error(err))
				return
			}

			// 利用oss功能获取封面图
			imgName, err := common.GetVideoCover(videoMsg.VideoFileName)
			if err != nil {
				zap.L().Error("图片截帧失败", zap.Error(err))
				return
			}

			// 视频信息存储到MySQL
			video := model.Video{
				AuthorId:  videoMsg.UserID,
				VideoUrl:  videoMsg.VideoFileName,
				CoverUrl:  imgName,
				Title:     videoMsg.Title,
				CreatedAt: time.Now().Unix(),
			}
			mysql.InsertVideo(&video)
			var wg sync.WaitGroup
			wg.Add(3)
			go func() {
				defer wg.Done()
				redis.AddVideo(&video)
			}()
			go func() {
				defer wg.Done()
				// cache aside
				redis.DelUserHashField(videoMsg.UserID, redis.WorkCountField)
			}()
			go func() {
				defer wg.Done()
				// 添加到布隆过滤器
				common.AddToWorkCountBloom(fmt.Sprintf("%d", videoMsg.UserID))
			}()
			wg.Wait()

			zap.L().Info("视频消息处理成功", zap.Any("videoMsg", videoMsg))
		}()
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1071230.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu 22.04 安装系统 手动分区 针对只有一块硬盘 lvm 单独分出/home

自动安装的信息 参考自动安装时产生的分区信息 rootyeqiang-MS-7B23:~# fdisk /dev/sdb -l Disk /dev/sdb:894.25 GiB,960197124096 字节,1875385008 个扇区 Disk model: INTEL SSDSC2KB96 单元:扇区 / 1 * 512 512 字节 扇区大…

基于Springboot实现论坛管理系统项目演示【项目源码+论文说明】分享

基于Springboot实现论坛管理系统演示 摘要 在社会快速发展的影响下,论坛管理系统继续发展,使论坛管理系统的管理和运营比过去十年更加信息化。依照这一现实为基础,设计一个快捷而又方便的网上论坛管理系统是一项十分重要并且有价值的事情。对…

排序(order by)

MySQL从小白到总裁完整教程目录:https://blog.csdn.net/weixin_67859959/article/details/129334507?spm1001.2014.3001.5502 语法格式: select */列名 from 表名 order by 列名1 asc/desc, 列名2 asc/desc; 说明: 排序的目的:改变查询结果的返回顺序…

学习笔记(css穿透、vue-cookie、拦截器、vuex、导航守卫、token/Cookie、正则校验)

目录 一、记录 1、CSS穿透 2、输入框是否提示输入 3、插槽 #slot 4、v-deep深入改掉属性值 二、vue-cookie 1、官方文档 2、使用 三、拦截器 1、请求拦截器 2、响应拦截器 四、vuex对信息存取改 五、路由导航守卫 1、登录思路 2、设置白名单 六、Token与Cookie…

vue3 集成 tailwindcss

tailwindcss 介绍 Tailwind CSS 是一个流行的前端框架,用于构建现代、响应式的网页和 Web 应用程序。它的设计理念是提供一组可复用的简单、低级别的 CSS 类,这些类可以直接应用到 HTML 元素上,从而加速开发过程并提高样式一致性。 主要特点…

【数据结构与算法】二叉树的实现以及二叉排序数的实现

目录 通过数组实现二叉树 通过链表实现二叉树 排序二叉树的实现 通过数组实现二叉树 该实现方式只能用于完全二叉树,因为如果是普通二叉数的话,数组中会出现空隙,会导致空间的利用率会降低。 实现思路: 因为假设一个父节点的…

原码反码补码移码的介绍和计算

1.原码 原码的定义:十进制数据的二进制表示形式就是原码。 (1)原码的最左边那位是符号位,其他位为数据位,符号位是0则为正数,符号位是1则为负数。 (2)一个byte有8bit,最…

Node-RED系列教程-25node-red获取天气

安装节点:node-red-contrib-weather 节点图标如下: 使用说明:node-red-contrib-weather (node) - Node-RED 流程图中填写经度和纬度即可。 演示: json内容: {

jmeter 请求发送加密参数

最近在做http加密接口,请求头的uid参数及body的请求json参数都经过加密再发送请求,加密方式为:ase256。所以,jmeter发送请求前也需要对uid及json参数进行加密。我这里是让开发写了个加密、解密的jar,jmeter直接调用这个…

CRM系统如何自动分配线索

分配线索是销售部门很重要的一项工作,大量的线索中潜藏着许多企业未来的忠实客户。如果将大把的线索通过手工的方式分配给多个销售人员是一件棘手的事,就要借助CRM系统自动分配线索。 你的企业是否也面临这些难题: 1.渠道多线索多&#xff…

点击、拖拉拽开发可视化大屏,网友直呼不可思议

可视化大屏既足够炫酷,又能快速整合多业务系统数据,可视化分析数据,是一种可运用于博览中心、会议中心、监控中心、企业大屏看板等场景的常用数据可视化分析形式。但可视化大屏虽然好用,在开发制作上却难倒了不少人,直…

汇编实现点灯实验

.text .global _start _start: 设置GPIOF寄存器的时钟使能LDR R0,0X50000A28LDR R1,[R0]ORR R1,R1,#(0x1<<5)STR R1,[R0]设置GPIOE寄存器的时钟使能LDR R0,0X50000A28LDR R1,[R0] 从r0为起始地址的4字节数据取出放在R1ORR R1,R1,#(0x1<<4) 第4位设置为1STR R1,[…

vue3 + typescript + vite + naive ui + tailwindcss + jsx 仿苹果桌面系统

基于 vue3.x typescript vite naive ui tailwindcss jsx vue-router pinia&#xff0c;项目使用 tsx 作为模版输出&#xff0c;全程没有使用vue提供的SFC&#xff0c; 仿macos桌面前端项目&#xff0c;开源免费模版&#xff0c;希望减少工作量和学习新技术&#xff0c;希…

javascript制作简单的富文本,基本功能都实现,除了上传图片只能用URL

//所有的图标用的字符&#xff0c;以后可以换成网上的css-icon图标库的图标&#xff0c;再设置一下css样式即可简单的使用 //这里所有的标签元素都是直接获取&#xff0c;没有使用委托&#xff0c;如果使用委托性能会更好&#xff0c;这里只做了简单的清理&#xff0c;让内存回…

操作系统对内存的管理:分配与回收,虚拟内存,内存容量的扩充,内存保护,补充(链接方式、装入方式)

内存&#xff1a;即内存条&#xff0c;也称主存储器&#xff08;简称主存&#xff09;&#xff0c;用于存放数据。 为了缓和CPU和外存&#xff08;磁盘&#xff09;的速度矛盾&#xff0c;外存的程序先放入内存才能被CPU处理。 内存地址从0开始&#xff0c;每个内存地址对应一…

以太网基础学习(三)——UDP协议

一、UDP协议概述 UDP&#xff08;User Datagram Protocol&#xff0c;用户数据报协议&#xff09;是一种无连接协议&#xff0c;它不像TCP协议那样需要在发送和接收数据前进行握手和释放&#xff0c;而是直接把数据发送出去&#xff0c;也不会对数据进行可靠传输和流量…

ARM_汇编流水灯

ARM_汇编流水灯 .text .global _start _start: 设置GPIOE寄存器的时钟使能ldr r0,0x50000A28ldr r1,[r0] 从r0为起始地址的4字节数据取出存入r1orr r1,r1,#(0x01<<4) 第4位设置为1 表示开启时钟使能orr r1,r1,#(0x01<<5) 第5位设置为1 表示开启时钟使能str r1…

求推荐好用的可视化大屏软件?强推奥威BI

在博览中心、会议中心、监控中心等场合下&#xff0c;经常看到很多炫酷的企业可视化大屏&#xff0c;将复杂的企业数据可视化展现&#xff0c;高大上、实用性一个不缺。那&#xff0c;可视化大屏做得好的软件有哪些&#xff1f;首推奥威BI软件。 奥威BI软件&#xff1a;零编程…

ST表(RMQ问题)

ST表能够O(1)地解决区间[l,r]之间最值问题 1.建表&#xff0c;首先明白ST[i][j]&#xff0c;表示的是区间[i, i(1<<j)-1]的最值&#xff0c;区间大小为2^j。首先初始化ST[i][0]a[i]。 void init&#xff08;&#xff09;{for(int i1; i<n; i){ST[i][0]a[i];} } 因为…

如何配置防火墙?看这篇就够了

大家好&#xff0c;我是老杨。 在互联网时代&#xff0c;网络安全的问题不用多说了&#xff0c;配置防火墙是非常必要的。 在网络的世界里&#xff0c;要由防火墙过滤的就是承载通信数据的通信包。 防火墙是位于内部网和外部网之间的屏障&#xff0c;也是系统的第一道防线。…