截止到目前,对以太网数据包的处理包含了一下几种协议:

  • udp

  • arp

  • icmp

从代码层次的角度来看,我们将所有的数据捕获都放在同一个线程里面;如果收到消息,就打印出来。这样显然是不对的,作为一个网络协议栈,需要包含多个缓冲区和封层设计。针对现在已经实现的部分,增加环形缓冲区和适当的分层设计提高当前项目的性能和可用性。

架构设计

https://imagehyj.oss-cn-hangzhou.aliyuncs.com/blog/20240723092608.png

根据功能的不同,开启多个线程执行相关操作。

  • 线程1:执行以太网数据包捕获,并且写入环形缓冲区。

  • 线程2:从唤醒缓冲区读取数据包并且解析,将需要发送出去的数据包写入输入环形缓冲区。

实现过程

ringbuf

struct inout_ring {

    struct rte_ring* in;
    struct rte_ring* out;
};

static struct inout_ring* iorpt = NULL;


/*
 * 函数名称: get_ioring_instance
 * 作     者: 黄彦杰 Lenn
 * 设计日期: 2024-07-23
 * 功能描述: 单例模式,环形队列
 * 返 回 值: 环形队列句柄
*/
struct inout_ring* get_ioring_instance(void) {

    if (iorpt == NULL) {

        iorpt = rte_malloc("io ring", sizeof(struct inout_ring), 0);
        memset(iorpt, 0, sizeof(struct inout_ring));
    }

    return iorpt;
}

struct inout_ring* ring = get_ioring_instance();
if (ring == NULL) {

    rte_exit(EXIT_FAILURE, "Malloc io buffer failed");
}

