SUNRPC: rpc_timeout_upcall_queue should not sleep
authorTrond Myklebust <Trond.Myklebust@netapp.com>
Wed, 1 Feb 2006 17:18:44 +0000 (12:18 -0500)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Wed, 1 Feb 2006 17:52:24 +0000 (12:52 -0500)
 The function rpc_timeout_upcall_queue runs from a workqueue, and hence
 sleeping is not recommended. Convert the protection of the upcall queue
 from being mutex-based to being spinlock-based.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
net/sunrpc/rpc_pipe.c

index 9764c80ab0b26927eaa23dd90be7e35bc158ef93..7281746e6532928a04378a8308f813280eb3ac69 100644 (file)
@@ -38,44 +38,42 @@ static kmem_cache_t *rpc_inode_cachep __read_mostly;
 
 #define RPC_UPCALL_TIMEOUT (30*HZ)
 
-static void
-__rpc_purge_list(struct rpc_inode *rpci, struct list_head *head, int err)
+static void rpc_purge_list(struct rpc_inode *rpci, struct list_head *head,
+               void (*destroy_msg)(struct rpc_pipe_msg *), int err)
 {
        struct rpc_pipe_msg *msg;
-       void (*destroy_msg)(struct rpc_pipe_msg *);
 
-       destroy_msg = rpci->ops->destroy_msg;
-       while (!list_empty(head)) {
+       if (list_empty(head))
+               return;
+       do {
                msg = list_entry(head->next, struct rpc_pipe_msg, list);
-               list_del_init(&msg->list);
+               list_del(&msg->list);
                msg->errno = err;
                destroy_msg(msg);
-       }
-}
-
-static void
-__rpc_purge_upcall(struct inode *inode, int err)
-{
-       struct rpc_inode *rpci = RPC_I(inode);
-
-       __rpc_purge_list(rpci, &rpci->pipe, err);
-       rpci->pipelen = 0;
+       } while (!list_empty(head));
        wake_up(&rpci->waitq);
 }
 
 static void
 rpc_timeout_upcall_queue(void *data)
 {
+       LIST_HEAD(free_list);
        struct rpc_inode *rpci = (struct rpc_inode *)data;
        struct inode *inode = &rpci->vfs_inode;
+       void (*destroy_msg)(struct rpc_pipe_msg *);
 
-       mutex_lock(&inode->i_mutex);
-       if (rpci->ops == NULL)
-               goto out;
-       if (rpci->nreaders == 0 && !list_empty(&rpci->pipe))
-               __rpc_purge_upcall(inode, -ETIMEDOUT);
-out:
-       mutex_unlock(&inode->i_mutex);
+       spin_lock(&inode->i_lock);
+       if (rpci->ops == NULL) {
+               spin_unlock(&inode->i_lock);
+               return;
+       }
+       destroy_msg = rpci->ops->destroy_msg;
+       if (rpci->nreaders == 0) {
+               list_splice_init(&rpci->pipe, &free_list);
+               rpci->pipelen = 0;
+       }
+       spin_unlock(&inode->i_lock);
+       rpc_purge_list(rpci, &free_list, destroy_msg, -ETIMEDOUT);
 }
 
 int
@@ -84,7 +82,7 @@ rpc_queue_upcall(struct inode *inode, struct rpc_pipe_msg *msg)
        struct rpc_inode *rpci = RPC_I(inode);
        int res = -EPIPE;
 
-       mutex_lock(&inode->i_mutex);
+       spin_lock(&inode->i_lock);
        if (rpci->ops == NULL)
                goto out;
        if (rpci->nreaders) {
@@ -100,7 +98,7 @@ rpc_queue_upcall(struct inode *inode, struct rpc_pipe_msg *msg)
                res = 0;
        }
 out:
-       mutex_unlock(&inode->i_mutex);
+       spin_unlock(&inode->i_lock);
        wake_up(&rpci->waitq);
        return res;
 }
