#define RPC_TASK_RUNNING 0
#define RPC_TASK_QUEUED 1
#define RPC_TASK_ACTIVE 2
-#define RPC_TASK_MSG_RECV 3
-#define RPC_TASK_MSG_RECV_WAIT 4
+#define RPC_TASK_NEED_XMIT 3
+#define RPC_TASK_NEED_RECV 4
+#define RPC_TASK_MSG_RECV 5
+#define RPC_TASK_MSG_RECV_WAIT 6
#define RPC_IS_RUNNING(t) test_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
#define rpc_set_running(t) set_bit(RPC_TASK_RUNNING, &(t)->tk_runstate)
*/
xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
xbufp->tail[0].iov_len;
+ set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_action = call_bc_transmit;
atomic_inc(&task->tk_count);
rpc_exit(task, -ERESTARTSYS);
}
-static inline int
+static int
rpc_task_need_encode(struct rpc_task *task)
{
- return task->tk_rqstp->rq_snd_buf.len == 0;
-}
-
-static inline void
-rpc_task_force_reencode(struct rpc_task *task)
-{
- task->tk_rqstp->rq_snd_buf.len = 0;
- task->tk_rqstp->rq_bytes_sent = 0;
+ return test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate) == 0;
}
/*
task->tk_status = rpcauth_wrap_req(task, encode, req, p,
task->tk_msg.rpc_argp);
+ if (task->tk_status == 0)
+ set_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
}
/*
*/
if (task->tk_status == 0) {
xprt_end_transmit(task);
- rpc_task_force_reencode(task);
return;
}
default:
dprint_status(task);
xprt_end_transmit(task);
- rpc_task_force_reencode(task);
break;
/*
* Special cases: if we've been waiting on the
case -EADDRINUSE:
case -ENOTCONN:
case -EPIPE:
- rpc_task_force_reencode(task);
+ break;
}
}
rpc_exit(task, status);
break;
case -EBADMSG:
+ clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_action = call_transmit;
break;
default:
/* req->rq_reply_bytes_recvd */
smp_wmb();
req->rq_reply_bytes_recvd = copied;
+ clear_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
rpc_wake_up_queued_task(&xprt->pending, task);
}
EXPORT_SYMBOL_GPL(xprt_complete_rqst);
+static bool
+xprt_request_data_received(struct rpc_task *task)
+{
+ return !test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate) &&
+ task->tk_rqstp->rq_reply_bytes_recvd != 0;
+}
+
static void xprt_timer(struct rpc_task *task)
{
struct rpc_rqst *req = task->tk_rqstp;
/* Add request to the receive list */
spin_lock(&xprt->recv_lock);
list_add_tail(&req->rq_list, &xprt->recv);
+ set_bit(RPC_TASK_NEED_RECV, &task->tk_runstate);
spin_unlock(&xprt->recv_lock);
xprt_reset_majortimeo(req);
/* Turn off autodisconnect */
del_singleshot_timer_sync(&xprt->timer);
}
- } else if (!req->rq_bytes_sent)
+ } else if (xprt_request_data_received(task) && !req->rq_bytes_sent)
return;
connect_cookie = xprt->connect_cookie;
task->tk_status = status;
return;
}
+
xprt_inject_disconnect(xprt);
dprintk("RPC: %5u xmit complete\n", task->tk_pid);
+ clear_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate);
task->tk_flags |= RPC_TASK_SENT;
spin_lock_bh(&xprt->transport_lock);
spin_unlock_bh(&xprt->transport_lock);
req->rq_connect_cookie = connect_cookie;
- if (rpc_reply_expected(task) && !READ_ONCE(req->rq_reply_bytes_recvd)) {
+ if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
/*
* Sleep on the pending queue if we're expecting a reply.
* The spinlock ensures atomicity between the test of
* req->rq_reply_bytes_recvd, and the call to rpc_sleep_on().
*/
spin_lock(&xprt->recv_lock);
- if (!req->rq_reply_bytes_recvd) {
+ if (test_bit(RPC_TASK_NEED_RECV, &task->tk_runstate)) {
rpc_sleep_on(&xprt->pending, task, xprt_timer);
/*
* Send an extra queue wakeup call if the