TCP实现之:RPS、XPS、GSO和GRO分析

RPS、XPS、GSO、GRO等这几个技术都是内核协议栈里面用于加速网卡报文处理(收包或者发包)所使用到的一些技术,这里我们来把它们放一块研究一下具体是怎么实现的。

一、RPS

RPS是一种网络负载均衡的技术,其核心原理是在网卡驱动收到报文后,将不同的数据流哈希到不同的CPU上来进行处理,从而更加充分地利用空闲CPU核的资源,推升网络带宽。RPS动作分别会在以下路径中被执行:

  • netif_receive_skb → netif_receive_skb_internal

    这个是在NAPI POLL过程中被调用的,不使用GRO的情况下的路径

  • netif_rx → netif_rx_internal

    这个是在上半部(硬中断)中被调用的,一般发生在在不支持NAPI机制的网卡驱动中

  • napi_gro_receive → napi_skb_finish → gro_normal_one → gro_normal_list → netif_receive_skb_list_internal

    这个是在支持GRO网卡驱动的流程

无论是在哪种路径上执行的RPS动作,其动作都是一样的,即:先调用get_rps_cpu来获取用来处理报文的CPU,再将报文放到那个CPU的收包队列上进行处理。核心代码如下所示:

	if (static_branch_unlikely(&rps_needed)) {struct rps_dev_flow voidflow, *rflow = &voidflow;int cpu = get_rps_cpu(skb->dev, skb, &rflow);if (cpu >= 0) {ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);rcu_read_unlock();return ret;}}

RFS与RPS类似,都是用来进行负载均衡的,不同的是:RPS是将报文根据哈希算法投递到某个CPU上;而RFS是尝试将报文投递到用户态收包进程所在的CPU上。这部分的核心就在于CPU选择的逻辑,而这部分的代码如下所示:

