今日已办
进度和问题汇总
- 请求合并
- feature/venus trace
- feature/venus metric
- feature/profile-otel-baserunner-style
- bugfix/profile-logger-Sync
- feature/profile_otelclient_enable_config
- 完成otel 开关
- trace-采样
- metrice-reader
- 已经都在各自服务器运行,并接入了云clickhouse集群,开始准备测试【详细需求】
- 测试的用例,并发的数目-【用例拓展-kafka的消息积压】
- clickhouse的哪些指标,cpu、内存,耗时等
- 以什么形式来输出这个性能对比?(表格or图形)
- 指标采集的性能消耗,复杂指标查询的消耗
- 对比对象-Jaeger
- 存储后端-elasticsearch 【手动部署或者购买】
- 收集存储,查询
golang pprof
抓取文件 CPU 占用和耗时,内存-火焰图- 不同方案做对比
- ck 的指标
- **数据库的延时,(五分钟)入库成功率 **【压测】
- 通过指标或者链路耗时,定位哪个环节卡住
- 压测 jaeger 数据收集出现问题-【qps】,降低配置,突出优势
- 内存和cpu占有,profile 手动收集指标
- profile服务器3301的端口
- watermill和baserunner的benmark,做得差不多了,修改了publisher用了kafka-client的异步生产者,耗时快了很多
- 需要启动其他监控工具(zipkin,jaeger【已经接入,正在尝试连入ck】,Prometheus等来进行对比吗)
- 一个优化代码中接入otel-sdk,如何减少显式声明,提高代码的可扩展性
- profile 已经将otel逻辑嵌入到baserunner的handler中
- venus 待办
- profile-watermill 待办
分工
- 测试用例 - 1
- jaeger - 2
- pprof - 1
- 测试对比两种方案的 clickhouse 指标
- docker-compose拉低配置
watermill-benchmark
代码实现
- 先初始化 producer
- watermill 初始化并启动 router / baserunner 初始化 consumer
- 在 for 循环中同步生产完固定数量的消息【开始计时】
- 阻塞等待固定数量的消息被消费,解析,处理,异步推回 kafka 完成【结束计时】
- 本机和服务器测试单个topic的100条消息的结果见下列表格
- watermill 的性能和资源利用率均好于 baserunner
- 在核心数多的情况下,优势会更加明显
// Package consumer
// @Author xzx 2023/8/19 14:13:00
package internal
import (
"context"
"fmt"
kc "github.com/Kevinello/kafka-client"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/bytedance/sonic"
"github.com/garsue/watermillzap"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"profile/cmd"
"profile/internal/config"
baseconsumer "profile/internal/context/consumer"
"profile/internal/log"
"profile/internal/schema"
"profile/internal/watermill/consumer"
"profile/internal/watermill/watermillkafka"
"testing"
"time"
)
// BenchmarkWatermill-16 240 5631314 ns/op 3684370 B/op 37997 allocs/op
// BenchmarkWatermill-16 153 7084305 ns/op 3706966 B/op 38168 allocs/op
// BenchmarkWatermill-16 145 6917486 ns/op 3712511 B/op 38175 allocs/op
func BenchmarkWatermill(b *testing.B) {
router := newRouter()
go func() {
if err := router.Run(context.Background()); err != nil {
log.Logger.Error("router run error", zap.Error(err))
}
}()
producer := newProducer()
time.Sleep(10 * time.Millisecond)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
watermillkafka.MessageCount = 0
err := publishMessage(producer, 100)
if err != nil {
break
}
b.StartTimer()
// 阻塞等待消费完成指定数量
for {
if watermillkafka.MessageCount >= 100 && router.IsRunning() {
b.StopTimer()
log.Logger.Error("PubSub Count End", zap.Any("count", watermillkafka.MessageCount))
break
}
}
}
b.StopTimer()
router.Close()
}
// BenchmarkBaseRunner-16 12 100429542 ns/op 4959836 B/op 42119 allocs/op
// BenchmarkBaseRunner-16 10 100110220 ns/op 4946421 B/op 42132 allocs/op
// BenchmarkBaseRunner-16 10 106747810 ns/op 4942656 B/op 42107 allocs/op
func BenchmarkBaseRunner(b *testing.B) {
producer := newProducer()
myConsumer, err := kc.NewConsumer(
context.Background(),
kc.ConsumerConfig{
Bootstrap: config.Profile.GetString("kafka.bootstrap"),
GroupID: config.Profile.GetString("kafka.group"),
GetTopics: func(broker string) (topics []string, err error) {
return []string{
"to_analyzer__0.PERF_CRASH",
"to_analyzer__0.PERF_LAG",
}, nil
},
MessageHandler: cmd.ConsumerDispatchHandler,
LogLevel: int(zapcore.InfoLevel),
},
)
if err != nil {
log.Logger.Fatal("create consumer error", zap.Error(err))
return
}
go func() {
select {
case <-myConsumer.Closed():
log.Logger.Info("consumer Closed")
return
}
}()
time.Sleep(10 * time.Millisecond)
b.ResetTimer()
for i := 0; i < b.N; i++ {
b.StopTimer()
baseconsumer.ConsumeCount = 0
err := publishMessage(producer, 100)
if err != nil {
break
}
b.StartTimer()
// 阻塞等待消费完成指定数量
for {
if baseconsumer.ConsumeCount >= 100 {
log.Logger.Error("PubSub Count End", zap.Any("count", baseconsumer.ConsumeCount))
break
}
}
}
b.StopTimer()
myConsumer.Closed()
}
func publishMessage(producer *kc.Producer, nums int) (err error) {
var event = &schema.Event{
Meta: schema.Meta{
AppID: "1024",
Category: "PERF_CRASH",
Model: "xiaomi13",
DeviceID: "1b201ff9-5002-4fae-8d22-507a1c1a10b6",
Os: "ios",
OsVer: "13.1",
UserID: "28865194-fd08-480f-957d-ee9f21b32c3c",
Version: "100.24.56.7.19",
Arch: "aarch64",
SdkVer: "5.12.6",
Platform: "ios",
},
Data: schema.Data{
Time: 1688491757512,
IP: "119.147.10.203",
ID: "a4b838db-4f34-4da8-a27b-e725477ed336",
NetType: "5G",
NetOp: "CT",
BatteryLevel: 92,
PageID: "com.tencent.test.page1",
Dimensions: map[string]string{
"crashed_thread": "com.tencent.thread1",
"crash_type": "native",
"lose_data": "true",
"repeat_occur": "false",
},
Values: map[string]int64{
"memory_free": 600,
"memory_max": 1200,
"memory_total": 1600,
"remain_disk": 4000,
},
},
VenusData: schema.VenusData{
UploadTime: time.Now().UnixMilli(),
BackendID: uuid.NewString(),
Country: "China",
Region: "Guangdong",
City: "Shenzhen",
},
}
topic := fmt.Sprintf("to_analyzer__0.%s", event.Category)
messages := make([]kafka.Message, 0, nums)
for i := 0; i < nums; i++ {
event.UploadTime = time.Now().UnixMilli()
event.BackendID = uuid.NewString()
bytes, err := sonic.Marshal(event)
if err != nil {
fmt.Printf("failed to marshal event: %v\n", err)
}
messages = append(messages, kafka.Message{
Topic: topic,
Value: bytes,
})
}
if err = producer.WriteMessages(context.Background(), messages...); err != nil {
fmt.Printf("failed to write messages: %v\n", err)
}
return
}
func newProducer() *kc.Producer {
eventKafkaConfig := &kc.ProducerConfig{
Bootstrap: "127.0.0.1:9092",
Async: false,
AllowAutoTopicCreation: true,
Logger: &log.LogrLogger,
}
producer, err := kc.NewProducer(context.Background(), *eventKafkaConfig)
if err != nil {
panic("cannot connect to kafka with address 127.0.0.1:9092")
}
return producer
}
func newRouter() *message.Router {
logger := watermillzap.NewLogger(log.Logger)
publisher, subscriber := consumer.NewPubSub(logger)
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
log.Logger.Fatal("create router error", zap.Error(err))
}
router.AddPlugin(plugin.SignalsHandler)
router.AddMiddleware(
middleware.InstantAck,
middleware.Recoverer,
)
router.AddMiddleware(consumer.UnpackKafkaMessage, consumer.InitPerformanceEvent, consumer.AnalyzeEvent)
router.AddHandler("crash", "to_analyzer__0.PERF_CRASH", subscriber, "solar-dev.PERF_CRASH", publisher, consumer.CrashHandler)
router.AddHandler("lag", "to_analyzer__0.PERF_LAG", subscriber, "solar-dev.PERF_LAG", publisher, consumer.LagHandler)
return router
}
本地测试
BenchmarkWatermill-16 | BenchmarkBaseRunner-16 |
---|---|
240 563,1314 ns/op 368,4370 B/op 3,7997 allocs/op | 12 1,0042,9542 ns/op 495,9836 B/op 4,2119 allocs/op |
153 708,4305 ns/op 370,6966 B/op 3,8168 allocs/op | 10 1,0011,0220 ns/op 494,6421 B/op 4,2132 allocs/op |
145 691,7486 ns/op 371,2511 B/op 3,8175 allocs/op | 10 1,0674,7810 ns/op 1,0674,7810 B/op 4,2107 allocs/op |
服务器上测试
单个topic的100条消息
BenchmarkWatermill-4 | BenchmarkBaseRunner-4 |
---|---|
10 4339,8240 ns/op 363,0762 B/op 3,7820 allocs/op | 25 4616,7095 ns/op 315,8836 B/op 3,9902 allocs/op |
78 4065,2822 ns/op 360,0755 B/op 3,7893 allocs/op | 26 4330,6776 ns/op 317,8770 B/op 3,9880 allocs/op |
100 3549,3863 ns/op 360,5322 B/op 3,7899 allocs/op | 100 4489,2327 ns/op 316,3158 B/op 3,9775 allocs/op |
386 1427,4034 ns/op 358,7454 B/op 3,7876 allocs/op | 10000 4949,4435 ns/op 319,7664 B/op 3,9874 allocs/op |
本地测试单个topic的100条消息
test | b.n | ns/op | B/op | allocs/op |
---|---|---|---|---|
BenchmarkWatermill-16 | 153 | 7084305 | 3706966 | 38168 |
BenchmarkWatermill-16 | 145 | 6917486 | 3712511 | 38175 |
BenchmarkBaseRunner-16 | 10 | 100110220 | 4946421 | 42132 |
BenchmarkBaseRunner-16 | 10 | 106747810 | 106747810 | 42107 |
服务器测试单个topic的100条消息
test | b.n | ns/op | B/op | allocs/op |
---|---|---|---|---|
BenchmarkWatermill-4 | 78 | 40652822 | 3600755 | 37893 |
BenchmarkBaseRunner-4 | 26 | 43306776 | 3178770 | 39880 |
明日待办
- 协助部署 jaeger