40分钟学 Go 语言高并发:服务监控与追踪

news2024/12/13 0:41:03

服务监控与追踪

一、知识要点总览

模块核心内容技术选型难度
监控指标请求量、响应时间、错误率、资源使用Prometheus + Grafana
链路追踪分布式调用链、性能瓶颈分析Jaeger, OpenTelemetry
日志处理日志收集、分析、存储ELK Stack
告警系统告警规则、通知渠道、告警分级AlertManager

二、详细实现

1. 监控指标采集系统

让我们先看监控指标的实现:

// metrics/collector.go
package metrics

import (
    "github.com/prometheus/client_golang/prometheus"
    "github.com/prometheus/client_golang/prometheus/promauto"
    "sync"
    "time"
)

type Collector struct {
    // HTTP请求相关指标
    requestCounter   *prometheus.CounterVec
    requestDuration  *prometheus.HistogramVec
    requestInFlight  *prometheus.GaugeVec
    
    // 系统资源指标
    cpuUsage        *prometheus.GaugeVec
    memoryUsage     *prometheus.GaugeVec
    goroutineCount  prometheus.Gauge
    
    // 业务指标
    businessCounter  *prometheus.CounterVec
    queueLength     *prometheus.GaugeVec
    
    // 错误指标
    errorCounter    *prometheus.CounterVec
    
    mu sync.RWMutex
}

func NewCollector(namespace string) *Collector {
    return &Collector{
        requestCounter: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Namespace: namespace,
                Name:      "http_requests_total",
                Help:      "Total number of HTTP requests",
            },
            []string{"method", "path", "status"},
        ),
        
        requestDuration: promauto.NewHistogramVec(
            prometheus.HistogramOpts{
                Namespace: namespace,
                Name:      "http_request_duration_seconds",
                Help:      "HTTP request duration in seconds",
                Buckets:   []float64{0.1, 0.3, 0.5, 0.7, 0.9, 1.0, 1.5, 2.0},
            },
            []string{"method", "path"},
        ),
        
        requestInFlight: promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Namespace: namespace,
                Name:      "http_requests_in_flight",
                Help:      "Current number of HTTP requests being processed",
            },
            []string{"method"},
        ),
        
        cpuUsage: promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Namespace: namespace,
                Name:      "cpu_usage_percent",
                Help:      "Current CPU usage percentage",
            },
            []string{"core"},
        ),
        
        memoryUsage: promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Namespace: namespace,
                Name:      "memory_usage_bytes",
                Help:      "Current memory usage in bytes",
            },
            []string{"type"},
        ),
        
        goroutineCount: promauto.NewGauge(
            prometheus.GaugeOpts{
                Namespace: namespace,
                Name:      "goroutine_count",
                Help:      "Current number of goroutines",
            },
        ),
        
        businessCounter: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Namespace: namespace,
                Name:      "business_operations_total",
                Help:      "Total number of business operations",
            },
            []string{"operation", "status"},
        ),
        
        queueLength: promauto.NewGaugeVec(
            prometheus.GaugeOpts{
                Namespace: namespace,
                Name:      "queue_length",
                Help:      "Current queue length",
            },
            []string{"queue"},
        ),
        
        errorCounter: promauto.NewCounterVec(
            prometheus.CounterOpts{
                Namespace: namespace,
                Name:      "errors_total",
                Help:      "Total number of errors",
            },
            []string{"type"},
        ),
    }
}

// RecordRequest 记录HTTP请求
func (c *Collector) RecordRequest(method, path string, status int, duration time.Duration) {
    c.requestCounter.WithLabelValues(method, path, string(status)).Inc()
    c.requestDuration.WithLabelValues(method, path).Observe(duration.Seconds())
}

// TrackInFlightRequests 跟踪正在处理的请求
func (c *Collector) TrackInFlightRequests(method string, delta int) {
    c.requestInFlight.WithLabelValues(method).Add(float64(delta))
}

// UpdateCPUUsage 更新CPU使用率
func (c *Collector) UpdateCPUUsage(core string, usage float64) {
    c.cpuUsage.WithLabelValues(core).Set(usage)
}

// UpdateMemoryUsage 更新内存使用情况
func (c *Collector) UpdateMemoryUsage(memType string, bytes float64) {
    c.memoryUsage.WithLabelValues(memType).Set(bytes)
}

// UpdateGoroutineCount 更新goroutine数量
func (c *Collector) UpdateGoroutineCount(count int64) {
    c.goroutineCount.Set(float64(count))
}

