今日已办
watermill
将 key 设置到 message 中
修改 watermill-kafka
源码 将 key 设置到 message.metadata中
接入 otel-sdk
- 添加 middleware
resolveUpstreamCtx
解析上游上下文,开启根Span - 添加 middleware
middleware.InstantAck
- 马上ACK,使得多条消息可以平行处理(走middleware 和 handler 的逻辑)
// Package pubsub
// @Author xzx 2023/8/12 10:01:00
package pubsub
import (
"context"
"encoding/json"
"github.com/ThreeDotsLabs/watermill/message"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"profile/internal/connector"
"profile/internal/log"
"profile/internal/otelclient"
"profile/internal/schema"
"profile/internal/schema/performance"
"profile/internal/state"
"profile/internal/watermill/watermillkafka"
)
// consumeCtxData
// @Description: Data collection of a message processing context
// @Author xzx 2023-08-17 13:36:12
type consumeCtxData struct {
Status int
Event schema.Event
RootSpan trace.Span
RootSpanCtx context.Context
AppID string // API 上报
FetchScenario string // API 上报
}
// resolveUpstreamCtx
// @Description
// @Author xzx 2023-08-18 11:15:09
// @Param h
// @Return message.HandlerFunc
func resolveUpstreamCtx(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
var data consumeCtxData
// get upstream producer's W3C trace context via propagation
headerCarrier := make(propagation.HeaderCarrier)
headerCarrier.Set("Traceparent", msg.Metadata.Get("Traceparent"))
upstreamProducerCtx := otel.GetTextMapPropagator().Extract(msg.Context(), headerCarrier)
// set traceID to consumer context
consumerCtx := trace.ContextWithRemoteSpanContext(msg.Context(),
trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.SpanContextFromContext(upstreamProducerCtx).TraceID(),
}))
//start tracing
data.RootSpanCtx, data.RootSpan = otelclient.ConsumerTracer.Start(consumerCtx, "Profile-Consumer",
trace.WithSpanKind(trace.SpanKindConsumer),
trace.WithLinks(trace.LinkFromContext(upstreamProducerCtx, semconv.OpentracingRefTypeFollowsFrom)))
msg.SetContext(context.WithValue(msg.Context(), "data", &data))
return h(msg)
}
}
// unpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param hmsg.SetContext(context.WithValue(msg.Context(), "data", data))
// @Return message.HandlerFunc
func unpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
data := msg.Context().Value("data").(*consumeCtxData)
unpackKafkaMessageCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "unpackKafkaMessage",
trace.WithSpanKind(trace.SpanKindClient))
// 反序列化,存入通用结构体
if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {
data.Status = state.StatusUnmarshalError
handlerErr(unpackKafkaMessageCtx, "unmarshal error", contextErr)
span.End()
return nil, contextErr
}
log.Logger.InfoContext(unpackKafkaMessageCtx, "[UnpackKafkaItem] unpack kafka item success",
zap.Any("event", data.Event),
zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
setSpanAttributes(span, data)
msg.SetContext(context.WithValue(msg.Context(), "data", data))
span.End()
return h(msg)
}
}
// initPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func initPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
data := msg.Context().Value("data").(*consumeCtxData)
initPerformanceEventCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "initPerformanceEvent",
trace.WithSpanKind(trace.SpanKindInternal))
event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)
if contextErr != nil {
data.Status = state.StatusEventFactoryError
handlerErr(initPerformanceEventCtx, "event factory error", contextErr)
span.End()
return nil, contextErr
}
log.Logger.InfoContext(initPerformanceEventCtx, "[initPerformanceEvent] init performance event success",
zap.Any("event", event),
zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
data.Event.ProfileData = event
setSpanAttributes(span, data)
msg.SetContext(context.WithValue(msg.Context(), "data", data))
span.End()
return h(msg)
}
}
// analyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func analyzeEvent(h message.HandlerFunc) message.HandlerFunc {
return func(msg *message.Message) ([]*message.Message, error) {
data := msg.Context().Value("data").(*consumeCtxData)
analyzeEventCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "analyzeEvent",
trace.WithSpanKind(trace.SpanKindInternal))
contextErr := data.Event.ProfileData.Analyze()
if contextErr != nil {
data.Status = state.StatusAnalyzeError
handlerErr(analyzeEventCtx, "analyze event error", contextErr)
span.End()
return nil, contextErr
}
log.Logger.InfoContext(analyzeEventCtx, "[analyzeEvent] analyze performance event success",
zap.Any("event", data.Event),
zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
// clear dimensions and values
data.Event.Dimensions = nil
data.Event.Values = nil
setSpanAttributes(span, data)
msg.SetContext(context.WithValue(msg.Context(), "data", data))
span.End()
return h(msg)
}
}
// crashHandler
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func crashHandler(msg *message.Message) ([]*message.Message, error) {
data := msg.Context().Value("data").(*consumeCtxData)
writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",
trace.WithSpanKind(trace.SpanKindProducer))
toWriteBytes, contextErr := json.Marshal(data.Event)
if contextErr != nil {
data.Status = state.StatusUnmarshalError
handlerErr(writeKafkaCtx, "marshal error", contextErr)
span.End()
return nil, contextErr
}
msg = message.NewMessage(data.Event.BackendID, toWriteBytes)
msg.Metadata.Set(watermillkafka.HeaderKey, data.Event.ID)
log.Logger.Info("[4-crashHandler] write kafka ...", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))
setSpanAttributes(span, data)
span.End()
data.RootSpan.End()
return message.Messages{msg}, nil
}
// lagHandler
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func lagHandler(msg *message.Message) ([]*message.Message, error) {
data := msg.Context().Value("data").(*consumeCtxData)
writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",
trace.WithSpanKind(trace.SpanKindProducer))
toWriteBytes, contextErr := json.Marshal(data.Event)
if contextErr != nil {
data.Status = state.StatusUnmarshalError
handlerErr(writeKafkaCtx, "marshal error", contextErr)
span.End()
return nil, contextErr
}
msg = message.NewMessage(data.Event.BackendID, toWriteBytes)
log.Logger.Info("[4-lagHandler] write kafka ...", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))
setSpanAttributes(span, data)
span.End()
data.RootSpan.End()
return message.Messages{msg}, nil
}
// setSpanAttributes
// @Description setSpanAttributes
// @Author xzx 2023-08-03 23:19:17
// @Param span
// @Param profileCtx
func setSpanAttributes(span trace.Span, data *consumeCtxData) {
if span.IsRecording() {
span.SetAttributes(
attribute.String("event.category", data.Event.Category),
attribute.String("event.backend_id", data.Event.BackendID),
)
}
}
// handlerErr
// @Description
// @Author xzx 2023-07-20 15:36:46
// @Param span
// @Param ctx
// @Param msg
// @Param err
func handlerErr(ctx context.Context, msg string, err error) {
log.Logger.ErrorContext(ctx, msg, zap.Error(err))
}
会议纪要
进度
- venus 的 metrics 独立分支开发
- venus 的 trace 修复了一些bug
- 返回 error 主动调用 span.end()
- profile 的 watemill pub/sub 和 trace 上报还原原本功能
- profile 的 hyperscan 的继续调研中
待办
- 调研如何关闭otel,设置开关配置
- 找到了 sdktrace.WithSampler(sdktrace.NeverSample()) - 可以不上报 trace
- 性能benchmark,单topic、多topic的场景下,对比原profile代码和现有watermill代码性能
- 调研 watermill 的 otelEnabled 的功能,其他集成 otel 的第三方库等
- hyperscan 的 benchmark
明日待办
- 性能benchmark,单topic、多topic的场景下,对比原profile代码和现有watermill代码性能
- 在
watermill-kafka
的源码基础上对 publisher 的 写回 kafka 添加otel - trace & log
的逻辑