aio: make the lookup_ioctx() lockless
authorJens Axboe <jens.axboe@oracle.com>
Tue, 9 Dec 2008 07:11:22 +0000 (08:11 +0100)
committerJens Axboe <jens.axboe@oracle.com>
Mon, 29 Dec 2008 07:29:50 +0000 (08:29 +0100)
The mm->ioctx_list is currently protected by a reader-writer lock,
so we always grab that lock on the read side for doing ioctx
lookups. As the workload is extremely reader biased, turn this into
an rcu hlist so we can make lookup_ioctx() lockless. Get rid of
the rwlock and use a spinlock for providing update side exclusion.

There's usually only 1 entry on this list, so it doesn't make sense
to look into fancier data structures.

Reviewed-by: Jeff Moyer <jmoyer@redhat.com>
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
arch/s390/mm/pgtable.c
fs/aio.c
include/linux/aio.h
include/linux/mm_types.h
kernel/fork.c

index ef3635b52fc0ce1420945fdbd42cf32845ecf671..0767827540b1778b0d88fa555d957ca457dc1d41 100644 (file)
@@ -263,7 +263,7 @@ int s390_enable_sie(void)
        /* lets check if we are allowed to replace the mm */
        task_lock(tsk);
        if (!tsk->mm || atomic_read(&tsk->mm->mm_users) > 1 ||
-           tsk->mm != tsk->active_mm || tsk->mm->ioctx_list) {
+           tsk->mm != tsk->active_mm || !hlist_empty(&tsk->mm->ioctx_list)) {
                task_unlock(tsk);
                return -EINVAL;
        }
@@ -279,7 +279,7 @@ int s390_enable_sie(void)
        /* Now lets check again if something happened */
        task_lock(tsk);
        if (!tsk->mm || atomic_read(&tsk->mm->mm_users) > 1 ||
-           tsk->mm != tsk->active_mm || tsk->mm->ioctx_list) {
+           tsk->mm != tsk->active_mm || !hlist_empty(&tsk->mm->ioctx_list)) {
                mmput(mm);
                task_unlock(tsk);
                return -EINVAL;
index f658441d5666c9d108f8ec337b2e0909686723bb..d6f89d3c15e80f0ebd15699e62959712fad4b7bc 100644 (file)
--- a/fs/aio.c
+++ b/fs/aio.c
@@ -191,6 +191,20 @@ static int aio_setup_ring(struct kioctx *ctx)
        kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK), km); \
 } while(0)
 
+static void ctx_rcu_free(struct rcu_head *head)
+{
+       struct kioctx *ctx = container_of(head, struct kioctx, rcu_head);
+       unsigned nr_events = ctx->max_reqs;
+
+       kmem_cache_free(kioctx_cachep, ctx);
+
+       if (nr_events) {
+               spin_lock(&aio_nr_lock);
+               BUG_ON(aio_nr - nr_events > aio_nr);
+               aio_nr -= nr_events;
+               spin_unlock(&aio_nr_lock);
+       }
+}
 
 /* __put_ioctx
  *     Called when the last user of an aio context has gone away,
@@ -198,8 +212,6 @@ static int aio_setup_ring(struct kioctx *ctx)
  */
 static void __put_ioctx(struct kioctx *ctx)
 {
-       unsigned nr_events = ctx->max_reqs;
-
        BUG_ON(ctx->reqs_active);
 
        cancel_delayed_work(&ctx->wq);
@@ -208,14 +220,7 @@ static void __put_ioctx(struct kioctx *ctx)
        mmdrop(ctx->mm);
        ctx->mm = NULL;
        pr_debug("__put_ioctx: freeing %p\n", ctx);
-       kmem_cache_free(kioctx_cachep, ctx);
-
-       if (nr_events) {
-               spin_lock(&aio_nr_lock);
-               BUG_ON(aio_nr - nr_events > aio_nr);
-               aio_nr -= nr_events;
-               spin_unlock(&aio_nr_lock);
-       }
+       call_rcu(&ctx->rcu_head, ctx_rcu_free);
 }
 
 #define get_ioctx(kioctx) do {                                         \