// RecordBusinessOperation 记录业务操作
func (c *Collector) RecordBusinessOperation(operation, status string) {
    c.businessCounter.WithLabelValues(operation, status).Inc()
}

// UpdateQueueLength 更新队列长度
func (c *Collector) UpdateQueueLength(queue string, length int) {
    c.queueLength.WithLabelValues(queue).Set(float64(length))
}

// RecordError 记录错误
func (c *Collector) RecordError(errorType string) {
    c.errorCounter.WithLabelValues(errorType).Inc()
}

2. 链路追踪实现

接下来实现分布式链路追踪:

// tracing/tracer.go
package tracing

import (
    "context"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/exporters/jaeger"
    "go.opentelemetry.io/otel/sdk/resource"
    tracesdk "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
    "go.opentelemetry.io/otel/trace"
)

type Tracer struct {
    tracer     trace.Tracer
    provider   *tracesdk.TracerProvider
}

func NewTracer(serviceName, jaegerEndpoint string) (*Tracer, error) {
    // 创建Jaeger导出器
    exporter, err := jaeger.New(jaeger.WithCollectorEndpoint(
        jaeger.WithEndpoint(jaegerEndpoint),
    ))
    if err != nil {
        return nil, err
    }

    // 创建资源
    res, err := resource.New(context.Background(),
        resource.WithAttributes(
            semconv.ServiceNameKey.String(serviceName),
            attribute.String("environment", "production"),
        ),
    )
    if err != nil {
        return nil, err
    }

    // 创建TracerProvider
    provider := tracesdk.NewTracerProvider(
        tracesdk.WithBatcher(exporter),
        tracesdk.WithResource(res),
        tracesdk.WithSampler(tracesdk.AlwaysSample()),
    )

    // 设置全局TracerProvider
    otel.SetTracerProvider(provider)

    return &Tracer{
        tracer:   provider.Tracer(serviceName),
        provider: provider,
    }, nil
}

// StartSpan 开启新的追踪Span
func (t *Tracer) StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
    return t.tracer.Start(ctx, name, opts...)
}

// Shutdown 关闭追踪器
func (t *Tracer) Shutdown(ctx context.Context) error {
    return t.provider.Shutdown(ctx)
}

// HTTP中间件
func (t *Tracer) HTTPMiddleware(next http.Handler) http.Handler {
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
        spanCtx, span := t.StartSpan(r.Context(), "http_request",
            trace.WithAttributes(
                attribute.String("http.method", r.Method),
                attribute.String("http.url", r.URL.String()),
            ),
        )
        defer span.End()

        // 包装ResponseWriter以捕获状态码
        ww := NewResponseWriter(w)
        
        // 使用新的上下文调用下一个处理器
        next.ServeHTTP(ww, r.WithContext(spanCtx))
        
        // 记录响应状态码
        span.SetAttributes(attribute.Int("http.status_code", ww.statusCode))
    })
}

// gRPC拦截器
func (t *Tracer) UnaryServerInterceptor() grpc.UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, 
        handler grpc.UnaryHandler) (interface{}, error) {
        
        spanCtx, span := t.StartSpan(ctx, info.FullMethod,
            trace.WithAttributes(
                attribute.String("rpc.system", "grpc"),
                attribute.String("rpc.method", info.FullMethod),
            ),
        )
        defer span.End()

        resp, err := handler(spanCtx, req)
        if err != nil {
            span.RecordError(err)
        }
        return resp, err
    }
}

// ResponseWriter包装器
type ResponseWriter struct {
    http.ResponseWriter
    statusCode int
}

func NewResponseWriter(w http.ResponseWriter) *ResponseWriter {
    return &ResponseWriter{ResponseWriter: w, statusCode: http.StatusOK}
}

func (w *ResponseWriter) WriteHeader(code int) {
    w.statusCode = code
    w.ResponseWriter.WriteHeader(code)
}

3. 日志处理系统

现在来实现日志处理系统:

// logging/logger.go
package logging

import (
    "context"
    "encoding/json"
    "github.com/olivere/elastic/v7"
    "go.uber.org/zap"
    "go.uber.org/zap/zapcore"
    "time"
)

type Logger struct {
    zap     *zap.Logger
    elastic *elastic.Client
}

