今日已办
50字项目价值和重难点
项目价值
通过将指标监控组件接入项目,对比包括其配套工具在功能、性能上的差异、优劣,给出监控服务瘦身的建议
top3难点
- 减少监控服务资源成本,考虑性能优化
- 如何证明我们在监控服务差异、优劣方面的断言
- 监控服务无感化,支持代码可扩展
总监回复
小而美的监控服务
怎么为之小? 怎么为之美? 要小要美的关键点是什么?是你们对于这个服务的核心的把握,最内核的把握,是如何做好减法。多看点做减法的文章,难的地方是,如何先想厚再做薄。
Watermill
Replace sarama.SyncProducer
-> kafka-client.Producer
-
测试发现 WriteKafka 的 Span 耗时较久,Watermill-kafka 的 Publisher 的 Producer 是
sarama.SyncProducer
,同步写入耗时较久,而sarama.ASyncProducer
的 API 不够直观,需求也需要我们更改底层库为 kafka-go -
对比原先写入kafka,发现 kafka-client(依赖 kafka-go) 的 Producer 可以支持同步or异步消息写入,故修改 Publisher 的实现
-
profile/internal/watermill/watermillkafka/marshaler.go
package watermillkafka import ( "github.com/Shopify/sarama" "github.com/ThreeDotsLabs/watermill/message" "github.com/pkg/errors" "github.com/segmentio/kafka-go" ) const UUIDHeaderKey = "_watermill_message_uuid" const HeaderKey = "_key" // Marshaler marshals Watermill's message to Kafka message. type Marshaler interface { Marshal(topic string, msg *message.Message) (*kafka.Message, error) } // Unmarshaler unmarshals Kafka's message to Watermill's message. type Unmarshaler interface { Unmarshal(*sarama.ConsumerMessage) (*message.Message, error) } type MarshalerUnmarshaler interface { Marshaler Unmarshaler } type DefaultMarshaler struct{} func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*kafka.Message, error) { if value := msg.Metadata.Get(UUIDHeaderKey); value != "" { return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey) } headers := []kafka.Header{{ Key: UUIDHeaderKey, Value: []byte(msg.UUID), }} var msgKey string for key, value := range msg.Metadata { if key == HeaderKey { msgKey = value } else { headers = append(headers, kafka.Header{ Key: key, Value: []byte(value), }) } } return &kafka.Message{ Topic: topic, Key: []byte(msgKey), Value: msg.Payload, Headers: headers, }, nil } func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error) { var messageID string metadata := make(message.Metadata, len(kafkaMsg.Headers)) for _, header := range kafkaMsg.Headers { if string(header.Key) == UUIDHeaderKey { messageID = string(header.Value) } else { metadata.Set(string(header.Key), string(header.Value)) } } msg := message.NewMessage(messageID, kafkaMsg.Value) msg.Metadata = metadata return msg, nil }
-
profile/internal/watermill/watermillkafka/publisher.go
package watermillkafka
import (
"context"
kc "github.com/Kevinello/kafka-client"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"go.uber.org/zap"
"profile/internal/connector"
"profile/internal/log"
"profile/internal/watermill/model"
)
type Publisher struct {
config PublisherConfig
producer *kc.Producer
logger watermill.LoggerAdapter
closed bool
}
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
config PublisherConfig,
logger watermill.LoggerAdapter,
) (*Publisher, error) {
if err := config.Validate(); err != nil {
return nil, err
}
if logger == nil {
logger = watermill.NopLogger{}
}
producer, err := kc.NewProducer(context.Background(), config.KcProducerConfig)
if err != nil {
return nil, errors.Wrap(err, "cannot create Kafka producer")
}
return &Publisher{
config: config,
producer: producer,
logger: logger,
}, nil
}
type PublisherConfig struct {
// Kafka brokers list.
Brokers []string
// Marshaler is used to marshal messages from Watermill format into Kafka format.
Marshaler Marshaler
// KcProducerConfig configuration object used to create new instances of Producer
KcProducerConfig kc.ProducerConfig
}
func (c PublisherConfig) Validate() error {
if len(c.Brokers) == 0 {
return errors.New("missing brokers")
}
if c.Marshaler == nil {
return errors.New("missing marshaler")
}
return c.KcProducerConfig.Validate()
}
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
if p.closed {
return errors.New("publisher closed")
}
logFields := make(watermill.LogFields, 2)
logFields["topic"] = topic
for _, msg := range msgs {
logFields["message_uuid"] = msg.UUID
p.logger.Trace("Sending message to Kafka", logFields)
kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg)
if err != nil {
return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
}
data := msg.Context().Value("data").(*model.ConsumeCtxData)
err = p.producer.WriteMessages(msg.Context(), *kafkaMsg)
if err != nil {
log.Logger.ErrorContext(msg.Context(), "send message to kafka error", zap.Error(err))
data.WriteKafkaSpan.End()
data.RootSpan.End()
return errors.Wrapf(err, "cannot produce message %s", msg.UUID)
}
data.WriteKafkaSpan.End()
log.Logger.Info("[WriteKafka] write kafka success",
zap.String("topic", connector.GetTopic(data.Event.Category)),
zap.String("id", data.Event.ID), zap.Any("msg", data.Event),
zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
p.logger.Trace("Message sent to Kafka", logFields)
data.RootSpan.End()
}
return nil
}
func (p *Publisher) Close() error {
if p.closed {
return nil
}
p.closed = true
if err := p.producer.Close(); err != nil {
return errors.Wrap(err, "cannot close Kafka producer")
}
return nil
}
- 测试发现性能对比先前修改有了大量提升,(还测试了不同环境-docker/本地,不同配置-同步/异步的区别),docker环境,开启异步是效率最高的
明日待办
- benchmark:watermill 和 baserunner