Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
tcp: implement rb-tree based retransmit queue
Browse files Browse the repository at this point in the history
Using a linear list to store all skbs in write queue has been okay
for quite a while : O(N) is not too bad when N < 500.

Things get messy when N is the order of 100,000 : Modern TCP stacks
want 10Gbit+ of throughput even with 200 ms RTT flows.

40 ns per cache line miss means a full scan can use 4 ms,
blowing away CPU caches.

SACK processing often can use various hints to avoid parsing
whole retransmit queue. But with high packet losses and/or high
reordering, hints no longer work.

Sender has to process thousands of unfriendly SACK, accumulating
a huge socket backlog, burning a cpu and massively dropping packets.

Using an rb-tree for retransmit queue has been avoided for years
because it added complexity and overhead, but now is the time
to be more resistant and say no to quadratic behavior.

1) RTX queue is no longer part of the write queue : already sent skbs
are stored in one rb-tree.

2) Since reaching the head of write queue no longer needs
sk->sk_send_head, we added an union of sk_send_head and tcp_rtx_queue

Tested:

 On receiver :
 netem on ingress : delay 150ms 200us loss 1
 GRO disabled to force stress and SACK storms.

for f in `seq 1 10`
do
 ./netperf -H lpaa6 -l30 -- -K bbr -o THROUGHPUT|tail -1
done | awk '{print $0} {sum += $0} END {printf "%7u\n",sum}'

Before patch :

323.87
351.48
339.59
338.62
306.72
204.07
304.93
291.88
202.47
176.88
   2840

After patch:

1700.83
2207.98
2070.17
1544.26
2114.76
2124.89
1693.14
1080.91
2216.82
1299.94
  18053

Signed-off-by: Eric Dumazet <edumazet@google.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
  • Loading branch information
Eric Dumazet authored and davem330 committed Oct 6, 2017
1 parent f331981 commit 75c119a
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 188 deletions.
7 changes: 5 additions & 2 deletions include/net/sock.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
#include <linux/sched.h>
#include <linux/wait.h>
#include <linux/cgroup-defs.h>

