tipc: fix unlimited bundling of small messages
authorTuong Lien <tuong.t.lien@dektech.com.au>
Wed, 2 Oct 2019 11:49:43 +0000 (18:49 +0700)
committerDavid S. Miller <davem@davemloft.net>
Wed, 2 Oct 2019 15:02:05 +0000 (11:02 -0400)
We have identified a problem with the "oversubscription" policy in the
link transmission code.

When small messages are transmitted, and the sending link has reached
the transmit window limit, those messages will be bundled and put into
the link backlog queue. However, bundles of data messages are counted
at the 'CRITICAL' level, so that the counter for that level, instead of
the counter for the real, bundled message's level is the one being
increased.
Subsequent, to-be-bundled data messages at non-CRITICAL levels continue
to be tested against the unchanged counter for their own level, while
contributing to an unrestrained increase at the CRITICAL backlog level.

This leaves a gap in congestion control algorithm for small messages
that can result in starvation for other users or a "real" CRITICAL
user. Even that eventually can lead to buffer exhaustion & link reset.

We fix this by keeping a 'target_bskb' buffer pointer at each levels,
then when bundling, we only bundle messages at the same importance
level only. This way, we know exactly how many slots a certain level
have occupied in the queue, so can manage level congestion accurately.

By bundling messages at the same level, we even have more benefits. Let
consider this:
- One socket sends 64-byte messages at the 'CRITICAL' level;
- Another sends 4096-byte messages at the 'LOW' level;

When a 64-byte message comes and is bundled the first time, we put the
overhead of message bundle to it (+ 40-byte header, data copy, etc.)
for later use, but the next message can be a 4096-byte one that cannot
be bundled to the previous one. This means the last bundle carries only
one payload message which is totally inefficient, as for the receiver
also! Later on, another 64-byte message comes, now we make a new bundle
and the same story repeats...

With the new bundling algorithm, this will not happen, the 64-byte
messages will be bundled together even when the 4096-byte message(s)
comes in between. However, if the 4096-byte messages are sent at the
same level i.e. 'CRITICAL', the bundling algorithm will again cause the
same overhead.

Also, the same will happen even with only one socket sending small
messages at a rate close to the link transmit's one, so that, when one
message is bundled, it's transmitted shortly. Then, another message
comes, a new bundle is created and so on...

We will solve this issue radically by another patch.

Fixes: 365ad353c256 ("tipc: reduce risk of user starvation during link congestion")
Reported-by: Hoang Le <hoang.h.le@dektech.com.au>
Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/link.c
net/tipc/msg.c

index 6cc75ffd9e2c2741c7734c173609bc0f30ce679e..999eab592de8fe1bfd796c66f7a4349aa1ba20e6 100644 (file)
@@ -160,6 +160,7 @@ struct tipc_link {
        struct {
                u16 len;
                u16 limit;
+               struct sk_buff *target_bskb;
        } backlog[5];
        u16 snd_nxt;
        u16 window;
@@ -880,6 +881,7 @@ static void link_prepare_wakeup(struct tipc_link *l)
 void tipc_link_reset(struct tipc_link *l)
 {
        struct sk_buff_head list;
+       u32 imp;
 
        __skb_queue_head_init(&list);
 
@@ -901,11 +903,10 @@ void tipc_link_reset(struct tipc_link *l)
        __skb_queue_purge(&l->deferdq);
        __skb_queue_purge(&l->backlogq);
        __skb_queue_purge(&l->failover_deferdq);
-       l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
-       l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
-       l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
-       l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
-       l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
+       for (imp = 0; imp <= TIPC_SYSTEM_IMPORTANCE; imp++) {
+               l->backlog[imp].len = 0;
+               l->backlog[imp].target_bskb = NULL;
+       }
        kfree_skb(l->reasm_buf);
        kfree_skb(l->reasm_tnlmsg);
        kfree_skb(l->failover_reasm_skb);
@@ -947,7 +948,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
        u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
        struct sk_buff_head *transmq = &l->transmq;
        struct sk_buff_head *backlogq = &l->backlogq;
-       struct sk_buff *skb, *_skb, *bskb;
+       struct sk_buff *skb, *_skb, **tskb;
        int pkt_cnt = skb_queue_len(list);
        int rc = 0;
 
@@ -999,19 +1000,21 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
                        seqno++;
                        continue;
                }
-               if (tipc_msg_bundle(skb_peek_tail(backlogq), hdr, mtu)) {
+               tskb = &l->backlog[imp].target_bskb;
+               if (tipc_msg_bundle(*tskb, hdr, mtu)) {
                        kfree_skb(__skb_dequeue(list));
                        l->stats.sent_bundled++;
                        continue;
                }
-               if (tipc_msg_make_bundle(&bskb, hdr, mtu, l->addr)) {
+               if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) {
                        kfree_skb(__skb_dequeue(list));
-                       __skb_queue_tail(backlogq, bskb);
-                       l->backlog[msg_importance(buf_msg(bskb))].len++;
+                       __skb_queue_tail(backlogq, *tskb);
+                       l->backlog[imp].len++;
                        l->stats.sent_bundled++;
                        l->stats.sent_bundles++;
                        continue;
                }
+               l->backlog[imp].target_bskb = NULL;
                l->backlog[imp].len += skb_queue_len(list);
                skb_queue_splice_tail_init(list, backlogq);
        }
@@ -1027,6 +1030,7 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
        u16 seqno = l->snd_nxt;
        u16 ack = l->rcv_nxt - 1;
        u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
+       u32 imp;
 
        while (skb_queue_len(&l->transmq) < l->window) {
                skb = skb_peek(&l->backlogq);
@@ -1037,7 +1041,10 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
                        break;
                __skb_dequeue(&l->backlogq);
                hdr = buf_msg(skb);
-               l->backlog[msg_importance(hdr)].len--;
+               imp = msg_importance(hdr);
+               l->backlog[imp].len--;
+               if (unlikely(skb == l->backlog[imp].target_bskb))
+                       l->backlog[imp].target_bskb = NULL;
                __skb_queue_tail(&l->transmq, skb);
                /* next retransmit attempt */
                if (link_is_bc_sndlink(l))
index e6d49cdc61b40becff116590b668abdfcb99dc0b..922d262e153ff40a03de9125c0fe52300857fc8f 100644 (file)
@@ -543,10 +543,7 @@ bool tipc_msg_make_bundle(struct sk_buff **skb,  struct tipc_msg *msg,
        bmsg = buf_msg(_skb);
        tipc_msg_init(msg_prevnode(msg), bmsg, MSG_BUNDLER, 0,
                      INT_H_SIZE, dnode);
-       if (msg_isdata(msg))
-               msg_set_importance(bmsg, TIPC_CRITICAL_IMPORTANCE);
-       else
-               msg_set_importance(bmsg, TIPC_SYSTEM_IMPORTANCE);
+       msg_set_importance(bmsg, msg_importance(msg));
        msg_set_seqno(bmsg, msg_seqno(msg));
        msg_set_ack(bmsg, msg_ack(msg));
        msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));