static int get_rps_cpu(struct net_device *dev, struct sk_buff *skb,struct rps_dev_flow **rflowp)
{const struct rps_sock_flow_table *sock_flow_table;struct netdev_rx_queue *rxqueue = dev->_rx;struct rps_dev_flow_table *flow_table;struct rps_map *map;int cpu = -1;u32 tcpu;u32 hash;if (skb_rx_queue_recorded(skb)) {u16 index = skb_get_rx_queue(skb);if (unlikely(index >= dev->real_num_rx_queues)) {WARN_ONCE(dev->real_num_rx_queues > 1,"%s received packet on queue %u, but number ""of RX queues is %u\n",dev->name, index, dev->real_num_rx_queues);goto done;}rxqueue += index;}/* Avoid computing hash if RFS/RPS is not active for this rxqueue */flow_table = rcu_dereference(rxqueue->rps_flow_table);map = rcu_dereference(rxqueue->rps_map);if (!flow_table && !map)goto done;skb_reset_network_header(skb);hash = skb_get_hash(skb);if (!hash)goto done;/* 这里会首先检查是否启用了RFS功能。其中,sock_flow_table是全局哈希表,* 通过/proc/sys/net/core/rps_sock_flow_entries设置,记录的是用户态* 收包进程所处的CPU。* * flow_table是通过网卡队列的rps_flow_cnt参数设置的,只有在sock_flow_table* 和flow_table都启用了的情况下,才会进入到RFS的流程。* * 这里需要关注两个CPU指标:用户CPU和设备CPU。设备CPU指的是内核用来处理报文* 的CPU,也就是经过负载均衡当前处理这个流的CPU;用户CPU指的是进行收包的* 用户进程所在的CPU,这个CPU可能会随着进程调度而迁移到别的CPU上。而RFS的* 目标就是将报文负载均衡到用户CPU上,也就是将设备CPU切换成用户CPU,从而* 降低报文处理的时延和cache miss。* * 用户CPU是通过rps_sock_flow_table这个全局哈希表来记录的,其存储了每个流对应* 的CPU。设备CPU是通过每个网卡队列上的局部哈希表flow_table来记录的。* * 当用户CPU和设备CPU不同的时候,会尝试在以下情况下将设备CPU切换成用户CPU(* 保证将报文放到用户进程所在的CPU,提升cache命中率):* - 设备CPU下线了* - 设备CPU上面的报文都已经处理完成了,切换CPU不会导致乱序出现* * 如何判断这个CPU上的当前流的报文都处理完成了?每个CPU上的softnet_data存储了* 两个值:input_queue_head - 当前CPU上已经处理的报文数据;* input_queue_tail - 当前CPU上已经处理和将要处理的报文数量。当前流上的* last_qtail记录着这个流所看到的最后的input_queue_tail值。如果这个值* <= input_queue_head,那么说明当前流的报文肯定已经处理完成了。*/sock_flow_table = rcu_dereference(rps_sock_flow_table);if (flow_table && sock_flow_table) {struct rps_dev_flow *rflow;u32 next_cpu;u32 ident;/* 还没有记录到用户CPU,那么就不需要考虑CPU迁移的问题,直接使用* map中的cpu信息进行哈希。*/ident = sock_flow_table->ents[hash & sock_flow_table->mask];/* 检查哈希表中对应的实体是不是属于这个流的,哈希表中的实体可能会因为* 哈希冲突,而被别的流占用。一旦被占用,那么就按照传统的方式进行哈希。* (不会乱序吗?)* * 假如这里ents的长度为1,有进程A/B交替收包,那么A在收包期间,其CPU* 会频繁从用户CPU直接切换到原始哈希CPU,而没有保证时序性?需要做个* 实验。* * 经过实验,这里的确会产生强烈的实例竞争问题,多个进程可能哈希到同一个* 实例上,从而产生相互影响,造成乱序。一旦发生冲突,rps的CPU就会从* 用户CPU直接切换到原始CPU。*/if ((ident ^ hash) & ~rps_cpu_mask)goto try_rps;next_cpu = ident & rps_cpu_mask;/* 这里的流不代表一个连接,所有的哈希目标相同的连接都会共享同一个rflow* 对象。也就是说,共享同一个rflow的连接,其设备CPU都一样。这里由于* 设置了序列检查,所以不会产生竞争的问题?只有在序列检查OK的情况下,* 才会进行设备CPU的切换。*/rflow = &flow_table->flows[hash & flow_table->mask];tcpu = rflow->cpu;if (unlikely(tcpu != next_cpu) &&(tcpu >= nr_cpu_ids || !cpu_online(tcpu) ||((int)(per_cpu(softnet_data, tcpu).input_queue_head -rflow->last_qtail)) >= 0)) {tcpu = next_cpu;/* 采用用户CPU,并更新设备CPU为用户CPU。这里更新的不是当前* 收包队列上的rflow,而是映射到用户CPU的收包队列上的rflow。** 这里会调用ndo_rx_flow_steer方法,来使得后续网卡(驱动)* 都会将当前流的报文从新的收包队列上进行收包。*/rflow = set_rps_cpu(dev, skb, rflow, next_cpu);}if (tcpu < nr_cpu_ids && cpu_online(tcpu)) {*rflowp = rflow;/* 这样搞的话,岂不是可以突破rps_cpus的限制了? */cpu = tcpu;goto done;}}try_rps:/* 对于未启用RFS功能的队列(rps_flow_cnt没有设置),这里会简单地从备选CPU中* 哈希出来一个CPU,也就是RPS的逻辑。*/if (map) {tcpu = map->cpus[reciprocal_scale(hash, map->len)];if (cpu_online(tcpu)) {cpu = tcpu;goto done;}}done:return cpu;
}

二、XPS

XPS与RPS的原理类似,不同的是XPS针对的是发包路径。在发包路径中__dev_queue_xmit → netdev_core_pick_tx → netdev_pick_tx会调用get_xps_queue来获取用于发送报文的网卡队列,因此搞清楚这个函数的逻辑,也就搞清楚了XPS的原理。其实现如下所示:

