SUNRPC: Add a transmission queue for RPC requests
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Fri, 10 Aug 2018 03:33:21 +0000 (23:33 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sun, 30 Sep 2018 19:35:15 +0000 (15:35 -0400)
Add the queue that will enforce the ordering of RPC task transmission.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
include/linux/sunrpc/xprt.h
net/sunrpc/clnt.c
net/sunrpc/xprt.c

index 9cec2d0811f2206151baa91f9f563ad947dbb10e..81a6c2c8dfc7b9566204d43de43e77e576d40599 100644 (file)
@@ -88,6 +88,8 @@ struct rpc_rqst {
                struct list_head        rq_recv;        /* Receive queue */
        };
 
+       struct list_head        rq_xmit;        /* Send queue */
+
        void                    *rq_buffer;     /* Call XDR encode buffer */
        size_t                  rq_callsize;
        void                    *rq_rbuffer;    /* Reply XDR decode buffer */
@@ -242,6 +244,9 @@ struct rpc_xprt {
        spinlock_t              queue_lock;     /* send/receive queue lock */
        u32                     xid;            /* Next XID value to use */
        struct rpc_task *       snd_task;       /* Task blocked in send */
+
+       struct list_head        xmit_queue;     /* Send queue */
+
        struct svc_xprt         *bc_xprt;       /* NFSv4.1 backchannel */
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
        struct svc_serv         *bc_serv;       /* The RPC service which will */
@@ -339,6 +344,7 @@ void                        xprt_free_slot(struct rpc_xprt *xprt,
                                       struct rpc_rqst *req);
 void                   xprt_lock_and_alloc_slot(struct rpc_xprt *xprt, struct rpc_task *task);
 bool                   xprt_prepare_transmit(struct rpc_task *task);
+void                   xprt_request_enqueue_transmit(struct rpc_task *task);
 void                   xprt_request_enqueue_receive(struct rpc_task *task);
 void                   xprt_request_wait_receive(struct rpc_task *task);
 void                   xprt_transmit(struct rpc_task *task);
index be0f06a8156b5e6170c2ac36d61ef52eb5a8fe41..c1a19a3e135698d9be316900a3030e1285d97df7 100644 (file)
@@ -1156,11 +1156,11 @@ struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
         */
        xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
                        xbufp->tail[0].iov_len;
-       set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 
        task->tk_action = call_bc_transmit;
        atomic_inc(&task->tk_count);
        WARN_ON_ONCE(atomic_read(&task->tk_count) != 2);
+       xprt_request_enqueue_transmit(task);
        rpc_execute(task);
 
        dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
@@ -1759,8 +1759,6 @@ rpc_xdr_encode(struct rpc_task *task)
 
        task->tk_status = rpcauth_wrap_req(task, encode, req, p,
                        task->tk_msg.rpc_argp);
-       if (task->tk_status == 0)
-               set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
 }
 
 /*
@@ -1964,6 +1962,7 @@ call_transmit(struct rpc_task *task)
        /* Add task to reply queue before transmission to avoid races */
        if (rpc_reply_expected(task))
                xprt_request_enqueue_receive(task);
+       xprt_request_enqueue_transmit(task);
 
        if (!xprt_prepare_transmit(task))
                return;
@@ -1998,7 +1997,6 @@ call_transmit_status(struct rpc_task *task)
                xprt_end_transmit(task);
                break;
        case -EBADMSG:
-               clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
                task->tk_action = call_transmit;
                task->tk_status = 0;
                xprt_end_transmit(task);
index d527dc08540e46d0ac22738afb596423f2d055d3..1f69d9f219af97cfe8e6b829830f9db218f1693d 100644 (file)
@@ -1058,6 +1058,72 @@ void xprt_request_wait_receive(struct rpc_task *task)
        spin_unlock(&xprt->queue_lock);
 }
 