if (ring->in == NULL) {

    ring->in = rte_ring_create("in buffer", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
}
if (ring->out == NULL) {

    ring->out = rte_ring_create("out buffer", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
}

格式化输出hwaddr

/*
 * 函数名称: ln_print_ethaddr
 * 作     者: 黄彦杰 Lenn
 * 设计日期: 2024-07-23
 * 功能描述: 格式化答应MAC地址
 * 返 回 值: None
*/
static void ln_print_ethaddr(const char* name, const struct rte_ether_addr* eth_addr) {

    char buf[RTE_ETHER_ADDR_FMT_SIZE];
    rte_ether_format_addr(buf, RTE_ETHER_ADDR_FMT_SIZE, eth_addr);
    printf("%s%s\n", name, buf);
}

多线程

rte_eal_remote_launch(ln_pkt_process, mbuf_pool, rte_get_next_lcore(lcore_id, 1, 0));
  • 如果参数 f_needs_lcore 设置为 1,那么本地核心 ID 将会作为参数传递给回调函数 remote_launch_callback。这样可以在回调函数中访问和使用本地核心的信息。例如,在多线程应用程序中,可以根据不同的核心ID分配任务或资源。如果 f_needs_lcore 设置为 0,则不会将本地核心作为参数传递给回调函数。

  • ln*_pkt_process*就是一个专门处理数据包的线程函数,将原来的单线程(while)中的数据包处理放在了一个线程函数中,提高了性能。

  • 传入的参数是内存池,这里需要用到内存池进行数据的收发。

https://imagehyj.oss-cn-hangzhou.aliyuncs.com/blog/20240726084712.png

static int ln_pkt_process(void* argv) {

    struct rte_mempool* mbuf_pool = (struct rte_mempool*)argv;
    struct inout_ring* ring = get_ioring_instance();

    while (1) {

        struct rte_mbuf* mbufs[BURST_SIZE];
        unsigned nb_recv = rte_ring_mc_dequeue_burst(ring->in, (void**)mbufs, BURST_SIZE, NULL);

        int i = 0;
        for (i = 0; i < nb_recv; i++) {

            struct rte_ether_hdr* ethdr = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr*);

#if ARP_ENABLE

            if (ethdr->ether_type == rte_cpu_to_be_16(RTE_ETHER_TYPE_ARP)) {

                struct rte_arp_hdr* arphdr = (struct rte_arp_hdr*)(ethdr + 1);

                struct in_addr addr;
                addr.s_addr = arphdr->arp_data.arp_sip;
                printf("---> arp pkt src %s, ", inet_ntoa(addr));
                addr.s_addr = arphdr->arp_data.arp_tip;
                printf("dst %s\n", inet_ntoa(addr));

                if (arphdr->arp_data.arp_tip == nLocalIp) {

                    if (arphdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REQUEST)) {

                        struct rte_mbuf* arpbuf = ln_arp_send(mbuf_pool, RTE_ARP_OP_REPLY,arphdr->arp_data.arp_sha.addr_bytes,
                        arphdr->arp_data.arp_tip, arphdr->arp_data.arp_sip);
//                         rte_eth_tx_burst(nDevPortId, 0, &arpbuf, 1);
//                         rte_pktmbuf_free(arpbuf);

//                         rte_pktmbuf_free(mbufs[i]);
                        rte_ring_mp_enqueue_burst(ring->out, (void**)&arpbuf, 1, NULL);

                    }
                    else if (arphdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REPLY)) {

                        struct arp_table* table = get_arp_instace();

                        uint8_t* hwaddr = get_mac_from_arp(arphdr->arp_data.arp_sip);
                        if (hwaddr) {

                            struct arp_entry* entry = rte_malloc("arp entry", sizeof(struct arp_entry), 0);

                            if (entry) {

                                memset(entry, 0, sizeof(struct arp_entry));

                                entry->ip = arphdr->arp_data.arp_sip;
                                rte_memcpy(entry->hwaddr, arphdr->arp_data.arp_sha.addr_bytes, RTE_ETHER_ADDR_LEN);
                                entry->type = 0;

                                LL_ADD(table->entries, entry);
                                table->count++;
                            }
                        }
                    }
                }

                rte_pktmbuf_free(mbufs[i]);
                continue;

            }

#endif
            if (ethdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4)) {

                rte_pktmbuf_free(mbufs[i]);
                continue;
            }

            struct rte_ipv4_hdr* iphdr = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_ipv4_hdr*, sizeof(struct rte_ether_hdr));

            if(iphdr->next_proto_id == IPPROTO_UDP) {

                struct rte_udp_hdr* udphdr = (struct rte_udp_hdr*)(iphdr + 1);

#if UDP_ENABLE

                //while前自动获取,不依赖udp 20240719
                //rte_memcpy(nSrcMac, ehdr->d_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
                rte_memcpy(nDstMac, ethdr->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
                rte_memcpy(&nSrcIp, &iphdr->dst_addr, sizeof(uint32_t));
                rte_memcpy(&nDstIp, &iphdr->src_addr, sizeof(uint32_t));
                nSrcPort = udphdr->dst_port;
                nDstPort = udphdr->src_port;

#endif

                int length = ntohs(udphdr->dgram_len);
                *((char*)udphdr + length) = '\0';

                struct in_addr addr;
                addr.s_addr = iphdr->src_addr;
                printf("---> udp pkt src %s:%d, ", inet_ntoa(addr), ntohs(udphdr->src_port));

                addr.s_addr = iphdr->dst_addr;
                printf("dst %s:%d %s\n", inet_ntoa(addr), ntohs(udphdr->dst_port), (char*)(udphdr + 1));

#if UDP_ENABLE

                struct rte_mbuf* txbuf = ln_udp_send(mbuf_pool, (uint8_t*)(udphdr + 1), length - sizeof(struct rte_udp_hdr));
//                 rte_eth_tx_burst(nDevPortId, 0, &txbuf, 1);
//                 rte_pktmbuf_free(txbuf);
                rte_ring_mp_enqueue_burst(ring->out, (void**)&txbuf, 1, NULL);

#endif

                rte_pktmbuf_free(mbufs[i]);
                continue;
            }

#if ICMP_ENABLE

            if (iphdr->next_proto_id == IPPROTO_ICMP) {

                struct rte_icmp_hdr* ichdr = (struct rte_icmp_hdr*)(iphdr + 1);

                if (ichdr->icmp_type == RTE_IP_ICMP_ECHO_REQUEST) {

                    struct in_addr addr;
                    addr.s_addr = iphdr->src_addr;
                    printf("---> icmp pkt src %s", inet_ntoa(addr));
                    addr.s_addr = iphdr->dst_addr;
                    printf(" dst %s type %d\n", inet_ntoa(addr), ichdr->icmp_type);
                    struct rte_mbuf* txbuf = ln_icmp_send(mbuf_pool, ethdr->s_addr.addr_bytes, iphdr->dst_addr, iphdr->src_addr, 
                        ichdr->icmp_ident, ichdr->icmp_seq_nb);

//                     rte_eth_tx_burst(nDevPortId, 0, &txbuf, 1);
//                     rte_pktmbuf_free(txbuf);
                    rte_ring_mp_enqueue_burst(ring->out, (void**)&txbuf, 1, NULL);

                    rte_pktmbuf_free(mbufs[i]);
                    continue;
                }

            }

#endif
        }       
    }

    return 0;
}

项目地址

项目地址