SUNRPC: Simplify dealing with aborted partially transmitted messages
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Fri, 31 Aug 2018 14:00:02 +0000 (10:00 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sun, 30 Sep 2018 19:35:14 +0000 (15:35 -0400)
If the previous message was only partially transmitted, we need to close
the socket in order to avoid corruption of the message stream. To do so,
we currently hijack the unlocking of the socket in order to schedule
the close.
Now that we track the message offset in the socket state, we can move
that kind of checking out of the socket lock code, which is needed to
allow messages to remain queued after dropping the socket lock.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
net/sunrpc/xprtsock.c

index 629cc45e1e6cf34b8fcc33c539bf90e8ff053293..3fbccebd0b109155702120687f94fe43c84f2ab2 100644 (file)
@@ -491,6 +491,16 @@ static int xs_nospace(struct rpc_task *task)
        return ret;
 }
 
+/*
+ * Determine if the previous message in the stream was aborted before it
+ * could complete transmission.
+ */
+static bool
+xs_send_request_was_aborted(struct sock_xprt *transport, struct rpc_rqst *req)
+{
+       return transport->xmit.offset != 0 && req->rq_bytes_sent == 0;
+}
+
 /*
  * Construct a stream transport record marker in @buf.
  */
@@ -522,6 +532,12 @@ static int xs_local_send_request(struct rpc_task *task)
        int status;
        int sent = 0;
 
+       /* Close the stream if the previous transmission was incomplete */
+       if (xs_send_request_was_aborted(transport, req)) {
+               xs_close(xprt);
+               return -ENOTCONN;
+       }
+
        xs_encode_stream_record_marker(&req->rq_snd_buf);
 
        xs_pktdump("packet data:",
@@ -665,6 +681,13 @@ static int xs_tcp_send_request(struct rpc_task *task)
        int status;
        int sent;
 
+       /* Close the stream if the previous transmission was incomplete */
+       if (xs_send_request_was_aborted(transport, req)) {
+               if (transport->sock != NULL)
+                       kernel_sock_shutdown(transport->sock, SHUT_RDWR);
+               return -ENOTCONN;
+       }
+
        xs_encode_stream_record_marker(&req->rq_snd_buf);
 
        xs_pktdump("packet data:",
@@ -755,30 +778,6 @@ static int xs_tcp_send_request(struct rpc_task *task)
        return status;
 }
 
-/**
- * xs_tcp_release_xprt - clean up after a tcp transmission
- * @xprt: transport
- * @task: rpc task
- *
- * This cleans up if an error causes us to abort the transmission of a request.
- * In this case, the socket may need to be reset in order to avoid confusing
- * the server.
- */
-static void xs_tcp_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
-{
-       struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-
-       if (task != xprt->snd_task)
-               return;
-       if (task == NULL)
-               goto out_release;
-       if (transport->xmit.offset == 0 || !xprt_connected(xprt))
-               goto out_release;
-       set_bit(XPRT_CLOSE_WAIT, &xprt->state);
-out_release:
-       xprt_release_xprt(xprt, task);
-}
-
 static void xs_save_old_callbacks(struct sock_xprt *transport, struct sock *sk)
 {
        transport->old_data_ready = sk->sk_data_ready;
@@ -2764,7 +2763,7 @@ static void bc_destroy(struct rpc_xprt *xprt)
 
 static const struct rpc_xprt_ops xs_local_ops = {
        .reserve_xprt           = xprt_reserve_xprt,
-       .release_xprt           = xs_tcp_release_xprt,
+       .release_xprt           = xprt_release_xprt,
        .alloc_slot             = xprt_alloc_slot,
        .free_slot              = xprt_free_slot,
        .rpcbind                = xs_local_rpcbind,
@@ -2806,7 +2805,7 @@ static const struct rpc_xprt_ops xs_udp_ops = {
 
 static const struct rpc_xprt_ops xs_tcp_ops = {
        .reserve_xprt           = xprt_reserve_xprt,
-       .release_xprt           = xs_tcp_release_xprt,
+       .release_xprt           = xprt_release_xprt,
        .alloc_slot             = xprt_lock_and_alloc_slot,
        .free_slot              = xprt_free_slot,
        .rpcbind                = rpcb_getport_async,