sctp: introduce priority based stream scheduler
authorMarcelo Ricardo Leitner <marcelo.leitner@gmail.com>
Tue, 3 Oct 2017 22:20:16 +0000 (19:20 -0300)
committerDavid S. Miller <davem@davemloft.net>
Tue, 3 Oct 2017 23:27:29 +0000 (16:27 -0700)
This patch introduces RFC Draft ndata section 3.4 Priority Based
Scheduler (SCTP_SS_PRIO).

It works by having a struct sctp_stream_priority for each priority
configured. This struct is then enlisted on a queue ordered per priority
if, and only if, there is a stream with data queued, so that dequeueing
is very straightforward: either finish current datamsg or simply dequeue
from the highest priority queued, which is the next stream pointed, and
that's it.

If there are multiple streams assigned with the same priority and with
data queued, it will do round robin amongst them while respecting
datamsgs boundaries (when not using idata chunks), to be reasonably
fair.

We intentionally don't maintain a list of priorities nor a list of all
streams with the same priority to save memory. The first would mean at
least 2 other pointers per priority (which, for 1000 priorities, that
can mean 16kB) and the second would also mean 2 other pointers but per
stream. As SCTP supports up to 65535 streams on a given asoc, that's
1MB. This impacts when giving a priority to some stream, as we have to
find out if the new priority is already being used and if we can free
the old one, and also when tearing down.

The new fields in struct sctp_stream_out_ext and sctp_stream are added
under a union because that memory is to be shared with other schedulers.

See-also: https://tools.ietf.org/html/draft-ietf-tsvwg-sctp-ndata-13
Tested-by: Xin Long <lucien.xin@gmail.com>
Signed-off-by: Marcelo Ricardo Leitner <marcelo.leitner@gmail.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
include/net/sctp/structs.h
include/uapi/linux/sctp.h
net/sctp/Makefile
net/sctp/stream_sched.c
net/sctp/stream_sched_prio.c [new file with mode: 0644]

index 3c22a30fd71b4ef87419a77cf69b00807a5986bb..40eb8d66a37c3ecee39141dc111663f7aac7326a 100644 (file)
@@ -1328,10 +1328,27 @@ struct sctp_inithdr_host {
        __u32 initial_tsn;
 };
 