type LogEntry struct {
    Timestamp   time.Time              `json:"@timestamp"`
    Level       string                 `json:"level"`
    Message     string                 `json:"message"`
    ServiceName string                 `json:"service_name"`
    TraceID     string                 `json:"trace_id"`
    SpanID      string                 `json:"span_id"`
    Fields      map[string]interface{} `json:"fields,omitempty"`
}

func NewLogger(serviceName string, elasticURL string) (*Logger, error) {
    // 配置zap logger
    config := zap.NewProductionConfig()
    config.EncoderConfig.TimeKey = "@timestamp"
    config.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
    
    zapLogger, err := config.Build(
        zap.AddCallerSkip(1),
        zap.Fields(zap.String("service", serviceName)),
    )
    if err != nil {
        return nil, err
    }

    // 创建Elasticsearch客户端
    client, err := elastic.NewClient(
        elastic.SetURL(elasticURL),
        elastic.SetSniff(false),
    )
    if err != nil {
        return nil, err
    }

    return &Logger{
        zap:     zapLogger,
        elastic: client,
    }, nil
}

func (l *Logger) Info(ctx context.Context, msg string, fields ...zap.Field) {
    // 添加追踪信息
    fields = append(fields, l.extractTraceInfo(ctx)...)
    
    // 本地日志记录
    l.zap.Info(msg, fields...)
    
    // 异步发送到Elasticsearch
    go l.sendToElasticsearch("info", msg, fields)
}

func (l *Logger) Error(ctx context.Context, msg string, fields ...zap.Field) {
    fields = append(fields, l.extractTraceInfo(ctx)...)
    l.zap.Error(msg, fields...)
    go l.sendToElasticsearch("error", msg, fields)
}

func (l *Logger) extractTraceInfo(ctx context.Context) []zap.Field {
    var fields []zap.Field
    if span := trace.SpanFromContext(ctx); span != nil {
        spanCtx := span.SpanContext()
        fields = append(fields,
            zap.String("trace_id", spanCtx.TraceID().String()),
            zap.String("span_id", spanCtx.SpanID().String()),
        )
    }
    return fields
}

func (l *Logger) sendToElasticsearch(level string, msg string, fields []zap.Field) {
    // 构建日志条目
    entry := LogEntry{
        Timestamp:   time.Now(),
        Level:       level,
        Message:     msg,
        ServiceName: "service_name",
        Fields:      make(map[string]interface{}),
    }

    // 提取字段信息
    for _, field := range fields {
        entry.Fields[field.Key] = field.Interface
    }

    // 发送到Elasticsearch
    _, err := l.elastic.Index().
        Index(fmt.Sprintf("logs-%s", time.Now().Format("2006.01.02"))).
        Type("_doc").
        BodyJson(entry).
        Do(context.Background())

    if err != nil {
        l.zap.Error("Failed to send log to elasticsearch",
            zap.Error(err),
            zap.Any("entry", entry),
        )
    }
}

// 日志查询功能
type QueryOptions struct {
    StartTime time.Time
    EndTime   time.Time
    Level     string
    Service   string
    TraceID   string
    Limit     int
    Offset    int
}

func (l *Logger) Query(ctx context.Context, opts QueryOptions) ([]LogEntry, error) {
    query := elastic.NewBoolQuery()
    
    // 添加时间范围
    if !opts.StartTime.IsZero() {
        query = query.Must(elastic.NewRangeQuery("@timestamp").Gte(opts.StartTime))
    }
    if !opts.EndTime.IsZero() {
        query = query.Must(elastic.NewRangeQuery("@timestamp").Lte(opts.EndTime))
    }
    
    // 添加日志级别过滤
    if opts.Level != "" {
        query = query.Must(elastic.NewTermQuery("level", opts.Level))
    }
    
    // 添加服务名过滤
    if opts.Service != "" {
        query = query.Must(elastic.NewTermQuery("service_name", opts.Service))
    }
    
    // 添加TraceID过滤
    if opts.TraceID != "" {
        query = query.Must(elastic.NewTermQuery("trace_id", opts.TraceID))
    }
    
    // 执行查询
    searchResult, err := l.elastic.Search().
        Index(fmt.Sprintf("logs-%s", time.Now().Format("2006.01.02"))).
        Query(query).
        Sort("@timestamp", false).
        From(opts.Offset).
        Size(opts.Limit).
        Do(ctx)
        
    if err != nil {
        return nil, err
    }
    
    // 解析结果
    var logs []LogEntry
    for _, hit := range searchResult.Hits.Hits {
        var entry LogEntry
        if err := json.Unmarshal(hit.Source, &entry); err != nil {
            return nil, err
        }
        logs = append(logs, entry)
    }
    
    return logs, nil
}

