今日待办
- 使用watermill框架替代当前的base_runner框架
a. 参考官方提供的sarama kafka Pub/Sub(https://github.com/ThreeDotsLabs/watermill-kafka/)实现kafka-go(https://github.com/segmentio/kafka-go)的Pub/Sub(sarama需要cgo,会导致一些额外的镜像依赖)
b. 参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)- 调研HyperScan golang implement(https://github.com/flier/gohs/)的使用和benchmark,扩充profile事件处理handler,添加一些正则处理任务(这个会给一些示例)
Watermill
what?
- 高效处理消息流,事件驱动程序
- 用于 event soucing, rpc over messages, sagas
- pub/sub
why?
- 微服务模式,异步通信
- 减少复杂性
core?
- Publisher
- Subscriber
- Message
官方示例
结合对 kafka
的 pub/sub
的实现和对 router
、middleware
的使用尝试替换掉 baserunner
// Package watermillx
// @Author xzx 2023/8/11 18:53:00
package watermillx
import (
"fmt"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"log"
"profile/internal/config"
"profile/internal/schema"
"time"
)
// 测试字段
type WatermillContext struct {
Status int
StageErr error
Event schema.Event
AppID string // API 上报
FetchScenario string // API 上报
}
var logger = watermill.NewStdLogger(false, false)
func (ctx *WatermillContext) Init() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// equivalent of auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{config.Profile.GetString("kafka.bootstrap")},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: "test_consumer_group",
},
logger,
)
if err != nil {
panic(err)
}
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// SignalsHandler will gracefully shutdown Router when SIGTERM is received.
// You can also close the router by just calling `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Router level middleware are executed for every message sent to the router
router.AddMiddleware(
// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
middleware.CorrelationID,
// The handler function is retried if it returns an error.
// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer handles panics from handlers.
// In this case, it passes them as errors to the Retry middleware.
middleware.Recoverer,
)
// AddHandler returns a handler which can be used to add handler level middleware
// or to stop handler.
// just for debug, we are printing all messages received on `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"to_analyzer__0.PERF_CRASH",
subscriber,
printMessages,
)
// Now that all handlers are registered, we're running the Router.
// Run is blocking while the router is running.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Received message: %s\n> %s\n> metadata: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
思考:
ConsumerDispatchHandler
每次消费消息都会执行,注册完 4 个stageHandler
后再执行Run
去异步调用BastContext
的Handler
考虑在此处进行订阅者的初始化、
MiddleWare
、Handler
的注册
// Package watermillx
// @Author xzx 2023/8/11 18:53:00
package watermillx
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"log"
"profile/internal/config"
"time"
)
var logger = watermill.NewStdLogger(true, false)
func Init() {
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// equivalent of auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{config.Profile.GetString("kafka.bootstrap")},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: config.Profile.GetString("kafka.group"),
},
logger,
)
if err != nil {
panic(err)
}
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// SignalsHandler will gracefully shutdown Router when SIGTERM is received.
// You can also close the router by just calling `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Router level middleware are executed for every message sent to the router
router.AddMiddleware(
// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
middleware.CorrelationID,
// The handler function is retried if it returns an error.
// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer handles panics from handlers.
// In this case, it passes them as errors to the Retry middleware.
middleware.Recoverer,
)
// AddHandler returns a handler which can be used to add handler level middleware
// or to stop handler.
// just for debug, we are printing all messages received on `incoming_messages_topic`
handler := router.AddNoPublisherHandler(
"print_incoming_messages",
"to_analyzer__0.PERF_CRASH",
subscriber,
printMessages,
)
// Handler level middleware is only executed for a specific handler
// Such middleware can be added the same way the router level ones
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("executing handler specific middleware for ", message.UUID)
return h(message)
}
})
// Now that all handlers are registered, we're running the Router.
// Run is blocking while the router is running.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Received message: %s\n> %s\n> metadata: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
可以正常消费到消息,接着就可以自定义 Handler 来对接到 Baserunner 的功能
测试如下:
// Package watermillx
// @Author xzx 2023/8/11 18:53:00
package watermillx
import (
"context"
"encoding/json"
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
kafkago "github.com/segmentio/kafka-go"
"go.uber.org/zap"
"profile/internal/config"
"profile/internal/connector"
"profile/internal/log"
"profile/internal/schema"
"profile/internal/schema/performance"
"profile/internal/state"
"time"
)
// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {
// Properties that can be called by inherited subclasses
Status int
Ctx context.Context
Event schema.Event
AppID string // API 上报
FetchScenario string // API 上报
}
//
// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Param ctx
// @Return *ProfileContext
//
func NewProfileContext(ctx context.Context) *ProfileContext {
return &ProfileContext{
Ctx: ctx,
}
}
// Init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) Init() {
logger := watermill.NewStdLogger(true, false)
saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
// equivalent of auto.offset.reset: earliest
saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
subscriber, err := kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: []string{config.Profile.GetString("kafka.bootstrap")},
Unmarshaler: kafka.DefaultMarshaler{},
OverwriteSaramaConfig: saramaSubscriberConfig,
ConsumerGroup: config.Profile.GetString("kafka.group"),
},
logger,
)
if err != nil {
log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))
panic(err)
}
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))
panic(err)
}
router.AddPlugin(plugin.SignalsHandler)
router.AddMiddleware(
middleware.CorrelationID,
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
middleware.Recoverer,
)
router.AddNoPublisherHandler(
"print_incoming_messages",
"to_analyzer__0.PERF_CRASH",
subscriber,
profileCtx.UnpackKafkaMessage,
)
/*router.AddNoPublisherHandler(
"print_incoming_messages",
"to_analyzer__0.PERF_CRASH",
subscriber,
profileCtx.InitPerformanceEvent,
)
router.AddNoPublisherHandler(
"print_incoming_messages",
"to_analyzer__0.PERF_CRASH",
subscriber,
profileCtx.AnalyzeEvent,
)
router.AddNoPublisherHandler(
"print_incoming_messages",
"to_analyzer__0.PERF_CRASH",
subscriber,
profileCtx.WriteKafka,
)*/
if err = router.Run(context.Background()); err != nil {
log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))
panic(err)
}
}
// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-11 22:29:21
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) UnpackKafkaMessage(msg *message.Message) (contextErr error) {
// 反序列化,存入通用结构体
if contextErr = json.Unmarshal(msg.Payload, &profileCtx.Event); contextErr != nil {
profileCtx.Status = state.StatusUnmarshalError
return
}
log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))
return
}
// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-11 22:30:36
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) InitPerformanceEvent(msg *message.Message) (contextErr error) {
event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)
if contextErr != nil {
profileCtx.Status = state.StatusEventFactoryError
return
}
log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", event))
profileCtx.Event.ProfileData = event
return
}
// AnalyzeEvent
// @Description
// @Author xzx 2023-08-11 22:30:44
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) AnalyzeEvent(msg *message.Message) (contextErr error) {
contextErr = profileCtx.Event.ProfileData.Analyze()
if contextErr != nil {
profileCtx.Status = state.StatusAnalyzeError
return
}
// clear dimensions and values
profileCtx.Event.Dimensions = nil
profileCtx.Event.Values = nil
return
}
// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {
toWriteBytes, contextErr := json.Marshal(profileCtx.Event)
if contextErr != nil {
profileCtx.Status = state.StatusUnmarshalError
return
}
topic := connector.GetTopic(profileCtx.Event.Category)
contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafkago.Message{
Topic: topic,
Key: []byte(profileCtx.Event.ID),
Value: toWriteBytes,
})
if contextErr != nil {
profileCtx.Status = state.StatusWriteKafkaError
return
}
log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))
return
}
可以正常执行 Handler 的逻辑
明日待办
- 为一个主题添加多个 Handler