SUNRPC: Refactor the transport request pinning
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Mon, 6 Aug 2018 16:55:34 +0000 (12:55 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sun, 30 Sep 2018 19:35:14 +0000 (15:35 -0400)
We are going to need to pin for both send and receive.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
include/linux/sunrpc/sched.h
include/linux/sunrpc/xprt.h
net/sunrpc/xprt.c

index 9e655df70131ab4a912c39887b36028199bc6807..8062ce6b18e5c80e688913b9076d950bcde382eb 100644 (file)
@@ -142,8 +142,7 @@ struct rpc_task_setup {
 #define RPC_TASK_ACTIVE                2
 #define RPC_TASK_NEED_XMIT     3
 #define RPC_TASK_NEED_RECV     4
-#define RPC_TASK_MSG_RECV      5
-#define RPC_TASK_MSG_RECV_WAIT 6
+#define RPC_TASK_MSG_PIN_WAIT  5
 
 #define RPC_IS_RUNNING(t)      test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
 #define rpc_set_running(t)     set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
index 3d80524e92d6c18a076a1ef8ec4aa83ca5804b95..bd743c51a8652787c68910ef22a11b71c574cc99 100644 (file)
@@ -103,6 +103,7 @@ struct rpc_rqst {
                                                /* A cookie used to track the
                                                   state of the transport
                                                   connection */
+       atomic_t                rq_pin;
        
        /*
         * Partial send handling
index 45d580cd93ac3716385d3a3c0e4079c658e12be1..649a40cfae6dd157290857467e5b63325c5c877d 100644 (file)
@@ -847,16 +847,22 @@ struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
 }
 EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
 
+static bool
+xprt_is_pinned_rqst(struct rpc_rqst *req)
+{
+       return atomic_read(&req->rq_pin) != 0;
+}
+
 /**
  * xprt_pin_rqst - Pin a request on the transport receive list
  * @req: Request to pin
  *
  * Caller must ensure this is atomic with the call to xprt_lookup_rqst()
- * so should be holding the xprt transport lock.
+ * so should be holding the xprt receive lock.
  */
 void xprt_pin_rqst(struct rpc_rqst *req)
 {
-       set_bit(RPC_TASK_MSG_RECV, &req->rq_task->tk_runstate);
+       atomic_inc(&req->rq_pin);
 }
 EXPORT_SYMBOL_GPL(xprt_pin_rqst);
 
@@ -864,31 +870,22 @@ EXPORT_SYMBOL_GPL(xprt_pin_rqst);
  * xprt_unpin_rqst - Unpin a request on the transport receive list
  * @req: Request to pin
  *
- * Caller should be holding the xprt transport lock.
+ * Caller should be holding the xprt receive lock.
  */
 void xprt_unpin_rqst(struct rpc_rqst *req)
 {
-       struct rpc_task *task = req->rq_task;
-
-       clear_bit(RPC_TASK_MSG_RECV, &task->tk_runstate);
-       if (test_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate))
-               wake_up_bit(&task->tk_runstate, RPC_TASK_MSG_RECV);
+       if (!test_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate)) {
+               atomic_dec(&req->rq_pin);
+               return;
+       }
+       if (atomic_dec_and_test(&req->rq_pin))
+               wake_up_var(&req->rq_pin);
 }
 EXPORT_SYMBOL_GPL(xprt_unpin_rqst);
 
 static void xprt_wait_on_pinned_rqst(struct rpc_rqst *req)
-__must_hold(&req->rq_xprt->recv_lock)
 {
-       struct rpc_task *task = req->rq_task;
-
-       if (task && test_bit(RPC_TASK_MSG_RECV, &task->tk_runstate)) {
-               spin_unlock(&req->rq_xprt->recv_lock);
-               set_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
-               wait_on_bit(&task->tk_runstate, RPC_TASK_MSG_RECV,
-                               TASK_UNINTERRUPTIBLE);
-               clear_bit(RPC_TASK_MSG_RECV_WAIT, &task->tk_runstate);
-               spin_lock(&req->rq_xprt->recv_lock);
-       }
+       wait_var_event(&req->rq_pin, !xprt_is_pinned_rqst(req));
 }
 
 /**
@@ -1388,7 +1385,13 @@ void xprt_release(struct rpc_task *task)
        spin_lock(&xprt->recv_lock);
        if (!list_empty(&req->rq_list)) {
                list_del_init(&req->rq_list);
-               xprt_wait_on_pinned_rqst(req);
+               if (xprt_is_pinned_rqst(req)) {
+                       set_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate);
+                       spin_unlock(&xprt->recv_lock);
+                       xprt_wait_on_pinned_rqst(req);
+                       spin_lock(&xprt->recv_lock);
+                       clear_bit(RPC_TASK_MSG_PIN_WAIT, &req->rq_task->tk_runstate);
+               }
        }
        spin_unlock(&xprt->recv_lock);
        spin_lock_bh(&xprt->transport_lock);