// 日志查询API处理器
type LogQueryHandler struct {
    logger *Logger
}

func NewLogQueryHandler(logger *Logger) *LogQueryHandler {
    return &LogQueryHandler{logger: logger}
}

func (h *LogQueryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    var opts QueryOptions
    
    // 解析查询参数
    startTime := r.URL.Query().Get("start_time")
    if startTime != "" {
        t, err := time.Parse(time.RFC3339, startTime)
        if err == nil {
            opts.StartTime = t
        }
    }
    
    endTime := r.URL.Query().Get("end_time")
    if endTime != "" {
        t, err := time.Parse(time.RFC3339, endTime)
        if err == nil {
            opts.EndTime = t
        }
    }
    
    opts.Level = r.URL.Query().Get("level")
    opts.Service = r.URL.Query().Get("service")
    opts.TraceID = r.URL.Query().Get("trace_id")
    
    limit := r.URL.Query().Get("limit")
    if limit != "" {
        if l, err := strconv.Atoi(limit); err == nil {
            opts.Limit = l
        }
    }
    
    offset := r.URL.Query().Get("offset")
    if offset != "" {
        if o, err := strconv.Atoi(offset); err == nil {
            opts.Offset = o
        }
    }
    
    // 执行查询
    logs, err := h.logger.Query(r.Context(), opts)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    
    // 返回结果
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(logs)
}

4. 告警系统实现

让我们来实现告警系统:

// alerting/alert.go
package alerting

import (
    "context"
    "time"
)

type AlertLevel string

const (
    AlertLevelInfo     AlertLevel = "info"
    AlertLevelWarning  AlertLevel = "warning"
    AlertLevelError    AlertLevel = "error"
    AlertLevelCritical AlertLevel = "critical"
)

type Alert struct {
    ID          string                 `json:"id"`
    Name        string                 `json:"name"`
    Level       AlertLevel             `json:"level"`
    Message     string                 `json:"message"`
    Labels      map[string]string      `json:"labels"`
    Value       float64                `json:"value"`
    Threshold   float64                `json:"threshold"`
    ServiceName string                 `json:"service_name"`
    Timestamp   time.Time              `json:"timestamp"`
    Metadata    map[string]interface{} `json:"metadata,omitempty"`
}

type AlertRule struct {
    Name         string
    Description  string
    Level        AlertLevel
    Expression   string
    Threshold    float64
    Duration     time.Duration
    Labels       map[string]string
    Annotations  map[string]string
}

type AlertManager struct {
    rules     []AlertRule
    notifiers []Notifier
    history   *AlertHistory
}

type Notifier interface {
    Notify(context.Context, *Alert) error
}

func NewAlertManager() *AlertManager {
    return &AlertManager{
        rules:     make([]AlertRule, 0),
        notifiers: make([]Notifier, 0),
        history:   NewAlertHistory(),
    }
}

func (am *AlertManager) AddRule(rule AlertRule) {
    am.rules = append(am.rules, rule)
}

func (am *AlertManager) AddNotifier(notifier Notifier) {
    am.notifiers = append(am.notifiers, notifier)
}

func (am *AlertManager) Start(ctx context.Context) {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            am.evaluate(ctx)
        }
    }
}

func (am *AlertManager) evaluate(ctx context.Context) {
    for _, rule := range am.rules {
        alerts := am.evaluateRule(ctx, rule)
        for _, alert := range alerts {
            // 检查是否是重复告警
            if am.history.IsDuplicate(alert) {
                continue
            }

            // 记录告警历史
            am.history.Add(alert)

            // 发送告警通知
            for _, notifier := range am.notifiers {
                go func(n Notifier, a *Alert) {
                    if err := n.Notify(ctx, a); err != nil {
                        // 记录发送失败的错误
                        log.Printf("Failed to send alert: %v", err)
                    }
                }(notifier, alert)
            }
        }
    }
}

// 告警历史记录
type AlertHistory struct {
    alerts map[string]*Alert
    mu     sync.RWMutex
}

func NewAlertHistory() *AlertHistory {
    return &AlertHistory{
        alerts: make(map[string]*Alert),
    }
}

func (h *AlertHistory) Add(alert *Alert) {
    h.mu.Lock()
    defer h.mu.Unlock()
    h.alerts[alert.ID] = alert
}