+struct sctp_stream_priorities {
+       /* List of priorities scheduled */
+       struct list_head prio_sched;
+       /* List of streams scheduled */
+       struct list_head active;
+       /* The next stream stream in line */
+       struct sctp_stream_out_ext *next;
+       __u16 prio;
+};
+
 struct sctp_stream_out_ext {
        __u64 abandoned_unsent[SCTP_PR_INDEX(MAX) + 1];
        __u64 abandoned_sent[SCTP_PR_INDEX(MAX) + 1];
        struct list_head outq; /* chunks enqueued by this stream */
+       union {
+               struct {
+                       /* Scheduled streams list */
+                       struct list_head prio_list;
+                       struct sctp_stream_priorities *prio_head;
+               };
+       };
 };
 
 struct sctp_stream_out {
@@ -1351,6 +1368,13 @@ struct sctp_stream {
        __u16 incnt;
        /* Current stream being sent, if any */
        struct sctp_stream_out *out_curr;
+       union {
+               /* Fields used by priority scheduler */
+               struct {
+                       /* List of priorities scheduled */
+                       struct list_head prio_list;
+               };
+       };
 };
 
 #define SCTP_STREAM_CLOSED             0x00
index 00ac417d2c4f8468ea2aad32e59806be5c5aa08d..850fa8b29d7e8163dc4ee88af192309bb2535ae9 100644 (file)
@@ -1099,7 +1099,8 @@ struct sctp_add_streams {
 /* SCTP Stream schedulers */
 enum sctp_sched_type {
        SCTP_SS_FCFS,
-       SCTP_SS_MAX = SCTP_SS_FCFS
+       SCTP_SS_PRIO,
+       SCTP_SS_MAX = SCTP_SS_PRIO
 };
 
 #endif /* _UAPI_SCTP_H */
index 0f6e6d1d69fd336b4a99f896851b0120f9a0d1e0..647c9cfd4e95be4429d25792e5832d7be2efc5c8 100644 (file)
@@ -12,7 +12,7 @@ sctp-y := sm_statetable.o sm_statefuns.o sm_sideeffect.o \
          inqueue.o outqueue.o ulpqueue.o \
          tsnmap.o bind_addr.o socket.o primitive.o \
          output.o input.o debug.o stream.o auth.o \
-         offload.o stream_sched.o
+         offload.o stream_sched.o stream_sched_prio.o
 
 sctp_probe-y := probe.o
 
index 40a9a9de2b98a56786a4c8585f5ad514be9189af..115ddb7651695cca7417cb63004a1a59c93523b8 100644 (file)
@@ -121,8 +121,11 @@ static struct sctp_sched_ops sctp_sched_fcfs = {
 
 /* API to other parts of the stack */
 
+extern struct sctp_sched_ops sctp_sched_prio;
+
 struct sctp_sched_ops *sctp_sched_ops[] = {
        &sctp_sched_fcfs,
+       &sctp_sched_prio,
 };
 
 int sctp_sched_set_sched(struct sctp_association *asoc,
diff --git a/net/sctp/stream_sched_prio.c b/net/sctp/stream_sched_prio.c
new file mode 100644 (file)
index 0000000..384dbf3
--- /dev/null
@@ -0,0 +1,347 @@
+/* SCTP kernel implementation
+ * (C) Copyright Red Hat Inc. 2017
+ *
+ * This file is part of the SCTP kernel implementation
+ *
+ * These functions manipulate sctp stream queue/scheduling.
+ *
+ * This SCTP implementation is free software;
+ * you can redistribute it and/or modify it under the terms of
+ * the GNU General Public License as published by
+ * the Free Software Foundation; either version 2, or (at your option)
+ * any later version.
+ *
+ * This SCTP implementation is distributed in the hope that it
+ * will be useful, but WITHOUT ANY WARRANTY; without even the implied
+ *                 ************************
+ * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ * See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU CC; see the file COPYING.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * Please send any bug reports or fixes you make to the
+ * email addresched(es):
+ *    lksctp developers <linux-sctp@vger.kernel.org>
+ *
+ * Written or modified by:
+ *    Marcelo Ricardo Leitner <marcelo.leitner@gmail.com>
+ */
+
+#include <linux/list.h>
+#include <net/sctp/sctp.h>
+#include <net/sctp/sm.h>
+#include <net/sctp/stream_sched.h>
+
+/* Priority handling
+ * RFC DRAFT ndata section 3.4
+ */
+
+static void sctp_sched_prio_unsched_all(struct sctp_stream *stream);
+
+static struct sctp_stream_priorities *sctp_sched_prio_new_head(
+                       struct sctp_stream *stream, int prio, gfp_t gfp)
+{
+       struct sctp_stream_priorities *p;
+
+       p = kmalloc(sizeof(*p), gfp);
+       if (!p)
+               return NULL;
+
+       INIT_LIST_HEAD(&p->prio_sched);
+       INIT_LIST_HEAD(&p->active);
+       p->next = NULL;
+       p->prio = prio;
+
+       return p;
+}
+
+static struct sctp_stream_priorities *sctp_sched_prio_get_head(
+                       struct sctp_stream *stream, int prio, gfp_t gfp)
+{
+       struct sctp_stream_priorities *p;
+       int i;
+
+       /* Look into scheduled priorities first, as they are sorted and
+        * we can find it fast IF it's scheduled.
+        */
+       list_for_each_entry(p, &stream->prio_list, prio_sched) {
+               if (p->prio == prio)
+                       return p;
+               if (p->prio > prio)
+                       break;
+       }
+
+       /* No luck. So we search on all streams now. */
+       for (i = 0; i < stream->outcnt; i++) {
+               if (!stream->out[i].ext)
+                       continue;
+
+               p = stream->out[i].ext->prio_head;
+               if (!p)
+                       /* Means all other streams won't be initialized
+                        * as well.
+                        */
+                       break;
+               if (p->prio == prio)
+                       return p;
+       }
+
+       /* If not even there, allocate a new one. */
+       return sctp_sched_prio_new_head(stream, prio, gfp);
+}
+
+static void sctp_sched_prio_next_stream(struct sctp_stream_priorities *p)
+{
+       struct list_head *pos;
+
+       pos = p->next->prio_list.next;
+       if (pos == &p->active)
+               pos = pos->next;
+       p->next = list_entry(pos, struct sctp_stream_out_ext, prio_list);
+}
+
+static bool sctp_sched_prio_unsched(struct sctp_stream_out_ext *soute)
+{
+       bool scheduled = false;
+
+       if (!list_empty(&soute->prio_list)) {
+               struct sctp_stream_priorities *prio_head = soute->prio_head;
+
+               /* Scheduled */
+               scheduled = true;
+
+               if (prio_head->next == soute)
+                       /* Try to move to the next stream */
+                       sctp_sched_prio_next_stream(prio_head);
+
+               list_del_init(&soute->prio_list);
+
+               /* Also unsched the priority if this was the last stream */
+               if (list_empty(&prio_head->active)) {
+                       list_del_init(&prio_head->prio_sched);
+                       /* If there is no stream left, clear next */
+                       prio_head->next = NULL;
+               }
+       }
+
+       return scheduled;
+}
+
+static void sctp_sched_prio_sched(struct sctp_stream *stream,
+                                 struct sctp_stream_out_ext *soute)
+{
+       struct sctp_stream_priorities *prio, *prio_head;
+
+       prio_head = soute->prio_head;
+
+       /* Nothing to do if already scheduled */
+       if (!list_empty(&soute->prio_list))
+               return;
+
+       /* Schedule the stream. If there is a next, we schedule the new
+        * one before it, so it's the last in round robin order.
+        * If there isn't, we also have to schedule the priority.
+        */
+       if (prio_head->next) {
+               list_add(&soute->prio_list, prio_head->next->prio_list.prev);
+               return;
+       }
+
+       list_add(&soute->prio_list, &prio_head->active);
+       prio_head->next = soute;
+
+       list_for_each_entry(prio, &stream->prio_list, prio_sched) {
+               if (prio->prio > prio_head->prio) {
+                       list_add(&prio_head->prio_sched, prio->prio_sched.prev);
+                       return;
+               }
+       }
+
+       list_add_tail(&prio_head->prio_sched, &stream->prio_list);
+}
+
+static int sctp_sched_prio_set(struct sctp_stream *stream, __u16 sid,
+                              __u16 prio, gfp_t gfp)
+{
+       struct sctp_stream_out *sout = &stream->out[sid];
+       struct sctp_stream_out_ext *soute = sout->ext;
+       struct sctp_stream_priorities *prio_head, *old;
+       bool reschedule = false;
+       int i;
+
+       prio_head = sctp_sched_prio_get_head(stream, prio, gfp);
+       if (!prio_head)
+               return -ENOMEM;
+
+       reschedule = sctp_sched_prio_unsched(soute);
+       old = soute->prio_head;
+       soute->prio_head = prio_head;
+       if (reschedule)
+               sctp_sched_prio_sched(stream, soute);
+
+       if (!old)
+               /* Happens when we set the priority for the first time */
+               return 0;
+
+       for (i = 0; i < stream->outcnt; i++) {
+               soute = stream->out[i].ext;
+               if (soute && soute->prio_head == old)
+                       /* It's still in use, nothing else to do here. */
+                       return 0;
+       }
+
+       /* No hits, we are good to free it. */
+       kfree(old);
+
+       return 0;
+}
+
+static int sctp_sched_prio_get(struct sctp_stream *stream, __u16 sid,
+                              __u16 *value)
+{
+       *value = stream->out[sid].ext->prio_head->prio;
+       return 0;
+}
+
+static int sctp_sched_prio_init(struct sctp_stream *stream)
+{
+       INIT_LIST_HEAD(&stream->prio_list);
+
+       return 0;
+}
+
+static int sctp_sched_prio_init_sid(struct sctp_stream *stream, __u16 sid,
+                                   gfp_t gfp)
+{
+       INIT_LIST_HEAD(&stream->out[sid].ext->prio_list);
+       return sctp_sched_prio_set(stream, sid, 0, gfp);
+}
+
+static void sctp_sched_prio_free(struct sctp_stream *stream)
+{
+       struct sctp_stream_priorities *prio, *n;
+       LIST_HEAD(list);
+       int i;
+
+       /* As we don't keep a list of priorities, to avoid multiple
+        * frees we have to do it in 3 steps:
+        *   1. unsched everyone, so the lists are free to use in 2.
+        *   2. build the list of the priorities
+        *   3. free the list
+        */
+       sctp_sched_prio_unsched_all(stream);
+       for (i = 0; i < stream->outcnt; i++) {
+               if (!stream->out[i].ext)
+                       continue;
+               prio = stream->out[i].ext->prio_head;
+               if (prio && list_empty(&prio->prio_sched))
+                       list_add(&prio->prio_sched, &list);
+       }
+       list_for_each_entry_safe(prio, n, &list, prio_sched) {
+               list_del_init(&prio->prio_sched);
+               kfree(prio);
+       }
+}
+
+static void sctp_sched_prio_enqueue(struct sctp_outq *q,
+                                   struct sctp_datamsg *msg)
+{
+       struct sctp_stream *stream;
+       struct sctp_chunk *ch;
+       __u16 sid;
+
+       ch = list_first_entry(&msg->chunks, struct sctp_chunk, frag_list);
+       sid = sctp_chunk_stream_no(ch);
+       stream = &q->asoc->stream;
+       sctp_sched_prio_sched(stream, stream->out[sid].ext);
+}
+
+static struct sctp_chunk *sctp_sched_prio_dequeue(struct sctp_outq *q)
+{
+       struct sctp_stream *stream = &q->asoc->stream;
+       struct sctp_stream_priorities *prio;
+       struct sctp_stream_out_ext *soute;
+       struct sctp_chunk *ch = NULL;
+
+       /* Bail out quickly if queue is empty */
+       if (list_empty(&q->out_chunk_list))
+               goto out;
+
+       /* Find which chunk is next. It's easy, it's either the current
+        * one or the first chunk on the next active stream.
+        */
+       if (stream->out_curr) {
+               soute = stream->out_curr->ext;
+       } else {
+               prio = list_entry(stream->prio_list.next,
+                                 struct sctp_stream_priorities, prio_sched);
+               soute = prio->next;
+       }
+       ch = list_entry(soute->outq.next, struct sctp_chunk, stream_list);
+       sctp_sched_dequeue_common(q, ch);
+
+out:
+       return ch;
+}
+
+static void sctp_sched_prio_dequeue_done(struct sctp_outq *q,
+                                        struct sctp_chunk *ch)
+{
+       struct sctp_stream_priorities *prio;
+       struct sctp_stream_out_ext *soute;
+       __u16 sid;
+
+       /* Last chunk on that msg, move to the next stream on
+        * this priority.
+        */
+       sid = sctp_chunk_stream_no(ch);
+       soute = q->asoc->stream.out[sid].ext;
+       prio = soute->prio_head;
+
+       sctp_sched_prio_next_stream(prio);
+
+       if (list_empty(&soute->outq))
+               sctp_sched_prio_unsched(soute);
+}
+
+static void sctp_sched_prio_sched_all(struct sctp_stream *stream)
+{
+       struct sctp_association *asoc;
+       struct sctp_stream_out *sout;
+       struct sctp_chunk *ch;
+
+       asoc = container_of(stream, struct sctp_association, stream);
+       list_for_each_entry(ch, &asoc->outqueue.out_chunk_list, list) {
+               __u16 sid;
+
+               sid = sctp_chunk_stream_no(ch);
+               sout = &stream->out[sid];
+               if (sout->ext)
+                       sctp_sched_prio_sched(stream, sout->ext);
+       }
+}
+
+static void sctp_sched_prio_unsched_all(struct sctp_stream *stream)
+{
+       struct sctp_stream_priorities *p, *tmp;
+       struct sctp_stream_out_ext *soute, *souttmp;
+
+       list_for_each_entry_safe(p, tmp, &stream->prio_list, prio_sched)
+               list_for_each_entry_safe(soute, souttmp, &p->active, prio_list)
+                       sctp_sched_prio_unsched(soute);
+}
+
+struct sctp_sched_ops sctp_sched_prio = {
+       .set = sctp_sched_prio_set,
+       .get = sctp_sched_prio_get,
+       .init = sctp_sched_prio_init,
+       .init_sid = sctp_sched_prio_init_sid,
+       .free = sctp_sched_prio_free,
+       .enqueue = sctp_sched_prio_enqueue,
+       .dequeue = sctp_sched_prio_dequeue,
+       .dequeue_done = sctp_sched_prio_dequeue_done,
+       .sched_all = sctp_sched_prio_sched_all,
+       .unsched_all = sctp_sched_prio_unsched_all,
+};