#include <linux/rbtree.h>
#include <linux/filter.h>
#include <linux/rculist_nulls.h>
#include <linux/poll.h>
Expand Down Expand Up @@ -397,7 +397,10 @@ struct sock {
int sk_wmem_queued;
refcount_t sk_wmem_alloc;
unsigned long sk_tsq_flags;
struct sk_buff *sk_send_head;
union {
struct sk_buff *sk_send_head;
struct rb_root tcp_rtx_queue;
};
struct sk_buff_head sk_write_queue;
__s32 sk_peek_off;
int sk_write_pending;
Expand Down
89 changes: 47 additions & 42 deletions include/net/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,13 @@ void tcp_xmit_retransmit_queue(struct sock *);
void tcp_simple_retransmit(struct sock *);
void tcp_enter_recovery(struct sock *sk, bool ece_ack);
int tcp_trim_head(struct sock *, struct sk_buff *, u32);
int tcp_fragment(struct sock *, struct sk_buff *, u32, unsigned int, gfp_t);
enum tcp_queue {
TCP_FRAG_IN_WRITE_QUEUE,
TCP_FRAG_IN_RTX_QUEUE,
};
int tcp_fragment(struct sock *sk, enum tcp_queue tcp_queue,
struct sk_buff *skb, u32 len,
unsigned int mss_now, gfp_t gfp);

void tcp_send_probe0(struct sock *);
void tcp_send_partial(struct sock *);
Expand Down Expand Up @@ -1608,6 +1614,11 @@ static inline void tcp_skb_tsorted_anchor_cleanup(struct sk_buff *skb)

void tcp_write_queue_purge(struct sock *sk);

static inline struct sk_buff *tcp_rtx_queue_head(const struct sock *sk)
{
return skb_rb_first(&sk->tcp_rtx_queue);
}

static inline struct sk_buff *tcp_write_queue_head(const struct sock *sk)
{
return skb_peek(&sk->sk_write_queue);
Expand All @@ -1630,18 +1641,12 @@ static inline struct sk_buff *tcp_write_queue_prev(const struct sock *sk,
return skb_queue_prev(&sk->sk_write_queue, skb);
}

#define tcp_for_write_queue(skb, sk) \
skb_queue_walk(&(sk)->sk_write_queue, skb)

#define tcp_for_write_queue_from(skb, sk) \
skb_queue_walk_from(&(sk)->sk_write_queue, skb)

#define tcp_for_write_queue_from_safe(skb, tmp, sk) \
skb_queue_walk_from_safe(&(sk)->sk_write_queue, skb, tmp)

static inline struct sk_buff *tcp_send_head(const struct sock *sk)
{
return sk->sk_send_head;
return skb_peek(&sk->sk_write_queue);
}

static inline bool tcp_skb_is_last(const struct sock *sk,
Expand All @@ -1650,29 +1655,30 @@ static inline bool tcp_skb_is_last(const struct sock *sk,
return skb_queue_is_last(&sk->sk_write_queue, skb);
}

static inline void tcp_advance_send_head(struct sock *sk, const struct sk_buff *skb)
static inline bool tcp_write_queue_empty(const struct sock *sk)
{
if (tcp_skb_is_last(sk, skb))
sk->sk_send_head = NULL;
else
sk->sk_send_head = tcp_write_queue_next(sk, skb);
return skb_queue_empty(&sk->sk_write_queue);
}

static inline bool tcp_rtx_queue_empty(const struct sock *sk)
{
return RB_EMPTY_ROOT(&sk->tcp_rtx_queue);
}

static inline bool tcp_rtx_and_write_queues_empty(const struct sock *sk)
{
return tcp_rtx_queue_empty(sk) && tcp_write_queue_empty(sk);
}

static inline void tcp_check_send_head(struct sock *sk, struct sk_buff *skb_unlinked)
{
if (sk->sk_send_head == skb_unlinked) {
sk->sk_send_head = NULL;
if (tcp_write_queue_empty(sk))
tcp_chrono_stop(sk, TCP_CHRONO_BUSY);
}

if (tcp_sk(sk)->highest_sack == skb_unlinked)
tcp_sk(sk)->highest_sack = NULL;
}

static inline void tcp_init_send_head(struct sock *sk)
{
sk->sk_send_head = NULL;
}

static inline void __tcp_add_write_queue_tail(struct sock *sk, struct sk_buff *skb)
{
__skb_queue_tail(&sk->sk_write_queue, skb);
Expand All @@ -1683,8 +1689,7 @@ static inline void tcp_add_write_queue_tail(struct sock *sk, struct sk_buff *skb
__tcp_add_write_queue_tail(sk, skb);

/* Queue it, remembering where we must start sending. */
if (sk->sk_send_head == NULL) {
sk->sk_send_head = skb;
if (sk->sk_write_queue.next == skb) {
tcp_chrono_start(sk, TCP_CHRONO_BUSY);

if (tcp_sk(sk)->highest_sack == NULL)
Expand All @@ -1697,35 +1702,32 @@ static inline void __tcp_add_write_queue_head(struct sock *sk, struct sk_buff *s
__skb_queue_head(&sk->sk_write_queue, skb);
}

/* Insert buff after skb on the write queue of sk. */
static inline void tcp_insert_write_queue_after(struct sk_buff *skb,
struct sk_buff *buff,
struct sock *sk)
{
__skb_queue_after(&sk->sk_write_queue, skb, buff);
}

/* Insert new before skb on the write queue of sk. */
static inline void tcp_insert_write_queue_before(struct sk_buff *new,
struct sk_buff *skb,
struct sock *sk)
{
__skb_queue_before(&sk->sk_write_queue, skb, new);

if (sk->sk_send_head == skb)
sk->sk_send_head = new;
}

static inline void tcp_unlink_write_queue(struct sk_buff *skb, struct sock *sk)
{
list_del(&skb->tcp_tsorted_anchor);
tcp_skb_tsorted_anchor_cleanup(skb);
__skb_unlink(skb, &sk->sk_write_queue);
}

static inline bool tcp_write_queue_empty(struct sock *sk)
void tcp_rbtree_insert(struct rb_root *root, struct sk_buff *skb);

static inline void tcp_rtx_queue_unlink(struct sk_buff *skb, struct sock *sk)
{
return skb_queue_empty(&sk->sk_write_queue);
tcp_skb_tsorted_anchor_cleanup(skb);
rb_erase(&skb->rbnode, &sk->tcp_rtx_queue);
}

static inline void tcp_rtx_queue_unlink_and_free(struct sk_buff *skb, struct sock *sk)
{
list_del(&skb->tcp_tsorted_anchor);
tcp_rtx_queue_unlink(skb, sk);
sk_wmem_free_skb(sk, skb);
}

static inline void tcp_push_pending_frames(struct sock *sk)
Expand Down Expand Up @@ -1754,8 +1756,9 @@ static inline u32 tcp_highest_sack_seq(struct tcp_sock *tp)

static inline void tcp_advance_highest_sack(struct sock *sk, struct sk_buff *skb)
{
tcp_sk(sk)->highest_sack = tcp_skb_is_last(sk, skb) ? NULL :
tcp_write_queue_next(sk, skb);
struct sk_buff *next = skb_rb_next(skb);

tcp_sk(sk)->highest_sack = next ?: tcp_send_head(sk);
}

static inline struct sk_buff *tcp_highest_sack(struct sock *sk)
Expand All @@ -1765,7 +1768,9 @@ static inline struct sk_buff *tcp_highest_sack(struct sock *sk)

static inline void tcp_highest_sack_reset(struct sock *sk)
{
tcp_sk(sk)->highest_sack = tcp_write_queue_head(sk);
struct sk_buff *skb = tcp_rtx_queue_head(sk);

tcp_sk(sk)->highest_sack = skb ?: tcp_send_head(sk);
}

/* Called when old skb is about to be deleted (to be combined with new skb) */
Expand Down Expand Up @@ -1935,7 +1940,7 @@ extern void tcp_rack_reo_timeout(struct sock *sk);
/* At how many usecs into the future should the RTO fire? */
static inline s64 tcp_rto_delta_us(const struct sock *sk)
{
const struct sk_buff *skb = tcp_write_queue_head(sk);
const struct sk_buff *skb = tcp_rtx_queue_head(sk);
u32 rto = inet_csk(sk)->icsk_rto;
u64 rto_time_stamp_us = skb->skb_mstamp + jiffies_to_usecs(rto);

Expand Down
41 changes: 32 additions & 9 deletions net/ipv4/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ void tcp_init_sock(struct sock *sk)
struct tcp_sock *tp = tcp_sk(sk);

tp->out_of_order_queue = RB_ROOT;
sk->tcp_rtx_queue = RB_ROOT;
tcp_init_xmit_timers(sk);
INIT_LIST_HEAD(&tp->tsq_node);
INIT_LIST_HEAD(&tp->tsorted_sent_queue);
Expand Down Expand Up @@ -701,10 +702,9 @@ static void tcp_push(struct sock *sk, int flags, int mss_now,
struct tcp_sock *tp = tcp_sk(sk);
struct sk_buff *skb;

if (!tcp_send_head(sk))
return;

skb = tcp_write_queue_tail(sk);
if (!skb)
return;
if (!(flags & MSG_MORE) || forced_push(tp))
tcp_mark_push(tp, skb);

Expand Down Expand Up @@ -964,14 +964,14 @@ ssize_t do_tcp_sendpages(struct sock *sk, struct page *page, int offset,
int copy, i;
bool can_coalesce;

if (!tcp_send_head(sk) || (copy = size_goal - skb->len) <= 0 ||
if (!skb || (copy = size_goal - skb->len) <= 0 ||
!tcp_skb_can_collapse_to(skb)) {
new_segment:
if (!sk_stream_memory_free(sk))
goto wait_for_sndbuf;

skb = sk_stream_alloc_skb(sk, 0, sk->sk_allocation,
skb_queue_empty(&sk->sk_write_queue));
tcp_rtx_and_write_queues_empty(sk));
if (!skb)
goto wait_for_memory;

Expand Down Expand Up @@ -1199,7 +1199,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
goto out_err;
}

skb = tcp_send_head(sk) ? tcp_write_queue_tail(sk) : NULL;
skb = tcp_write_queue_tail(sk);
uarg = sock_zerocopy_realloc(sk, size, skb_zcopy(skb));
if (!uarg) {
err = -ENOBUFS;
Expand Down Expand Up @@ -1275,7 +1275,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
int max = size_goal;

skb = tcp_write_queue_tail(sk);
if (tcp_send_head(sk)) {
if (skb) {
if (skb->ip_summed == CHECKSUM_NONE)
max = mss_now;
copy = max - skb->len;
Expand All @@ -1295,7 +1295,7 @@ int tcp_sendmsg_locked(struct sock *sk, struct msghdr *msg, size_t size)
process_backlog = false;
goto restart;
}
first_skb = skb_queue_empty(&sk->sk_write_queue);
first_skb = tcp_rtx_and_write_queues_empty(sk);
skb = sk_stream_alloc_skb(sk,
select_size(sk, sg, first_skb),
sk->sk_allocation,
Expand Down Expand Up @@ -1521,6 +1521,13 @@ static int tcp_peek_sndq(struct sock *sk, struct msghdr *msg, int len)

/* XXX -- need to support SO_PEEK_OFF */

skb_rbtree_walk(skb, &sk->tcp_rtx_queue) {
err = skb_copy_datagram_msg(skb, 0, msg, skb->len);
if (err)
return err;
copied += skb->len;
}

skb_queue_walk(&sk->sk_write_queue, skb) {
err = skb_copy_datagram_msg(skb, 0, msg, skb->len);
if (err)
Expand Down Expand Up @@ -2320,6 +2327,22 @@ static inline bool tcp_need_reset(int state)
TCPF_FIN_WAIT2 | TCPF_SYN_RECV);
}

static void tcp_rtx_queue_purge(struct sock *sk)
{
struct rb_node *p = rb_first(&sk->tcp_rtx_queue);

while (p) {
struct sk_buff *skb = rb_to_skb(p);

p = rb_next(p);
/* Since we are deleting whole queue, no need to
* list_del(&skb->tcp_tsorted_anchor)
*/
tcp_rtx_queue_unlink(skb, sk);
sk_wmem_free_skb(sk, skb);
}
}

void tcp_write_queue_purge(struct sock *sk)
{
struct sk_buff *skb;
Expand All @@ -2329,6 +2352,7 @@ void tcp_write_queue_purge(struct sock *sk)
tcp_skb_tsorted_anchor_cleanup(skb);
sk_wmem_free_skb(sk, skb);
}
tcp_rtx_queue_purge(sk);
INIT_LIST_HEAD(&tcp_sk(sk)->tsorted_sent_queue);
sk_mem_reclaim(sk);
tcp_clear_all_retrans_hints(tcp_sk(sk));
Expand Down Expand Up @@ -2392,7 +2416,6 @@ int tcp_disconnect(struct sock *sk, int flags)
* issue in __tcp_select_window()
*/
icsk->icsk_ack.rcv_mss = TCP_MIN_MSS;
tcp_init_send_head(sk);
memset(&tp->rx_opt, 0, sizeof(tp->rx_opt));
__sk_dst_reset(sk);
dst_release(sk->sk_rx_dst);
Expand Down
Loading

0 comments on commit 75c119a

Please sign in to comment.