SUNRPC: Replace the queue timer with a delayed work function
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Wed, 1 May 2019 14:49:27 +0000 (10:49 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sat, 6 Jul 2019 18:54:48 +0000 (14:54 -0400)
The queue timer function, which walks the RPC queue in order to locate
candidates for waking up is one of the current constraints against
removing the bh-safe queue spin locks. Replace it with a delayed
work queue, so that we can do the actual rpc task wake ups from an
ordinary process context.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
include/linux/sunrpc/sched.h
net/sunrpc/sched.c

index d0e451868f02b1d0288e9ce106e27c3d87c614e1..7d8db5dcac04c7ff2f38d6cd58e66f9ec8ef27de 100644 (file)
@@ -183,8 +183,9 @@ struct rpc_task_setup {
 #define RPC_NR_PRIORITY                (1 + RPC_PRIORITY_PRIVILEGED - RPC_PRIORITY_LOW)
 
 struct rpc_timer {
-       struct timer_list timer;
        struct list_head list;
+       unsigned long expires;
+       struct delayed_work dwork;
 };
 
 /*
index a2c1148127172520ba85b7f7e48685076d6128a0..e0a0cf381ebac0aa0a161b33c469c1d6e5779487 100644 (file)
@@ -46,7 +46,7 @@ static mempool_t      *rpc_buffer_mempool __read_mostly;
 
 static void                    rpc_async_schedule(struct work_struct *);
 static void                     rpc_release_task(struct rpc_task *task);
-static void __rpc_queue_timer_fn(struct timer_list *t);
+static void __rpc_queue_timer_fn(struct work_struct *);
 
 /*
  * RPC tasks sit here while waiting for conditions to improve.
@@ -87,13 +87,19 @@ __rpc_disable_timer(struct rpc_wait_queue *queue, struct rpc_task *task)
        task->tk_timeout = 0;
        list_del(&task->u.tk_wait.timer_list);
        if (list_empty(&queue->timer_list.list))
-               del_timer(&queue->timer_list.timer);
+               cancel_delayed_work(&queue->timer_list.dwork);
 }
 
 static void
 rpc_set_queue_timer(struct rpc_wait_queue *queue, unsigned long expires)
 {
-       timer_reduce(&queue->timer_list.timer, expires);
+       unsigned long now = jiffies;
+       queue->timer_list.expires = expires;
+       if (time_before_eq(expires, now))
+               expires = 0;
+       else
+               expires -= now;
+       mod_delayed_work(rpciod_workqueue, &queue->timer_list.dwork, expires);
 }
 
 /*
@@ -107,7 +113,8 @@ __rpc_add_timer(struct rpc_wait_queue *queue, struct rpc_task *task,
                task->tk_pid, jiffies_to_msecs(timeout - jiffies));
 
        task->tk_timeout = timeout;
-       rpc_set_queue_timer(queue, timeout);
+       if (list_empty(&queue->timer_list.list) || time_before(timeout, queue->timer_list.expires))
+               rpc_set_queue_timer(queue, timeout);
        list_add(&task->u.tk_wait.timer_list, &queue->timer_list.list);
 }
 
@@ -250,7 +257,8 @@ static void __rpc_init_priority_wait_queue(struct rpc_wait_queue *queue, const c
        queue->maxpriority = nr_queues - 1;
        rpc_reset_waitqueue_priority(queue);
        queue->qlen = 0;
-       timer_setup(&queue->timer_list.timer, __rpc_queue_timer_fn, 0);
+       queue->timer_list.expires = 0;
+       INIT_DEFERRABLE_WORK(&queue->timer_list.dwork, __rpc_queue_timer_fn);
        INIT_LIST_HEAD(&queue->timer_list.list);
        rpc_assign_waitqueue_name(queue, qname);
 }
@@ -269,7 +277,7 @@ EXPORT_SYMBOL_GPL(rpc_init_wait_queue);
 
 void rpc_destroy_wait_queue(struct rpc_wait_queue *queue)
 {
-       del_timer_sync(&queue->timer_list.timer);
+       cancel_delayed_work_sync(&queue->timer_list.dwork);
 }
 EXPORT_SYMBOL_GPL(rpc_destroy_wait_queue);
 
@@ -759,13 +767,15 @@ void rpc_wake_up_status(struct rpc_wait_queue *queue, int status)
 }
 EXPORT_SYMBOL_GPL(rpc_wake_up_status);
 
-static void __rpc_queue_timer_fn(struct timer_list *t)
+static void __rpc_queue_timer_fn(struct work_struct *work)
 {
-       struct rpc_wait_queue *queue = from_timer(queue, t, timer_list.timer);
+       struct rpc_wait_queue *queue = container_of(work,
+                       struct rpc_wait_queue,
+                       timer_list.dwork.work);
        struct rpc_task *task, *n;
        unsigned long expires, now, timeo;
 
-       spin_lock(&queue->lock);
+       spin_lock_bh(&queue->lock);
        expires = now = jiffies;
        list_for_each_entry_safe(task, n, &queue->timer_list.list, u.tk_wait.timer_list) {
                timeo = task->tk_timeout;
@@ -780,7 +790,7 @@ static void __rpc_queue_timer_fn(struct timer_list *t)
        }
        if (!list_empty(&queue->timer_list.list))
                rpc_set_queue_timer(queue, expires);
-       spin_unlock(&queue->lock);
+       spin_unlock_bh(&queue->lock);
 }
 
 static void __rpc_atrun(struct rpc_task *task)