static int get_xps_queue(struct net_device *dev, struct net_device *sb_dev,struct sk_buff *skb)
{
#ifdef CONFIG_XPSstruct xps_dev_maps *dev_maps;struct sock *sk = skb->sk;int queue_index = -1;if (!static_key_false(&xps_needed))return -1;rcu_read_lock();if (!static_key_false(&xps_rxqs_needed))goto get_cpus_map;/* rxq部分的代码。常规的xps会通过哈希的方式,将报文投递到当前CPU映射到的队列* 中的一个。而rxq则是建立了rx队列和tx队列之间的映射关系,将报文投递到* 报文的收包队列映射到的发包队列中的一个。* * 如果没有找到合适的队列,那么调用者会直接将报文哈希到某个队列上(CPU无关)。*/dev_maps = rcu_dereference(sb_dev->xps_rxqs_map);if (dev_maps) {/* 这里的tci获取到的是套接口上记录的sk_rx_queue_mapping,表示的* 是当前套接口对应的收包队列的索引。因为一般一个流会对应一个收包* 队列,因此可以把这个信息持久化地保存到套接口上。*/int tci = sk_rx_queue_get(sk);if (tci >= 0 && tci < dev->num_rx_queues)queue_index = __get_xps_queue_idx(dev, skb, dev_maps,tci);}get_cpus_map:if (queue_index < 0) {dev_maps = rcu_dereference(sb_dev->xps_cpus_map);if (dev_maps) {/* 这里的sender_cpu记录的是发送skb的CPU。这里取出该* CPU上的map,并哈希出对应的队列。*/unsigned int tci = skb->sender_cpu - 1;queue_index = __get_xps_queue_idx(dev, skb, dev_maps,tci);}}rcu_read_unlock();return queue_index;
#elsereturn -1;
#endif
}

三、GSO/TSO

GSO的全称是Generic Segmentation Offload,即通用的分段卸载技术,这里我们以TCP协议为例来讲解GSO的实现细节。在内核函数tcp_sendmsg_locked中,它会构造skb并将用户态的数据拷贝进来。其中,size_goal变量中存储的是一个skb可以存储的数据的长度,其数值是在函数tcp_send_mss → tcp_xmit_size_goal中计算出来的。在未开启GSO的情况下,这个值的大小就是mss,也就意味着一个skb中存储的数据量不大于mss。从某种意义上讲,这种情况下,在数据拷贝阶段就已经完成了TCP数据的分段过程。

在支持GSO的情况下,size_goal的值会被设置为网卡驱动中设置的max_gso_size。在进行数据拷贝的时候,所有的数据都会被拷贝到skb的聚合/分散IO区,即采用page物理页的方式来存储数据。采用这种数据存储方式,一方面可以为后续的TSO做准备;另一方面,如果后续不支持GSO,也可以很方便地进行数据的切割(无拷贝)和分段。

在内核将skb交给网卡驱动之前,会进行GSO的检查,在__dev_queue_xmit → validate_xmit_skb → netif_needs_gso中:

static inline bool netif_needs_gso(struct sk_buff *skb,netdev_features_t features)
{/* skb是GSO类型的skb,并且网卡驱动特性不满足GSO的要求,那么就需要进行软件* 分段。*/return skb_is_gso(skb) && (!skb_gso_ok(skb, features) ||unlikely((skb->ip_summed != CHECKSUM_PARTIAL) &&(skb->ip_summed != CHECKSUM_UNNECESSARY)));
}static inline bool skb_gso_ok(struct sk_buff *skb, netdev_features_t features)
{/* 检查网卡驱动支持的特性是否满足gso_type的要求(GSO-TCP,GSO-UDP等),* 并且检查网卡驱动是否支持链表(还能这样?)*/return net_gso_ok(features, skb_shinfo(skb)->gso_type) &&(!skb_has_frag_list(skb) || (features & NETIF_F_FRAGLIST));
}

在需要进行GSO的情况下,__skb_gso_segment → skb_mac_gso_segment会被调用。这里会根据当前报文的协议,从offload_base链表中遍历并找出对应的三层协议注册的钩子函数来进行GSO分段。对于IP协议,这里会调用inet_gso_segment函数,这个函数会做一些准备工作(设置协议头等)并从inet_offloads找到对应的四层协议处理函数。对于TCP协议,这里会调用tcp4_gso_segment函数。需要注意的是,四层钩子函数中只会处理四层头部,而IP头部是在inet_gso_segment函数中处理的。

