NAPI简介
它的核心概念就是不采用中断的方式读取数据,而代之以首先采用中断唤醒数据接收的服务程序,然后 POLL 的方法来轮询数据。NAPI是综合中断方式与轮询方式的技术。
中断的好处是响应及时,如果数据量较小,则不会占用太多的CPU事件;缺点是数据量大时,会产生过多中断,而每个中断都要消耗不少的CPU时间,从而导致效率反而不如轮询高。
轮询方式与中断方式相反,它更适合处理大量数据,因为每次轮询不需要消耗过多的CPU时间;缺点是即使只接收很少数据或不接收数据时,也要占用CPU
时间。
NAPI是两者的结合,数据量低时采用中断,数据量高时采用轮询。当有数据到达时,会触发中断处理函数执行,中断处理函数关闭中断开始处理。如果此时有数据到达,则没必要再触发中断了,因为中断处理函数中会轮询处理数据,直到没有新数据时才打开中断。很明显,数据量很低与很高时,NAPI可以发挥中断与轮询方式的优点,性能较好。如果数据量不稳定,且说高不高说低不低,则NAPI则会在两种方式切换上消耗不少时间,效率反而较低一些。
2.实现区别分析:
NAPI目前要求驱动设备提供poll 方法
非NAPI的内核接口为netif_rx(),NAPI的内核接口为napi_schedule()。
非NAPI使用共享的CPU队列softnet_data->input_pkt_queue,NAPI使用设备内存或者驱动程序的接受环。
*/
struct napi_struct {
/* The poll_list must only be managed by the entity which
* changes the state of the NAPI_STATE_SCHED bit. This means
* whoever atomically sets that bit can add this napi_struct
* to the per-CPU poll_list, and whoever clears that bit
* can remove from the list right before clearing the bit.
*/
struct list_head poll_list;/* 用于加入处于轮询状态的设备队列 */
unsigned long state; /* 设备的状态 */
int weight; /* 每次处理的最大数量,非NAPI有默认值*/
unsigned long gro_bitmask;
int (*poll)(struct napi_struct *, int);/* 设备注册的轮询方法,非NAPI为process_backlog() */
#ifdef CONFIG_NETPOLL
int poll_owner;
#endif
struct net_device *dev;
struct gro_list gro_hash[GRO_HASH_BUCKETS];
struct sk_buff *skb;
struct list_head rx_list; /* Pending GRO_NORMAL skbs */
int rx_count; /* length of rx_list */
struct hrtimer timer;
struct list_head dev_list;
struct hlist_node napi_hash_node;
unsigned int napi_id;
struct task_struct *thread;
};
NAPI方式下收包流程:
NAPI方式下,过程如下:
1、驱动中调用netif_napi_add注册NAPI
// netif_napi_add(ð->dummy_dev, ð->tx_napi, mtk_napi_tx, MTK_NAPI_WEIGHT);
// netif_napi_add(ð->dummy_dev, ð->rx_napi[0].napi, mtk_napi_rx, MTK_NAPI_WEIGHT);
void netif_napi_add(struct net_device *dev, struct napi_struct *napi,
int (*poll)(struct napi_struct *, int), int weight)
{
INIT_LIST_HEAD(&napi->poll_list);
hrtimer_init(&napi->timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL_PINNED);
napi->timer.function = napi_watchdog;
init_gro_hash(napi);
napi->skb = NULL;
INIT_LIST_HEAD(&napi->rx_list);
napi->rx_count = 0;
napi->poll = poll;
if (weight > NAPI_POLL_WEIGHT)
netdev_err_once(dev, "%s() called with weight %d\n", __func__,
weight);
napi->weight = weight;
napi->dev = dev;
#ifdef CONFIG_NETPOLL
napi->poll_owner = -1;
#endif
set_bit(NAPI_STATE_SCHED, &napi->state);
set_bit(NAPI_STATE_NPSVC, &napi->state);
list_add_rcu(&napi->dev_list, &dev->napi_list);
napi_hash_add(napi);
/* Create kthread for this napi if dev->threaded is set.
* Clear dev->threaded if kthread creation failed so that
* threaded mode will not be enabled in napi_enable().
*/
if (dev->threaded && napi_kthread_create(napi))
dev->threaded = 0;
}
NAPI 方式在硬中断处理函数中调用napi_schedule 来获取数据报文
static irqreturn_t mtk_handle_irq_rx(int irq, void *priv)
{
struct mtk_napi *rx_napi = priv;
struct mtk_eth *eth = rx_napi->eth;
struct mtk_rx_ring *ring = rx_napi->rx_ring;
if (likely(napi_schedule_prep(&rx_napi->napi))) {
mtk_rx_irq_disable(eth, MTK_RX_DONE_INT(ring->ring_no));
__napi_schedule(&rx_napi->napi);
}
return IRQ_HANDLED;
}
/* Called with irq disabled */
static inline void ____napi_schedule(struct softnet_data *sd,
struct napi_struct *napi)
{
struct task_struct *thread;
if (test_bit(NAPI_STATE_THREADED, &napi->state)) {
/* Paired with smp_mb__before_atomic() in
* napi_enable()/dev_set_threaded().
* Use READ_ONCE() to guarantee a complete
* read on napi->thread. Only call
* wake_up_process() when it's not NULL.
*/
thread = READ_ONCE(napi->thread);
if (thread) {
if (thread->state != TASK_INTERRUPTIBLE)
set_bit(NAPI_STATE_SCHED_THREADED, &napi->state);
wake_up_process(thread);
return;
}
}
list_add_tail(&napi->poll_list, &sd->poll_list); //将napi->poll_list加入到sd->poll_list链表尾部
__raise_softirq_irqoff(NET_RX_SOFTIRQ); //触发软中断,软中断处理函数中回调注册的poll接口
}
软中断处理接口net_rx_action:
static __latent_entropy void net_rx_action(struct softirq_action *h)
{
/* 设置了两个限制:budget和time_limit。前者限制本次处理数据包的总量,后者限制本次处理总时间。
只有二者均有剩余的情况下,才会继续处理 */
struct softnet_data *sd = this_cpu_ptr(&softnet_data);
unsigned long time_limit = jiffies +
usecs_to_jiffies(netdev_budget_usecs); //设置软中断处理程序一次允许的最大执行时间2个jiffes
int budget = netdev_budget;
LIST_HEAD(list);
LIST_HEAD(repoll);
local_irq_disable();
list_splice_init(&sd->poll_list, &list);
local_irq_enable();
for (;;) {
struct napi_struct *n;
if (list_empty(&list)) {
if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
goto out;
break;
}
/* 从sd->poll_list头部取出一个napi,即使现在硬中断抢占软中断,会把一个napi挂到pool_list的尾端软中断只会从pool_list 头部移除一个pool_list,这样不存在临界区*/
n = list_first_entry(&list, struct napi_struct, poll_list);
budget -= napi_poll(n, &repoll); //减去poll的数量
/* 若时间已经过去2jiffes或无数据可poll */
if (unlikely(budget <= 0 ||
time_after_eq(jiffies, time_limit))) {
sd->time_squeeze++;
break;
}
}
/*下面会有把没执行完的NAPI挂到softnet_data尾部的操作,和硬中断存在临界区,所以关闭CPU本地中断(使CPU无法响应系统硬件中断) */
local_irq_disable();
list_splice_tail_init(&sd->poll_list, &list);
list_splice_tail(&repoll, &list);
list_splice(&list, &sd->poll_list);
if (!list_empty(&sd->poll_list))//还有数据需要处理 就打开软中断
__raise_softirq_irqoff(NET_RX_SOFTIRQ);// 设置softirq bitmask 等待 执行软中断回调
net_rps_action_and_irq_enable(sd); //里面会打开CPU本地中断
out:
__kfree_skb_flush();
}
static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{
bool do_repoll = false;
void *have;
int work;
list_del_init(&n->poll_list);
have = netpoll_poll_lock(n);
work = __napi_poll(n, &do_repoll);
if (do_repoll)
list_add_tail(&n->poll_list, repoll);
netpoll_poll_unlock(have);
return work;
}
static int __napi_poll(struct napi_struct *n, bool *repoll)
{
int work, weight;
weight = n->weight;
/* This NAPI_STATE_SCHED test is for avoiding a race
* with netpoll's poll_napi(). Only the entity which
* obtains the lock and sees NAPI_STATE_SCHED set will
* actually make the ->poll() call. Therefore we avoid
* accidentally calling ->poll() when NAPI is not scheduled.
*/
work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
/* NAPI的napi_struct是自己构造的,该结构上的poll钩子函数也是自己定义的。
非NAPI的napi_struct结构是默认的,也就是per cpu的softnet_data>backlog,
其poll钩子函数为process_backlog,在net_dev_init注册。
*/
work = n->poll(n, weight);
trace_napi_poll(n, work, weight);
}
WARN_ON_ONCE(work > weight);
if (likely(work < weight))
return work;
/* Drivers must not modify the NAPI state if they
* consume the entire weight. In such cases this code
* still "owns" the NAPI instance and therefore can
* move the instance around on the list at-will.
*/
if (unlikely(napi_disable_pending(n))) {
napi_complete(n);
return work;
}
if (n->gro_bitmask) {
/* flush too old packets
* If HZ < 1000, flush all packets.
*/
napi_gro_flush(n, HZ >= 1000);
}
gro_normal_list(n);
/* Some drivers may have called napi_schedule
* prior to exhausting their budget.
*/
if (unlikely(!list_empty(&n->poll_list))) {
pr_warn_once("%s: Budget exhausted after napi rescheduled\n",
n->dev ? n->dev->name : "backlog");
return work;
}
*repoll = true;
return work;
}
static int mtk_napi_rx(struct napi_struct *napi, int budget)
{
struct mtk_napi *rx_napi = container_of(napi, struct mtk_napi, napi);
struct mtk_eth *eth = rx_napi->eth;
struct mtk_rx_ring *ring = rx_napi->rx_ring;
u32 status, mask;
int rx_done = 0;
int remain_budget = budget;
mtk_handle_status_irq(eth);
poll_again:
mtk_w32(eth, MTK_RX_DONE_INT(ring->ring_no), MTK_PDMA_INT_STATUS);
rx_done = mtk_poll_rx(napi, remain_budget, eth);
if (unlikely(netif_msg_intr(eth))) {
status = mtk_r32(eth, MTK_PDMA_INT_STATUS);
mask = mtk_r32(eth, MTK_PDMA_INT_MASK);
dev_info(eth->dev,
"done rx %d, intr 0x%08x/0x%x\n",
rx_done, status, mask);
}
if (rx_done == remain_budget)
return budget;
status = mtk_r32(eth, MTK_PDMA_INT_STATUS);
if (status & MTK_RX_DONE_INT(ring->ring_no)) {
remain_budget -= rx_done;
goto poll_again;
}
if (napi_complete(napi))
mtk_rx_irq_enable(eth, MTK_RX_DONE_INT(ring->ring_no));
return rx_done + budget - remain_budget;
}
napi的状态NAPI_STATE_SCHED:
初始化时:netif_napi_add 会将 napi->state 设置set为调度状态。// set_bit(NAPI_STATE_SCHED, &napi->state);
open网卡时会enable napi,此时会clear 掉标志也就是非调度状态。// clear_bit(NAPI_STATE_SCHED, &n->state);
后续 net_rx_action 会处理报文,如果报文处理完了,就回调用napi_complete,此时会将状态设置位非调度状态,同时enable 网卡中断。