@@ -235,6 +240,7 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
 {
        struct mm_struct *mm;
        struct kioctx *ctx;
+       int did_sync = 0;
 
        /* Prevent overflows */
        if ((nr_events > (0x10000000U / sizeof(struct io_event))) ||
@@ -267,21 +273,30 @@ static struct kioctx *ioctx_alloc(unsigned nr_events)
                goto out_freectx;
 
        /* limit the number of system wide aios */
-       spin_lock(&aio_nr_lock);
-       if (aio_nr + ctx->max_reqs > aio_max_nr ||
-           aio_nr + ctx->max_reqs < aio_nr)
-               ctx->max_reqs = 0;
-       else
-               aio_nr += ctx->max_reqs;
-       spin_unlock(&aio_nr_lock);
+       do {
+               spin_lock_bh(&aio_nr_lock);
+               if (aio_nr + nr_events > aio_max_nr ||
+                   aio_nr + nr_events < aio_nr)
+                       ctx->max_reqs = 0;
+               else
+                       aio_nr += ctx->max_reqs;
+               spin_unlock_bh(&aio_nr_lock);
+               if (ctx->max_reqs || did_sync)
+                       break;
+
+               /* wait for rcu callbacks to have completed before giving up */
+               synchronize_rcu();
+               did_sync = 1;
+               ctx->max_reqs = nr_events;
+       } while (1);
+
        if (ctx->max_reqs == 0)
                goto out_cleanup;
 
        /* now link into global list. */
-       write_lock(&mm->ioctx_list_lock);
-       ctx->next = mm->ioctx_list;
-       mm->ioctx_list = ctx;
-       write_unlock(&mm->ioctx_list_lock);
+       spin_lock(&mm->ioctx_lock);
+       hlist_add_head_rcu(&ctx->list, &mm->ioctx_list);
+       spin_unlock(&mm->ioctx_lock);
 
        dprintk("aio: allocated ioctx %p[%ld]: mm=%p mask=0x%x\n",
                ctx, ctx->user_id, current->mm, ctx->ring_info.nr);
@@ -375,11 +390,12 @@ ssize_t wait_on_sync_kiocb(struct kiocb *iocb)
  */
 void exit_aio(struct mm_struct *mm)
 {
-       struct kioctx *ctx = mm->ioctx_list;
-       mm->ioctx_list = NULL;
-       while (ctx) {
-               struct kioctx *next = ctx->next;
-               ctx->next = NULL;
+       struct kioctx *ctx;
+
+       while (!hlist_empty(&mm->ioctx_list)) {
+               ctx = hlist_entry(mm->ioctx_list.first, struct kioctx, list);
+               hlist_del_rcu(&ctx->list);
+
                aio_cancel_all(ctx);
 
                wait_for_all_aios(ctx);
@@ -394,7 +410,6 @@ void exit_aio(struct mm_struct *mm)
                                atomic_read(&ctx->users), ctx->dead,
                                ctx->reqs_active);
                put_ioctx(ctx);
-               ctx = next;
        }
 }
 
@@ -555,19 +570,21 @@ int aio_put_req(struct kiocb *req)
 
 static struct kioctx *lookup_ioctx(unsigned long ctx_id)
 {
-       struct kioctx *ioctx;
-       struct mm_struct *mm;
+       struct mm_struct *mm = current->mm;
+       struct kioctx *ctx = NULL;
+       struct hlist_node *n;
 
-       mm = current->mm;
-       read_lock(&mm->ioctx_list_lock);
-       for (ioctx = mm->ioctx_list; ioctx; ioctx = ioctx->next)
-               if (likely(ioctx->user_id == ctx_id && !ioctx->dead)) {
-                       get_ioctx(ioctx);
+       rcu_read_lock();
+
+       hlist_for_each_entry_rcu(ctx, n, &mm->ioctx_list, list) {
+               if (ctx->user_id == ctx_id && !ctx->dead) {
+                       get_ioctx(ctx);
                        break;
                }
-       read_unlock(&mm->ioctx_list_lock);
+       }
 
-       return ioctx;
+       rcu_read_unlock();
+       return ctx;
 }
 
 /*
@@ -1215,19 +1232,14 @@ out:
 static void io_destroy(struct kioctx *ioctx)
 {
        struct mm_struct *mm = current->mm;
-       struct kioctx **tmp;
        int was_dead;
 
        /* delete the entry from the list is someone else hasn't already */
-       write_lock(&mm->ioctx_list_lock);
+       spin_lock(&mm->ioctx_lock);
        was_dead = ioctx->dead;
        ioctx->dead = 1;
-       for (tmp = &mm->ioctx_list; *tmp && *tmp != ioctx;
-            tmp = &(*tmp)->next)
-               ;
-       if (*tmp)
-               *tmp = ioctx->next;
-       write_unlock(&mm->ioctx_list_lock);
+       hlist_del_rcu(&ioctx->list);
+       spin_unlock(&mm->ioctx_lock);
 
        dprintk("aio_release(%p)\n", ioctx);
        if (likely(!was_dead))
index f6b8cf99b596a30820215766fb7e680dedf7e6e1..b16a957030f87e16341efe5b41b476a04e742fc2 100644 (file)
@@ -5,6 +5,7 @@
 #include <linux/workqueue.h>
 #include <linux/aio_abi.h>
 #include <linux/uio.h>
+#include <linux/rcupdate.h>
 
 #include <asm/atomic.h>
 
@@ -183,7 +184,7 @@ struct kioctx {
 
        /* This needs improving */
        unsigned long           user_id;
-       struct kioctx           *next;
+       struct hlist_node       list;
 
        wait_queue_head_t       wait;
 
@@ -199,6 +200,8 @@ struct kioctx {
        struct aio_ring_info    ring_info;
 
        struct delayed_work     wq;
+
+       struct rcu_head         rcu_head;
 };
 
 /* prototypes */
index fe825471d5aaf9d59440a7f322989004736ea71f..9cfc9b627fdd745d4702a903e29db36f2676aa34 100644 (file)
@@ -232,8 +232,9 @@ struct mm_struct {
        struct core_state *core_state; /* coredumping support */
 
        /* aio bits */
-       rwlock_t                ioctx_list_lock;        /* aio lock */
-       struct kioctx           *ioctx_list;
+       spinlock_t              ioctx_lock;
+       struct hlist_head       ioctx_list;
+
 #ifdef CONFIG_MM_OWNER
        /*
         * "owner" points to a task that is regarded as the canonical
index 6144b36cd897512273f18c94f0d89958b21666c4..43cbf30669e6de76d4bc315864489deab3336eff 100644 (file)
@@ -415,8 +415,8 @@ static struct mm_struct * mm_init(struct mm_struct * mm, struct task_struct *p)
        set_mm_counter(mm, file_rss, 0);
        set_mm_counter(mm, anon_rss, 0);
        spin_lock_init(&mm->page_table_lock);
-       rwlock_init(&mm->ioctx_list_lock);
-       mm->ioctx_list = NULL;
+       spin_lock_init(&mm->ioctx_lock);
+       INIT_HLIST_HEAD(&mm->ioctx_list);
        mm->free_area_cache = TASK_UNMAPPED_BASE;
        mm->cached_hole_size = ~0UL;
        mm_init_owner(mm, p);