今日已办
Venus 的 Trace 无感化
定义 handler
函数
fiber.Handler
的主要处理逻辑- 返回处理中出现的
error
- 返回处理中响应
json
的函数
// handler
// @Description:
// @Author xzx 2023-08-26 18:00:03
// @Param c
// @Return error
// @Return func() error : function for response json
type handler func(c *fiber.Ctx) (error, func() error)
定义TraceWrapper
函数
-
将
otel-trace
的逻辑嵌入 handler 中 -
启动 span
-
执行 handler,记录 span 的 attributes
-
根据返回的 err, jsonRespFunc 分情况讨论
-
条件 处理逻辑 err != nil && jsonRespFunc == nil 存在错误,将错误记录到Span中,结束Span,执行c.Next() err != nil && jsonRespFunc != nil 存在错误,将错误记录到Span中,结束Span,响应JSON err == nil && jsonRespFunc == nil 结束Span,执行c.Next() err == nil && jsonRespFunc != nil 结束Span,响应JSON -
代码实现
// TraceWrapper return fiber.Handler integrate report otel-trace
// @Description integrate otel-trace logic in handler
// @Author xzx 2023-08-26 18:00:03
// @Param f
// @Param opts
// @Return fiber.Handler
func TraceWrapper(f handler, opts ...trace.SpanStartOption) fiber.Handler {
return func(c *fiber.Ctx) error {
_, span := otelclient.Tracer.Start(c.UserContext(), runtime.GetFunctionName(f), opts...)
// execute handler logic
err, jsonRespFunc := f(c)
// todo: setSpanAttributes
if err != nil {
// record span error
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
span.End()
if jsonRespFunc != nil {
// response error result
return jsonRespFunc()
}
// ignore error, continue handlers
return c.Next()
}
span.End()
if jsonRespFunc != nil {
// response success result
return jsonRespFunc()
}
// err == nil, jsonRespFunc == nil
return c.Next()
}
}
具体的 handler 逻辑
// SplitAndValidate split multi-events and validate uploaded data
// @Description
// @Author xzx 2023-08-26 17:54:21
// @Param c
// @Return Err
// @Return f
func SplitAndValidate(c *fiber.Ctx) (Err error, f func() error) {
log.Logger().Debug("split and validate", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
attribute.String("type", "all_upload"),
)))
RequestIdBaggage, err := baggage.NewMember("request_id", c.GetRespHeader(fiber.HeaderXRequestID))
if err != nil {
log.Logger().Error("Create Baggage Member Failed", zap.Error(err))
}
Baggage, err := baggage.New(RequestIdBaggage)
if err != nil {
log.Logger().Error("Create Baggage Failed", zap.Error(err))
}
c.SetUserContext(baggage.ContextWithBaggage(c.UserContext(), Baggage))
c.Accepts(fiber.MIMEMultipartForm)
uploadedData, err := parseJSON(c)
if err != nil {
log.Logger().Error("failed to parse json", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
c.Status(fiber.StatusBadRequest)
return err, func() error {
return c.JSON(protocol.Response{
Code: protocol.AllFail,
Data: err.Error(),
})
}
}
if err = uploadedData.Meta.Validate(); err != nil {
log.Logger().Error("failed to validate meta", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
attribute.String("type", "wrong_meta"),
)))
c.Status(fiber.StatusBadRequest)
return err, func() error {
return c.JSON(protocol.Response{
Code: protocol.AllFail,
Data: err.Error(),
})
}
}
// must use pointer when using Locals of fiber if it's about to modify
events := make([]*schema.Event, 0, len(uploadedData.Data))
parts := make([]*protocol.Part, 0, len(uploadedData.Data))
for idx, data := range uploadedData.Data {
log.Logger().Debug("split event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))
event := &schema.Event{}
part := &protocol.Part{}
if err = data.Validate(); err != nil {
Err = err
otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
attribute.String("type", "wrong_data"),
)))
part.Code = protocol.ValidationError
part.Data = err.Error()
}
part.ID = data.ID
event.Meta = uploadedData.Meta
event.Data = data
events = append(events, event)
parts = append(parts, part)
}
c.Locals("meta", uploadedData.Meta)
c.Locals("events", events)
c.Locals("parts", parts)
return Err, nil
}
// HandleEvent handle event
// @Description
// @Author xzx 2023-08-26 17:54:23
// @Param c
// @Return Err
// @Return f
func HandleEvent(c *fiber.Ctx) (Err error, f func() error) {
log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
events := c.Locals("events").([]*schema.Event)
parts := c.Locals("parts").([]*protocol.Part)
for idx, event := range events {
log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))
event.BackendID = uuid.NewString()
event.UploadTime = time.Now().UnixMilli()
part := parts[idx]
part.BackendID = event.BackendID
country, region, city, err := ip.ParseIP(event.IP)
if err != nil {
Err = err
log.Logger().Warn("failed to parse ip", zap.Error(err), zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())))
part.Code = protocol.IPParsingError
part.Data = err.Error()
}
log.Logger().Debug("parsed ip", zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())), zap.String("country", country), zap.String("region", region), zap.String("city", city))
event.Country = country
event.Region = region
event.City = city
}
return Err, nil
}
// WriteKafka write to kafka
// @Description
// @Author xzx 2023-08-26 17:54:26
// @Param c
// @Return Err
// @Return f
func WriteKafka(c *fiber.Ctx) (Err error, f func() error) {
log.Logger().Debug("write kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
meta := c.Locals("meta").(schema.Meta)
events := c.Locals("events").([]*schema.Event)
parts := c.Locals("parts").([]*protocol.Part)
// mark if all parts are succeed, which is response for code
isAllSuccess := true
// topic is like to_analyzer__0.PERF_CRASH
topic := fmt.Sprintf("to_analyzer__0.%s", meta.Category)
traceparent := c.Get(traceparentHeaderKey)
if len(traceparent) == 55 {
spanId := trace.SpanFromContext(c.UserContext()).SpanContext().SpanID().String()
traceparent = traceparent[:36] + spanId + traceparent[52:]
}
messages := make([]kafka.Message, 0, len(events))
for idx, event := range events {
// skip if event was failed
if parts[idx].Code != 0 {
isAllSuccess = false
continue
}
bytes, err := sonic.Marshal(event)
if err != nil {
Err = err
log.Logger().Error("failed to marshal event", zap.Error(err), zap.Any("event", event), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
parts[idx].Code = protocol.SerializationError
parts[idx].Data = err.Error()
}
messages = append(messages, kafka.Message{
Topic: topic,
Value: bytes,
Headers: []kafka.Header{
{Key: traceparentHeaderKey, Value: []byte(traceparent)},
},
})
}
if len(messages) == 0 { // would not write to kafka since every part were failed for some reason
log.Logger().Warn("every data were failed to handle, would not write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
c.Status(fiber.StatusBadRequest)
return errors.New("every data were failed to handle, check their code and data"), func() error {
return c.JSON(protocol.Response{
Code: protocol.AllFail,
Data: "every data were failed to handle, check their code and data",
Parts: parts,
})
}
}
log.Logger().Info("would write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
kafkaProducer := connector.GetEventKafka()
if err := kafkaProducer.WriteMessages(context.Background(), messages...); err != nil {
log.Logger().Error("failed to write to kafka", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Any("messages", messages))
c.Status(fiber.StatusInternalServerError)
return err, func() error {
return c.JSON(protocol.Response{
Code: protocol.AllFail,
Data: fmt.Sprintf("failed to write to kafka: %s", err.Error()),
Parts: parts,
})
}
}
if isAllSuccess {
c.Status(fiber.StatusOK)
otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
attribute.String("type", "success_upload"))))
return nil, func() error {
return c.JSON(protocol.Response{
Code: protocol.AllSuccess,
Parts: parts,
})
}
} else {
c.Status(fiber.StatusPartialContent)
return Err, func() error {
return c.JSON(protocol.Response{
Code: protocol.PartialSuccess,
Data: "some data were failed to handle, check their code and data",
Parts: parts,
})
}
}
}
同步会议
进度
- 完成了 venus、profile 逻辑的 otel-trace 接入 handler 无感化,提高代码可扩展性
- otel上报的开关,已经 review 完合并了
- 部署了jaeger的整套方案
- 关于 watermill 和 baserunner 的 banchmark
Review和测试方案
- 代码 review
- fiber.Handler 接入 otel 无感化,代码可扩展性
- 修复部分命名规范、注释规范和代码质量检查中指出的问题
- 移除 profile-compose 的 grafana 容器
- log 的初始化
- 移除 venus-compose 的无用配置
- 测试方案
- 接入 ck 集群,benchmark 进行 otel-sdk 上报,压测 otel-collector 和 ck 集群
- 【卡点】对 collector 压测暂时对 ck 没有造成很大压力
- 部署了 jaeger 的整套方案,使用 es 存储 trace、Prometheus 存储 metrics
- 【差异】jaeger 的 trace 可视化出来,使用 span_references - followsform 会展示为父子关系的 span
- es 已购买集群,等待接入
- 【方案】对 jaeger 进行压测,找到 jaeger 出现问题的 qps,要该 qps 来测试 a、b两组的方案
- 【方法】在压测的过程用 pprof 来抓取 venus 和 profile 的 cpu、内存的使用情况
- 对于 ck 集群的指标
- 【插入】ck 的 写入耗时,1/5分钟的写入成功率
- 【查询】ck 的 查询耗时,查询成功率
- 【资源利用率】ck 的 cpu、内存利用率
- 【问题】如何证明我们在监控服务差异、优劣方面的断言,具体的测试流程、测试对象、测试指标对比等
- 接入 ck 集群,benchmark 进行 otel-sdk 上报,压测 otel-collector 和 ck 集群
总结
- 对比上报相同的 metrics 测试,收集器(jaeger、otel)collector(容器)的差异,展示otel的优势,cpu、内存的,监控 collector 的差异,cadvisor
- 相同上报比如 trace,对比 es、ck 的存储大小对比,kibana
- 简单拓展说明:otel-log,故障的trace的地方,具体问题的原因
- 查询性能方面:性能测试-响应耗时,用 signoz,jaeger 的 web-url 来压测-主要使用Web,控制变量(同一个trace和span,清空缓存,主要 tcp 压测),
- 扩展:watermill/baserunner 的对比,高并发场景下比较优秀
- 扩展:benchmark 的 hyperscan 和官方正则处理的对比
数据记录、控制变量
cadvisor
services:
cadvisor:
image: gcr.io/cadvisor/cadvisor:latest
container_name: cadvisor
networks:
- backend
command: --url_base_prefix=/cadvisor
volumes:
- /:/rootfs:ro
- /var/run:/var/run:rw
- /sys:/sys:ro
- /var/lib/docker/:/var/lib/docker:ro
- /dev/disk/:/dev/disk:ro
jaeger-all-in-one 分开部署
为了收集 jaeger-collector 的指标,把 jaeger-all-in-one 分开部署
services:
jaeger-collector:
image: jaegertracing/jaeger-collector
container_name: jaeger-collector
networks:
- backend
command: [
"--es.server-urls=http://elasticsearch:9200",
"--es.num-shards=1",
"--es.num-replicas=0",
"--log-level=info"
]
environment:
- SPAN_STORAGE_TYPE=elasticsearch
- METRICS_STORAGE_TYPE=prometheus
- PROMETHEUS_SERVER_URL=http://prometheus:9090
depends_on:
- elasticsearch
jaeger-query:
image: jaegertracing/jaeger-query
container_name: jaeger-query
networks:
- backend
command: [
"--es.server-urls=http://elasticsearch:9200",
"--span-storage.type=elasticsearch",
"--log-level=info"
]
environment:
- SPAN_STORAGE_TYPE=elasticsearch
- METRICS_STORAGE_TYPE=prometheus
- PROMETHEUS_SERVER_URL=http://prometheus:9090
- no_proxy=localhost
- QUERY_BASE_PATH=/jaeger
depends_on:
- jaeger-agent
jaeger-agent:
image: jaegertracing/jaeger-agent
container_name: jaeger-agent
networks:
- backend
command: [ "--reporter.grpc.host-port=jaeger-collector:14250" ]
environment:
- SPAN_STORAGE_TYPE=elasticsearch
- METRICS_STORAGE_TYPE=prometheus
- PROMETHEUS_SERVER_URL=http://prometheus:9090
depends_on:
- jaeger-collector
明日待办
- 压测