kafka系统的生成,自顶向下
1. kafaka发送消息
- 1.1 是最初始外部调用kafaka的地方
- 1.6 是最初调用kafaka的函数。中间是对kafaka的构建
1.1 向Kafka发送一条发布视频的message
- 在videoHandler的发布视频逻辑中,向Kafka发送一条发布视频的mq,之后就解耦,先返回状态告知发布成功,不再等待具体执行
// 通过MQ异步处理视频的上传操作, 包括上传到OSS,截帧, 保存到MySQL, 更新redis
zap.L().Info("上传视频发送到消息队列", zap.String("videoPath", videoPath))
kafka.VideoMQInstance.Produce(&kafka.VideoMessage{
VideoPath: videoPath,
VideoFileName: videoFileName,
UserID: uint(request.GetUserId()),
Title: request.GetTitle(),
})
return &video.PublishVideoResponse{
StatusCode: common.CodeSuccess,
StatusMsg: common.MapErrMsg(common.CodeSuccess),
}, nil
1.2. 构造MQ结构体,核心包括Topic,GroupId,Producer,Consumer
- 上面的VideoMQInstance 是*VideoMQ的类型,实际上就是包括Topic,GroupId,Producer,Consumer这几个成员的结构体
1.3 对MQ结构体进行初始化
- 对上面这个结构体VideoMQInstance中的几个成员进行初始化
func InitVideoKafka() {
VideoMQInstance = &VideoMQ{
MQ{
Topic: "videos",
GroupId: "video_group",
},
}
// 创建 Video 业务的生产者和消费者实例
VideoMQInstance.Producer = kafkaManager.NewProducer(VideoMQInstance.Topic)
VideoMQInstance.Consumer = kafkaManager.NewConsumer(VideoMQInstance.Topic, VideoMQInstance.GroupId)
go VideoMQInstance.Consume()
}
Topic、GroupId都很简单,赋一个string的字符串就好了,关键在Producer和Consumer需要一步步创建
1.4 Producer和Consumer的创建流程
先看代码:
type Manager struct {
Brokers []string
}
var kafkaManager *Manager
func (m *Manager) NewProducer(topic string) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(m.Brokers...),
Topic: topic,
Balancer: &kafka.Hash{}, // 使用Hash算法按照key将消息均匀分布到不同的partition上
WriteTimeout: 1 * time.Second,
RequiredAcks: kafka.RequireAll, // 需要确保Leader和所有Follower都写入成功才可以发送下一条消息, 确保消息成功写入, 不丢失
AllowAutoTopicCreation: true, // Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目
}
}
func (m *Manager) NewConsumer(topic, groupId string) *kafka.Reader {
// TODO reader 优雅关闭
return kafka.NewReader(kafka.ReaderConfig{
Brokers: m.Brokers,
Topic: topic,
GroupID: groupId,
// CommitInterval: 1 * time.Second, // 不配置此项, 默认每次读取都会自动提交offset
StartOffset: kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效
})
}
可以看到,Producer实际上就是kafka.Writer,consumer实际上就是kafka.Reader,其中writer肯定需要绑定Topic,而reader肯定需要Topic和GroupId,去消费这些消息。
1.5 创建Kafaka的manager
- 发现上述创建Producer和Consumer的代码都是Manager的成员方法,Manager是什么呢?
- 是Manager的成员方法说明肯定是使用Manager这个结构体去创建Producer和Consumer,而Manager核心包含的就是Brokers(存的是broker的url地址)
type Manager struct {
Brokers []string
}
var kafkaManager *Manager
type MQ struct {
Topic string
GroupId string
Producer *kafka.Writer
Consumer *kafka.Reader
}
func Init(appConfig *config.AppConfig) (err error) {
var conf *config.KafkaConfig
if appConfig.Mode == config.LocalMode {
conf = appConfig.Local.KafkaConfig
} else {
conf = appConfig.Remote.KafkaConfig
}
brokerUrl := conf.Address + ":" + strconv.Itoa(conf.Port)
// 初始化 Kafka Manager
brokers := []string{brokerUrl}
kafkaManager = NewKafkaManager(brokers)
//InitMessageKafka()
//InitCommentKafka()
//InitVideoKafka()
return nil
}
func NewKafkaManager(brokers []string) *Manager {
return &Manager{
Brokers: brokers,
}
}
1.6 VideoMQ 它有个成员方法是Produce(和最早的1.1调用对应)
// Produce 发布将本地视频上传到OSS的消息
func (m *VideoMQ) Produce(message *VideoMessage) {
err := kafkaManager.ProduceMessage(m.Producer, message)
if err != nil {
log.Println("kafka发送添加视频的消息失败:", err)
return
}
}
Produce其中又调用了ProduceMessage方法,方法具体内容如下,就是将通过producer将要发送的消息序列化后发送出去
// ProduceMessage 向 Kafka 写入消息的公共函数, 由于不同业务的消息格式不同, 所以使用 interface{} 代替
func (m *Manager) ProduceMessage(producer *kafka.Writer, message interface{}) error {
messageBytes, err := json.Marshal(message)
if err != nil {
return err
}
return producer.WriteMessages(context.Background(), kafka.Message{
Value: messageBytes,
})
}
2. kafka消费消息
2.1 开启消费goroutine
kafka消费消息的代码之前在initMQ的时候就已经开启一个goroutine开始消费,只要有消息对应上topic就可以消费
func InitVideoKafka() {
VideoMQInstance = &VideoMQ{
MQ{
Topic: "videos",
GroupId: "video_group",
},
}
// 创建 Video 业务的生产者和消费者实例
VideoMQInstance.Producer = kafkaManager.NewProducer(VideoMQInstance.Topic)
VideoMQInstance.Consumer = kafkaManager.NewConsumer(VideoMQInstance.Topic, VideoMQInstance.GroupId)
go VideoMQInstance.Consume()
}
2.2 消费的具体逻辑举例:执行一个上传视频到oss的函数
步骤:
- Consumer.ReadMessage 先拿到序列化的消息msg,并反序列化为最初的结构体
- 现在拿到了msg,利用里面的内容,开启goroutine执行相关函数
- 开启一个goroutine:比如拿到msg中: video的url,和name。那现在就可以调用oss的函数,将指定url地址中name为name的视频上传到oss。上传完成之后,还可以将最开始传来的msg(包含video的消息)的内容上传到mysql
- 再开启一个goroutine:将视频上传到redis
- 再开启一个goroutine:删除用户哈希字段
- 再开启一个goroutine:将视频id加入到布隆过滤器中
上面开的那么多goroutine都是互相不影响的,没有先后执行的需要,因此可以分别开启
// Consume 消费将本地视频上传到OSS的消息
func (m *VideoMQ) Consume() {
for {
msg, err := m.Consumer.ReadMessage(context.Background())
if err != nil {
log.Fatal("[VideoMQ]从消息队列中读取消息失败:", err)
}
videoMsg := new(VideoMessage)
err = json.Unmarshal(msg.Value, videoMsg)
if err != nil {
log.Println("[VideoMQ]解析消息失败:", err)
return
}
go func() {
defer func() {
os.Remove(videoMsg.VideoPath)
}()
zap.L().Info("开始处理视频消息", zap.Any("videoMsg", videoMsg))
// 视频存储到oss
if err = common.UploadToOSS(videoMsg.VideoPath, videoMsg.VideoFileName); err != nil {
zap.L().Error("上传视频到OSS失败", zap.Error(err))
return
}
// 利用oss功能获取封面图
imgName, err := common.GetVideoCover(videoMsg.VideoFileName)
if err != nil {
zap.L().Error("图片截帧失败", zap.Error(err))
return
}
// 视频信息存储到MySQL
video := model.Video{
AuthorId: videoMsg.UserID,
VideoUrl: videoMsg.VideoFileName,
CoverUrl: imgName,
Title: videoMsg.Title,
CreatedAt: time.Now().Unix(),
}
mysql.InsertVideo(&video)
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
redis.AddVideo(&video)
}()
go func() {
defer wg.Done()
// cache aside
redis.DelUserHashField(videoMsg.UserID, redis.WorkCountField)
}()
go func() {
defer wg.Done()
// 添加到布隆过滤器
common.AddToWorkCountBloom(fmt.Sprintf("%d", videoMsg.UserID))
}()
wg.Wait()
zap.L().Info("视频消息处理成功", zap.Any("videoMsg", videoMsg))
}()
}
}