【SkyWalking】SkyWalking是如何实现跨进程传播链路数据?

news2025/1/18 9:59:33

文章目录

  • 一、简介
    • 1 为什么写这篇文章
    • 2 跨进程传播协议-简介
  • 二、协议
    • 1 Standard Header项
    • 2 Extension Header项
    • 3 Correlation Header项
  • 三、跨进程传播协议的源码分析
    • 1 OpenTracing规范
    • 2 通过dubbo插件分析跨进程数据传播
    • 3 分析跨进程传播协议的核心源码
  • 四、小结
  • 参考

一、简介

1 为什么写这篇文章

写这篇文章是为了让自己和大家梳理这些内容:

  1. SkyWalking的链路串联依赖跨进程数据传播,他的跨进程传播协议是怎样的?
  2. 如果我想借助SkyWalking的跨进程传播协议实现传递全链路业务数据(如全局userId等),该如何实现?

2 跨进程传播协议-简介

SkyWalking 跨进程传播协议是用于上下文的传播,之前经历过sw3协议、sw6协议,本文介绍是当前(2023年)最新的sw8协议。
该协议适用于不同语言、系统的探针之间传递上下文。

二、协议

Header项分为三类:

  • Standard Header项,Header名称:sw8
  • Extension Header项,Header名称:sw8-x
  • Correlation Header项,Header名称:sw8-correlation

协议的整体设计:
在这里插入图片描述

下面详细讲解协议的Header项:

1 Standard Header项

该Header项是上下文传播必须包含的。

  • Header名称:sw8.
  • Header值:由-分隔的8个字段组成。Header值的长度应该小于2KB。

Header值中具体包含以下8个字段:

  • 采样(Sample),0 或 1,0 表示上下文存在,但是可以(也很可能)被忽略而不做采样;1 表示这个trace需要采样并发送到后端。
  • 追踪ID(Trace Id),是 Base64 编码的字符串,其内容是由 . 分割的三个 long 类型值, 表示此trace的唯一标识。
  • 父追踪片段ID(Parent trace segment Id),是 Base64 编码的字符串,其内容是字符串且全局唯一。
  • 父跨度ID(Parent span Id),是一个从 0 开始的整数,这个跨度ID指向父追踪片段(segment)中的父跨度(span)。
  • 父服务名称(Parent service),是 Base64 编码的字符串,其内容是一个长度小于或等于50个UTF-8编码的字符串。
  • 父服务实例标识(Parent service instance),是 Base64 编码的字符串,其内容是一个长度小于或等于50个UTF-8编码的字符串。
  • 父服务的端点(Parent endpoint),是 Base64 编码的字符串,其内容是父追踪片段(segment)中第一个入口跨度(span)的操作名,由长度小于或等于50个UTF-8编码的字符组成。
  • 本请求的目标地址(Peer),是 Base64 编码的字符串,其内容是客户端用于访问目标服务的网络地址(不一定是 IP + 端口)。

示例值: 1-TRACEID-SEGMENTID-3-PARENT_SERVICE-PARENT_INSTANCE-PARENT_ENDPOINT-IPPORT

2 Extension Header项

该Header项是可选的。扩展Header项是为高级特性设计的,它提供了部署在上游和下游服务中的探针之间的交互功能。

Header名称:sw8-x

Header值:由-分割,字段可扩展。

扩展Header值
当前值包括的字段:

追踪模式(Tracing Mode),空、0或1,默认为空或0。表示在这个上下文中生成的所有跨度(span)应该跳过分析。在默认情况下,这个应该在上下文中传播到服务端,除非它在跟踪过程中被更改。
客户端发送的时间戳:用于异步RPC,如MQ。一旦设置,消费端将计算发送和接收之间的延迟,并使用key transmission.latency自动在span中标记延迟。

示例值:1-1621344125000

3 Correlation Header项

该Header项是是可选的。并非所有语言的探针都支持,已知的是Java的探针是支持该协议。
该Header项用于跨进程传递用户自定义数据,例如userId、orgId。
这个协议跟OpenTracing 的 Baggage很类似,但是Correlation Header项相比,在默认设置下会更有更严格的限制,例如,只能存放3个字段,且有字段长度限制,这个是为了安全、性能等考虑。
数据格式:

Header名称:sw8-correlation

Header值:由,分割一对对key、value,每对key、value逗号分割,key、value的由Base64编码。

示例值:a2V5MQ==:dmFsdWUx,a2V5LTI=:dmFsdWUy

三、跨进程传播协议的源码分析

1 OpenTracing规范

SkyWalking是基于OpenTracing标准的追踪系统,参考吴晟老师翻译的OpenTracing规范的文章opentracing之Inject和Extract,OpenTracing定义了跨进程传播的几个要素:

SpanContext:SpanContext代表跨越进程边界,传递到下级span的状态。在SkyWalking中的实现类是org.apache.skywalking.apm.agent.core.context.TracingContext
Carrier:传递跨进程数据的搬运工,负责将追踪状态从一个进程"carries"(携带,传递)到另一个进程
Inject 和 Extract:SpanContexts可以通过Inject(注入)操作向Carrier增加,或者通过Extract(提取)从Carrier中获取,跨进程通讯数据(例如:HTTP头)。通过这种方式,SpanContexts可以跨越进程边界,并提供足够的信息来建立跨进程的span间关系(因此可以实现跨进程连续追踪)

2 通过dubbo插件分析跨进程数据传播

我们以SkyWalking java agent的dubbo-2.7.x-plugin插件为例,其中跨进程传播数据的核心代码在org.apache.skywalking.apm.plugin.asf.dubbo.DubboInterceptor,下面是该类跨进程传播的核心代码:

public class DubboInterceptor implements InstanceMethodsAroundInterceptor {

    /**
     * Consumer: The serialized trace context data will
     * inject to the {@link RpcContext#attachments} for transport to provider side.
     * <p>
     * Provider: The serialized trace context data will extract from
     * {@link RpcContext#attachments}. current trace segment will ref if the serialization context data is not null.
     */
    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        ......
        if (isConsumer) { // 1、consumer端
            // ContextCarrier
            final ContextCarrier contextCarrier = new ContextCarrier();
            // 1.1 createExitSpan()内部会调用TracerContext.inject(carrier),将TracerContext中的context数据inject(注入)到ContextCarrier的context中
            span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);

            CarrierItem next = contextCarrier.items();
            // 1.2 遍历ContextCarrier,从ContextCarrier的context获取数据,注入到dubbo的attachment,从consumer端传递到provider端
            while (next.hasNext()) {
                next = next.next();
                rpcContext.setAttachment(next.getHeadKey(), next.getHeadValue());
                if (invocation.getAttachments().containsKey(next.getHeadKey())) {
                    invocation.getAttachments().remove(next.getHeadKey());
                }
            }
        } else { // 2 provider端
            // 2.1 从consumer端传递到provider端的attachment中获取跨进程协议数据,然后设置到context
            ContextCarrier contextCarrier = new ContextCarrier();
            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
            }
            // 2.2 createEntrySpan()内部会调用TracerContext.extract(carrier),将ContextCarrier的context数据extract(提取)到将TracerContext中的context中
            span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
            span.setPeer(rpcContext.getRemoteAddressString());
        }
    }
}

从上面的源码可以看出在服务调用方和被调用方,都会用到ContextCarrier,他是临时搬运工,负责两个进程的TracerContext数据的传递。
下面分析ContextCarrier等类的核心源码。

3 分析跨进程传播协议的核心源码

TracingContext
org.apache.skywalking.apm.agent.core.context.TracingContext是OpenTracing的SpanContext的一种实现,里面包含了span的上下文,包含在segment、correlationContext、extensionContext,而inject()、extract()负责跨进程上下文透传。

public class TracingContext implements AbstractTracerContext {

     /**
     * The final {@link TraceSegment}, which includes all finished spans.
     */
    private TraceSegment segment;
  
    @Getter(AccessLevel.PACKAGE)
    private final CorrelationContext correlationContext;
    @Getter(AccessLevel.PACKAGE)
    private final ExtensionContext extensionContext;

     /**
     * Prepare for the cross-process propagation. How to initialize the carrier, depends on the implementation.
     *
     * @param carrier to carry the context for crossing process.
     */
    void inject(ContextCarrier carrier);

    /**
     * Build the reference between this segment and a cross-process segment. How to build, depends on the
     * implementation.
     *
     * @param carrier carried the context from a cross-process segment.
     */
    void extract(ContextCarrier carrier);
}

ContextCarrier
ContextCarrier作为传递跨进程数据的搬运工,负责将追踪状态从一个进程"carries"(携带,传递)到另一个进程,其中包含了sw8协议里的Standard Header项、Extension Header项、Correlation Header项相关的上下文数据,具体参考下面的代码:

public class ContextCarrier implements Serializable {
    /**
     * extensionContext包含了在某些特定场景中用于增强分析的可选上下文,对应sw8的Extension Header项
     */
    private ExtensionContext extensionContext = new ExtensionContext();
    /**
     * 用户的自定义上下文容器。此上下文与主追踪上下文一同传播。对应sw8的Correlation Header项
     */
    private CorrelationContext correlationContext = new CorrelationContext();

    /**
     * @return 存在于当前tracing上下文中的item清单
     */
    public CarrierItem items() {
        SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null);
        SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(
            correlationContext, sw8ExtensionCarrierItem);
        SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem);
        return new CarrierItemHead(sw8CarrierItem);
    }

    /**
     * Extract the extension context to tracing context
     */
    void extractExtensionTo(TracingContext tracingContext) {
        tracingContext.getExtensionContext().extract(this);
        // The extension context could have field not to propagate further, so, must use the this.* to process.
        this.extensionContext.handle(tracingContext.activeSpan());
    }

    /**
     * Extract the correlation context to tracing context
     */
    void extractCorrelationTo(TracingContext tracingContext) {
        tracingContext.getCorrelationContext().extract(this);
        // The correlation context could have field not to propagate further, so, must use the this.* to process.
        this.correlationContext.handle(tracingContext.activeSpan());
    }

    /**
     * 序列化sw8的Standard Header项,使用 '-' 分割各个字段
     * Serialize this {@link ContextCarrier} to a {@link String}, with '|' split.
     * @return the serialization string.
     */
    String serialize(HeaderVersion version) {
        if (this.isValid(version)) {
            return StringUtil.join(
                '-',
                "1",
                Base64.encode(this.getTraceId()),
                Base64.encode(this.getTraceSegmentId()),
                this.getSpanId() + "",
                Base64.encode(this.getParentService()),
                Base64.encode(this.getParentServiceInstance()),
                Base64.encode(this.getParentEndpoint()),
                Base64.encode(this.getAddressUsedAtClient())
            );
        }
        return "";
    }

    /**
     * 反序列化sw8的Standard Header项
     * Initialize fields with the given text.
     * @param text carries {@link #traceSegmentId} and {@link #spanId}, with '|' split.
     */
    ContextCarrier deserialize(String text, HeaderVersion version) {
        if (text == null) {
            return this;
        }
        if (HeaderVersion.v3.equals(version)) {
            String[] parts = text.split("-", 8);
            if (parts.length == 8) {
                try {
                    // parts[0] is sample flag, always trace if header exists.
                    this.traceId = Base64.decode2UTFString(parts[1]);
                    this.traceSegmentId = Base64.decode2UTFString(parts[2]);
                    this.spanId = Integer.parseInt(parts[3]);
                    this.parentService = Base64.decode2UTFString(parts[4]);
                    this.parentServiceInstance = Base64.decode2UTFString(parts[5]);
                    this.parentEndpoint = Base64.decode2UTFString(parts[6]);
                    this.addressUsedAtClient = Base64.decode2UTFString(parts[7]);
                } catch (IllegalArgumentException ignored) {

                }
            }
        }
        return this;
    }
}

CorrelationContext
ContextCarrier里包含里sw8的Correlation Header项存放于CorrelationContext,这个类非常有用,适合我们去在全链路跨进程传递自定义的数据。
sw8协议里的Standard Header项、Extension Header项是比较固定的协议格式,我们可以扩展这些协议,例如Standard Header项,当前固定是8位的,对应8个字段,我们可以扩展为9位,第九位可以定义为userId。但是如果要这样改造,就得修改ContextCarrier类序列化、反序列的逻辑,要重新发布agent,并考虑好新旧版本兼容性问题、以及不同语言的agent是否兼容。
而sw8的Correlation Header项使用起来就非常方便。先看下对应实现了CorrelationContext的源码:

/**
 * Correlation context, use to propagation user custom data.
 * Correlation上下文,用于传播用户自定义数据
 */
public class CorrelationContext {

    private final Map<String, String> data;

    /**
     * Add or override the context. 添加或覆盖上下文数据
     *
     * @param key   to add or locate the existing context
     * @param value as new value
     * @return old one if exist.
     */
    public Optional<String> put(String key, String value) {
        // 可以存放于span的tag中
        if (AUTO_TAG_KEYS.contains(key) && ContextManager.isActive()) {
            ContextManager.activeSpan().tag(new StringTag(key), value);
        }
        // setting
        data.put(key, value);
        return Optional.empty();
    }

    /**
     * @param key to find the context 获取上下文数据
     * @return value if exist.
     */
    public Optional<String> get(String key) {
        return Optional.ofNullable(data.get(key));
    }

    /**
     * Serialize this {@link CorrelationContext} to a {@link String} 序列化
     *
     * @return the serialization string.
     */
    String serialize() {
        if (data.isEmpty()) {
            return "";
        }

        return data.entrySet().stream()
                   .map(entry -> Base64.encode(entry.getKey()) + ":" + Base64.encode(entry.getValue()))
                   .collect(Collectors.joining(","));
    }

    /**
     * Deserialize data from {@link String} 反序列化
     */
    void deserialize(String value) {
        if (StringUtil.isEmpty(value)) {
            return;
        }

        for (String perData : value.split(",")) {
            // Only data with limited count of elements can be added
            if (data.size() >= Config.Correlation.ELEMENT_MAX_NUMBER) {
                break;
            }
            final String[] parts = perData.split(":");
            if (parts.length != 2) {
                continue;
            }
            data.put(Base64.decode2UTFString(parts[0]), Base64.decode2UTFString(parts[1]));
        }
    }

    /**
     * Prepare for the cross-process propagation. Inject the {@link #data} into {@link
     * ContextCarrier#getCorrelationContext()}
     */
    void inject(ContextCarrier carrier) {
        carrier.getCorrelationContext().data.putAll(this.data);
    }

    /**
     * Extra the {@link ContextCarrier#getCorrelationContext()} into this context.
     */
    void extract(ContextCarrier carrier) {
        ......
    }

    /**
     * Clone the context data, work for capture to cross-thread. 克隆数据,用于跨线程传递
     */
    @Override
    public CorrelationContext clone() {
        final CorrelationContext context = new CorrelationContext();
        context.data.putAll(this.data);
        return context;
    }

    /**
     * Continue the correlation context in another thread.传递到另外的线程
     *
     * @param snapshot holds the context.
     */
    void continued(ContextSnapshot snapshot) {
        this.data.putAll(snapshot.getCorrelationContext().data);
    }
}

通过源码可知,CorrelationContext通过Map<String, String>来存放数据,CorrelationContext数据支持跨线程、跨进程透传。

四、小结

分析Dubbo插件的跨进程核心代码,了解了跨进程传播协议的核心实现逻辑。

其实在其他分布式追踪系统(如Zipkin、Jager)、全链路灰度系统等涉及到跨进程数据传播的系统中,也是使用了类似于上面SkyWalking协议的思路。

参考

SkyWalking Cross Process Propagation Headers Protocol
SkyWalking Cross Process Correlation Headers Protocol
详解 Apache SkyWalking 的跨进程传播协议

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1070194.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ERDAS 2022 安装教程

注意&#xff1a; 演示ERDAS版本为&#xff1a;2022.v16.7.0.1216 安装程序&#xff1a; 1、主程序&#xff1a;点击下载 2、许可文件&#xff1a;点击下载 3、IDM下载器&#xff1a;点击下载 下载速度&#xff1a; 浏览器下载速度慢&#xff0c;可以使用以上提供的IDM下…

[GWCTF 2019]我有一个数据库 phpMyAdmin 4.8.1后台文件包含漏洞

一开始打开是乱码 之前题目做过修复乱码的&#xff0c;得到这个 用dirsearch扫一下 一开始我是看到robots.txt 访问一下 访问一下phpinfo 也没啥&#xff0c;看到phpmyadimin 访问一下 没啥思路&#xff0c;看了wp 看到phpMyAdmin 4.8.1后台文件包含漏洞&#xff08;CV…

LabVIEW中不同颜色连线的含义

LabVIEW中不同颜色连线的含义 LabVIEW中的连线具有不同的颜色&#xff0c;样式和宽度。每个都代表了什么&#xff1f; 下表列出了常见的连线类型&#xff1a; 相关信息 请注意&#xff0c;类的连线颜色是可更改的。该表显示其默认外观。 连线用于在程序框图各对象间传递数据…

016 Spring Boot + Vue 图书管理系统

Spring Boot Vue 图书馆管理系统&#xff08;library-system&#xff09; 本地快捷预览项目 第一步&#xff1a;运行 db 文件夹下的springboot-vue.sql(询问作者获取)&#xff0c;创建springboot-vue数据库 第二步&#xff1a;修改后端数据库配置文件&#xff0c;启动后端 …

二次封装View Design的table组件,实现宽度自适应,内容在一行展示

由于table组件本身并不支持宽度自适应&#xff0c;但实际项目需要&#xff0c;而且多处有用到table组件&#xff0c;所以尝试着自己来二次封装一下组件 想法 刚开始的想法很简单&#xff0c;就是获取每一列中数据和标题在表格中的长度&#xff0c;然后将当中最大的长度作为该列…

Nginx配置文件的通用语法介绍

要是参考《Ubuntu 20.04使用源码安装nginx 1.14.0》安装nginx的话&#xff0c;nginx配置文件在/nginx/conf目录里边&#xff0c;/nginx/conf里边的配置文件结构如下图所示&#xff1a; nginx.conf是主配置文件&#xff0c;它是一个ascii文本文件。配置文件由指令&#xff08;…

分析“由于找不到vcruntime140.dll无法继续执行代码”这个问题的5个解决方法

当使用电脑时&#xff0c;我们难免会遇到各种问题。其中&#xff0c;“由于找不到vcruntime140.dll无法继续执行代码”是一个常见的错误&#xff0c;通常出现在运行使用C编写的应用程序时。这个问题可能会导致软件程序或游戏无法打开或运行。然而&#xff0c;只要我们掌握正确的…

大话机器学习准确率(Accuracy)、精确率(Pecision)、召回率(Recall)以及TP、FP、TN、FN

话说三国时期&#xff0c;乱世出人才&#xff0c;当时刘备让张飞帮忙招兵买马&#xff0c;寻找人才。张飞发公告以后&#xff0c;有10人来面试&#xff0c;这10人分为两类&#xff0c;人才和庸才&#xff0c;各占百分之五十&#xff0c;张飞的主要作用就是从这10人中识别出人才…

放大招,百度文心大模型4.0正在加紧训练,即将发布

插播一条快讯&#xff01; &#xfeff;&#xfeff;刚刚看到一篇报道&#xff0c;说百度正在加紧训练文心大模型4.0&#xff01;百度5月发布了文心大模型3.5&#xff0c;才4个多月又要发布4.0了&#xff0c;这迭代速度简直了。据说这次发布将在10月17日百度世界大会上进行&am…

strcat函数详解:字符串追加的利器

目录 一&#xff0c;strcat函数的简介 二&#xff0c;strcat函数的使用 三&#xff0c;strcat函数的注意事项 四&#xff0c;strcat函数的模拟实现 一&#xff0c;strcat函数的简介 strcat函数用于将源字符串追加到目标字符串的末尾&#xff0c;并返回一个指向目标字符串的…

QString、QLatin1String、QStringLiteral区别和用法以及效率

QString类 QString是Qt框架中提供的字符串类&#xff0c;用于处理Unicode字符串。它提供了许多方便的方法和功能&#xff0c;可以进行字符串的连接、查找、替换、截取等操作。QString类的对象是可变的&#xff0c;可以在运行时修改字符串内容。 . 由以上引出一个知识点&#xf…

LabVIEW(一)简介

LabVIEW&#xff08;Laboratory Virtual Instrument Engineering Workbench&#xff09;是一种程序开发环境&#xff0c;是由美国国家仪器&#xff08;NI&#xff09;公司研制开发的。LabVIEW与其他计算机语言的显著区别是&#xff1a;其他计算机语言都是采用基于文本的语言产生…

linux驱动开发找不到工作寻求前辈建议?

linux驱动开发找不到工作寻求前辈建议? 不要局限驱动&#xff0c;我毕业的时候不成熟的想法就是做驱动比做应用有技术含量&#xff0c;就努力往这方面做&#xff0c;我就从应用转到驱动最近很多小伙伴找我&#xff0c;说想要一些Linux内核学习资料&#xff0c;然后我根据自己从…

产品经理必备知识——API接口

前言 在古代&#xff0c;我们的传输信息的方式有很多&#xff0c;比如写信、飞鸽传书&#xff0c;以及在战争中使用的烽烟&#xff0c;才有了著名的烽火戏诸侯&#xff0c;但这些方式传输信息的效率终究还是无法满足高速发展的社会需要。如今万物互联的时代&#xff0c;我通过…

嵌入式养成计划-36----C++引用--const--函数重载--结构体--类

八十、 引用 80.1 概念 引用是给目标取了个别名。引用与目标&#xff0c;它俩的地址一样 80.2 格式 数据类型 &引用名 同类型的变量名&#xff1b;数据类型 &引用名 同类型的变量名&#xff1b; eg&#xff1a;int a;int &b a; //b引用a,给a取个别名叫b80.3…

ps安装遇到问题

安装PhotoShop报错 无法写入注册表值请检查权限(错误代码160)_ps安装无法写入注册表值错误160_Zhac的博客-CSDN博客在Visual Studio中创建DLL项目 打开VS 新建一个项目&#xff1a;文件→→新建→→项目 选择Visual C#类库(.NET Framework)Unity当前只支持最高 3.5版本将UnityE…

MySQL增删查改(进阶1)

一、数据库约束 约束&#xff1a;按照一定条件进行规范的做事&#xff1b; 表定义的时候&#xff0c;某些字段保存的数据需要按照一定的约束条件&#xff1b; 1.null约束 字段null&#xff1a;该字段可以为空&#xff1b;not null&#xff1a;该字段不能为空不指定的话就是…

Bigemap是如何在生态林业科技行业去应用的

选择Bigemap的原因&#xff1a; ①之前一直是使用的谷歌地球&#xff0c;现在谷歌不能使用了就在网上搜索找一款可以替代的软件&#xff0c;工作使用需求还是挺大的&#xff0c;谷歌不能用对工作进展也非常影响&#xff0c;在网上搜索到软件大部分功能都可以满足需求 ②软件卫…

小白自学—网络安全(黑客技术)笔记

目录 一、自学网络安全学习的误区和陷阱 二、学习网络安全的一些前期准备 三、网络安全学习路线 四、学习资料的推荐 想自学网络安全&#xff08;黑客技术&#xff09;首先你得了解什么是网络安全&#xff01;什么是黑客&#xff01; 网络安全可以基于攻击和防御视角来分类…

Unity Golang教程-Shader编写一个流动的云效果

创建目录 一个友好的项目&#xff0c;项目目录结构是很重要的。我们先导入一个登录界面模型资源。 我们先创建Art表示是美术类的资源&#xff0c;资源是模型创建Model文件夹&#xff0c;由于是在登录界面所以创建Login文件夹&#xff0c;下面依次是模型对应的资源&#xff0c…