四、GRO

GRO可以理解为GSO的反向操作:将网卡驱动收到的多个属于同一个流的skb组装成一个skb链表,一次性作为一个单独的大的报文上送到内核协议栈。通过这种方式,避免了网络协议栈的频繁调度,降低了系统开销,提升网络性能。下面我们来看一下内核是如何进行skb的组装的。

napi_poll

一切的一切还要从napi_poll开始说起,这个函数会调用网卡驱动注册的poll函数从当前网卡上收取不超过budget数量的网络报文:

static int napi_poll(struct napi_struct *n, struct list_head *repoll)
{xxxxxxwork = 0;if (test_bit(NAPI_STATE_SCHED, &n->state)) {/* 调用网卡驱动上的POLL函数进行底层的报文收包。常规情况下,在准备好* 数据后,网卡驱动会将skb交给 napi_gro_receive 函数处理。*/work = n->poll(n, weight);trace_napi_poll(n, work, weight);}if (unlikely(work > weight))pr_err_once("NAPI poll function %pS returned %d, exceeding its budget of %d.\n",n->poll, work, weight);/* 网卡驱动处理的报文数量没有达到budget,直接结束poll流程。 */if (likely(work < weight))goto out_unlock;/* Drivers must not modify the NAPI state if they* consume the entire weight.  In such cases this code* still "owns" the NAPI instance and therefore can* move the instance around on the list at-will.*/if (unlikely(napi_disable_pending(n))) {napi_complete(n);goto out_unlock;}if (n->gro_bitmask) {/* 本次poll周期结束了,如果当前HZ < 1000,那么flush掉当前* napi哈希表中所有的skb。否则,只flush掉比较老的skb。** (flush指的是将skb从哈希表gro_hash中移动到rx_list中)** 这里估计考虑到如果HZ比较小的话,会存在一定的处理延迟,不能等待* 下一个周期。*/napi_gro_flush(n, HZ >= 1000);}/* 在完成当前轮次的napi实例poll后,会调用gro_normal_list将GRO链表中的* 等待上送到协议栈的skb一股脑的处理掉。理论上来说,这里应该会处理掉当前* poll周期内所有的skb。*/gro_normal_list(n);/* Some drivers may have called napi_schedule* prior to exhausting their budget.*/if (unlikely(!list_empty(&n->poll_list))) {pr_warn_once("%s: Budget exhausted after napi rescheduled\n",n->dev ? n->dev->name : "backlog");goto out_unlock;}list_add_tail(&n->poll_list, repoll);out_unlock:netpoll_poll_unlock(have);return work;
}

而网卡驱动的poll函数会调用napi_gro_receive来处理从网卡上获取到的网络报文,这个函数会完成skb的重组,并将其放到当前网卡对应的napi->rx_list链表中。具体的重组的逻辑我们下面再说,先看总体的流程。每个napi对象上有两个比较重要的字段用来处理GRO相关的逻辑:gro_hashrx_list

struct napi_struct {/* The poll_list must only be managed by the entity which* changes the state of the NAPI_STATE_SCHED bit.  This means* whoever atomically sets that bit can add this napi_struct* to the per-CPU poll_list, and whoever clears that bit* can remove from the list right before clearing the bit.*/struct list_head	poll_list;unsigned long		state;int			weight;int			defer_hard_irqs_count;/* 和下面的gro_hash对应的,标志着哈希表中哪个位置存在数据 */unsigned long		gro_bitmask;int			(*poll)(struct napi_struct *, int);
#ifdef CONFIG_NETPOLLint			poll_owner;
#endifstruct net_device	*dev;/* 用于保存属于不同的流的skb的哈希表。这是一个链表式哈希表,每个数组中存储的* 都是一个链表,以链表的形式将skb保存起来。对于TCP协议,这个哈希表中的* 一个skb就代表着一个TCP流,属于同一个流的skb会通过 frag_list 的形式* 并入到这个skb中。*/struct gro_list		gro_hash[GRO_HASH_BUCKETS];struct sk_buff		*skb;/* 已经经过GRO组装好的准备上送到内核协议栈的skb链表 */struct list_head	rx_list; /* Pending GRO_NORMAL skbs */int			rx_count; /* length of rx_list */struct hrtimer		timer;struct list_head	dev_list;struct hlist_node	napi_hash_node;unsigned int		napi_id;
};

