net: sched: use skb list for skb_bad_tx
authorJohn Fastabend <john.fastabend@gmail.com>
Thu, 7 Dec 2017 17:56:23 +0000 (09:56 -0800)
committerDavid S. Miller <davem@davemloft.net>
Fri, 8 Dec 2017 18:32:26 +0000 (13:32 -0500)
Similar to how gso is handled use skb list for skb_bad_tx this is
required with lockless qdiscs because we may have multiple cores
attempting to push skbs into skb_bad_tx concurrently

Signed-off-by: John Fastabend <john.fastabend@gmail.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
include/net/sch_generic.h
net/sched/sch_generic.c

index 9b9e4feda1273e68f01fb010be4ef790461bbb01..da2528036e2e52c6fd6075602c5051a094e1e46b 100644 (file)
@@ -95,7 +95,7 @@ struct Qdisc {
        struct gnet_stats_queue qstats;
        unsigned long           state;
        struct Qdisc            *next_sched;
-       struct sk_buff          *skb_bad_txq;
+       struct sk_buff_head     skb_bad_txq;
        int                     padded;
        refcount_t              refcnt;
 
index 482ba2234470732167f57356a9d192ff52ea9e5b..84cef0570862bcf45d1fffcc9de53f283c2e4549 100644 (file)
@@ -45,6 +45,68 @@ EXPORT_SYMBOL(default_qdisc_ops);
  * - ingress filtering is also serialized via qdisc root lock
  * - updates to tree and tree walking are only done under the rtnl mutex.
  */
+
+static inline struct sk_buff *__skb_dequeue_bad_txq(struct Qdisc *q)
+{
+       const struct netdev_queue *txq = q->dev_queue;
+       spinlock_t *lock = NULL;
+       struct sk_buff *skb;
+
+       if (q->flags & TCQ_F_NOLOCK) {
+               lock = qdisc_lock(q);
+               spin_lock(lock);
+       }
+
+       skb = skb_peek(&q->skb_bad_txq);
+       if (skb) {
+               /* check the reason of requeuing without tx lock first */
+               txq = skb_get_tx_queue(txq->dev, skb);
+               if (!netif_xmit_frozen_or_stopped(txq)) {
+                       skb = __skb_dequeue(&q->skb_bad_txq);
+                       if (qdisc_is_percpu_stats(q)) {
+                               qdisc_qstats_cpu_backlog_dec(q, skb);
+                               qdisc_qstats_cpu_qlen_dec(q);
+                       } else {
+                               qdisc_qstats_backlog_dec(q, skb);
+                               q->q.qlen--;
+                       }
+               } else {
+                       skb = NULL;
+               }
+       }
+
+       if (lock)
+               spin_unlock(lock);
+
+       return skb;
+}
+
+static inline struct sk_buff *qdisc_dequeue_skb_bad_txq(struct Qdisc *q)
+{
+       struct sk_buff *skb = skb_peek(&q->skb_bad_txq);
+
+       if (unlikely(skb))
+               skb = __skb_dequeue_bad_txq(q);
+
+       return skb;
+}
+
+static inline void qdisc_enqueue_skb_bad_txq(struct Qdisc *q,
+                                            struct sk_buff *skb)
+{
+       spinlock_t *lock = NULL;
+
+       if (q->flags & TCQ_F_NOLOCK) {
+               lock = qdisc_lock(q);
+               spin_lock(lock);
+       }
+
+       __skb_queue_tail(&q->skb_bad_txq, skb);
+
+       if (lock)
+               spin_unlock(lock);
+}
+
 static inline int __dev_requeue_skb(struct sk_buff *skb, struct Qdisc *q)
 {
        __skb_queue_head(&q->gso_skb, skb);
@@ -117,9 +179,15 @@ static void try_bulk_dequeue_skb_slow(struct Qdisc *q,
                if (!nskb)
                        break;
                if (unlikely(skb_get_queue_mapping(nskb) != mapping)) {
-                       q->skb_bad_txq = nskb;
-                       qdisc_qstats_backlog_inc(q, nskb);
-                       q->q.qlen++;
+                       qdisc_enqueue_skb_bad_txq(q, nskb);
+
+                       if (qdisc_is_percpu_stats(q)) {
+                               qdisc_qstats_cpu_backlog_inc(q, nskb);
+                               qdisc_qstats_cpu_qlen_inc(q);
+                       } else {
+                               qdisc_qstats_backlog_inc(q, nskb);
+                               q->q.qlen++;
+                       }
                        break;
                }
                skb->next = nskb;
@@ -180,19 +248,9 @@ static struct sk_buff *dequeue_skb(struct Qdisc *q, bool *validate,
        }
 validate:
        *validate = true;
-       skb = q->skb_bad_txq;
-       if (unlikely(skb)) {
-               /* check the reason of requeuing without tx lock first */
-               txq = skb_get_tx_queue(txq->dev, skb);
-               if (!netif_xmit_frozen_or_stopped(txq)) {
-                       q->skb_bad_txq = NULL;
-                       qdisc_qstats_backlog_dec(q, skb);
-                       q->q.qlen--;
-                       goto bulk;
-               }
-               skb = NULL;
-               goto trace;
-       }
+       skb = qdisc_dequeue_skb_bad_txq(q);
+       if (unlikely(skb))
+               goto bulk;
        if (!(q->flags & TCQ_F_ONETXQUEUE) ||
            !netif_xmit_frozen_or_stopped(txq))
                skb = q->dequeue(q);
@@ -680,6 +738,7 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
                sch->padded = (char *) sch - (char *) p;
        }
        __skb_queue_head_init(&sch->gso_skb);
+       __skb_queue_head_init(&sch->skb_bad_txq);
        qdisc_skb_head_init(&sch->q);
        spin_lock_init(&sch->q.lock);
 
@@ -753,14 +812,16 @@ void qdisc_reset(struct Qdisc *qdisc)
        if (ops->reset)
                ops->reset(qdisc);
 
-       kfree_skb(qdisc->skb_bad_txq);
-       qdisc->skb_bad_txq = NULL;
-
        skb_queue_walk_safe(&qdisc->gso_skb, skb, tmp) {
                __skb_unlink(skb, &qdisc->gso_skb);
                kfree_skb_list(skb);
        }
 
+       skb_queue_walk_safe(&qdisc->skb_bad_txq, skb, tmp) {
+               __skb_unlink(skb, &qdisc->skb_bad_txq);
+               kfree_skb_list(skb);
+       }
+
        qdisc->q.qlen = 0;
        qdisc->qstats.backlog = 0;
 }
@@ -804,7 +865,11 @@ void qdisc_destroy(struct Qdisc *qdisc)
                kfree_skb_list(skb);
        }
 
-       kfree_skb(qdisc->skb_bad_txq);
+       skb_queue_walk_safe(&qdisc->skb_bad_txq, skb, tmp) {
+               __skb_unlink(skb, &qdisc->skb_bad_txq);
+               kfree_skb_list(skb);
+       }
+
        qdisc_free(qdisc);
 }
 EXPORT_SYMBOL(qdisc_destroy);
@@ -1042,6 +1107,7 @@ static void dev_init_scheduler_queue(struct net_device *dev,
        rcu_assign_pointer(dev_queue->qdisc, qdisc);
        dev_queue->qdisc_sleeping = qdisc;
        __skb_queue_head_init(&qdisc->gso_skb);
+       __skb_queue_head_init(&qdisc->skb_bad_txq);
 }
 
 void dev_init_scheduler(struct net_device *dev)