@@ -115,21 +113,29 @@ static void
 rpc_close_pipes(struct inode *inode)
 {
        struct rpc_inode *rpci = RPC_I(inode);
+       struct rpc_pipe_ops *ops;
 
        mutex_lock(&inode->i_mutex);
-       if (rpci->ops != NULL) {
+       ops = rpci->ops;
+       if (ops != NULL) {
+               LIST_HEAD(free_list);
+
+               spin_lock(&inode->i_lock);
                rpci->nreaders = 0;
-               __rpc_purge_list(rpci, &rpci->in_upcall, -EPIPE);
-               __rpc_purge_upcall(inode, -EPIPE);
-               rpci->nwriters = 0;
-               if (rpci->ops->release_pipe)
-                       rpci->ops->release_pipe(inode);
+               list_splice_init(&rpci->in_upcall, &free_list);
+               list_splice_init(&rpci->pipe, &free_list);
+               rpci->pipelen = 0;
                rpci->ops = NULL;
+               spin_unlock(&inode->i_lock);
+               rpc_purge_list(rpci, &free_list, ops->destroy_msg, -EPIPE);
+               rpci->nwriters = 0;
+               if (ops->release_pipe)
+                       ops->release_pipe(inode);
+               cancel_delayed_work(&rpci->queue_timeout);
+               flush_scheduled_work();
        }
        rpc_inode_setowner(inode, NULL);
        mutex_unlock(&inode->i_mutex);
-       cancel_delayed_work(&rpci->queue_timeout);
-       flush_scheduled_work();
 }
 
 static struct inode *
@@ -177,16 +183,26 @@ rpc_pipe_release(struct inode *inode, struct file *filp)
                goto out;
        msg = (struct rpc_pipe_msg *)filp->private_data;
        if (msg != NULL) {
+               spin_lock(&inode->i_lock);
                msg->errno = -EAGAIN;
-               list_del_init(&msg->list);
+               list_del(&msg->list);
+               spin_unlock(&inode->i_lock);
                rpci->ops->destroy_msg(msg);
        }
        if (filp->f_mode & FMODE_WRITE)
                rpci->nwriters --;
-       if (filp->f_mode & FMODE_READ)
+       if (filp->f_mode & FMODE_READ) {
                rpci->nreaders --;
-       if (!rpci->nreaders)
-               __rpc_purge_upcall(inode, -EAGAIN);
+               if (rpci->nreaders == 0) {
+                       LIST_HEAD(free_list);
+                       spin_lock(&inode->i_lock);
+                       list_splice_init(&rpci->pipe, &free_list);
+                       rpci->pipelen = 0;
+                       spin_unlock(&inode->i_lock);
+                       rpc_purge_list(rpci, &free_list,
+                                       rpci->ops->destroy_msg, -EAGAIN);
+               }
+       }
        if (rpci->ops->release_pipe)
                rpci->ops->release_pipe(inode);
 out:
@@ -209,6 +225,7 @@ rpc_pipe_read(struct file *filp, char __user *buf, size_t len, loff_t *offset)
        }
        msg = filp->private_data;
        if (msg == NULL) {
+               spin_lock(&inode->i_lock);
                if (!list_empty(&rpci->pipe)) {
                        msg = list_entry(rpci->pipe.next,
                                        struct rpc_pipe_msg,
@@ -218,6 +235,7 @@ rpc_pipe_read(struct file *filp, char __user *buf, size_t len, loff_t *offset)
                        filp->private_data = msg;
                        msg->copied = 0;
                }
+               spin_unlock(&inode->i_lock);
                if (msg == NULL)
                        goto out_unlock;
        }
@@ -225,7 +243,9 @@ rpc_pipe_read(struct file *filp, char __user *buf, size_t len, loff_t *offset)
        res = rpci->ops->upcall(filp, msg, buf, len);
        if (res < 0 || msg->len == msg->copied) {
                filp->private_data = NULL;
-               list_del_init(&msg->list);
+               spin_lock(&inode->i_lock);
+               list_del(&msg->list);
+               spin_unlock(&inode->i_lock);
                rpci->ops->destroy_msg(msg);
        }
 out_unlock: