HeaderForwarder组件不仅能够从当前接收请求提取指定的HTTP报头,并自动将其添加到任何一个通过HttpClient发出的请求中,它同时也提供了一种基于Context/ContextScope的编程模式是我们可以很方便地将任何报头添加到指定范围内的所有由HttpClient发出的请求中。现在我们来简单聊聊该组件的设计和实现原理。
一、HeaderForwardObserver
HeaderForwarder组件利用HeaderForwardObserver对HttpClient进行拦截,并将需要的报头添加到由它发出的请求消息中,我们曾经介绍过这种方案,这也是大部分APM自动添加跟踪报头的解决方案。具体的原理其实很简单:当HttpClient发送请求过程中会利用DiagnosticListener触发一些列事件,并在事件中提供相应的对象,比如发送的HttpRequestMessage和接收的HttpResponseMessage。如果我们需要这个过程进行干预,只需要订阅相应的事件并将干预操作实现在提供的回调中。
HeaderForwarder用来添加请求报头的是一个类型为HeaderForwardObserver的对象。在介绍该类型之前,我们得先来介绍如下这个IOutgoingHeaderCollectionProvider接口,顾名思义,它用来提供需要被添加的所有HTTP请求报头。
public interface IOutgoingHeaderCollectionProvider
{
IDictionary<string, StringValues> GetHeaders();
}
如下所示的是HeaderForwardObserver的定义。如代码片段所示,HeaderForwardObserver实现了IObserver<KeyValuePair<string, object>> 接。在实现的OnNext中,通过对事件名称(System.Net.Http.HttpRequestOut.Start)的比较订阅了HttpClient在发送请求前触发的事件,并从提供的参数提取出表示待发送请求的HttpRequestMessage对象(对应Request属性)。有了这个待发送的请求,我们只需要从构造函数中注入的IOutgoingHeaderCollectionProvider 对象提取出所有报头列表,并将其添加这个HttpRequestMessage对象中即可。
public sealed class HeaderForwardObserver : IObserver<KeyValuePair<string, object>>
{
private static Func<object, HttpRequestMessage> _requestAccessor;
private readonly IOutgoingHeaderCollectionProvider _provider;
public HeaderForwardObserver(IOutgoingHeaderCollectionProvider provider)
{
_provider = provider ?? throw new ArgumentNullException(nameof(provider));
}
public void OnCompleted() { }
public void OnError(Exception error) { }
public void OnNext(KeyValuePair<string, object> value)
{
if (headers.Any() && value.Key == "System.Net.Http.HttpRequestOut.Start")
{
var headers = _provider.GetHeaders();
_requestAccessor ??= CreateRequestAccessor(value.Value.GetType());
var outgoingHeaders = _requestAccessor(value.Value).Headers;
foreach (var kv in headers)
{
outgoingHeaders.Add(kv.Key, kv.Value.AsEnumerable());
}
}
}
private static Func<object, HttpRequestMessage> CreateRequestAccessor(Type type)
{
var requestProperty = type.GetProperty("Request");
var payload = Expression.Parameter(typeof(object));
var convertToPayload = Expression.Convert(payload, type);
var getRequest = Expression.Call(convertToPayload, requestProperty.GetMethod);
var convertToRequest = Expression.Convert(getRequest, typeof(HttpRequestMessage));
return Expression.Lambda<Func<object, HttpRequestMessage>>(convertToRequest, payload).Compile();
}
}
二、HttpClientObserver
HeaderForwardObserver借助于如下这个HttpClientObserver进行注册。如代码片段所示,HttpClientObserver 实现了IObserver<DiagnosticListener>接口,在实现的OnNext方法中,它创建出HeaderForwardObserver对象并将其订阅到HttpClient使用的DiagnosticListener对象上(该对象的名称为HttpHandlerDiagnosticListener)。
public sealed class HttpClientObserver : IObserver<DiagnosticListener>
{
private readonly IOutgoingHeaderCollectionProvider _provider;
public HttpClientObserver(IOutgoingHeaderCollectionProvider provider)
{
_provider = provider ?? throw new ArgumentNullException(nameof(provider));
}
public void OnCompleted() { }
public void OnError(Exception error) { }
public void OnNext(DiagnosticListener value)
{
if (value.Name == "HttpHandlerDiagnosticListener")
{
value.Subscribe(new HeaderForwardObserver(_provider));
}
}
}
三、HeaderForwaderStartupFilter
我们将针对HttpClientObserver的注册实现在如下这个HeaderForwaderStartupFilter类型中。如代码片段所示,HeaderForwaderStartupFilter实现了IStartupFilter接口,针对HttpClientObserver的注册就实现在Configure方法中。
public sealed class HeaderForwaderStartupFilter : IStartupFilter
{
public Action<IApplicationBuilder> Configure(Action<IApplicationBuilder> next)
{
return app => {
DiagnosticListener.AllListeners.Subscribe(app.ApplicationServices.GetRequiredService<HttpClientObserver>());
next(app);
};
}
}
四、HttpInvocationContext/HttpInvocationContextScope
接下来我们讨论待转发HTTP报头的来源问题。带转发报头有两种来源,一种是从当前请求中提取出来的,另一种是手工添加到HttpInvocationContext上下文中。如下所示的是HttpInvocationContext的定义,我们添加的报头就存储在它的OutgoingHeaders 属性中,表示当前上下文的HttpInvocationContext对象存储在AsyncLocal<HttpInvocationContext>对象上。
public sealed class HttpInvocationContext
{
internal static readonly AsyncLocal<HttpInvocationContext> _current = new AsyncLocal<HttpInvocationContext>();
public static HttpInvocationContext Current => _current.Value;
public IDictionary<string, StringValues> OutgoingHeaders { get; } = new Dictionary<string, StringValues>();
internal HttpInvocationContext() { }
}
HttpInvocationContextScope用来控制HttpInvocationContext的范围(生命周期),从定义可以看出,只有在创建该Scope的using block范围为才能得到当前的HttpInvocationContext上下文。
public sealed class HttpInvocationContextScope : IDisposable
{
public HttpInvocationContextScope()
{
HttpInvocationContext._current.Value = new HttpInvocationContext();
}
public void Dispose() => HttpInvocationContext._current.Value = null;
}
五、OutgoingHeaderCollectionProvider
HeaderForwardObserver添加到请求消息中的报头是通过注入的IOutgoingHeaderCollectionProvider对象提供的,现在我们来看看该接口的实现类型OutgoingHeaderCollectionProvider。我们说过,所有的报头具有两个来源,其中一个来源于当前接收的请求,但是并不是请求中携带的所有报头都需要转发,所以我们需要利用如下这个HeaderForwarderOptions类型来配置转发的报头名称。
public class HeaderForwarderOptions
public ISet<string> AutoForwardHeaderNames { get; } = new HashSet<string>();
public void AddHeaderNames(params string[] headerNames) => Array.ForEach(headerNames, it => AutoForwardHeaderNames.Add(it));
}
如下所示的是OutgoingHeaderCollectionProvider类型的定义。在实现的GetHeaders方法中,它利用注入的IHttpContextAccessor 对象得到当前HttpContext,并结合HeaderForwarderOptions上的配置得到需要自动转发的报头。然后通过当前HttpInvocationContext上下文你得到手工指定的报头,两者合并之后成为了最终需要添加到请求消息的报头列表。
public sealed class OutgoingHeaderCollectionProvider : IOutgoingHeaderCollectionProvider
{
private readonly IHttpContextAccessor _httpContextAccessor;
private readonly ISet<string> _autoForwardedHeaderNames;
public OutgoingHeaderCollectionProvider(IHttpContextAccessor httpContextAccessor, IOptions<HeaderForwarderOptions> optionsAccessor)
{
_httpContextAccessor = httpContextAccessor ?? throw new ArgumentNullException(nameof(httpContextAccessor));
_autoForwardedHeaderNames = (optionsAccessor?? throw new ArgumentNullException(nameof(optionsAccessor))).Value.AutoForwardHeaderNames;
}
public IDictionary<string, StringValues> GetHeaders()
{
var headers = new Dictionary<string, StringValues>();
try
{
var incomingHeaders = _httpContextAccessor.HttpContext?.Request?.Headers;
if (incomingHeaders != null)
{
foreach (var headerName in _autoForwardedHeaderNames)
{
if (incomingHeaders.TryGetValue(headerName, out var values))
{
headers.Add(headerName, values);
}
}
}
}
catch (ObjectDisposedException) {}
var outgoingHeaders = HttpInvocationContext.Current?.OutgoingHeaders;
if (outgoingHeaders != null)
{
foreach (var kv in outgoingHeaders)
{
if (headers.TryGetValue(kv.Key, out var values))
{
headers[kv.Key] = new StringValues(values.Concat(kv.Value).ToArray());
}
else
{
headers.Add(kv.Key, kv.Value);
}
}
}
return headers;
}
}
到目前为止,HeaderForwarder的核心成员均已介绍完毕,这些接口/类型之间的关系体现在如下所示的UML中。
六、服务注册
HeaderForwarder涉及的服务通过如下这个AddHeaderForwarder扩展方法进行注册
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddHeaderForwarder(this IServiceCollection services, Action<HeaderForwarderOptions> setup = null)
{
services = services ?? throw new ArgumentNullException(nameof(services));
services.AddOptions();
services.AddHttpContextAccessor();
services.TryAddSingleton<IOutgoingHeaderCollectionProvider, OutgoingHeaderCollectionProvider>();
services.TryAddSingleton<HttpClientObserver>();
services.TryAddEnumerable(ServiceDescriptor.Singleton<IStartupFilter, HeaderForwaderStartupFilter>());
if (null != setup)
{
services.Configure(setup);
}
return services;
}
}
我们进一步定义了针对IHostBuilder接口的扩展方法,我们在前面演示实例中正是使用的这个方法。
public static class HostBuilderExtensions
{
public static IHostBuilder UseHeaderForwarder(this IHostBuilder hostBuilder, Action<HeaderForwarderOptions> setup = null)
{
hostBuilder = hostBuilder ?? throw new ArgumentNullException(nameof(hostBuilder));
hostBuilder.ConfigureServices((_,services) => services.AddHeaderForwarder(setup));
return hostBuilder;
}
}