func (h *AlertHistory) IsDuplicate(alert *Alert) bool {
    h.mu.RLock()
    defer h.mu.RUnlock()
    
    if prev, exists := h.alerts[alert.ID]; exists {
        // 检查是否在静默期内(1小时内的相同告警被视为重复)
        return time.Since(prev.Timestamp) < time.Hour
    }
    return false
}

// 钉钉通知器实现
type DingTalkNotifier struct {
    webhook string
}

func NewDingTalkNotifier(webhook string) *DingTalkNotifier {
    return &DingTalkNotifier{webhook: webhook}
}

func (d *DingTalkNotifier) Notify(ctx context.Context, alert *Alert) error {
    message := map[string]interface{}{
        "msgtype": "markdown",
        "markdown": map[string]string{
            "title": fmt.Sprintf("【%s】%s", alert.Level, alert.Name),
            "text": fmt.Sprintf("### %s\n\n"+
                "> **服务**: %s\n\n"+
                "> **级别**: %s\n\n"+
                "> **详情**: %s\n\n"+
                "> **时间**: %s\n\n"+
                "> **值**: %.2f (阈值: %.2f)\n\n",
                alert.Name,
                alert.ServiceName,
                alert.Level,
                alert.Message,
                alert.Timestamp.Format("2006-01-02 15:04:05"),
                alert.Value,
                alert.Threshold,
            ),
        },
    }

    payload, err := json.Marshal(message)
    if err != nil {
        return err
    }

    resp, err := http.Post(d.webhook, "application/json", bytes.NewBuffer(payload))
    if err != nil {
        return err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return fmt.Errorf("dingtalk notification failed: %d", resp.StatusCode)
    }

    return nil
}

5. 监控大盘设计

让我们为以上功能设计一个完整的监控大盘:
在这里插入图片描述

6. 系统集成示例

下面是一个完整的系统集成示例:

// main.go
package main

import (
    "context"
    "log"
    "net/http"
    "time"
)

func main() {
    // 初始化监控收集器
    collector := metrics.NewCollector("example_service")

    // 初始化追踪器
    tracer, err := tracing.NewTracer("example_service", "http://jaeger:14268/api/traces")
    if err != nil {
        log.Fatal(err)
    }

    // 初始化日志系统
    logger, err := logging.NewLogger("example_service", "http://elasticsearch:9200")
    if err != nil {
        log.Fatal(err)
    }

    // 初始化告警管理器
    alertManager := alerting.NewAlertManager()
    
    // 添加钉钉通知器
    dingTalk := alerting.NewDingTalkNotifier("your-webhook-url")
    alertManager.AddNotifier(dingTalk)

    // 添加告警规则
    alertManager.AddRule(alerting.AlertRule{
        Name:        "High Error Rate",
        Description: "Service error rate is too high",
        Level:       alerting.AlertLevelError,
        Expression:  "rate(errors_total[5m]) > 0.1",
        Threshold:   0.1,
        Duration:    5 * time.Minute,
        Labels: map[string]string
        // main.go (续)
        {
            "service": "example_service",
            "severity": "error",
        },
        Annotations: map[string]string{
            "description": "Error rate exceeded 10% in the last 5 minutes",
            "runbook": "https://wiki.example.com/runbook/high-error-rate",
        },
    })

    alertManager.AddRule(alerting.AlertRule{
        Name:        "High Latency",
        Description: "Service latency is too high",
        Level:       alerting.AlertLevelWarning,
        Expression:  "histogram_quantile(0.95, rate(http_request_duration_seconds[5m])) > 0.5",
        Threshold:   0.5,
        Duration:    5 * time.Minute,
        Labels: map[string]string{
            "service": "example_service",
            "severity": "warning",
        },
    })

    // 创建HTTP处理器
    mux := http.NewServeMux()

    // 注册指标采集端点
    mux.Handle("/metrics", promhttp.Handler())

    // 注册健康检查端点
    mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
        w.WriteHeader(http.StatusOK)
        w.Write([]byte("OK"))
    })

    // 创建示例API端点
    mux.HandleFunc("/api/example", func(w http.ResponseWriter, r *http.Request) {
        // 记录请求开始时间
        start := time.Now()

        // 创建span
        ctx, span := tracer.StartSpan(r.Context(), "example_api")
        defer span.End()

        // 记录正在处理的请求
        collector.TrackInFlightRequests(r.Method, 1)
        defer collector.TrackInFlightRequests(r.Method, -1)

        // 模拟业务处理
        time.Sleep(time.Millisecond * 100)

        // 记录业务指标
        collector.RecordBusinessOperation("example_api", "success")

        // 计算处理时间
        duration := time.Since(start)

        // 记录请求指标
        collector.RecordRequest(r.Method, "/api/example", http.StatusOK, duration)

        // 记录日志
        logger.Info(ctx, "API request processed",
            zap.String("method", r.Method),
            zap.String("path", "/api/example"),
            zap.Duration("duration", duration),
        )

        w.WriteHeader(http.StatusOK)
        w.Write([]byte("Success"))
    })

    // 创建带有中间件的服务器
    server := &http.Server{
        Addr: ":8080",
        Handler: chainMiddleware(mux,
            metricsMiddleware(collector),
            traceMiddleware(tracer),
            logMiddleware(logger),
        ),
    }

    // 启动告警管理器
    ctx := context.Background()
    go alertManager.Start(ctx)

    // 启动资源监控
    go monitorResources(collector)

    // 启动服务器
    log.Printf("Server starting on :8080")
    if err := server.ListenAndServe(); err != nil {
        log.Fatal(err)
    }
}

