当前系统都是多核系统,为了充分利用多核优势,数据包可以在底层就进行分流,可以通过多线程/多进程对同一个接口的数据进行并行的处理。
一、实验
- 一个server/client程序
- 一个简单的抓包程序,抓取server/client之间的通信数据
1.1 server.c
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
void
Usage() {
printf("ip port\n");
}
int
start_srv(const char *ip, int port) {
int ret = 0;
int sockfd;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket:");
ret = -1;
goto out;
}
struct sockaddr_in srv;
srv.sin_family = AF_INET;
srv.sin_port = htons(port);
inet_pton(AF_INET, ip, &srv.sin_addr.s_addr);
socklen_t len = sizeof(srv);
if (bind(sockfd, (const struct sockaddr *) &srv, len)) {
perror("bind:");
ret = -2;
goto out;
}
if (listen(sockfd, 5)) {
perror("listen:");
ret = -3;
goto out;
}
int cli_fd = accept(sockfd, NULL, NULL);
if (cli_fd < 0) {
perror("accept:");
ret = -4;
goto out;
}
char buf[128] = {0};
read(cli_fd, buf, sizeof(buf)-1);
sleep(1);
memcpy(buf, "hello world", sizeof("hello world"));
write(cli_fd, buf, strlen(buf));
out:
if (sockfd > 0) {
close(sockfd);
}
return ret;
}
int
main(int argc, char **argv) {
if (argc < 3) {
Usage();
exit(1);
}
start_srv(argv[1], atoi(argv[2]));
return 0;
}
1.2 client.c
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include <stdlib.h>
int
start_cli(const char *remote_ip, int port) {
int ret = 0;
int sockfd;
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0) {
perror("socket:");
ret = -1;
goto out;
}
struct sockaddr_in srv;
srv.sin_family = AF_INET;
srv.sin_port = htons(port);
inet_pton(AF_INET, remote_ip, &srv.sin_addr.s_addr);
socklen_t len = sizeof(srv);
if (connect(sockfd, (const struct sockaddr *) &srv, len)) {
perror("connect:");
ret = -2;
goto out;
}
char buf[128];
snprintf(buf, sizeof(buf), "I'm client.");
write(sockfd, buf, strlen(buf));
read(sockfd, buf, sizeof(buf));
out:
if (sockfd > 0) {
close(sockfd);
}
return ret;
}
void
Usage() {
printf("remote_ip port\n");
}
int
main(int argc, char **argv) {
if (argc < 3) {
Usage();
exit(1);
}
start_cli(argv[1], atoi(argv[2]));
return 0;
}
1.3 fanouttest.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pcap.h>
#include <pcap-int.h>
#include <pcap/bpf.h>
void
Usage() {
printf("-i device\n");
printf("-w filename\n");
printf("-f filter\n");
}
int recv_pkt = 0;
void
handler_cb(u_char *priv, const struct pcap_pkthdr *h, const u_char *pkt) {
if (priv) {
pcap_dump(priv, h, pkt);
} else {
printf("recv pkt length %u\n", h->caplen);
}
++recv_pkt;
}
pcap_t *handle = NULL;
void
sig_handler(int num) {
if (handle) {
pcap_breakloop(handle);
}
}
int
fanout_test(const char *device, const char *filter, const char *filename) {
signal(SIGUSR1, sig_handler);
pcap_dumper_t *dumper = NULL;
struct bpf_program bpf = {0};
int ret = 0;
char errbuf[PCAP_ERRBUF_SIZE];
handle = pcap_open_live(device, 65535, 1, 1000, errbuf);
if (!handle) {
printf("open %s failed. %s\n", device, errbuf);
ret = -1;
goto out;
}
if (filter) {
bpf_u_int32 localnet, netmask;
if (pcap_lookupnet(device, &localnet, &netmask, errbuf)) {
localnet = 0;
netmask = 0;
printf("lookupnet err: %s\n", errbuf);
}
if (pcap_compile(handle, &bpf, filter, 1, netmask)) {
printf("compile %s failed. %s\n", filter, pcap_geterr(handle));
ret = -2;
goto out;
}
if (pcap_setfilter(handle, &bpf)) {
printf("set filter failed. %s\n", pcap_geterr(handle));
ret = -3;
goto out;
}
}
if (filename) {
dumper = pcap_dump_open(handle, filename);
if (!dumper) {
printf("dump open %s failed. %s", filename, pcap_geterr(handle));
ret = -4;
goto out;
}
}
pcap_loop(handle, -1, handler_cb, (u_char *)dumper);
printf("recv %d packet\n", recv_pkt);
out:
if (dumper) {
pcap_dump_close(dumper);
}
if (handle) {
pcap_close(handle);
}
pcap_freecode(&bpf);
return ret;
}
int
main(int argc, char **argv) {
char *device = NULL;
char *save_file = NULL;
char *filter = NULL;
int opt;
while ((opt = getopt(argc, argv, "i:w:f:")) != -1) {
switch (opt) {
case 'i':
device = optarg;
break;
case 'w':
save_file = optarg;
break;
case 'f':
filter = optarg;
break;
default:
Usage();
exit(1);
break;
}
}
if (!device) {
Usage();
exit (2);
}
fanout_test(device, filter, save_file);
return 0;
}
1.4 实验过程
复用libpcap编译结构,直接在testprogs目录增加源码文件。
testprogs/Makefile
testprogs/fanouttest.c
拷贝获取数据
gcc server.c -o srv
gcc client.c -o cli
可以从结果看,数据确实拷贝了两份,每个文件都是完整的数据。
分流获取数据
在函数中添加setsockopt设置相关属性。
...
#include <errno.h>
#include <string.h>
#include <linux/if_packet.h>
...
int
fanout_test(const char *device, const char *filter, const char *filename) {
...
int val;
int id = 0;
val = (id << 16) | PACKET_FANOUT_QM;
if (setsockopt(handle->fd, SOL_PACKET, PACKET_FANOUT, &val, sizeof(val))) {
printf("set fanout failed. %s \n", strerror(errno));
}
pcap_loop(handle, -1, handler_cb, (u_char *)dumper);
printf("recv %d packet\n", recv_pkt);
...
}
从结果看,左边抓取到20个包,右边抓取到10个包,数据确实被分流了,没有完全复制。
二、原理
static int
packet_setsockopt(struct socket *sock, int level, int optname, char __user *optval, unsigned int optlen)
{
struct sock *sk = sock->sk;
struct packet_sock *po = pkt_sk(sk);
int ret;
if (level != SOL_PACKET)
return -ENOPROTOOPT;
switch (optname) {
...
case PACKET_FANOUT:
{
int val;
if (optlen != sizeof(val))
return -EINVAL;
if (copy_from_user(&val, optval, sizeof(val)))
return -EFAULT;
return fanout_add(sk, val & 0xffff, val >> 16);
}
...
default:
return -ENOPROTOOPT;
}
}
static int fanout_add(struct sock *sk, u16 id, u16 type_flags)
{
struct packet_rollover *rollover = NULL;
struct packet_sock *po = pkt_sk(sk);
struct packet_fanout *f, *match;
u8 type = type_flags & 0xff;
u8 flags = type_flags >> 8;
int err;
switch (type) {
case PACKET_FANOUT_ROLLOVER:
if (type_flags & PACKET_FANOUT_FLAG_ROLLOVER)
return -EINVAL;
case PACKET_FANOUT_HASH:
case PACKET_FANOUT_LB:
case PACKET_FANOUT_CPU:
case PACKET_FANOUT_RND:
case PACKET_FANOUT_QM:
case PACKET_FANOUT_CBPF:
case PACKET_FANOUT_EBPF:
break;
default:
return -EINVAL;
}
mutex_lock(&fanout_mutex);
err = -EALREADY;
if (po->fanout)
goto out;
if (type == PACKET_FANOUT_ROLLOVER ||
(type_flags & PACKET_FANOUT_FLAG_ROLLOVER)) {
err = -ENOMEM;
rollover = kzalloc(sizeof(*rollover), GFP_KERNEL);
if (!rollover)
goto out;
atomic_long_set(&rollover->num, 0);
atomic_long_set(&rollover->num_huge, 0);
atomic_long_set(&rollover->num_failed, 0);
}
if (type_flags & PACKET_FANOUT_FLAG_UNIQUEID) {
if (id != 0) {
err = -EINVAL;
goto out;
}
if (!fanout_find_new_id(sk, &id)) {
err = -ENOMEM;
goto out;
}
/* ephemeral flag for the first socket in the group: drop it */
flags &= ~(PACKET_FANOUT_FLAG_UNIQUEID >> 8);
}
match = NULL;
list_for_each_entry(f, &fanout_list, list) {
if (f->id == id &&
read_pnet(&f->net) == sock_net(sk)) {
match = f;
break;
}
}
err = -EINVAL;
if (match && match->flags != flags)
goto out;
if (!match) {
err = -ENOMEM;
match = kzalloc(sizeof(*match), GFP_KERNEL);
if (!match)
goto out;
write_pnet(&match->net, sock_net(sk));
match->id = id;
match->type = type;
match->flags = flags;
INIT_LIST_HEAD(&match->list);
spin_lock_init(&match->lock);
refcount_set(&match->sk_ref, 0);
fanout_init_data(match);
match->prot_hook.type = po->prot_hook.type;
match->prot_hook.dev = po->prot_hook.dev;
match->prot_hook.func = packet_rcv_fanout;
match->prot_hook.af_packet_priv = match;
match->prot_hook.id_match = match_fanout_group;
list_add(&match->list, &fanout_list);
}
err = -EINVAL;
spin_lock(&po->bind_lock);
if (po->running &&
match->type == type &&
match->prot_hook.type == po->prot_hook.type &&
match->prot_hook.dev == po->prot_hook.dev) {
err = -ENOSPC;
if (refcount_read(&match->sk_ref) < PACKET_FANOUT_MAX) {
__dev_remove_pack(&po->prot_hook);
po->fanout = match;
po->rollover = rollover;
rollover = NULL;
refcount_set(&match->sk_ref, refcount_read(&match->sk_ref) + 1);
__fanout_link(sk, po);
err = 0;
}
}
spin_unlock(&po->bind_lock);
if (err && !refcount_read(&match->sk_ref)) {
list_del(&match->list);
kfree(match);
}
out:
kfree(rollover);
mutex_unlock(&fanout_mutex);
return err;
}
通过将入口函数tpacket_rcv
替换成packet_rcv_fanout
,并且将相同的抓包方式放在fanout中的一个数据中,最后将此fanout挂在一个全局列表fanout_list
中。
并且将po从ptype_all列表中剔除掉,最后在ptype_all中只保留一个即可,这样数据就只有一个入口,然后在到packet_rcv_fanout中进行分流处理。
static int packet_rcv_fanout(struct sk_buff *skb, struct net_device *dev,
struct packet_type *pt, struct net_device *orig_dev)
{
struct packet_fanout *f = pt->af_packet_priv;
unsigned int num = READ_ONCE(f->num_members);
struct net *net = read_pnet(&f->net);
struct packet_sock *po;
unsigned int idx;
if (!net_eq(dev_net(dev), net) || !num) {
kfree_skb(skb);
return 0;
}
if (fanout_has_flag(f, PACKET_FANOUT_FLAG_DEFRAG)) {
skb = ip_check_defrag(net, skb, IP_DEFRAG_AF_PACKET);
if (!skb)
return 0;
}
switch (f->type) {
case PACKET_FANOUT_HASH:
default:
idx = fanout_demux_hash(f, skb, num);
break;
case PACKET_FANOUT_LB:
idx = fanout_demux_lb(f, skb, num);
break;
case PACKET_FANOUT_CPU:
idx = fanout_demux_cpu(f, skb, num);
break;
case PACKET_FANOUT_RND:
idx = fanout_demux_rnd(f, skb, num);
break;
case PACKET_FANOUT_QM:
idx = fanout_demux_qm(f, skb, num);
break;
case PACKET_FANOUT_ROLLOVER:
idx = fanout_demux_rollover(f, skb, 0, false, num);
break;
case PACKET_FANOUT_CBPF:
case PACKET_FANOUT_EBPF:
idx = fanout_demux_bpf(f, skb, num);
break;
}
if (fanout_has_flag(f, PACKET_FANOUT_FLAG_ROLLOVER))
idx = fanout_demux_rollover(f, skb, idx, true, num);
po = pkt_sk(f->arr[idx]);
return po->prot_hook.func(skb, dev, &po->prot_hook, orig_dev);
}
packet_rcv_fanout
进入后,根据不同的算法进行选择一个po,然后调用相应的func(tpacket_rcv
)