napi_gro_receive

这个函数会先调用dev_gro_receive来进行GRO的处理,并将处理的结果传递给napi_skb_finish做后续的处理。

gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{gro_result_t ret;/* NAPI模式下,网卡驱动中的POLL方法在分配好skb后会将其交给本函数处理。 *//* 对当前报文设置NAPI的ID,该ID用于在网口handler处理过程中。 */skb_mark_napi_id(skb, napi);trace_napi_gro_receive_entry(skb);skb_gro_reset_offset(skb, 0);ret = napi_skb_finish(napi, skb, dev_gro_receive(napi, skb));trace_napi_gro_receive_exit(ret);return ret;
}

这里先简单介绍一下dev_gro_receive的返回值所代表的意义吧:

  • GRO_NORMAL:这个skb不需要(不能)进行GRO并包,走常规路线
  • GRO_MERGED_FREE:成功地被GRO,且当前skb的数据被合并到前一个skb中了(前一个skb中还有剩余空间),这个skb可以释放了
  • GRO_MERGED:成功地被GRO,且当前这个skb被加入到了frag_list链表中了

下面我们来看一下dev_gro_receive的具体实现:

static enum gro_result dev_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
{u32 hash = skb_get_hash_raw(skb) & (GRO_HASH_BUCKETS - 1);struct list_head *head = &offload_base;struct packet_offload *ptype;__be16 type = skb->protocol;struct list_head *gro_head;struct sk_buff *pp = NULL;enum gro_result ret;int same_flow;int grow;/* 在网口未开启GRO支持或者网口上使用了XDP的情况下,不进行聚合,走常规路线。 */if (netif_elide_gro(skb->dev))goto normal;/* 根据当前skb的hash值,从napi的hash表里获取skb链表。 */gro_head = gro_list_prepare(napi, skb);rcu_read_lock();/* 对skb的一些GRO属性进行初始化,并调用当前skb的协议对应的GRO回调函数。* 其中,这里的ptype为每个协议(如IP协议)注册的packet_offload类型的* 钩子函数。* * 以IP协议为例,这里会调用 inet_gro_receive() 函数。*/list_for_each_entry_rcu(ptype, head, list) {if (ptype->type != type || !ptype->callbacks.gro_receive)continue;skb_set_network_header(skb, skb_gro_offset(skb));skb_reset_mac_len(skb);NAPI_GRO_CB(skb)->same_flow = 0;NAPI_GRO_CB(skb)->flush = skb_is_gso(skb) || skb_has_frag_list(skb);NAPI_GRO_CB(skb)->free = 0;NAPI_GRO_CB(skb)->encap_mark = 0;NAPI_GRO_CB(skb)->recursion_counter = 0;NAPI_GRO_CB(skb)->is_fou = 0;NAPI_GRO_CB(skb)->is_atomic = 1;NAPI_GRO_CB(skb)->gro_remcsum_start = 0;/* Setup for GRO checksum validation */switch (skb->ip_summed) {case CHECKSUM_COMPLETE:NAPI_GRO_CB(skb)->csum = skb->csum;NAPI_GRO_CB(skb)->csum_valid = 1;NAPI_GRO_CB(skb)->csum_cnt = 0;break;case CHECKSUM_UNNECESSARY:NAPI_GRO_CB(skb)->csum_cnt = skb->csum_level + 1;NAPI_GRO_CB(skb)->csum_valid = 0;break;default:NAPI_GRO_CB(skb)->csum_cnt = 0;NAPI_GRO_CB(skb)->csum_valid = 0;}pp = INDIRECT_CALL_INET(ptype->callbacks.gro_receive,ipv6_gro_receive, inet_gro_receive,gro_head, skb);break;}rcu_read_unlock();/* offload_base链表中没有找到合适的钩子用来处理报文,走正常路径 */if (&ptype->list == head)goto normal;if (PTR_ERR(pp) == -EINPROGRESS) {ret = GRO_CONSUMED;goto ok;}same_flow = NAPI_GRO_CB(skb)->same_flow;/* 检查当前skb是否需要被释放。GRO_MERGED_FREE代表当前skb的数据被合并到了* 上一个skb,当前skb可以释放。GRO_MERGED代表当前skb被加入到了frag_list* 链表,不能释放。*/ret = NAPI_GRO_CB(skb)->free ? GRO_MERGED_FREE : GRO_MERGED;/* 在找到了属于同一个流的skb链表,且需要flush数据的情况下(没有并包,或者* 并包了但是需要flush),这个pp指向的是那个skb链表。*/if (pp) {/* 将这个skb从GRO的哈希链表中删除 */skb_list_del_init(pp);/* 做一些收尾的工作,这里会将GRO映射为GSO。随后,调用* gro_normal_one将报文传递上去。*/napi_gro_complete(napi, pp);napi->gro_hash[hash].count--;}/* 如果并包了,那么当前skb的same_flow会被置为1 */if (same_flow)goto ok;if (NAPI_GRO_CB(skb)->flush)goto normal;/* 走到这里,应该是没有找到同一个流的skb,这种情况下当前skb将要作为领头skb* 被加入到哈希表中了。*/if (unlikely(napi->gro_hash[hash].count >= MAX_GRO_SKBS)) {gro_flush_oldest(napi, gro_head);} else {napi->gro_hash[hash].count++;}NAPI_GRO_CB(skb)->count = 1;NAPI_GRO_CB(skb)->age = jiffies;NAPI_GRO_CB(skb)->last = skb;skb_shinfo(skb)->gso_size = skb_gro_len(skb);list_add(&skb->list, gro_head);ret = GRO_HELD;pull:grow = skb_gro_offset(skb) - skb_headlen(skb);if (grow > 0)gro_pull_from_frag0(skb, grow);
ok:/* 在不需要flush的情况下,当前skb是不会被加入到rx_list链表中的。 */if (napi->gro_hash[hash].count) {if (!test_bit(hash, &napi->gro_bitmask))__set_bit(hash, &napi->gro_bitmask);} else if (test_bit(hash, &napi->gro_bitmask)) {__clear_bit(hash, &napi->gro_bitmask);}return ret;normal:ret = GRO_NORMAL;goto pull;
}