// 中间件链接器
func chainMiddleware(handler http.Handler, middlewares ...func(http.Handler) http.Handler) http.Handler {
    for _, middleware := range middlewares {
        handler = middleware(handler)
    }
    return handler
}

// 指标中间件
func metricsMiddleware(collector *metrics.Collector) func(http.Handler) http.Handler {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()
            
            // 包装ResponseWriter以捕获状态码
            ww := &responseWriter{w: w, status: http.StatusOK}
            
            next.ServeHTTP(ww, r)
            
            // 记录请求指标
            duration := time.Since(start)
            collector.RecordRequest(r.Method, r.URL.Path, ww.status, duration)
        })
    }
}

// 追踪中间件
func traceMiddleware(tracer *tracing.Tracer) func(http.Handler) http.Handler {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            ctx, span := tracer.StartSpan(r.Context(), "http_request",
                trace.WithAttributes(
                    attribute.String("http.method", r.Method),
                    attribute.String("http.url", r.URL.String()),
                ),
            )
            defer span.End()

            next.ServeHTTP(w, r.WithContext(ctx))
        })
    }
}

// 日志中间件
func logMiddleware(logger *logging.Logger) func(http.Handler) http.Handler {
    return func(next http.Handler) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()
            
            ww := &responseWriter{w: w, status: http.StatusOK}
            
            next.ServeHTTP(ww, r)
            
            // 记录请求日志
            logger.Info(r.Context(), "HTTP Request",
                zap.String("method", r.Method),
                zap.String("path", r.URL.Path),
                zap.Int("status", ww.status),
                zap.Duration("duration", time.Since(start)),
            )
        })
    }
}

// ResponseWriter包装器
type responseWriter struct {
    w      http.ResponseWriter
    status int
}

func (w *responseWriter) Header() http.Header {
    return w.w.Header()
}

func (w *responseWriter) Write(b []byte) (int, error) {
    return w.w.Write(b)
}

func (w *responseWriter) WriteHeader(statusCode int) {
    w.status = statusCode
    w.w.WriteHeader(statusCode)
}

// 资源监控
func monitorResources(collector *metrics.Collector) {
    ticker := time.NewTicker(15 * time.Second)
    defer ticker.Stop()

    for {
        <-ticker.C
        
        // 更新CPU使用率
        cpuUsage := getCPUUsage()
        collector.UpdateCPUUsage("total", cpuUsage)
        
        // 更新内存使用情况
        var m runtime.MemStats
        runtime.ReadMemStats(&m)
        collector.UpdateMemoryUsage("heap", float64(m.Alloc))
        collector.UpdateMemoryUsage("system", float64(m.Sys))
        
        // 更新goroutine数量
        collector.UpdateGoroutineCount(int64(runtime.NumGoroutine()))
    }
}

func getCPUUsage() float64 {
    var cpuUsage float64
    percentage, err := cpu.Percent(time.Second, false)
    if err == nil && len(percentage) > 0 {
        cpuUsage = percentage[0]
    }
    return cpuUsage
}

7. 监控指标说明

让我们通过一个表格来总结需要监控的关键指标:

