From 70e299c87f3b31f71d679d55e9ef002b8af077e8 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 22 Aug 2024 11:15:27 -0400 Subject: [PATCH 01/13] Initial WIP function --- src/platform/datapath_raw_xdp_win.c | 89 +++++++++++++++++++++++++++-- 1 file changed, 85 insertions(+), 4 deletions(-) diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index 473569ec86..22947e4850 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -107,6 +107,84 @@ CxPlatXdpExecute( _Inout_ CXPLAT_EXECUTION_STATE* State ); +QUIC_STATUS +CxPlatGetRssQueueProcessors( + _In_ XDP_DATAPATH* Xdp, + _In_ HANDLE XdpHandle, + _In_ uint32_t InterfaceIndex, + _Inout_count_(*Count) PROCESSOR_NUMBER* Queues, + _Inout_ uint16_t* Count + ) +{ + const uint16_t MaxCount = *Count; + + *Count = 0; + for (*Count = 0; i < MaxCount; ++i) { // TODO - Add cleanup code + XDP_TX_PACKET TxPacket = { 0 }; // TODO - Write Payload + HANDLE TxXsk = NULL; + uint32_t TxRingSize = 1; + QUIC_STATUS Status = Xdp->XdpApi->XskCreate(&TxXsk); + if (QUIC_FAILED(Status)) { return Status; } + + XSK_UMEM_REG TxUmem = {0}; + TxUmem.Address = &TxPacket; + TxUmem.ChunkSize = sizeof(XDP_TX_PACKET); + TxUmem.Headroom = FIELD_OFFSET(XDP_TX_PACKET, FrameBuffer); + TxUmem.TotalSize = sizeof(XDP_TX_PACKET); + + Status = Xdp->XdpApi->XskSetSockopt(TxXsk, XSK_SOCKOPT_UMEM_REG, &TxUmem, sizeof(TxUmem)); + if (QUIC_FAILED(Status)) { return Status; } + + Status = Xdp->XdpApi->XskSetSockopt(TxXsk, XSK_SOCKOPT_TX_RING_SIZE, &TxRingSize, sizeof(TxRingSize)); + if (QUIC_FAILED(Status)) { return Status; } + + Status = Xdp->XdpApi->XskSetSockopt(TxXsk, XSK_SOCKOPT_TX_COMPLETION_RING_SIZE, &TxRingSize, sizeof(TxRingSize)); + if (QUIC_FAILED(Status)) { return Status; } + + uint32_t Flags = XSK_BIND_FLAG_TX; + Status = Xdp->XdpApi->XskBind(TxXsk, Interface->ActualIfIndex, *Count, Flags); + if (QUIC_FAILED(Status)) { break; } // No more queues. Break out. + + Status = Xdp->XdpApi->XskActivate(TxXsk, 0); + if (QUIC_FAILED(Status)) { return Status; } + + XSK_RING_INFO_SET TxRingInfo; + uint32_t TxRingInfoSize = sizeof(TxRingInfo); + Status = Xdp->XdpApi->XskGetSockopt(TxXsk, XSK_SOCKOPT_RING_INFO, &TxRingInfo, &TxRingInfoSize); + if (QUIC_FAILED(Status)) { return Status; } + + XSK_RING TxRing, TxCompletionRing; + XskRingInitialize(&TxRing, &TxRingInfo.Tx); + XskRingInitialize(&TxCompletionRing, &TxRingInfo.Completion); + + uint32_t TxIndex; + uint32_t TxAvailable = XskRingProducerReserve(&TxRing, MAXUINT32, &TxIndex); + + XSK_BUFFER_DESCRIPTOR* Buffer = XskRingGetElement(&TxRing, TxIndex++); + Buffer->Address.BaseAddress = 0; + Buffer->Address.Offset = FIELD_OFFSET(XDP_TX_PACKET, FrameBuffer); + Buffer->Length = TxPacket.Buffer.Length; + XskRingProducerSubmit(&Queue->TxRing, 1); + + XSK_NOTIFY_RESULT_FLAGS OutFlags; + Status = Xdp->XdpApi->XskNotifySocket(TxXsk, XSK_NOTIFY_FLAG_POKE_TX, 0, &OutFlags); + + uint32_t CompIndex; + uint32_t CompAvailable = XskRingConsumerReserve(&TxCompletionRing, MAXUINT32, &CompIndex); + while (XskRingConsumerReserve(&TxCompletionRing, MAXUINT32, &CompIndex) == 0) { + Sleep(0); // TODO - Wait? + } + + XskRingConsumerRelease(&Queue->TxCompletionRing, 1); + + uint32_t ProcNumberSize = sizeof(PROCESSOR_NUMBER); + Status = Xdp->XdpApi->XskGetSockopt(TxXsk, XSK_SOCKOPT_TX_PROCESSOR_AFFINITY, Queues + *Count, &ProcNumberSize); + if (QUIC_FAILED(Status)) { return Status; } + } + + return QUIC_STATUS_SUCCESS; +} + QUIC_STATUS CxPlatGetInterfaceRssQueueCount( _In_ uint32_t InterfaceIndex, @@ -480,6 +558,9 @@ CxPlatDpRawInterfaceInitialize( Interface->OffloadStatus.Transmit.NetworkLayerXsum = Xdp->SkipXsum; Interface->Xdp = Xdp; + PROCESSOR_NUMBER Processors[256]; + uint16_t ProcessorCount = ARRAYSIZE(Processors); + Status = Xdp->XdpApi->XdpInterfaceOpen(Interface->ActualIfIndex, &Interface->XdpHandle); if (QUIC_FAILED(Status)) { QuicTraceEvent( @@ -490,18 +571,18 @@ CxPlatDpRawInterfaceInitialize( goto Error; } - Status = CxPlatGetInterfaceRssQueueCount(Interface->ActualIfIndex, &Interface->QueueCount); + Status = CxPlatGetRssQueueProcessors(Xdp, Interface->XdpHandle, Interface->ActualIfIndex, Processors, &ProcessorCount); if (QUIC_FAILED(Status)) { goto Error; } - if (Interface->QueueCount == 0) { + if (ProcessorCount == 0) { Status = QUIC_STATUS_INVALID_STATE; QuicTraceEvent( LibraryErrorStatus, "[ lib] ERROR, %u, %s.", Status, - "CxPlatGetInterfaceRssQueueCount"); + "CxPlatGetRssQueueProcessors"); goto Error; } @@ -518,7 +599,7 @@ CxPlatDpRawInterfaceInitialize( CxPlatZeroMemory(Interface->Queues, Interface->QueueCount * sizeof(*Interface->Queues)); - for (uint8_t i = 0; i < Interface->QueueCount; i++) { + for (uint8_t i = 0; i < Interface->QueueCount; i++) { // TODO - Get the right queue ID to match this processor XDP_QUEUE* Queue = &Interface->Queues[i]; Queue->Interface = Interface; From 4f1a4144f076e6f0ffe3abca955c699348fb694d Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 22 Aug 2024 12:18:06 -0400 Subject: [PATCH 02/13] Successfully compiles --- src/platform/datapath_raw_xdp_win.c | 93 ++++++++++++++++++++++------- 1 file changed, 73 insertions(+), 20 deletions(-) diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index 22947e4850..623fa9a68b 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -107,22 +107,69 @@ CxPlatXdpExecute( _Inout_ CXPLAT_EXECUTION_STATE* State ); +void CreateNoOpEthernetPacket( + _Inout_ XDP_TX_PACKET* Packet + ) +{ + ETHERNET_HEADER* Ethernet = (ETHERNET_HEADER*)Packet->FrameBuffer; + IPV4_HEADER* IPv4 = (IPV4_HEADER*)(Ethernet + 1); + UDP_HEADER* UDP = (UDP_HEADER*)(IPv4 + 1); + + // Set Ethernet header + memset(Ethernet->Destination, 0xFF, sizeof(Ethernet->Destination)); // Broadcast address + memset(Ethernet->Source, 0x00, sizeof(Ethernet->Source)); // Source MAC address + Ethernet->Type = htons(0x0800); // IPv4 + + // Set IPv4 header + IPv4->VersionAndHeaderLength = 0x45; // Version 4, Header length 20 bytes + IPv4->TypeOfService = 0; + IPv4->TotalLength = htons(sizeof(IPV4_HEADER) + sizeof(UDP_HEADER)); + IPv4->Identification = 0; + IPv4->FlagsAndFragmentOffset = 0; + IPv4->TimeToLive = 64; + IPv4->Protocol = 17; // UDP + IPv4->HeaderChecksum = 0; // Will be calculated later + *(uint32_t*)IPv4->Source = htonl(0xC0A80001); // 192.168.0.1 + *(uint32_t*)IPv4->Destination = htonl(0xC0A80002); // 192.168.0.2 + + // Set UDP header + UDP->SourcePort = htons(12345); + UDP->DestinationPort = htons(80); + UDP->Length = htons(sizeof(UDP_HEADER)); + UDP->Checksum = 0; // Optional for IPv4 + + // Calculate IPv4 header checksum + uint32_t sum = 0; + uint16_t* header = (uint16_t*)IPv4; + for (int i = 0; i < sizeof(IPV4_HEADER) / 2; ++i) { + sum += header[i]; + } + while (sum >> 16) { + sum = (sum & 0xFFFF) + (sum >> 16); + } + IPv4->HeaderChecksum = (uint16_t)~sum; + + // Set packet length + Packet->Buffer.Length = sizeof(ETHERNET_HEADER) + sizeof(IPV4_HEADER) + sizeof(UDP_HEADER); +} + QUIC_STATUS CxPlatGetRssQueueProcessors( _In_ XDP_DATAPATH* Xdp, - _In_ HANDLE XdpHandle, _In_ uint32_t InterfaceIndex, - _Inout_count_(*Count) PROCESSOR_NUMBER* Queues, - _Inout_ uint16_t* Count + _Inout_ uint16_t* Count, + _Out_writes_(*Count) PROCESSOR_NUMBER* Queues ) { const uint16_t MaxCount = *Count; + uint32_t TxRingSize = 1; + XDP_TX_PACKET TxPacket = { 0 }; + CreateNoOpEthernetPacket(&TxPacket); *Count = 0; - for (*Count = 0; i < MaxCount; ++i) { // TODO - Add cleanup code - XDP_TX_PACKET TxPacket = { 0 }; // TODO - Write Payload + for (*Count = 0; *Count < MaxCount; ++(*Count)) { // TODO - Add cleanup code + HANDLE TxXsk = NULL; - uint32_t TxRingSize = 1; QUIC_STATUS Status = Xdp->XdpApi->XskCreate(&TxXsk); if (QUIC_FAILED(Status)) { return Status; } @@ -133,53 +180,54 @@ CxPlatGetRssQueueProcessors( TxUmem.TotalSize = sizeof(XDP_TX_PACKET); Status = Xdp->XdpApi->XskSetSockopt(TxXsk, XSK_SOCKOPT_UMEM_REG, &TxUmem, sizeof(TxUmem)); - if (QUIC_FAILED(Status)) { return Status; } + if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } Status = Xdp->XdpApi->XskSetSockopt(TxXsk, XSK_SOCKOPT_TX_RING_SIZE, &TxRingSize, sizeof(TxRingSize)); - if (QUIC_FAILED(Status)) { return Status; } + if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } Status = Xdp->XdpApi->XskSetSockopt(TxXsk, XSK_SOCKOPT_TX_COMPLETION_RING_SIZE, &TxRingSize, sizeof(TxRingSize)); - if (QUIC_FAILED(Status)) { return Status; } + if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } uint32_t Flags = XSK_BIND_FLAG_TX; - Status = Xdp->XdpApi->XskBind(TxXsk, Interface->ActualIfIndex, *Count, Flags); - if (QUIC_FAILED(Status)) { break; } // No more queues. Break out. + Status = Xdp->XdpApi->XskBind(TxXsk, InterfaceIndex, *Count, Flags); + if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); break; } // No more queues. Break out. Status = Xdp->XdpApi->XskActivate(TxXsk, 0); - if (QUIC_FAILED(Status)) { return Status; } + if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } XSK_RING_INFO_SET TxRingInfo; uint32_t TxRingInfoSize = sizeof(TxRingInfo); Status = Xdp->XdpApi->XskGetSockopt(TxXsk, XSK_SOCKOPT_RING_INFO, &TxRingInfo, &TxRingInfoSize); - if (QUIC_FAILED(Status)) { return Status; } + if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } XSK_RING TxRing, TxCompletionRing; XskRingInitialize(&TxRing, &TxRingInfo.Tx); XskRingInitialize(&TxCompletionRing, &TxRingInfo.Completion); uint32_t TxIndex; - uint32_t TxAvailable = XskRingProducerReserve(&TxRing, MAXUINT32, &TxIndex); + XskRingProducerReserve(&TxRing, MAXUINT32, &TxIndex); XSK_BUFFER_DESCRIPTOR* Buffer = XskRingGetElement(&TxRing, TxIndex++); Buffer->Address.BaseAddress = 0; Buffer->Address.Offset = FIELD_OFFSET(XDP_TX_PACKET, FrameBuffer); Buffer->Length = TxPacket.Buffer.Length; - XskRingProducerSubmit(&Queue->TxRing, 1); + XskRingProducerSubmit(&TxRing, 1); XSK_NOTIFY_RESULT_FLAGS OutFlags; Status = Xdp->XdpApi->XskNotifySocket(TxXsk, XSK_NOTIFY_FLAG_POKE_TX, 0, &OutFlags); uint32_t CompIndex; - uint32_t CompAvailable = XskRingConsumerReserve(&TxCompletionRing, MAXUINT32, &CompIndex); while (XskRingConsumerReserve(&TxCompletionRing, MAXUINT32, &CompIndex) == 0) { Sleep(0); // TODO - Wait? } - XskRingConsumerRelease(&Queue->TxCompletionRing, 1); + XskRingConsumerRelease(&TxCompletionRing, 1); uint32_t ProcNumberSize = sizeof(PROCESSOR_NUMBER); Status = Xdp->XdpApi->XskGetSockopt(TxXsk, XSK_SOCKOPT_TX_PROCESSOR_AFFINITY, Queues + *Count, &ProcNumberSize); - if (QUIC_FAILED(Status)) { return Status; } + if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } + + CloseHandle(TxXsk); } return QUIC_STATUS_SUCCESS; @@ -558,7 +606,7 @@ CxPlatDpRawInterfaceInitialize( Interface->OffloadStatus.Transmit.NetworkLayerXsum = Xdp->SkipXsum; Interface->Xdp = Xdp; - PROCESSOR_NUMBER Processors[256]; + PROCESSOR_NUMBER Processors[256]; // TODO - Use max processor count uint16_t ProcessorCount = ARRAYSIZE(Processors); Status = Xdp->XdpApi->XdpInterfaceOpen(Interface->ActualIfIndex, &Interface->XdpHandle); @@ -571,8 +619,13 @@ CxPlatDpRawInterfaceInitialize( goto Error; } - Status = CxPlatGetRssQueueProcessors(Xdp, Interface->XdpHandle, Interface->ActualIfIndex, Processors, &ProcessorCount); + Status = CxPlatGetRssQueueProcessors(Xdp, Interface->ActualIfIndex, &ProcessorCount, Processors); if (QUIC_FAILED(Status)) { + QuicTraceEvent( + LibraryErrorStatus, + "[ lib] ERROR, %u, %s.", + Status, + "CxPlatGetRssQueueProcessors"); goto Error; } From 47eb702cba3f5f7cf75223d2847ecdbf03f114e4 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 22 Aug 2024 12:24:28 -0400 Subject: [PATCH 03/13] Find right processor queue --- src/platform/datapath_raw_xdp_win.c | 31 ++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index 623fa9a68b..11b083f1a1 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -158,7 +158,7 @@ CxPlatGetRssQueueProcessors( _In_ XDP_DATAPATH* Xdp, _In_ uint32_t InterfaceIndex, _Inout_ uint16_t* Count, - _Out_writes_(*Count) PROCESSOR_NUMBER* Queues + _Out_writes_(*Count) uint32_t* Queues ) { const uint16_t MaxCount = *Count; @@ -223,10 +223,14 @@ CxPlatGetRssQueueProcessors( XskRingConsumerRelease(&TxCompletionRing, 1); + PROCESSOR_NUMBER ProcNumber; uint32_t ProcNumberSize = sizeof(PROCESSOR_NUMBER); - Status = Xdp->XdpApi->XskGetSockopt(TxXsk, XSK_SOCKOPT_TX_PROCESSOR_AFFINITY, Queues + *Count, &ProcNumberSize); + Status = Xdp->XdpApi->XskGetSockopt(TxXsk, XSK_SOCKOPT_TX_PROCESSOR_AFFINITY, &ProcNumber, &ProcNumberSize); if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } + const CXPLAT_PROCESSOR_GROUP_INFO* Group = &CxPlatProcessorGroupInfo[ProcNumber.Group]; + Queues[*Count] = Group->Offset + (ProcNumber.Number % Group->Count); + CloseHandle(TxXsk); } @@ -606,7 +610,7 @@ CxPlatDpRawInterfaceInitialize( Interface->OffloadStatus.Transmit.NetworkLayerXsum = Xdp->SkipXsum; Interface->Xdp = Xdp; - PROCESSOR_NUMBER Processors[256]; // TODO - Use max processor count + uint32_t Processors[256]; // TODO - Use max processor count uint16_t ProcessorCount = ARRAYSIZE(Processors); Status = Xdp->XdpApi->XdpInterfaceOpen(Interface->ActualIfIndex, &Interface->XdpHandle); @@ -666,6 +670,23 @@ CxPlatDpRawInterfaceInitialize( CxPlatDatapathSqeInitialize(&Queue->TxIoSqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); Queue->TxIoSqe.IoType = DATAPATH_XDP_IO_SEND; + uint32_t QueueIndex = UINT32_MAX; + for (uint32_t j = 0; j < ProcessorCount; j++) { + if (Processors[j] == i) { + QueueIndex = j; + break; + } + } + if (QueueIndex == UINT32_MAX) { + Status = QUIC_STATUS_INVALID_STATE; + QuicTraceEvent( + LibraryErrorStatus, + "[ lib] ERROR, %u, %s.", + Status, + "QueueIndex"); + goto Error; + } + // // RX datapath. // @@ -733,7 +754,7 @@ CxPlatDpRawInterfaceInitialize( } uint32_t Flags = XSK_BIND_FLAG_RX; - Status = Xdp->XdpApi->XskBind(Queue->RxXsk, Interface->ActualIfIndex, i, Flags); + Status = Xdp->XdpApi->XskBind(Queue->RxXsk, Interface->ActualIfIndex, QueueIndex, Flags); if (QUIC_FAILED(Status)) { QuicTraceEvent( LibraryErrorStatus, @@ -855,7 +876,7 @@ CxPlatDpRawInterfaceInitialize( } Flags = XSK_BIND_FLAG_TX; // TODO: support native/generic forced flags. - Status = Xdp->XdpApi->XskBind(Queue->TxXsk, Interface->ActualIfIndex, i, Flags); + Status = Xdp->XdpApi->XskBind(Queue->TxXsk, Interface->ActualIfIndex, QueueIndex, Flags); if (QUIC_FAILED(Status)) { QuicTraceEvent( LibraryErrorStatus, From b1ff11c7437ec7d608f9ffaabd8d0f4f1cfe4ec3 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 22 Aug 2024 12:30:11 -0400 Subject: [PATCH 04/13] Fix build --- src/platform/datapath_raw_xdp_win.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index 11b083f1a1..0db3972804 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -158,16 +158,14 @@ CxPlatGetRssQueueProcessors( _In_ XDP_DATAPATH* Xdp, _In_ uint32_t InterfaceIndex, _Inout_ uint16_t* Count, - _Out_writes_(*Count) uint32_t* Queues + _Out_writes_to_(*Count, *Count) uint32_t* Queues ) { - const uint16_t MaxCount = *Count; uint32_t TxRingSize = 1; XDP_TX_PACKET TxPacket = { 0 }; CreateNoOpEthernetPacket(&TxPacket); - *Count = 0; - for (*Count = 0; *Count < MaxCount; ++(*Count)) { // TODO - Add cleanup code + for (uint16_t i = 0; i < *Count; ++i) { // TODO - Add cleanup code HANDLE TxXsk = NULL; QUIC_STATUS Status = Xdp->XdpApi->XskCreate(&TxXsk); @@ -189,8 +187,12 @@ CxPlatGetRssQueueProcessors( if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } uint32_t Flags = XSK_BIND_FLAG_TX; - Status = Xdp->XdpApi->XskBind(TxXsk, InterfaceIndex, *Count, Flags); - if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); break; } // No more queues. Break out. + Status = Xdp->XdpApi->XskBind(TxXsk, InterfaceIndex, i, Flags); + if (QUIC_FAILED(Status)) { // No more queues. Break out. + *Count = i; + CloseHandle(TxXsk); + break; + } Status = Xdp->XdpApi->XskActivate(TxXsk, 0); if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } @@ -229,7 +231,7 @@ CxPlatGetRssQueueProcessors( if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } const CXPLAT_PROCESSOR_GROUP_INFO* Group = &CxPlatProcessorGroupInfo[ProcNumber.Group]; - Queues[*Count] = Group->Offset + (ProcNumber.Number % Group->Count); + Queues[i] = Group->Offset + (ProcNumber.Number % Group->Count); CloseHandle(TxXsk); } From 14330bfdf9d935e5a14dc72384aba7f366cce10a Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 22 Aug 2024 12:31:18 -0400 Subject: [PATCH 05/13] nits --- src/platform/datapath_raw_xdp_win.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index 0db3972804..c486d93cf4 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -107,7 +107,8 @@ CxPlatXdpExecute( _Inout_ CXPLAT_EXECUTION_STATE* State ); -void CreateNoOpEthernetPacket( +void +CreateNoOpEthernetPacket( _Inout_ XDP_TX_PACKET* Packet ) { @@ -165,8 +166,7 @@ CxPlatGetRssQueueProcessors( XDP_TX_PACKET TxPacket = { 0 }; CreateNoOpEthernetPacket(&TxPacket); - for (uint16_t i = 0; i < *Count; ++i) { // TODO - Add cleanup code - + for (uint16_t i = 0; i < *Count; ++i) { HANDLE TxXsk = NULL; QUIC_STATUS Status = Xdp->XdpApi->XskCreate(&TxXsk); if (QUIC_FAILED(Status)) { return Status; } From d6fbdec5b95e7636a5d37dbe574264fcf6797bfe Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Fri, 23 Aug 2024 09:31:53 -0400 Subject: [PATCH 06/13] Works on my machine now --- src/platform/datapath_raw_xdp_win.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index c486d93cf4..fc66b80674 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -613,7 +613,7 @@ CxPlatDpRawInterfaceInitialize( Interface->Xdp = Xdp; uint32_t Processors[256]; // TODO - Use max processor count - uint16_t ProcessorCount = ARRAYSIZE(Processors); + Interface->QueueCount = ARRAYSIZE(Processors); Status = Xdp->XdpApi->XdpInterfaceOpen(Interface->ActualIfIndex, &Interface->XdpHandle); if (QUIC_FAILED(Status)) { @@ -625,7 +625,7 @@ CxPlatDpRawInterfaceInitialize( goto Error; } - Status = CxPlatGetRssQueueProcessors(Xdp, Interface->ActualIfIndex, &ProcessorCount, Processors); + Status = CxPlatGetRssQueueProcessors(Xdp, Interface->ActualIfIndex, &Interface->QueueCount, Processors); if (QUIC_FAILED(Status)) { QuicTraceEvent( LibraryErrorStatus, @@ -635,7 +635,7 @@ CxPlatDpRawInterfaceInitialize( goto Error; } - if (ProcessorCount == 0) { + if (Interface->QueueCount == 0) { Status = QUIC_STATUS_INVALID_STATE; QuicTraceEvent( LibraryErrorStatus, @@ -673,7 +673,7 @@ CxPlatDpRawInterfaceInitialize( Queue->TxIoSqe.IoType = DATAPATH_XDP_IO_SEND; uint32_t QueueIndex = UINT32_MAX; - for (uint32_t j = 0; j < ProcessorCount; j++) { + for (uint32_t j = 0; j < Interface->QueueCount; j++) { if (Processors[j] == i) { QueueIndex = j; break; From 25150a2cf03e5ea67e6b772ebb7cf73bc2981bb8 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Fri, 23 Aug 2024 09:58:57 -0400 Subject: [PATCH 07/13] No sleep --- src/platform/datapath_raw_xdp_win.c | 253 +--------------------------- 1 file changed, 6 insertions(+), 247 deletions(-) diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index fc66b80674..d2f38d3cfd 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -216,13 +216,14 @@ CxPlatGetRssQueueProcessors( XskRingProducerSubmit(&TxRing, 1); XSK_NOTIFY_RESULT_FLAGS OutFlags; - Status = Xdp->XdpApi->XskNotifySocket(TxXsk, XSK_NOTIFY_FLAG_POKE_TX, 0, &OutFlags); + Status = Xdp->XdpApi->XskNotifySocket(TxXsk, XSK_NOTIFY_FLAG_POKE_TX|XSK_NOTIFY_FLAG_WAIT_TX, 1000, &OutFlags); + if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } uint32_t CompIndex; - while (XskRingConsumerReserve(&TxCompletionRing, MAXUINT32, &CompIndex) == 0) { - Sleep(0); // TODO - Wait? + if (XskRingConsumerReserve(&TxCompletionRing, MAXUINT32, &CompIndex) == 0) { + CloseHandle(TxXsk); + return E_ABORT; } - XskRingConsumerRelease(&TxCompletionRing, 1); PROCESSOR_NUMBER ProcNumber; @@ -239,248 +240,6 @@ CxPlatGetRssQueueProcessors( return QUIC_STATUS_SUCCESS; } -QUIC_STATUS -CxPlatGetInterfaceRssQueueCount( - _In_ uint32_t InterfaceIndex, - _Out_ uint16_t* Count - ) -{ - HRESULT hRes; - IWbemLocator *pLoc = NULL; - IEnumWbemClassObject *pEnum = NULL; - IWbemServices *pSvc = NULL; - DWORD ret = 0; - uint16_t cnt = 0; - NET_LUID if_luid = { 0 }; - WCHAR if_alias[256 + 1] = { 0 }; - - ret = ConvertInterfaceIndexToLuid(InterfaceIndex, &if_luid); - if (ret != NO_ERROR) { - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - ret, - "ConvertInterfaceIndexToLuid"); - return HRESULT_FROM_WIN32(ret); - } - - ret = ConvertInterfaceLuidToAlias(&if_luid, if_alias, RTL_NUMBER_OF(if_alias)); - if (ret != NO_ERROR) { - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - ret, - "ConvertInterfaceLuidToAlias"); - return HRESULT_FROM_WIN32(ret); - } - - // Step 1: -------------------------------------------------- - // Initialize COM. ------------------------------------------ - hRes = CoInitializeEx(0, COINIT_MULTITHREADED); - if (FAILED(hRes)) { - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - hRes, - "CoInitializeEx"); - return hRes; - } - - // Step 2: --------------------------------------------------- - // Obtain the initial locator to WMI ------------------------- - hRes = CoCreateInstance( - &CLSID_WbemLocator, - 0, - CLSCTX_INPROC_SERVER, - &IID_IWbemLocator, (LPVOID *) &pLoc); - if (FAILED(hRes)) { - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - hRes, - "CoCreateInstance IWbemLocator"); - goto Cleanup; - } - - // Step 3: ----------------------------------------------------- - // Connect to WMI through the IWbemLocator::ConnectServer method - // Connect to the root\cimv2 namespace with - // the current user and obtain pointer pSvc - // to make IWbemServices calls. - BSTR Namespace = SysAllocString(L"ROOT\\STANDARDCIMV2"); - hRes = pLoc->lpVtbl->ConnectServer(pLoc, - Namespace, // Object path of WMI namespace - NULL, // User name. NULL = current user - NULL, // User password. NULL = current - 0, // Locale. NULL indicates current - 0, // Security flags. - 0, // Authority (for example, Kerberos) - 0, // Context object - &pSvc // pointer to IWbemServices proxy - ); - SysFreeString(Namespace); - if (FAILED(hRes)) { - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - hRes, - "ConnectServer"); - goto Cleanup; - } - - // Step 4: -------------------------------------------------- - // Set security levels on the proxy ------------------------- - hRes = CoSetProxyBlanket( - (IUnknown*)pSvc, // Indicates the proxy to set - RPC_C_AUTHN_WINNT, // RPC_C_AUTHN_xxx - RPC_C_AUTHZ_NONE, // RPC_C_AUTHZ_xxx - NULL, // Server principal name - RPC_C_AUTHN_LEVEL_CALL, // RPC_C_AUTHN_LEVEL_xxx - RPC_C_IMP_LEVEL_IMPERSONATE, // RPC_C_IMP_LEVEL_xxx - NULL, // client identity - EOAC_NONE // proxy capabilities - ); - if (FAILED(hRes)) { - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - hRes, - "CoSetProxyBlanket"); - goto Cleanup; - } - - // Step 5: -------------------------------------------------- - // Use the IWbemServices pointer to make requests of WMI ---- - wchar_t query[512] = { '\0' }; - (void)wcscat_s(query, 512, L"SELECT * FROM MSFT_NetAdapterRssSettingData WHERE Name='"); - (void)wcscat_s(query, 512, if_alias); - (void)wcscat_s(query, 512, L"'"); - //AF_XDP_LOG(INFO, "WMI query = \"%ws\"\n", query); - - BSTR Language = SysAllocString(L"WQL"); - BSTR Query = SysAllocString(query); - hRes = pSvc->lpVtbl->ExecQuery(pSvc, - Language, - Query, - WBEM_FLAG_FORWARD_ONLY, // Flags - 0, // Context - &pEnum - ); - SysFreeString(Query); - SysFreeString(Language); - if (FAILED(hRes)) { - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - hRes, - "ExecQuery"); - goto Cleanup; - } - - // Step 6: ------------------------------------------------- - // Get the data from the query in step 6 ------------------- - IWbemClassObject *pclsObj = NULL; - ULONG uReturn = 0; - while (pEnum) { - HRESULT hr = pEnum->lpVtbl->Next(pEnum, WBEM_INFINITE, 1, - &pclsObj, &uReturn); - - if (0 == uReturn) { - break; - } - - VARIANT vtProp; - - // Get the value of the IndirectionTable property - hr = pclsObj->lpVtbl->Get(pclsObj, L"IndirectionTable", 0, &vtProp, 0, 0); - if ((vtProp.vt == VT_NULL) || (vtProp.vt == VT_EMPTY)) { - //AF_XDP_LOG(INFO, "No RSS indirection table, assuming 1 default queue\n"); - cnt++; - CXPLAT_FRE_ASSERT(cnt != 0); - } else if ((vtProp.vt & VT_ARRAY) == 0) { - //AF_XDP_LOG(ERR, "not ARRAY\n"); - } else { - long lLower, lUpper; - SAFEARRAY *pSafeArray = vtProp.parray; - UINT8 *rssTable = NULL; - DWORD rssTableSize; - DWORD numberOfProcs; - DWORD numberOfProcGroups; - - SafeArrayGetLBound(pSafeArray, 1, &lLower); - SafeArrayGetUBound(pSafeArray, 1, &lUpper); - - IUnknown** rawArray; - SafeArrayAccessData(pSafeArray, (void**)&rawArray); - - // Set up the RSS table according to number of procs and proc groups. - numberOfProcs = GetActiveProcessorCount(ALL_PROCESSOR_GROUPS); - numberOfProcGroups = GetActiveProcessorGroupCount(); - rssTableSize = numberOfProcs * numberOfProcGroups; - rssTable = malloc(rssTableSize); - memset(rssTable, 0, rssTableSize); - - for (long i = lLower; i <= lUpper; i++) - { - IUnknown* pIUnk = rawArray[i]; - IWbemClassObject *obj = NULL; - pIUnk->lpVtbl->QueryInterface(pIUnk, &IID_IWbemClassObject, (void **)&obj); - if (obj == NULL) { - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - hRes, - "QueryInterface"); - free(rssTable); - hRes = QUIC_STATUS_OUT_OF_MEMORY; - goto Cleanup; - } - - hr = obj->lpVtbl->Get(obj, L"ProcessorNumber", 0, &vtProp, 0, 0); - UINT32 procNum = vtProp.iVal; - VariantClear(&vtProp); - hr = obj->lpVtbl->Get(obj, L"ProcessorGroup", 0, &vtProp, 0, 0); - UINT32 groupNum = vtProp.iVal; - VariantClear(&vtProp); - CXPLAT_DBG_ASSERT(groupNum < numberOfProcGroups); - CXPLAT_DBG_ASSERT(procNum < numberOfProcs); - *(rssTable + groupNum * numberOfProcs + procNum) = 1; - obj->lpVtbl->Release(obj); - } - - SafeArrayUnaccessData(pSafeArray); - - // Count unique RSS procs by counting ones in rssTable. - for (DWORD i = 0; i < rssTableSize; ++i) { - cnt += rssTable[i]; - } - - free(rssTable); - } - - VariantClear(&vtProp); - pclsObj->lpVtbl->Release(pclsObj); - } - - //AF_XDP_LOG(INFO, "counted %u active queues on %s\n", cnt, if_name); - *Count = cnt; - -Cleanup: - - if (pEnum != NULL) { - pEnum->lpVtbl->Release(pEnum); - } - if (pSvc != NULL) { - pSvc->lpVtbl->Release(pSvc); - } - if (pLoc != NULL) { - pLoc->lpVtbl->Release(pLoc); - } - CoUninitialize(); - - return hRes; -} - _IRQL_requires_max_(PASSIVE_LEVEL) void CxPlatXdpReadConfig( @@ -658,7 +417,7 @@ CxPlatDpRawInterfaceInitialize( CxPlatZeroMemory(Interface->Queues, Interface->QueueCount * sizeof(*Interface->Queues)); - for (uint8_t i = 0; i < Interface->QueueCount; i++) { // TODO - Get the right queue ID to match this processor + for (uint8_t i = 0; i < Interface->QueueCount; i++) { XDP_QUEUE* Queue = &Interface->Queues[i]; Queue->Interface = Interface; From ca5d8e6142ef8f8ffd593bf809bd1a6a9d1c037e Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 29 Aug 2024 11:55:54 -0400 Subject: [PATCH 08/13] Let's try this instead --- src/platform/datapath_raw_xdp.h | 5 ++- src/platform/datapath_raw_xdp_linux.c | 8 +++- src/platform/datapath_raw_xdp_win.c | 61 ++++++++++++--------------- 3 files changed, 39 insertions(+), 35 deletions(-) diff --git a/src/platform/datapath_raw_xdp.h b/src/platform/datapath_raw_xdp.h index f51080f465..d264ce5d4a 100644 --- a/src/platform/datapath_raw_xdp.h +++ b/src/platform/datapath_raw_xdp.h @@ -57,6 +57,7 @@ typedef struct QUIC_CACHEALIGN XDP_PARTITION { CXPLAT_EVENTQ* EventQ; XDP_QUEUE* Queues; // A linked list of queues, accessed by Next. uint16_t PartitionIndex; + uint16_t Processor; } XDP_PARTITION; void XdpWorkerAddQueue(_In_ XDP_PARTITION* Partition, _In_ XDP_QUEUE* Queue) { @@ -78,7 +79,9 @@ CxPlatDpRawAssignQueue( ) { const XDP_INTERFACE_COMMON* Interface = (const XDP_INTERFACE_COMMON*)_Interface; - Route->Queue = &((XDP_QUEUE_COMMON*)Interface->Queues)[0]; + XDP_QUEUE_COMMON* Queues = (XDP_QUEUE_COMMON*)Interface->Queues; + CXPLAT_FRE_ASSERT(Queues[0].Partition != NULL); // What if there was no partition? + Route->Queue = &Queues[0]; // TODO - Can we do better than just the first queue? } _IRQL_requires_max_(DISPATCH_LEVEL) diff --git a/src/platform/datapath_raw_xdp_linux.c b/src/platform/datapath_raw_xdp_linux.c index eaef116164..96fb14724a 100644 --- a/src/platform/datapath_raw_xdp_linux.c +++ b/src/platform/datapath_raw_xdp_linux.c @@ -332,7 +332,7 @@ static uint64_t XskUmemFrameAlloc(struct XskSocketInfo *Xsk) if (Xsk->UmemFrameFree == 0) { QuicTraceLogVerbose( XdpUmemAllocFails, - "[ xdp][umem] Out of UMEM frame, OOM"); + "[ xdp][umem] Out of UMEM frame, OOM"); return INVALID_UMEM_FRAME; } Frame = Xsk->UmemFrameAddr[--Xsk->UmemFrameFree]; @@ -684,8 +684,14 @@ CxPlatDpRawInitialize( if (Config && Config->ProcessorCount) { Xdp->PartitionCount = Config->ProcessorCount; + for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { + Xdp->Partitions[i].Processor = Config->ProcessorList[i]; + } } else { Xdp->PartitionCount = CxPlatProcCount(); + for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { + Xdp->Partitions[i].Processor = (uint16_t)i; + } } QuicTraceLogVerbose( diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index d2f38d3cfd..b0c25366b5 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -57,6 +57,7 @@ typedef struct XDP_INTERFACE { typedef struct XDP_QUEUE { XDP_QUEUE_COMMON; + uint16_t RssProcessor; uint8_t* RxBuffers; HANDLE RxXsk; DATAPATH_XDP_IO_SQE RxIoSqe; @@ -420,6 +421,7 @@ CxPlatDpRawInterfaceInitialize( for (uint8_t i = 0; i < Interface->QueueCount; i++) { XDP_QUEUE* Queue = &Interface->Queues[i]; + Queue->RssProcessor = (uint16_t)Processors[i]; // TODO - Should memory be aligned with this? Queue->Interface = Interface; InitializeSListHead(&Queue->RxPool); InitializeSListHead(&Queue->TxPool); @@ -431,23 +433,6 @@ CxPlatDpRawInterfaceInitialize( CxPlatDatapathSqeInitialize(&Queue->TxIoSqe.DatapathSqe, CXPLAT_CQE_TYPE_SOCKET_IO); Queue->TxIoSqe.IoType = DATAPATH_XDP_IO_SEND; - uint32_t QueueIndex = UINT32_MAX; - for (uint32_t j = 0; j < Interface->QueueCount; j++) { - if (Processors[j] == i) { - QueueIndex = j; - break; - } - } - if (QueueIndex == UINT32_MAX) { - Status = QUIC_STATUS_INVALID_STATE; - QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - Status, - "QueueIndex"); - goto Error; - } - // // RX datapath. // @@ -515,7 +500,7 @@ CxPlatDpRawInterfaceInitialize( } uint32_t Flags = XSK_BIND_FLAG_RX; - Status = Xdp->XdpApi->XskBind(Queue->RxXsk, Interface->ActualIfIndex, QueueIndex, Flags); + Status = Xdp->XdpApi->XskBind(Queue->RxXsk, Interface->ActualIfIndex, i, Flags); if (QUIC_FAILED(Status)) { QuicTraceEvent( LibraryErrorStatus, @@ -637,7 +622,7 @@ CxPlatDpRawInterfaceInitialize( } Flags = XSK_BIND_FLAG_TX; // TODO: support native/generic forced flags. - Status = Xdp->XdpApi->XskBind(Queue->TxXsk, Interface->ActualIfIndex, QueueIndex, Flags); + Status = Xdp->XdpApi->XskBind(Queue->TxXsk, Interface->ActualIfIndex, i, Flags); if (QUIC_FAILED(Status)) { QuicTraceEvent( LibraryErrorStatus, @@ -694,10 +679,20 @@ CxPlatDpRawInterfaceInitialize( } // - // Add each queue to a partition (round robin). + // Add each queue to the correct partition. // - for (uint8_t i = 0; i < Interface->QueueCount; i++) { - XdpWorkerAddQueue(&Xdp->Partitions[i % Xdp->PartitionCount], &Interface->Queues[i]); + for (uint16_t i = 0; i < Interface->QueueCount; i++) { + BOOLEAN Found = FALSE; + for (uint16_t j = 0; j < Xdp->PartitionCount; j++) { + if (Xdp->Partitions[j].Processor == Interface->Queues[i].RssProcessor) { + XdpWorkerAddQueue(&Xdp->Partitions[j], &Interface->Queues[i]); + Found = TRUE; + break; + } + } + if (!Found) { + CXPLAT_FRE_ASSERT(FALSE); // TODO - What do we do if there is no partition for this processor? + } } Error: @@ -915,8 +910,14 @@ CxPlatDpRawInitialize( if (Config && Config->ProcessorCount) { Xdp->PartitionCount = Config->ProcessorCount; + for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { + Xdp->Partitions[i].Processor = Config->ProcessorList[i]; + } } else { Xdp->PartitionCount = CxPlatProcCount(); + for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { + Xdp->Partitions[i].Processor = (uint16_t)i; + } } QuicTraceLogVerbose( @@ -1050,15 +1051,7 @@ CxPlatDpRawInitialize( for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { XDP_PARTITION* Partition = &Xdp->Partitions[i]; - if (Partition->Queues == NULL) { - // - // Because queues are assigned in a round-robin manner, subsequent - // partitions will not have a queue assigned. Stop the loop and update - // partition count. - // - Xdp->PartitionCount = i; - break; - } + if (Partition->Queues == NULL) { continue; } // No RSS queues for this partition. Partition->Xdp = Xdp; Partition->PartitionIndex = (uint16_t)i; @@ -1164,8 +1157,10 @@ CxPlatDpRawUninitialize( Xdp); Xdp->Running = FALSE; for (uint32_t i = 0; i < Xdp->PartitionCount; i++) { - Xdp->Partitions[i].Ec.Ready = TRUE; - CxPlatWakeExecutionContext(&Xdp->Partitions[i].Ec); + if (Xdp->Partitions[i].Queues != NULL) { + Xdp->Partitions[i].Ec.Ready = TRUE; + CxPlatWakeExecutionContext(&Xdp->Partitions[i].Ec); + } } CxPlatDpRawRelease(Xdp); } From 4a6929c5c009f2343771a9bce124c685f2dd8d85 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 29 Aug 2024 13:27:04 -0400 Subject: [PATCH 09/13] Logs and round robin --- src/platform/datapath_raw_xdp_win.c | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index b0c25366b5..46b598c94a 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -405,6 +405,12 @@ CxPlatDpRawInterfaceInitialize( goto Error; } + QuicTraceLogVerbose( + XdpInterfaceQueues, + "[ixdp][%p] Initializing %u queues on interface", + Interface, + Interface->QueueCount); + Interface->Queues = CxPlatAlloc(Interface->QueueCount * sizeof(*Interface->Queues), QUEUE_TAG); if (Interface->Queues == NULL) { QuicTraceEvent( @@ -681,6 +687,7 @@ CxPlatDpRawInterfaceInitialize( // // Add each queue to the correct partition. // + uint16_t RoundRobinIndex = 0; for (uint16_t i = 0; i < Interface->QueueCount; i++) { BOOLEAN Found = FALSE; for (uint16_t j = 0; j < Xdp->PartitionCount; j++) { @@ -691,7 +698,12 @@ CxPlatDpRawInterfaceInitialize( } } if (!Found) { - CXPLAT_FRE_ASSERT(FALSE); // TODO - What do we do if there is no partition for this processor? + // + // Assign leftovers based on round robin. + // + XdpWorkerAddQueue( + &Xdp->Partitions[RoundRobinIndex++ % Xdp->PartitionCount], + &Interface->Queues[i]); } } @@ -1011,6 +1023,12 @@ CxPlatDpRawInitialize( } }*/ + QuicTraceLogVerbose( + XdpInterfaceInitialize, + "[ixdp][%p] Initializing interface %u", + Interface, + Interface->ActualIfIndex); + Status = CxPlatDpRawInterfaceInitialize( Xdp, Interface, ClientRecvContextLength); From 984d142c78d6473b4115325fd4a4165b639b9362 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 29 Aug 2024 14:24:17 -0400 Subject: [PATCH 10/13] clog again --- .../linux/datapath_raw_xdp_win.c.clog.h | 40 ++++++++++++++++ .../datapath_raw_xdp_win.c.clog.h.lttng.h | 46 +++++++++++++++++++ 2 files changed, 86 insertions(+) diff --git a/src/generated/linux/datapath_raw_xdp_win.c.clog.h b/src/generated/linux/datapath_raw_xdp_win.c.clog.h index d9cf18709e..4768dedd8a 100644 --- a/src/generated/linux/datapath_raw_xdp_win.c.clog.h +++ b/src/generated/linux/datapath_raw_xdp_win.c.clog.h @@ -51,6 +51,26 @@ tracepoint(CLOG_DATAPATH_RAW_XDP_WIN_C, FoundVF , arg2, arg3, arg4);\ +/*---------------------------------------------------------- +// Decoder Ring for XdpInterfaceQueues +// [ixdp][%p] Initializing %u queues on interface +// QuicTraceLogVerbose( + XdpInterfaceQueues, + "[ixdp][%p] Initializing %u queues on interface", + Interface, + Interface->QueueCount); +// arg2 = arg2 = Interface = arg2 +// arg3 = arg3 = Interface->QueueCount = arg3 +----------------------------------------------------------*/ +#ifndef _clog_4_ARGS_TRACE_XdpInterfaceQueues +#define _clog_4_ARGS_TRACE_XdpInterfaceQueues(uniqueId, encoded_arg_string, arg2, arg3)\ +tracepoint(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpInterfaceQueues , arg2, arg3);\ + +#endif + + + + /*---------------------------------------------------------- // Decoder Ring for XdpInitialize // [ xdp][%p] XDP initialized, %u procs @@ -71,6 +91,26 @@ tracepoint(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpInitialize , arg2, arg3);\ +/*---------------------------------------------------------- +// Decoder Ring for XdpInterfaceInitialize +// [ixdp][%p] Initializing interface %u +// QuicTraceLogVerbose( + XdpInterfaceInitialize, + "[ixdp][%p] Initializing interface %u", + Interface, + Interface->ActualIfIndex); +// arg2 = arg2 = Interface = arg2 +// arg3 = arg3 = Interface->ActualIfIndex = arg3 +----------------------------------------------------------*/ +#ifndef _clog_4_ARGS_TRACE_XdpInterfaceInitialize +#define _clog_4_ARGS_TRACE_XdpInterfaceInitialize(uniqueId, encoded_arg_string, arg2, arg3)\ +tracepoint(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpInterfaceInitialize , arg2, arg3);\ + +#endif + + + + /*---------------------------------------------------------- // Decoder Ring for XdpQueueStart // [ xdp][%p] XDP queue start on partition %p diff --git a/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h b/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h index 6752517578..6e0c0c2b44 100644 --- a/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h +++ b/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h @@ -28,6 +28,29 @@ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, FoundVF, +/*---------------------------------------------------------- +// Decoder Ring for XdpInterfaceQueues +// [ixdp][%p] Initializing %u queues on interface +// QuicTraceLogVerbose( + XdpInterfaceQueues, + "[ixdp][%p] Initializing %u queues on interface", + Interface, + Interface->QueueCount); +// arg2 = arg2 = Interface = arg2 +// arg3 = arg3 = Interface->QueueCount = arg3 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpInterfaceQueues, + TP_ARGS( + const void *, arg2, + unsigned int, arg3), + TP_FIELDS( + ctf_integer_hex(uint64_t, arg2, (uint64_t)arg2) + ctf_integer(unsigned int, arg3, arg3) + ) +) + + + /*---------------------------------------------------------- // Decoder Ring for XdpInitialize // [ xdp][%p] XDP initialized, %u procs @@ -51,6 +74,29 @@ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpInitialize, +/*---------------------------------------------------------- +// Decoder Ring for XdpInterfaceInitialize +// [ixdp][%p] Initializing interface %u +// QuicTraceLogVerbose( + XdpInterfaceInitialize, + "[ixdp][%p] Initializing interface %u", + Interface, + Interface->ActualIfIndex); +// arg2 = arg2 = Interface = arg2 +// arg3 = arg3 = Interface->ActualIfIndex = arg3 +----------------------------------------------------------*/ +TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpInterfaceInitialize, + TP_ARGS( + const void *, arg2, + unsigned int, arg3), + TP_FIELDS( + ctf_integer_hex(uint64_t, arg2, (uint64_t)arg2) + ctf_integer(unsigned int, arg3, arg3) + ) +) + + + /*---------------------------------------------------------- // Decoder Ring for XdpQueueStart // [ xdp][%p] XDP queue start on partition %p From 99361666859a65103ac6555af07e578430629423 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Thu, 29 Aug 2024 14:28:41 -0400 Subject: [PATCH 11/13] Missed the sidecar --- src/manifest/clog.sidecar | 42 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/src/manifest/clog.sidecar b/src/manifest/clog.sidecar index 226b674417..d93d46054c 100644 --- a/src/manifest/clog.sidecar +++ b/src/manifest/clog.sidecar @@ -12418,6 +12418,38 @@ ], "macroName": "QuicTraceLogVerbose" }, + "XdpInterfaceInitialize": { + "ModuleProperites": {}, + "TraceString": "[ixdp][%p] Initializing interface %u", + "UniqueId": "XdpInterfaceInitialize", + "splitArgs": [ + { + "DefinationEncoding": "p", + "MacroVariableName": "arg2" + }, + { + "DefinationEncoding": "u", + "MacroVariableName": "arg3" + } + ], + "macroName": "QuicTraceLogVerbose" + }, + "XdpInterfaceQueues": { + "ModuleProperites": {}, + "TraceString": "[ixdp][%p] Initializing %u queues on interface", + "UniqueId": "XdpInterfaceQueues", + "splitArgs": [ + { + "DefinationEncoding": "p", + "MacroVariableName": "arg2" + }, + { + "DefinationEncoding": "u", + "MacroVariableName": "arg3" + } + ], + "macroName": "QuicTraceLogVerbose" + }, "XdpLoadBpfObjectError": { "ModuleProperites": {}, "TraceString": "[ xdp] ERROR:, loading BPF-OBJ file:%s, %d: [%s].", @@ -17201,6 +17233,16 @@ "TraceID": "XdpInitialize", "EncodingString": "[ xdp][%p] XDP initialized, %u procs" }, + { + "UniquenessHash": "1b9f0d89-3821-2037-08c6-88d59b4e4dae", + "TraceID": "XdpInterfaceInitialize", + "EncodingString": "[ixdp][%p] Initializing interface %u" + }, + { + "UniquenessHash": "8101710f-c411-8be4-bddb-9cffaaa1cac3", + "TraceID": "XdpInterfaceQueues", + "EncodingString": "[ixdp][%p] Initializing %u queues on interface" + }, { "UniquenessHash": "83b58d1d-bb4b-653f-d379-3631b103ed34", "TraceID": "XdpLoadBpfObjectError", From b0573f363b284bc6a7d9c1b153c0fb272d942187 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Tue, 1 Oct 2024 09:14:21 -0400 Subject: [PATCH 12/13] PR feedback --- src/platform/datapath_raw_xdp_win.c | 56 +++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 14 deletions(-) diff --git a/src/platform/datapath_raw_xdp_win.c b/src/platform/datapath_raw_xdp_win.c index 3f0a80a14a..f8d7810f1c 100644 --- a/src/platform/datapath_raw_xdp_win.c +++ b/src/platform/datapath_raw_xdp_win.c @@ -23,6 +23,8 @@ #include "datapath_raw_xdp_win.c.clog.h" #endif +#define XDP_MAX_SYNC_WAIT_TIMEOUT_MS 1000 // Used for querying XDP RSS capabilities. + typedef struct XDP_DATAPATH { CXPLAT_DATAPATH_RAW; DECLSPEC_CACHEALIGN @@ -189,14 +191,30 @@ CxPlatGetRssQueueProcessors( uint32_t Flags = XSK_BIND_FLAG_TX; Status = Xdp->XdpApi->XskBind(TxXsk, InterfaceIndex, i, Flags); - if (QUIC_FAILED(Status)) { // No more queues. Break out. - *Count = i; + if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); - break; + if (Status == E_INVALIDARG) { // No more queues. Break out. + *Count = i; + break; // Expected failure if there is no more queue. + } + QuicTraceEvent( + LibraryErrorStatus, + "[ lib] ERROR, %u, %s.", + Status, + "XskBind (GetRssQueueProcessors)"); + return Status; } Status = Xdp->XdpApi->XskActivate(TxXsk, 0); - if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } + if (QUIC_FAILED(Status)) { + QuicTraceEvent( + LibraryErrorStatus, + "[ lib] ERROR, %u, %s.", + Status, + "XskActivate (GetRssQueueProcessors)"); + CloseHandle(TxXsk); + return Status; + } XSK_RING_INFO_SET TxRingInfo; uint32_t TxRingInfoSize = sizeof(TxRingInfo); @@ -217,7 +235,7 @@ CxPlatGetRssQueueProcessors( XskRingProducerSubmit(&TxRing, 1); XSK_NOTIFY_RESULT_FLAGS OutFlags; - Status = Xdp->XdpApi->XskNotifySocket(TxXsk, XSK_NOTIFY_FLAG_POKE_TX|XSK_NOTIFY_FLAG_WAIT_TX, 1000, &OutFlags); + Status = Xdp->XdpApi->XskNotifySocket(TxXsk, XSK_NOTIFY_FLAG_POKE_TX|XSK_NOTIFY_FLAG_WAIT_TX, XDP_MAX_SYNC_WAIT_TIMEOUT_MS, &OutFlags); if (QUIC_FAILED(Status)) { CloseHandle(TxXsk); return Status; } uint32_t CompIndex; @@ -372,8 +390,15 @@ CxPlatDpRawInterfaceInitialize( Interface->OffloadStatus.Transmit.NetworkLayerXsum = Xdp->SkipXsum; Interface->Xdp = Xdp; - uint32_t Processors[256]; // TODO - Use max processor count - Interface->QueueCount = ARRAYSIZE(Processors); + Interface->QueueCount = (uint16_t)CxPlatProcCount(); + uint32_t* Processors = + CXPLAT_ALLOC_NONPAGED( + Interface->QueueCount * sizeof(uint32_t), + QUIC_POOL_PLATFORM_TMP_ALLOC); + if (Processors == NULL) { + Status = QUIC_STATUS_OUT_OF_MEMORY; + goto Error; + } Status = Xdp->XdpApi->XdpInterfaceOpen(Interface->ActualIfIndex, &Interface->XdpHandle); if (QUIC_FAILED(Status)) { @@ -411,7 +436,7 @@ CxPlatDpRawInterfaceInitialize( Interface, Interface->QueueCount); - Interface->Queues = CxPlatAlloc(Interface->QueueCount * sizeof(*Interface->Queues), QUEUE_TAG); + Interface->Queues = CXPLAT_ALLOC_NONPAGED(Interface->QueueCount * sizeof(*Interface->Queues), QUEUE_TAG); if (Interface->Queues == NULL) { QuicTraceEvent( AllocFailure, @@ -443,7 +468,7 @@ CxPlatDpRawInterfaceInitialize( // RX datapath. // - Queue->RxBuffers = CxPlatAlloc(Xdp->RxBufferCount * RxPacketSize, RX_BUFFER_TAG); + Queue->RxBuffers = CXPLAT_ALLOC_NONPAGED(Xdp->RxBufferCount * RxPacketSize, RX_BUFFER_TAG); if (Queue->RxBuffers == NULL) { QuicTraceEvent( AllocFailure, @@ -565,7 +590,7 @@ CxPlatDpRawInterfaceInitialize( // TX datapath. // - Queue->TxBuffers = CxPlatAlloc(Xdp->TxBufferCount * sizeof(XDP_TX_PACKET), TX_BUFFER_TAG); + Queue->TxBuffers = CXPLAT_ALLOC_NONPAGED(Xdp->TxBufferCount * sizeof(XDP_TX_PACKET), TX_BUFFER_TAG); if (Queue->TxBuffers == NULL) { QuicTraceEvent( AllocFailure, @@ -711,6 +736,9 @@ CxPlatDpRawInterfaceInitialize( if (QUIC_FAILED(Status)) { CxPlatDpRawInterfaceUninitialize(Interface); } + if (Processors != NULL) { + CXPLAT_FREE(Processors, QUIC_POOL_PLATFORM_TMP_ALLOC); + } return Status; } @@ -793,7 +821,7 @@ CxPlatDpRawInterfaceAddRules( const size_t OldSize = sizeof(XDP_RULE) * (size_t)Interface->RuleCount; const size_t NewSize = sizeof(XDP_RULE) * ((size_t)Interface->RuleCount + Count); - XDP_RULE* NewRules = CxPlatAlloc(NewSize, RULE_TAG); + XDP_RULE* NewRules = CXPLAT_ALLOC_NONPAGED(NewSize, RULE_TAG); if (NewRules == NULL) { QuicTraceEvent( AllocFailure, @@ -962,7 +990,7 @@ CxPlatDpRawInitialize( GAA_FLAG_SKIP_DNS_INFO; do { - Adapters = (IP_ADAPTER_ADDRESSES*)CxPlatAlloc(AdaptersBufferSize, ADAPTER_TAG); + Adapters = (IP_ADAPTER_ADDRESSES*)CXPLAT_ALLOC_NONPAGED(AdaptersBufferSize, ADAPTER_TAG); if (Adapters == NULL) { QuicTraceEvent( AllocFailure, @@ -990,7 +1018,7 @@ CxPlatDpRawInitialize( if (Adapter->IfType == IF_TYPE_ETHERNET_CSMACD && Adapter->OperStatus == IfOperStatusUp && Adapter->PhysicalAddressLength == ETH_MAC_ADDR_LEN) { - XDP_INTERFACE* Interface = CxPlatAlloc(sizeof(XDP_INTERFACE), IF_TAG); + XDP_INTERFACE* Interface = CXPLAT_ALLOC_NONPAGED(sizeof(XDP_INTERFACE), IF_TAG); if (Interface == NULL) { QuicTraceEvent( AllocFailure, @@ -1395,7 +1423,7 @@ CxPlatDpRawPlumbRulesOnSocket( CxPlatLockRelease(&Interface->RuleLock); XDP_RULE NewRule = { .Match = MatchType, - .Pattern.IpPortSet.PortSet.PortSet = CxPlatAlloc(XDP_PORT_SET_BUFFER_SIZE, PORT_SET_TAG), + .Pattern.IpPortSet.PortSet.PortSet = CXPLAT_ALLOC_NONPAGED(XDP_PORT_SET_BUFFER_SIZE, PORT_SET_TAG), .Action = XDP_PROGRAM_ACTION_REDIRECT, .Redirect.TargetType = XDP_REDIRECT_TARGET_TYPE_XSK, .Redirect.Target = NULL, From 802a1cd8ec4310b087d96bf45770311e931892c0 Mon Sep 17 00:00:00 2001 From: Nick Banks Date: Tue, 1 Oct 2024 09:14:49 -0400 Subject: [PATCH 13/13] clog --- src/generated/linux/datapath_raw_xdp_win.c.clog.h | 10 +++++----- .../linux/datapath_raw_xdp_win.c.clog.h.lttng.h | 10 +++++----- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/generated/linux/datapath_raw_xdp_win.c.clog.h b/src/generated/linux/datapath_raw_xdp_win.c.clog.h index 4768dedd8a..b8e8a3d927 100644 --- a/src/generated/linux/datapath_raw_xdp_win.c.clog.h +++ b/src/generated/linux/datapath_raw_xdp_win.c.clog.h @@ -317,12 +317,12 @@ tracepoint(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpPartitionShutdownComplete , arg2);\ // Decoder Ring for LibraryErrorStatus // [ lib] ERROR, %u, %s. // QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - Status, - "XdpInterfaceOpen"); + LibraryErrorStatus, + "[ lib] ERROR, %u, %s.", + Status, + "XskBind (GetRssQueueProcessors)"); // arg2 = arg2 = Status = arg2 -// arg3 = arg3 = "XdpInterfaceOpen" = arg3 +// arg3 = arg3 = "XskBind (GetRssQueueProcessors)" = arg3 ----------------------------------------------------------*/ #ifndef _clog_4_ARGS_TRACE_LibraryErrorStatus #define _clog_4_ARGS_TRACE_LibraryErrorStatus(uniqueId, encoded_arg_string, arg2, arg3)\ diff --git a/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h b/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h index 6e0c0c2b44..891a5d88f3 100644 --- a/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h +++ b/src/generated/linux/datapath_raw_xdp_win.c.clog.h.lttng.h @@ -318,12 +318,12 @@ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, XdpPartitionShutdownComplete, // Decoder Ring for LibraryErrorStatus // [ lib] ERROR, %u, %s. // QuicTraceEvent( - LibraryErrorStatus, - "[ lib] ERROR, %u, %s.", - Status, - "XdpInterfaceOpen"); + LibraryErrorStatus, + "[ lib] ERROR, %u, %s.", + Status, + "XskBind (GetRssQueueProcessors)"); // arg2 = arg2 = Status = arg2 -// arg3 = arg3 = "XdpInterfaceOpen" = arg3 +// arg3 = arg3 = "XskBind (GetRssQueueProcessors)" = arg3 ----------------------------------------------------------*/ TRACEPOINT_EVENT(CLOG_DATAPATH_RAW_XDP_WIN_C, LibraryErrorStatus, TP_ARGS(