这个函数会调用当前skb所属于的协议注册的钩子函数进行处理,对于TCP协议,这里会调用inet_gro_receive → tcp4_gro_receive函数。在inet_gro_receive函数里,会遍历哈希链表中的skb,从IP层的角度将和当前skb属于同一个流的skb的same_flow属性设置为1。同时,它还会从IP协议的角度来判断属于同一个流的skb是否需要进行flush。因为这里是从IP层来进行标注的,因此可能会标注多个skb(多个流)。

tcp4_gro_receive函数中,它首先会再次从之前已经标注成same_flow的skb中找到对应的从TCP层面看属于同一个流的skb。这里的同一个流才是TCP意义上的同一个流。

struct sk_buff *tcp_gro_receive(struct list_head *head, struct sk_buff *skb)
{struct sk_buff *pp = NULL;struct sk_buff *p;struct tcphdr *th;struct tcphdr *th2;unsigned int len;unsigned int thlen;__be32 flags;unsigned int mss = 1;unsigned int hlen;unsigned int off;int flush = 1;int i;off = skb_gro_offset(skb);hlen = off + sizeof(*th);th = skb_gro_header_fast(skb, off);if (skb_gro_header_hard(skb, hlen)) {th = skb_gro_header_slow(skb, hlen, off);if (unlikely(!th))goto out;}thlen = th->doff * 4;if (thlen < sizeof(*th))goto out;hlen = off + thlen;if (skb_gro_header_hard(skb, hlen)) {th = skb_gro_header_slow(skb, hlen, off);if (unlikely(!th))goto out;}skb_gro_pull(skb, thlen);len = skb_gro_len(skb);flags = tcp_flag_word(th);/* 从p链表中找到当前流的skb。属于同一个流的所有skb都会形成一个frag_list链表,* 因此一个流的所有skb都会在同一个skb的frag_list中。这里需要找到那个skb,* 且这个skb应该是唯一的。*/list_for_each_entry(p, head, list) {if (!NAPI_GRO_CB(p)->same_flow)continue;th2 = tcp_hdr(p);if (*(u32 *)&th->source ^ *(u32 *)&th2->source) {NAPI_GRO_CB(p)->same_flow = 0;continue;}goto found;}p = NULL;goto out_check_final;found:/* Include the IP ID check below from the inner most IP hdr */flush = NAPI_GRO_CB(p)->flush;flush |= (__force int)(flags & TCP_FLAG_CWR);flush |= (__force int)((flags ^ tcp_flag_word(th2)) &~(TCP_FLAG_CWR | TCP_FLAG_FIN | TCP_FLAG_PSH));flush |= (__force int)(th->ack_seq ^ th2->ack_seq);for (i = sizeof(*th); i < thlen; i += 4)flush |= *(u32 *)((u8 *)th + i) ^*(u32 *)((u8 *)th2 + i);/* When we receive our second frame we can made a decision on if we* continue this flow as an atomic flow with a fixed ID or if we use* an incrementing ID.*/if (NAPI_GRO_CB(p)->flush_id != 1 ||NAPI_GRO_CB(p)->count != 1 ||!NAPI_GRO_CB(p)->is_atomic)flush |= NAPI_GRO_CB(p)->flush_id;elseNAPI_GRO_CB(p)->is_atomic = false;mss = skb_shinfo(p)->gso_size;flush |= (len - 1) >= mss;flush |= (ntohl(th2->seq) + skb_gro_len(p)) ^ ntohl(th->seq);
#ifdef CONFIG_TLS_DEVICEflush |= p->decrypted ^ skb->decrypted;
#endif/* flush代表不进行GRO,直接对报文进行收包(之前需要对GRO链表进行flush)。* 如果不需要flush,才会调用skb_gro_receive进行GRO并包。*/if (flush || skb_gro_receive(p, skb)) {/* 不需要并包,或者并包失败的情况 */mss = 1;goto out_check_final;}/* 虽然进行了并包,但是th2所指向的内存依然没有变。并包了的情况下,如果当前skb* 里面有fin或者psh标准,需要将其同步到领头skb的TCP数据中。*/tcp_flag_word(th2) |= flags & (TCP_FLAG_FIN | TCP_FLAG_PSH);out_check_final:/* 最后的操作,是否GRO并包了都会走到这里。如果当前skb的有效数据flush = len < mss;/* 如果报文存在以下标准,那边不再进行GRO等待,直接flush */flush |= (__force int)(flags & (TCP_FLAG_URG | TCP_FLAG_PSH |TCP_FLAG_RST | TCP_FLAG_SYN |TCP_FLAG_FIN));/* 找到了同一个流的skb(p),但是没有并包,或者需要进行flush,那么就把找到的* 这个skb返回。内核会调用napi_gro_complete来对这个返回的skb进行处理。*/if (p && (!NAPI_GRO_CB(skb)->same_flow || flush))pp = p;out:NAPI_GRO_CB(skb)->flush |= (flush != 0);return pp;
}

