今日已办
Watermill
Handler
将 4 个阶段的逻辑处理定义为 Handler
测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。
参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)
Middleware
ProfileCtx实现 context.Context 接口
// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumer
import (
"context"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"go.uber.org/zap"
"profile/internal/config"
"profile/internal/log"
"profile/internal/schema"
"time"
)
// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {
// Properties that can be called by inherited subclasses
Status int
Ctx context.Context
Router *message.Router
Event schema.Event
AppID string // API 上报
FetchScenario string // API 上报
}
// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Return *ProfileContext
func NewProfileContext() *ProfileContext {
profileCtx := &ProfileContext{
Ctx: context.Background(),
}
profileCtx.init()
return profileCtx
}
// init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) init() {
logger := watermill.NewStdLogger(false, false)
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{config.Profile.GetString("kafka.bootstrap")},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: config.Profile.GetString("kafka.group"),
},
logger,
)
if err != nil {
log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))
}
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))
}
router.AddPlugin(plugin.SignalsHandler)
router.AddMiddleware(
middleware.CorrelationID,
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
middleware.Recoverer,
)
topic := "to_analyzer__0.PERF_CRASH"
router.AddNoPublisherHandler("WriteKafka", topic, subscriber, profileCtx.WriteKafka).
AddMiddleware(
profileCtx.UnpackKafkaMessage,
profileCtx.InitPerformanceEvent,
profileCtx.AnalyzeEvent,
)
profileCtx.Router = router
}
// Run
// @Description
// @Author xzx 2023-08-12 13:52:53
func (profileCtx *ProfileContext) Run() {
// router.Run contains defer cancel()
if err := profileCtx.Router.Run(profileCtx.Ctx); err != nil {
log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))
}
}
func (profileCtx *ProfileContext) Done() <-chan struct{} {
return profileCtx.Ctx.Done()
}
func (profileCtx *ProfileContext) Err() error {
return profileCtx.Ctx.Err()
}
func (profileCtx *ProfileContext) Deadline() (deadline time.Time, ok bool) {
return profileCtx.Ctx.Deadline()
}
func (profileCtx *ProfileContext) Value(key any) any {
return profileCtx.Ctx.Value(key)
}
【测试】前 3 个阶段处理为 Middleware,最后一个阶段为 Handler
// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumer
import (
"encoding/json"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/segmentio/kafka-go"
"go.uber.org/zap"
"profile/internal/connector"
"profile/internal/log"
"profile/internal/schema/performance"
"profile/internal/state"
)
// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
// 反序列化,存入通用结构体
if contextErr := json.Unmarshal(message.Payload, &profileCtx.Event); contextErr != nil {
profileCtx.Status = state.StatusUnmarshalError
return h(message)
}
log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))
message.SetContext(profileCtx)
return h(message)
}
}
// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
profileCtx = message.Context().(*ProfileContext)
event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)
if contextErr != nil {
profileCtx.Status = state.StatusEventFactoryError
return h(message)
}
log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", profileCtx.Event))
profileCtx.Event.ProfileData = event
message.SetContext(profileCtx)
return h(message)
}
}
// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
profileCtx = message.Context().(*ProfileContext)
contextErr := profileCtx.Event.ProfileData.Analyze()
if contextErr != nil {
profileCtx.Status = state.StatusAnalyzeError
return h(message)
}
log.Logger.Info("[AnalyzeEvent] analyze event success", zap.Any("event", profileCtx.Event))
// clear dimensions and values
profileCtx.Event.Dimensions = nil
profileCtx.Event.Values = nil
message.SetContext(profileCtx)
return h(message)
}
}
// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {
profileCtx = msg.Context().(*ProfileContext)
toWriteBytes, contextErr := json.Marshal(profileCtx.Event)
if contextErr != nil {
profileCtx.Status = state.StatusUnmarshalError
return
}
topic := connector.GetTopic(profileCtx.Event.Category)
contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafka.Message{
Topic: topic,
Key: []byte(profileCtx.Event.ID),
Value: toWriteBytes,
})
if contextErr != nil {
profileCtx.Status = state.StatusWriteKafkaError
return
}
log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))
return
}
可以实现正常的效果
Router
- 目前的 topic 是固定写死的,要考虑正则表达式,将不同 topic 的共有逻辑抽出为 Middleware,特定逻辑抽出为 Handler
- 消息处理似乎不是并发的
pub/sub kafka-go
-
custom pub/sub
-
Kafka Pub/Sub for the Watermill project, based on Shopify’s Sarama
-
qiulin/watermill-kafkago: Kafka Pub/Sub for the Watermill project, based on segmentio/kafka-go (github.com)
明日待办
- 组内开会
- 继续开发需求