+static bool
+xprt_request_need_transmit(struct rpc_task *task)
+{
+       return !(task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) ||
+               xprt_request_retransmit_after_disconnect(task);
+}
+
+static bool
+xprt_request_need_enqueue_transmit(struct rpc_task *task, struct rpc_rqst *req)
+{
+       return xprt_request_need_transmit(task) &&
+               !test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+}
+
+/**
+ * xprt_request_enqueue_transmit - queue a task for transmission
+ * @task: pointer to rpc_task
+ *
+ * Add a task to the transmission queue.
+ */
+void
+xprt_request_enqueue_transmit(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       if (xprt_request_need_enqueue_transmit(task, req)) {
+               spin_lock(&xprt->queue_lock);
+               list_add_tail(&req->rq_xmit, &xprt->xmit_queue);
+               set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
+               spin_unlock(&xprt->queue_lock);
+       }
+}
+
+/**
+ * xprt_request_dequeue_transmit_locked - remove a task from the transmission queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmission queue
+ * Caller must hold xprt->queue_lock
+ */
+static void
+xprt_request_dequeue_transmit_locked(struct rpc_task *task)
+{
+       xprt_task_clear_bytes_sent(task);
+       if (test_and_clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
+               list_del(&task->tk_rqstp->rq_xmit);
+}
+
+/**
+ * xprt_request_dequeue_transmit - remove a task from the transmission queue
+ * @task: pointer to rpc_task
+ *
+ * Remove a task from the transmission queue
+ */
+static void
+xprt_request_dequeue_transmit(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       spin_lock(&xprt->queue_lock);
+       xprt_request_dequeue_transmit_locked(task);
+       spin_unlock(&xprt->queue_lock);
+}
+
 /**
  * xprt_prepare_transmit - reserve the transport before sending a request
  * @task: RPC task about to send a request
@@ -1077,12 +1143,8 @@ bool xprt_prepare_transmit(struct rpc_task *task)
                        task->tk_status = req->rq_reply_bytes_recvd;
                        goto out_unlock;
                }
-               if ((task->tk_flags & RPC_TASK_NO_RETRANS_TIMEOUT) &&
-                   !xprt_request_retransmit_after_disconnect(task)) {
-                       xprt->ops->set_retrans_timeout(task);
-                       rpc_sleep_on(&xprt->pending, task, xprt_timer);
+               if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate))
                        goto out_unlock;
-               }
        }
        if (!xprt->ops->reserve_xprt(xprt, task)) {
                task->tk_status = -EAGAIN;
@@ -1116,11 +1178,11 @@ void xprt_transmit(struct rpc_task *task)
 
        if (!req->rq_bytes_sent) {
                if (xprt_request_data_received(task))
-                       return;
+                       goto out_dequeue;
                /* Verify that our message lies in the RPCSEC_GSS window */
                if (rpcauth_xmit_need_reencode(task)) {
                        task->tk_status = -EBADMSG;
-                       return;
+                       goto out_dequeue;
                }
        }
 
@@ -1135,7 +1197,6 @@ void xprt_transmit(struct rpc_task *task)
        xprt_inject_disconnect(xprt);
 
        dprintk("RPC: %5u xmit complete\n", task->tk_pid);
-       clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
        task->tk_flags |= RPC_TASK_SENT;
        spin_lock_bh(&xprt->transport_lock);
 
@@ -1147,6 +1208,8 @@ void xprt_transmit(struct rpc_task *task)
        spin_unlock_bh(&xprt->transport_lock);
 
        req->rq_connect_cookie = connect_cookie;
+out_dequeue:
+       xprt_request_dequeue_transmit(task);
 }
 
 static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)
@@ -1420,9 +1483,11 @@ xprt_request_dequeue_all(struct rpc_task *task, struct rpc_rqst *req)
 {
        struct rpc_xprt *xprt = req->rq_xprt;
 
-       if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
+       if (test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) ||
+           test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) ||
            xprt_is_pinned_rqst(req)) {
                spin_lock(&xprt->queue_lock);
+               xprt_request_dequeue_transmit_locked(task);
                xprt_request_dequeue_receive_locked(task);
                while (xprt_is_pinned_rqst(req)) {
                        set_bit(RPC_TASK_MSG_PIN_WAIT, &task->tk_runstate);
@@ -1493,6 +1558,7 @@ static void xprt_init(struct rpc_xprt *xprt, struct net *net)
 
        INIT_LIST_HEAD(&xprt->free);
        INIT_LIST_HEAD(&xprt->recv_queue);
+       INIT_LIST_HEAD(&xprt->xmit_queue);
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
        spin_lock_init(&xprt->bc_pa_lock);
        INIT_LIST_HEAD(&xprt->bc_pa_list);