在确定需要进行并包(不需要flush当前流)的情况下,skb_gro_receive函数会被调用。这个函数看起来挺复杂,实际上就是尝试将当前skb的数据合并到p的frga_list链表的最后一个skb上。如果合不上去,就将其加入到frga_list中:

int skb_gro_receive(struct sk_buff *p, struct sk_buff *skb)
{struct skb_shared_info *pinfo, *skbinfo = skb_shinfo(skb);unsigned int offset = skb_gro_offset(skb);unsigned int headlen = skb_headlen(skb);/* skb中存储的纯数据的长度,应该是不包含mac头部,从IP头部开始算的 */unsigned int len = skb_gro_len(skb);unsigned int delta_truesize;struct sk_buff *lp;/* 进行GRO的skb合并。首先尝试将当前skb的数据合并到p链表中的最后一个skb上,* 如果不允许的话,就将当前skb加入到frag_list链表中。*/if (unlikely(p->len + len >= 65536 || NAPI_GRO_CB(skb)->flush))return -E2BIG;lp = NAPI_GRO_CB(p)->last;pinfo = skb_shinfo(lp);if (headlen <= offset) {skb_frag_t *frag;skb_frag_t *frag2;int i = skbinfo->nr_frags;int nr_frags = pinfo->nr_frags + i;/* 如果线性区的长度小于offset,那么说明skb的有效数据都存储在frags* 区域,这里只需要进行frag的合并即可。这里首先检查上一个skb中的* frags的数量有没有超限,没有的话就取出下一个可用的frag的地址。*//* nr_frags是合并后的frags的数量 */if (nr_frags > MAX_SKB_FRAGS)goto merge;offset -= headlen;pinfo->nr_frags = nr_frags;skbinfo->nr_frags = 0;frag = pinfo->frags + nr_frags;frag2 = skbinfo->frags + i;do {/* 将skb的frag地址依次拷贝到p的frags数组的可用区域中 */*--frag = *--frag2;} while (--i);/* 跳过无效的数据,即跳过data_offset之前的数据 */skb_frag_off_add(frag, offset);skb_frag_size_sub(frag, offset);/* all fragments truesize : remove (head size + sk_buff) */delta_truesize = skb->truesize -SKB_TRUESIZE(skb_end_offset(skb));/* 设置frags被挪走之后的skb的各种尺寸信息 */skb->truesize -= skb->data_len;skb->len -= skb->data_len;skb->data_len = 0;/* 这个skb被合并了,已经可以被释放了 */NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE;goto done;} else if (skb->head_frag) {int nr_frags = pinfo->nr_frags;skb_frag_t *frag = pinfo->frags + nr_frags;struct page *page = virt_to_head_page(skb->head);unsigned int first_size = headlen - offset;unsigned int first_offset;/* 有效数据没有(全部)存储到frags里,但是线性区存储到了page fragment的* 情况。这里可以把线性区当做一个frag来对待,逻辑与上面类似。*/if (nr_frags + 1 + skbinfo->nr_frags > MAX_SKB_FRAGS)goto merge;first_offset = skb->data -(unsigned char *)page_address(page) +offset;pinfo->nr_frags = nr_frags + 1 + skbinfo->nr_frags;__skb_frag_set_page(frag, page);skb_frag_off_set(frag, first_offset);skb_frag_size_set(frag, first_size);memcpy(frag + 1, skbinfo->frags, sizeof(*frag) * skbinfo->nr_frags);/* We dont need to clear skbinfo->nr_frags here */delta_truesize = skb->truesize - SKB_DATA_ALIGN(sizeof(struct sk_buff));NAPI_GRO_CB(skb)->free = NAPI_GRO_FREE_STOLEN_HEAD;goto done;}merge:/* 无法将当前skb的数据合并到最后一个skb里,那么就将这个skb加入到当前的skb链表* 中,也就是frag_list中。*/delta_truesize = skb->truesize;if (offset > headlen) {unsigned int eat = offset - headlen;skb_frag_off_add(&skbinfo->frags[0], eat);skb_frag_size_sub(&skbinfo->frags[0], eat);skb->data_len -= eat;skb->len -= eat;offset = headlen;}__skb_pull(skb, offset);if (NAPI_GRO_CB(p)->last == p)skb_shinfo(p)->frag_list = skb;elseNAPI_GRO_CB(p)->last->next = skb;NAPI_GRO_CB(p)->last = skb;__skb_header_release(skb);lp = p;done:NAPI_GRO_CB(p)->count++;/* 更新领头skb的一些长度信息。领头skb里的长度包含了链表skb中所有的数据长度。 */p->data_len += len;p->truesize += delta_truesize;p->len += len;if (lp != p) {/* 进行了数据合并的情况,这里会更新最后一个skb里面的一些长度信息。 */lp->data_len += len;lp->truesize += delta_truesize;lp->len += len;}NAPI_GRO_CB(skb)->same_flow = 1;return 0;
}


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部