指标类型指标名称说明告警阈值
性能指标QPS每秒请求数>1000
性能指标P95延迟95%请求的响应时间>500ms
性能指标P99延迟99%请求的响应时间>1s
错误指标错误率请求错误率>1%
错误指标5xx错误服务器错误数>10/分钟
资源指标CPU使用率CPU使用百分比>80%
资源指标内存使用率内存使用百分比>80%
资源指标Goroutine数量协程数量>10000
业务指标成功率业务操作成功率<99%
业务指标处理延迟业务处理时间>1s

8. 链路追踪设计

这个完整的服务监控与追踪系统实现了以下功能:

  1. 监控指标采集

    • HTTP请求监控
    • 系统资源监控
    • 业务指标监控
    • 错误监控
  2. 分布式链路追踪

    • 调用链路记录
    • 性能分析
    • 错误定位
    • 服务依赖分析
  3. 日志处理系统

    • 多级别日志
    • 结构化日志
    • 日志聚合
    • 日志查询
  4. 告警系统

    • 多级别告警
    • 告警规则配置
    • 告警通知
    • 告警历史记录

怎么样今天的内容还满意吗?再次感谢观众老爷的观看,关注GZH:凡人的AI工具箱,回复666,送您价值199的AI大礼包。最后,祝您早日实现财务自由,还请给个赞,谢谢!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2256797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Unity高级】如何实现粒子系统的间歇式喷射

先看下要最终实现的效果&#xff1a; 代码如下&#xff1a; using UnityEngine; using System.Collections;public class ParticleBurstController : MonoBehaviour {private ParticleSystem _particleSystem; // 获取粒子系统public float burstDuration 2f; // 每次…

clipchamp制作视频文字转语音音频

一.准备工作&#xff1a; 1.在浏览器打开 https://app.clipchamp.com/首次打开需要登录&#xff0c;未登录用户注册登录 2.点击右上角头像到Settings页面&#xff0c;点击Language切换到中文&#xff08;英文水平好的可以忽略此步骤&#xff09;因中文英文界面有微小差异&…

开源轻量级文件分享服务Go File本地Docker部署与远程访问

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

三菱伺服通过MR Configurator2进行的试运行模式

(1)试运行模式 (a)JOG运转 可以不使用伺服系统控制器执行J0G运行。请在解除强制停止的状态下使用。无论伺服0N/伺服OFF或伺服系统控制器有无连接均可使用。 通过MR Configurator2的J0G运行画面进行操作。 1)运行模式 2)运行方法 "“仅在长按正转、反转按钮中运行”的复选框…

Sqoop导入数据(mysql---->>hive)

目录 数据传输流程脚本报错和异常说明1. Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.conf.HiveConf2. 数据导入hive后显示NULL 数据传输流程 mysql---->>hdfs---->>hive 数据从mysql表中取出&#xff0c;放到hdfs上&#xff08;由targ…

Flask返回中文Unicode编码(乱码)解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

【C++AVL树】枝叶间的旋律:AVL树的和谐之道

公主请阅 1.AVL树的概念2.AVL树的插入AVL树插入一个值的大概过程平衡因子更新更新原则更新停止条件 3.AVL树的右转旋转的原则右单旋 4.AVL树的左旋左单旋 5.AVL树的左右双旋6.AVL树的右左双旋7.AVL树的模拟实现 1.AVL树的概念 AVL树是最先发明的自平衡二叉查找树&#xff0c;AV…

深入理解C#的TCPIP通信机制

本文还有配套的精品资源&#xff0c;点击获取 简介&#xff1a;在分布式系统和实时数据交换应用中&#xff0c;C#作为一种现代面向对象编程语言&#xff0c;利用其***命名空间下的Socket类&#xff0c;提供强大的TCP/IP通信功能。本文将探讨C#中TCP/IP通信的基本概念、使用方…

基于yolov8的SAR影像目标检测系统,支持图像、视频和摄像实时检测【pytorch框架、python源码】

更多目标检测、图像分类识别、目标追踪等项目可看我主页其他文章 功能演示&#xff1a; 基于yolov8的SAR影像目标检测系统&#xff0c;支持图像、视频和摄像实时检测【pytorch框架、python源码】_哔哩哔哩_bilibili &#xff08;一&#xff09;简介 基于yolov8的SAR影像目标…

Prime2_解法二:openssl解密凭据

Prime2_解法二&#xff1a;openssl解密凭据 本博客提供的所有信息仅供学习和研究目的&#xff0c;旨在提高读者的网络安全意识和技术能力。请在合法合规的前提下使用本文中提供的任何技术、方法或工具。如果您选择使用本博客中的任何信息进行非法活动&#xff0c;您将独自承担全…

