一、upcall 消息的类型
在 Open vSwitch 的数据包转发流程中,如果数据包在内核空间无法完全处理(比如匹配不到流表项),就会发生 upcall 调用,将数据包从内核空间的 Datapath 模块传输至用户空间的 ovs-vswitchd 守护进程进行处理。然而产生 upcall 消息的原因不止匹配不到流表项这一种,也存在其它的场景,这时就需要 vswitchd 守护进程对于不同类型的 upcall 消息做出相应的处理。
在 vswitchd 守护进程中,其对应的 upcall 结构体相关内容都存储在 ovs-main/ofproto/ofproto-dpif-upcall.c 文件中:
struct upcall {
......
enum upcall_type type; /* Type of the upcall. */
......
};
这里用 enum 枚举类型定义 upcall 消息的类型:
enum upcall_type {
BAD_UPCALL, /* Some kind of bug somewhere. */
MISS_UPCALL, /* A flow miss. */
SLOW_PATH_UPCALL, /* Slow path upcall. */
SFLOW_UPCALL, /* sFlow sample. */
FLOW_SAMPLE_UPCALL, /* Per-flow sampling. */
IPFIX_UPCALL, /* Per-bridge sampling. */
CONTROLLER_UPCALL /* Destined for the controller. */
};
其中 MISS_UPCALL、SLOW_PATH_UPCALL 和 CONTROLLER_UPCALL 将会被重点讨论。
二、upcall 消息处理 process_upcall()
函数 process_upcall() 是 vswitchd 守护进进行 upcall 消息处理的核心函数,存储在 ovs-main/ofproto/ofproto-dpif-upcall.c 文件中:
static int process_upcall(struct udpif *udpif, struct upcall *upcall, struct ofpbuf *odp_actions, struct flow_wildcards *wc) {
const struct dp_packet *packet = upcall->packet;
const struct flow *flow = upcall->flow;
size_t actions_len = 0;
switch (upcall->type) {
case MISS_UPCALL:
case SLOW_PATH_UPCALL:
......
case SFLOW_UPCALL:
......
case IPFIX_UPCALL:
case FLOW_SAMPLE_UPCALL:
......
case CONTROLLER_UPCALL:
......
case BAD_UPCALL:
......
}
return EAGAIN;
}
可以看到,这里通过 case 语句针对不同的 upcall 消息类型,给出了不同的处理方式。函数的流程图如下所示:
(1)MISS_UPCALL 和 SLOW_PATH_UPCALL 类型的处理
case MISS_UPCALL:
case SLOW_PATH_UPCALL:
upcall_xlate(udpif, upcall, odp_actions, wc);
return 0;
这段代码用于处理 miss 类型和 slow path 类型的 upcall 消息。函数会调用 upcall_xlate(udpif, upcall, odp_actions, wc) 进行处理,该函数也存储在 ovs-main/ofproto/ofproto-dpif-upcall.c 文件中:
static void upcall_xlate(struct udpif *udpif, struct upcall *upcall, struct ofpbuf *odp_actions, struct flow_wildcards *wc) {
struct dpif_flow_stats stats;
enum xlate_error xerr;
struct xlate_in xin;
struct ds output;
stats.n_packets = 1;
stats.n_bytes = dp_packet_size(upcall->packet);
stats.used = time_msec();
stats.tcp_flags = ntohs(upcall->flow->tcp_flags);
xlate_in_init(&xin, upcall->ofproto, ofproto_dpif_get_tables_version(upcall->ofproto), upcall->flow, upcall->ofp_in_port, NULL, stats.tcp_flags, upcall->packet, wc, odp_actions);
if (upcall->type == MISS_UPCALL) {
xin.resubmit_stats = &stats;
if (xin.frozen_state) {
/* We may install a datapath flow only if we get a reference to the recirculation context (otherwise we could have recirculation upcalls using recirculation ID for which no context can be found).
* We may still execute the flow's actions even if we don't install the flow. */
upcall->recirc = recirc_id_node_from_state(xin.frozen_state);
upcall->have_recirc_ref = recirc_id_node_try_ref_rcu(upcall->recirc);
}
} else {
/* For non-miss upcalls, we are either executing actions (one of which is an userspace action) for an upcall, in which case the stats have already been taken care of, or there's a flow in the datapath which this packet was accounted to.
* Presumably the revalidators will deal with pushing its stats eventually. */
}
upcall->reval_seq = seq_read(udpif->reval_seq);
xerr = xlate_actions(&xin, &upcall->xout);
/* Translate again and log the ofproto trace for these two error types. */
if (xerr == XLATE_RECURSION_TOO_DEEP || xerr == XLATE_TOO_MANY_RESUBMITS) {
static struct vlog_rate_limit rll = VLOG_RATE_LIMIT_INIT(1, 1);
/* This is a huge log, so be conservative. */
if (!VLOG_DROP_WARN(&rll)) {
ds_init(&output);
ofproto_trace(upcall->ofproto, upcall->flow, upcall->packet, NULL, 0, NULL, &output);
VLOG_WARN("%s", ds_cstr(&output));
ds_destroy(&output);
}
}
if (wc) {
/* Convert the input port wildcard from OFP to ODP format.
* There's no real way to do this for arbitrary bitmasks since the numbering spaces aren't the same.
* However, flow translation always exact matches the whole thing, so we can do the same here. */
WC_MASK_FIELD(wc, in_port.odp_port);
}
upcall->xout_initialized = true;
if (upcall->fitness == ODP_FIT_TOO_LITTLE) {
upcall->xout.slow |= SLOW_MATCH;
}
if (!upcall->xout.slow) {
ofpbuf_use_const(&upcall->put_actions, odp_actions->data, odp_actions->size);
} else {
/* upcall->put_actions already initialized by upcall_receive(). */
compose_slow_path(udpif, &upcall->xout, upcall->flow->in_port.odp_port, upcall->ofp_in_port, &upcall->put_actions, upcall->ofproto->up.slowpath_meter_id, &upcall->ofproto->uuid);
}
/* This function is also called for slow-pathed flows.
* As we are only going to create new datapath flows for actual datapath misses, there is no point in creating a ukey otherwise. */
if (upcall->type == MISS_UPCALL) {
upcall->ukey = ukey_create_from_upcall(upcall, wc);
}
}
函数的第一个输入参数 struct udpif *udpif 代表数据平面上下文,第二个输入参数 struct upcall *upcall 代表收到的 upcall 消息,第三个输入参数 struct ofpbuf *odp_actions 用于存储最终生成的 ODP 行为,第四个输入参数 struct flow_wildcards *wc 用于记录生成的流表项中的通配掩码。
函数首先调用 xlate_in_init(&xin, upcall->ofproto, ofproto_dpif_get_tables_version(upcall->ofproto), upcall->flow, upcall->ofp_in_port, NULL, stats.tcp_flags, upcall->packet, wc, odp_actions) 函数初始化 xin 结构体,它包含了进行流表项翻译所需的各种上下文信息,如交换机实例、流表版本、输入端口等。
如果接收的是 MISS_UPCALL 类型的消息,则会关联 upcall 的统计信息,并尝试获得循环上下文的引用。对于其他类型的 upcall 消息,则认为已经记录过数据,无需再次处理。
然后调用 xlate_actions() 函数执行流表项的行为转换,该函数也存储在 ovs-main/ofproto/ofproto-dpif-upcall.c 文件中:
/* Translates the flow, actions, or rule in 'xin' into datapath actions in 'xout'.
* The caller must take responsibility for eventually freeing 'xout', with xlate_out_uninit().
* Returns 'XLATE_OK' if translation was successful.
* In case of an error an empty set of actions will be returned in 'xin->odp_actions' (if non-NULL), so that most callers may ignore the return value and transparently install a drop flow when the translation fails. */
enum xlate_error xlate_actions(struct xlate_in *xin, struct xlate_out *xout) {
*xout = (struct xlate_out) {
.slow = 0,
.recircs = RECIRC_REFS_EMPTY_INITIALIZER,
};
struct xlate_cfg *xcfg = ovsrcu_get(struct xlate_cfg *, &xcfgp);
struct xbridge *xbridge = xbridge_lookup(xcfg, xin->ofproto);
if (!xbridge) {
return XLATE_BRIDGE_NOT_FOUND;
}
struct flow *flow = &xin->flow;
uint8_t stack_stub[1024];
uint64_t action_set_stub[1024 / 8];
uint64_t frozen_actions_stub[1024 / 8];
uint64_t actions_stub[256 / 8];
struct ofpbuf scratch_actions = OFPBUF_STUB_INITIALIZER(actions_stub);
struct xlate_ctx ctx = {
.xin = xin,
.xout = xout,
.base_flow = *flow,
.orig_tunnel_ipv6_dst = flow_tnl_dst(&flow->tunnel),
.xcfg = xcfg,
.xbridge = xbridge,
.stack = OFPBUF_STUB_INITIALIZER(stack_stub),
.rule = xin->rule,
.wc = (xin->wc ? xin->wc : &(struct flow_wildcards) { .masks = { .dl_type = 0 } }),
.odp_actions = xin->odp_actions ? xin->odp_actions : &scratch_actions,
.depth = xin->depth,
.resubmits = xin->resubmits,
.in_action_set = false,
.in_packet_out = xin->in_packet_out,
.pending_encap = false,
.pending_decap = false,
.encap_data = NULL,
.table_id = 0,
.rule_cookie = OVS_BE64_MAX,
.orig_skb_priority = flow->skb_priority,
.sflow_n_outputs = 0,
.sflow_odp_port = 0,
.nf_output_iface = NF_OUT_DROP,
.exit = false,
.error = XLATE_OK,
.mirrors = 0,
.freezing = false,
.recirc_update_dp_hash = false,
.frozen_actions = OFPBUF_STUB_INITIALIZER(frozen_actions_stub),
.pause = NULL,
.was_mpls = false,
.conntracked = false,
.ct_nat_action = NULL,
.action_set_has_group = false,
.action_set = OFPBUF_STUB_INITIALIZER(action_set_stub),
};
......
if (!xin->frozen_state && process_special(&ctx, in_port)) {
......
} else {
......
/* Output only fully processed packets. */
if (!ctx.freezing
&& xbridge->has_in_band && in_band_must_output_to_local_port(flow) && !actions_output_to_local_port(&ctx)) {
WC_MASK_FIELD(ctx.wc, nw_proto);
WC_MASK_FIELD(ctx.wc, tp_src);
WC_MASK_FIELD(ctx.wc, tp_dst);
WC_MASK_FIELD(ctx.wc, dl_type);
xlate_report(&ctx, OFT_DETAIL, "outputting DHCP packet " "to local port for in-band control");
compose_output_action(&ctx, OFPP_LOCAL, NULL, false, false);
}
if (user_cookie_offset) {
fix_sflow_action(&ctx, user_cookie_offset);
}
}
if (nl_attr_oversized(ctx.odp_actions->size)) {
/* These datapath actions are too big for a Netlink attribute, so we can't hand them to the kernel directly.
* dpif_execute() can execute them one by one with help, so just mark the result as SLOW_ACTION to prevent the flow from being installed. */
COVERAGE_INC(xlate_actions_oversize);
ctx.xout->slow |= SLOW_ACTION;
} else if (too_many_output_actions(ctx.odp_actions)) {
COVERAGE_INC(xlate_actions_too_many_output);
ctx.xout->slow |= SLOW_ACTION;
}
/* Update NetFlow for non-frozen traffic. */
if (xbridge->netflow && !xin->frozen_state) {
if (ctx.xin->resubmit_stats) {
netflow_flow_update(xbridge->netflow, flow,
ctx.nf_output_iface,
ctx.xin->resubmit_stats);
}
if (ctx.xin->xcache) {
struct xc_entry *entry;
entry = xlate_cache_add_entry(ctx.xin->xcache, XC_NETFLOW);
entry->nf.netflow = netflow_ref(xbridge->netflow);
entry->nf.flow = xmemdup(flow, sizeof *flow);
entry->nf.iface = ctx.nf_output_iface;
}
}
/* Translate tunnel metadata masks to udpif format if necessary. */
if (xin->upcall_flow->tunnel.flags & FLOW_TNL_F_UDPIF) {
if (ctx.wc->masks.tunnel.metadata.present.map) {
const struct flow_tnl *upcall_tnl = &xin->upcall_flow->tunnel;
struct geneve_opt opts[TLV_TOT_OPT_SIZE /
sizeof(struct geneve_opt)];
tun_metadata_to_geneve_udpif_mask(&flow->tunnel,
&ctx.wc->masks.tunnel,
upcall_tnl->metadata.opts.gnv,
upcall_tnl->metadata.present.len,
opts);
memset(&ctx.wc->masks.tunnel.metadata, 0,
sizeof ctx.wc->masks.tunnel.metadata);
memcpy(&ctx.wc->masks.tunnel.metadata.opts.gnv, opts,
upcall_tnl->metadata.present.len);
}
ctx.wc->masks.tunnel.metadata.present.len = 0xff;
ctx.wc->masks.tunnel.metadata.tab = NULL;
ctx.wc->masks.tunnel.flags |= FLOW_TNL_F_UDPIF;
}
......
xlate_wc_finish(&ctx);
......
return ctx.error;
}
xlate_actions() 函数主要负责处理隧道相关信息、处理 DHCP 报文、修正 sFlow 行为和更新 NetFlow 记录等功能(内容太多这里不展开)。总之最后转换的结果会存储在 xout 结构体中。
在修改完 xout 结构体后,如果函数提供了 wc 参数,则将输入端口的通配掩码从 OpenFlow 格式转换为 datapath 格式。最后,根据 upcall->fitness 的值决定是否需要将 SLOW_MATCH 标记添加到 upcall->xout.slow 中,并相应地决定如何将 odp_actions 中的行为拷贝到 upcall->put_actions 中。
(2)SFLOW_UPCALL 类型的处理
case SFLOW_UPCALL:
if (upcall->sflow) {
struct dpif_sflow_actions sflow_actions;
memset(&sflow_actions, 0, sizeof sflow_actions);
actions_len = dpif_read_actions(udpif, upcall, flow, upcall->type, &sflow_actions);
dpif_sflow_received(upcall->sflow, packet, flow, flow->in_port.odp_port, &upcall->cookie, actions_len > 0 ? &sflow_actions : NULL);
}
break;
这段代码用于处理 sFlow 类型的 upcall 消息。函数首先初始化一个 dpif_sflow_actions 结构体,用于存储 sFlow 采集的行为信息。然后调用 dpif_read_actions(udpif, upcall, flow, upcall->type, &sflow_actions) 函数从 upcall 中读取 sFlow 的动作信息,并将其存储在 sflow_actions 结构体中。接下来调用 dpif_sflow_received(upcall->sflow, packet, flow, flow->in_port.odp_port, &upcall->cookie, actions_len > 0 ? &sflow_actions : NULL) 函数,将读取到的数据包、流信息、端口以及行为信息传递给 sFlow 分析模块进行处理。
Tips:这里的 sFlow 是一种网络数据采样技术,用于监控和分析网络流量。
(3)IPFIX_UPCALL 和 FLOW_SAMPLE_UPCALL 类型的处理
case IPFIX_UPCALL:
case FLOW_SAMPLE_UPCALL:
if (upcall->ipfix) {
struct flow_tnl output_tunnel_key;
struct dpif_ipfix_actions ipfix_actions;
memset(&ipfix_actions, 0, sizeof ipfix_actions);
if (upcall->out_tun_key) {
odp_tun_key_from_attr(upcall->out_tun_key, &output_tunnel_key, NULL);
}
actions_len = dpif_read_actions(udpif, upcall, flow, upcall->type, &ipfix_actions);
if (upcall->type == IPFIX_UPCALL) {
dpif_ipfix_bridge_sample(upcall->ipfix, packet, flow, flow->in_port.odp_port, upcall->cookie.ipfix.output_odp_port, upcall->out_tun_key ? &output_tunnel_key : NULL, actions_len > 0 ? &ipfix_actions: NULL);
} else {
/* The flow reflects exactly the contents of the packet.
* Sample the packet using it. */
dpif_ipfix_flow_sample(upcall->ipfix, packet, flow, &upcall->cookie, flow->in_port.odp_port, upcall->out_tun_key ? &output_tunnel_key : NULL, actions_len > 0 ? &ipfix_actions: NULL);
}
}
break;
这段代码用于处理 IPFIX 和 FLOW_SAMPLE 类型的 upcall 消息。函数首先初始化 flow_tnl 结构体用于存储输出隧道的相关信息,还初始化了 dpif_ipfix_actions 结构体用于存储 IPFIX 采集的行为信息。然后调用 dpif_read_actions(udpif, upcall, flow, upcall->type, &ipfix_actions) 函数从 upcall 中读取 IPFIX 的动作信息,并将其存储在 ipfix_actions 结构体中。接下来根据 upcall 的类型,调用不同的 IPFIX 处理函数:如果是 IPFIX_UPCALL 类型,则调用 dpif_ipfix_bridge_sample(upcall->ipfix, packet, flow, flow->in_port.odp_port, upcall->cookie.ipfix.output_odp_port, upcall->out_tun_key ? &output_tunnel_key : NULL, actions_len > 0 ? &ipfix_actions: NULL) 函数,将数据包、流信息、端口、输出隧道信息以及行为信息传递给 IPFIX 分析模块进行处理;如果是 FLOW_SAMPLE_UPCALL 类型,则调用 dpif_ipfix_flow_sample(upcall->ipfix, packet, flow, &upcall->cookie, flow->in_port.odp_port, upcall->out_tun_key ? &output_tunnel_key : NULL, actions_len > 0 ? &ipfix_actions: NULL) 函数将数据包、流信息、cookie、端口、输出隧道信息以及行为信息传递给 IPFIX 分析模块进行处理。
Tips:这里的 IPFIX 是一种网络流量监控协议,用于收集和分析网络流量数据。
并且这里的 IPFIX_UPCALL 和 FLOW_SAMPLE_UPCALL 类型是 IPFIX 类型的不同属性。
(4)CONTROLLER_UPCALL 类型的处理
case CONTROLLER_UPCALL:
{
struct user_action_cookie *cookie = &upcall->cookie;
if (cookie->controller.dont_send) {
return 0;
}
uint32_t recirc_id = cookie->controller.recirc_id;
if (!recirc_id) {
break;
}
const struct recirc_id_node *recirc_node = recirc_id_node_find(recirc_id);
if (!recirc_node) {
break;
}
const struct frozen_state *state = &recirc_node->state;
struct ofproto_async_msg *am = xmalloc(sizeof *am);
*am = (struct ofproto_async_msg) {
.controller_id = cookie->controller.controller_id,
.oam = OAM_PACKET_IN,
.pin = {
.up = {
.base = {
.packet = xmemdup(dp_packet_data(packet), dp_packet_size(packet)),
.packet_len = dp_packet_size(packet),
.reason = cookie->controller.reason,
.table_id = state->table_id,
.cookie = get_32aligned_be64(&cookie->controller.rule_cookie),
.userdata = (recirc_node->state.userdata_len ? xmemdup(recirc_node->state.userdata, recirc_node->state.userdata_len) : NULL),
.userdata_len = recirc_node->state.userdata_len,
},
},
.max_len = cookie->controller.max_len,
},
};
if (cookie->controller.continuation) {
am->pin.up.stack = (state->stack_size ? xmemdup(state->stack, state->stack_size) : NULL),
am->pin.up.stack_size = state->stack_size,
am->pin.up.mirrors = state->mirrors,
am->pin.up.conntracked = state->conntracked,
am->pin.up.actions = (state->ofpacts_len ? xmemdup(state->ofpacts, state->ofpacts_len) : NULL),
am->pin.up.actions_len = state->ofpacts_len,
am->pin.up.action_set = (state->action_set_len ? xmemdup(state->action_set, state->action_set_len) : NULL),
am->pin.up.action_set_len = state->action_set_len,
am->pin.up.bridge = upcall->ofproto->uuid;
am->pin.up.odp_port = upcall->packet->md.in_port.odp_port;
}
/* We don't want to use the upcall 'flow', since it may be more specific than the point at which the "controller" action was specified. */
struct flow frozen_flow;
frozen_flow = *flow;
if (!state->conntracked) {
flow_clear_conntrack(&frozen_flow);
}
frozen_metadata_to_flow(&upcall->ofproto->up, &state->metadata, &frozen_flow);
flow_get_metadata(&frozen_flow, &am->pin.up.base.flow_metadata);
ofproto_dpif_send_async_msg(upcall->ofproto, am);
}
break;
这段代码用于处理 controller 类型的 upcall 消息。函数首先检查和处理相关的 cookie 信息,然后分别创建 ofproto_async_msg 结构体和 flow 结构体并初始化相关字段,并根据实际情况对部分属性进行更新。接下来将更新后的 frozen_flow 的数据填充到 ofproto_async_msg 的 flow_metadata 字段中。最后调用 ofproto_dpif_send_async_msg(upcall->ofproto, am) 函数,将构造好的 ofproto_async_msg 消息发送给控制器。
(5)BAD_UPCALL 类型的处理
case BAD_UPCALL:
break;
}
对于无法识别的 upcall 消息,直接返回。
总结:
通过阅读上面的源码实现,能够对 upcall 的不同消息类型有更深入的理解:这里的 MISS_UPCALL 表示网络设备遇到了流表缺失的情况,SLOW_PATH_UPCALL 表示网络设备需要通过慢速路径处理当前数据包。某种意义上说这二者是相同的,因为当发生流表缺失时内核模块会直接走慢速路径。这里的 SFLOW_UPCALL、IPFIX_UPCALL 和 FLOW_SAMPLE_UPCALL 都表示网络监控相关模块的 upcall 信息,其中 IPFIX_UPCALL 和 FLOW_SAMPLE_UPCALL 对应 IPFIX 的不同属性。这里的 CONTROLLER_UPCALL 表示网络设备需要向控制器请求指令。这里的 BAD_UPCALL 表示在 upcall 调用过程中的某个地方出现了错误。
那么问题来了:
在数据包的转发流程中,因为内核空间的 Datapath 模块无法匹配流表而产生 upcall 调用,采用的是哪种 upcall 消息类型呢?
答:一般是 MISS_UPCALL 类型。
再强调一遍,一般是 MISS_UPCALL 类型!
由于本人水平有限,以上内容如有不足之处欢迎大家指正(评论区/私信均可)。
参考资料:
Open vSwitch 官网
Open vSwitch 源代码 GitHub
Open vSwitch v2.17.10 LTS 源代码
CloudEngine 16800 V200R019C10 配置指南-网络管理与监控-sFlow原理描述 - 华为
锐捷网络——网络管理和监控—IPFIX功能常见问题以及常见故障-CSDN博客
ovs upcall 处理流程-CSDN博客
OVS 中的 upcall 线程-CSDN博客