截止到目前,对以太网数据包的处理包含了一下几种协议:
udp
arp
icmp
从代码层次的角度来看,我们将所有的数据捕获都放在同一个线程里面;如果收到消息,就打印出来。这样显然是不对的,作为一个网络协议栈,需要包含多个缓冲区和封层设计。针对现在已经实现的部分,增加环形缓冲区和适当的分层设计提高当前项目的性能和可用性。
架构设计
根据功能的不同,开启多个线程执行相关操作。
线程1:执行以太网数据包捕获,并且写入环形缓冲区。
线程2:从唤醒缓冲区读取数据包并且解析,将需要发送出去的数据包写入输入环形缓冲区。
实现过程
ringbuf
struct inout_ring {
struct rte_ring* in;
struct rte_ring* out;
};
static struct inout_ring* iorpt = NULL;
/*
* 函数名称: get_ioring_instance
* 作 者: 黄彦杰 Lenn
* 设计日期: 2024-07-23
* 功能描述: 单例模式,环形队列
* 返 回 值: 环形队列句柄
*/
struct inout_ring* get_ioring_instance(void) {
if (iorpt == NULL) {
iorpt = rte_malloc("io ring", sizeof(struct inout_ring), 0);
memset(iorpt, 0, sizeof(struct inout_ring));
}
return iorpt;
}
struct inout_ring* ring = get_ioring_instance();
if (ring == NULL) {
rte_exit(EXIT_FAILURE, "Malloc io buffer failed");
}
if (ring->in == NULL) {
ring->in = rte_ring_create("in buffer", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
}
if (ring->out == NULL) {
ring->out = rte_ring_create("out buffer", RING_SIZE, rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);
}
格式化输出hwaddr
/*
* 函数名称: ln_print_ethaddr
* 作 者: 黄彦杰 Lenn
* 设计日期: 2024-07-23
* 功能描述: 格式化答应MAC地址
* 返 回 值: None
*/
static void ln_print_ethaddr(const char* name, const struct rte_ether_addr* eth_addr) {
char buf[RTE_ETHER_ADDR_FMT_SIZE];
rte_ether_format_addr(buf, RTE_ETHER_ADDR_FMT_SIZE, eth_addr);
printf("%s%s\n", name, buf);
}
多线程
rte_eal_remote_launch(ln_pkt_process, mbuf_pool, rte_get_next_lcore(lcore_id, 1, 0));
如果参数 f_needs_lcore 设置为 1,那么本地核心 ID 将会作为参数传递给回调函数 remote_launch_callback。这样可以在回调函数中访问和使用本地核心的信息。例如,在多线程应用程序中,可以根据不同的核心ID分配任务或资源。如果 f_needs_lcore 设置为 0,则不会将本地核心作为参数传递给回调函数。
ln*_pkt_process*
就是一个专门处理数据包的线程函数,将原来的单线程(while)中的数据包处理放在了一个线程函数中,提高了性能。传入的参数是内存池,这里需要用到内存池进行数据的收发。
static int ln_pkt_process(void* argv) {
struct rte_mempool* mbuf_pool = (struct rte_mempool*)argv;
struct inout_ring* ring = get_ioring_instance();
while (1) {
struct rte_mbuf* mbufs[BURST_SIZE];
unsigned nb_recv = rte_ring_mc_dequeue_burst(ring->in, (void**)mbufs, BURST_SIZE, NULL);
int i = 0;
for (i = 0; i < nb_recv; i++) {
struct rte_ether_hdr* ethdr = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr*);
#if ARP_ENABLE
if (ethdr->ether_type == rte_cpu_to_be_16(RTE_ETHER_TYPE_ARP)) {
struct rte_arp_hdr* arphdr = (struct rte_arp_hdr*)(ethdr + 1);
struct in_addr addr;
addr.s_addr = arphdr->arp_data.arp_sip;
printf("---> arp pkt src %s, ", inet_ntoa(addr));
addr.s_addr = arphdr->arp_data.arp_tip;
printf("dst %s\n", inet_ntoa(addr));
if (arphdr->arp_data.arp_tip == nLocalIp) {
if (arphdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REQUEST)) {
struct rte_mbuf* arpbuf = ln_arp_send(mbuf_pool, RTE_ARP_OP_REPLY,arphdr->arp_data.arp_sha.addr_bytes,
arphdr->arp_data.arp_tip, arphdr->arp_data.arp_sip);
// rte_eth_tx_burst(nDevPortId, 0, &arpbuf, 1);
// rte_pktmbuf_free(arpbuf);
// rte_pktmbuf_free(mbufs[i]);
rte_ring_mp_enqueue_burst(ring->out, (void**)&arpbuf, 1, NULL);
}
else if (arphdr->arp_opcode == rte_cpu_to_be_16(RTE_ARP_OP_REPLY)) {
struct arp_table* table = get_arp_instace();
uint8_t* hwaddr = get_mac_from_arp(arphdr->arp_data.arp_sip);
if (hwaddr) {
struct arp_entry* entry = rte_malloc("arp entry", sizeof(struct arp_entry), 0);
if (entry) {
memset(entry, 0, sizeof(struct arp_entry));
entry->ip = arphdr->arp_data.arp_sip;
rte_memcpy(entry->hwaddr, arphdr->arp_data.arp_sha.addr_bytes, RTE_ETHER_ADDR_LEN);
entry->type = 0;
LL_ADD(table->entries, entry);
table->count++;
}
}
}
}
rte_pktmbuf_free(mbufs[i]);
continue;
}
#endif
if (ethdr->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4)) {
rte_pktmbuf_free(mbufs[i]);
continue;
}
struct rte_ipv4_hdr* iphdr = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_ipv4_hdr*, sizeof(struct rte_ether_hdr));
if(iphdr->next_proto_id == IPPROTO_UDP) {
struct rte_udp_hdr* udphdr = (struct rte_udp_hdr*)(iphdr + 1);
#if UDP_ENABLE
//while前自动获取,不依赖udp 20240719
//rte_memcpy(nSrcMac, ehdr->d_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(nDstMac, ethdr->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(&nSrcIp, &iphdr->dst_addr, sizeof(uint32_t));
rte_memcpy(&nDstIp, &iphdr->src_addr, sizeof(uint32_t));
nSrcPort = udphdr->dst_port;
nDstPort = udphdr->src_port;
#endif
int length = ntohs(udphdr->dgram_len);
*((char*)udphdr + length) = '\0';
struct in_addr addr;
addr.s_addr = iphdr->src_addr;
printf("---> udp pkt src %s:%d, ", inet_ntoa(addr), ntohs(udphdr->src_port));
addr.s_addr = iphdr->dst_addr;
printf("dst %s:%d %s\n", inet_ntoa(addr), ntohs(udphdr->dst_port), (char*)(udphdr + 1));
#if UDP_ENABLE
struct rte_mbuf* txbuf = ln_udp_send(mbuf_pool, (uint8_t*)(udphdr + 1), length - sizeof(struct rte_udp_hdr));
// rte_eth_tx_burst(nDevPortId, 0, &txbuf, 1);
// rte_pktmbuf_free(txbuf);
rte_ring_mp_enqueue_burst(ring->out, (void**)&txbuf, 1, NULL);
#endif
rte_pktmbuf_free(mbufs[i]);
continue;
}
#if ICMP_ENABLE
if (iphdr->next_proto_id == IPPROTO_ICMP) {
struct rte_icmp_hdr* ichdr = (struct rte_icmp_hdr*)(iphdr + 1);
if (ichdr->icmp_type == RTE_IP_ICMP_ECHO_REQUEST) {
struct in_addr addr;
addr.s_addr = iphdr->src_addr;
printf("---> icmp pkt src %s", inet_ntoa(addr));
addr.s_addr = iphdr->dst_addr;
printf(" dst %s type %d\n", inet_ntoa(addr), ichdr->icmp_type);
struct rte_mbuf* txbuf = ln_icmp_send(mbuf_pool, ethdr->s_addr.addr_bytes, iphdr->dst_addr, iphdr->src_addr,
ichdr->icmp_ident, ichdr->icmp_seq_nb);
// rte_eth_tx_burst(nDevPortId, 0, &txbuf, 1);
// rte_pktmbuf_free(txbuf);
rte_ring_mp_enqueue_burst(ring->out, (void**)&txbuf, 1, NULL);
rte_pktmbuf_free(mbufs[i]);
continue;
}
}
#endif
}
}
return 0;
}
评论