Jenkins环境一站式教程:从安装到配置,打造高效CI/CD流水线环境-Ubuntu 22.04.5 环境离线安装配置 Jenkins 2.479.1

文章目录 Jenkins环境一站式教程&#xff1a;从安装到配置&#xff0c;打造高效CI/CD流水线环境-Ubuntu 22.04.5 环境离线安装配置 Jenkins 2.479.1一、环境准备1.1 机器规划1.2 环境配置1.2.1 设置主机名1.2.2 停止和禁用防火墙1.2.3 更新系统 二、安装配置Jenkins2.1 安装JDK…

K8S命令部署后端(流水线全自动化部署)

前言 本文为链接: 云效流水线k8s半自动部署java&#xff08;保姆级&#xff09;的补充,本文起初的目的是为了补充完善k8s流水线的全自动化部署,但是也适用于k8s的一键重启,因为使用k8s的web页面容易出现漏点的情况,因此也可以把代码保存为shell脚本,同样可以实现一键重启。关于…

力扣-图论-7【算法学习day.57】

前言 ###我做这类文章一个重要的目的还是给正在学习的大家提供方向和记录学习过程&#xff08;例如想要掌握基础用法&#xff0c;该刷哪些题&#xff1f;&#xff09;我的解析也不会做的非常详细&#xff0c;只会提供思路和一些关键点&#xff0c;力扣上的大佬们的题解质量是非…

TEA系列例题

解析 TEA 加密算法(C语言、python)&#xff1a;_tea加密-CSDN博客 CTF-RE 从0到N: TEA_tea加密原理-CSDN博客 1 字节 8 位 牢记密文的64位和密钥的128位,最好可以自己独立的写出tea解密代码 相当于密文是传入8个字符类型数据或者是2个整型数据, 密钥是16个字符数据或者4个…

首批|云轴科技ZStack成为信通院AI Cloud MSP技术服务实验室成员单位

近日&#xff0c;由全球数字经济大会组委会主办&#xff0c;中国信息通信研究院&#xff08;以下简称“中国信通院”&#xff09;、中国通信企业协会承办的云AI计算国际合作论坛在京举行&#xff0c;会上公布了AI Cloud MSP&#xff08;人工智能云管理服务提供商&#xff09;技…

Docker安装部署RabbitMQ

1. Docker环境准备 1.1 安装Docker 在开始Docker安装部署RabbitMQ之前&#xff0c;确保您的系统环境已经满足Docker的运行要求。以下是在不同操作系统上安装Docker的步骤和命令行演示。 对于Linux系统 在基于Debian的系统&#xff08;如Ubuntu&#xff09;上&#xff0c;您…

Linux下网卡实现NAT转发

目标 在嵌入式Linux设备下&#xff0c;使用单一的网卡&#xff08;前提支持STA&#xff0b;AP共存&#xff09;&#xff0c;使用NAT&#xff08;网络地址转换&#xff09;实现软路由&#xff0c;以自身为热点&#xff0c;将接收到的流量数据全部转发出去。 一&#xff0c;STA…

笔记04--零基础创建个人本地大模型知识库ollama+Dify

ollma安装 官网下载直接下一步下一步即可&#xff0c;没有魔法的朋友可以留言&#xff0c;文章中所用到的文件也给打包了&#xff0c;大家可以直接下载使用通过云盘下载使用。 链接: https://pan.baidu.com/s/12zF9MpQtg1bnMDAQayaSyg 提取码: n9rm 官网地址&#xff1a;http…

Python爬虫:爬取动漫网站的排行榜数据并进行可视化分析

简单介绍 由于哔哩哔哩的网站现在不太方便爬取&#xff0c;我们选择 agefans.com 这个网站完成项目。 我们会爬取排行榜上的数据&#xff0c;并借助可视化手段绘柱状图展示出来。 导入Python库&#xff08;前提&#xff1a;已经安装了所需的库&#xff09; import pandas impo…

MyBatis快速入门(下)

MyBatis快速入门&#xff08;下&#xff09; 六、MyBatis-缓存机制1、一级缓存&#xff08;Local Cache&#xff09;2、二级缓存&#xff08;Global Cache&#xff09;二级缓存标签属性介绍缓存相关设置 3、第三方缓存Mybatis整合ehcache示例 七、MyBatis-逆向工程1、targetRun…