xprtrdma: Allocate and map transport header buffers at connect time
authorChuck Lever <chuck.lever@oracle.com>
Fri, 3 Jan 2020 16:56:53 +0000 (11:56 -0500)
committerAnna Schumaker <Anna.Schumaker@Netapp.com>
Wed, 15 Jan 2020 15:54:32 +0000 (10:54 -0500)
Currently the underlying RDMA device is chosen at transport set-up
time. But it will soon be at connect time instead.

The maximum size of a transport header is based on device
capabilities. Thus transport header buffers have to be allocated
_after_ the underlying device has been chosen (via address and route
resolution); ie, in the connect worker.

Thus, move the allocation of transport header buffers to the connect
worker, after the point at which the underlying RDMA device has been
chosen.

This also means the RDMA device is available to do a DMA mapping of
these buffers at connect time, instead of in the hot I/O path. Make
that optimization as well.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
net/sunrpc/xprtrdma/backchannel.c
net/sunrpc/xprtrdma/rpc_rdma.c
net/sunrpc/xprtrdma/verbs.c
net/sunrpc/xprtrdma/xprt_rdma.h

index 9d02eae353c678ea1f9be772bbc27b54ab6f0b89..1a0ae0c61353c1786fdedbcb5afcca680d0269aa 100644 (file)
@@ -194,6 +194,10 @@ create_req:
        req = rpcrdma_req_create(r_xprt, size, GFP_KERNEL);
        if (!req)
                return NULL;
+       if (rpcrdma_req_setup(r_xprt, req)) {
+               rpcrdma_req_destroy(req);
+               return NULL;
+       }
 
        xprt->bc_alloc_count++;
        rqst = &req->rl_slot;
index c6dcea06c754f42f09f957f7aecfc14ebb7f709e..28020ec104d491d30bfc5879ec0e8ad5c0d27c63 100644 (file)
@@ -580,22 +580,19 @@ void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
  */
-static bool rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
+static void rpcrdma_prepare_hdr_sge(struct rpcrdma_xprt *r_xprt,
                                    struct rpcrdma_req *req, u32 len)
 {
        struct rpcrdma_sendctx *sc = req->rl_sendctx;
        struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
        struct ib_sge *sge = &sc->sc_sges[req->rl_wr.num_sge++];
 
-       if (!rpcrdma_regbuf_dma_map(r_xprt, rb))
-               return false;
        sge->addr = rdmab_addr(rb);
        sge->length = len;
        sge->lkey = rdmab_lkey(rb);
 
        ib_dma_sync_single_for_device(rdmab_device(rb), sge->addr, sge->length,
                                      DMA_TO_DEVICE);
-       return true;
 }
 
 /* The head iovec is straightforward, as it is usually already
@@ -836,10 +833,9 @@ inline int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
        req->rl_wr.num_sge = 0;
        req->rl_wr.opcode = IB_WR_SEND;
 
-       ret = -EIO;
-       if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
-               goto out_unmap;
+       rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen);
 
+       ret = -EIO;
        switch (rtype) {
        case rpcrdma_noch_pullup:
                if (!rpcrdma_prepare_noch_pullup(r_xprt, req, xdr))
index dcb2073ec5bde257840fe484b8a8a55d71afc861..90c215beef066eb5364628d5b6aff4edc2bbcb8a 100644 (file)
@@ -78,6 +78,7 @@ static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
                                       struct rpcrdma_sendctx *sc);
+static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
 static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
@@ -381,6 +382,8 @@ out_err:
  *
  * Divest transport H/W resources associated with this adapter,
  * but allow it to be restored later.
+ *
+ * Caller must hold the transport send lock.
  */
 void
 rpcrdma_ia_remove(struct rpcrdma_ia *ia)
@@ -388,8 +391,6 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
        struct rpcrdma_xprt *r_xprt = container_of(ia, struct rpcrdma_xprt,
                                                   rx_ia);
        struct rpcrdma_ep *ep = &r_xprt->rx_ep;
-       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-       struct rpcrdma_req *req;
 
        /* This is similar to rpcrdma_ep_destroy, but:
         * - Don't cancel the connect worker.
@@ -412,11 +413,7 @@ rpcrdma_ia_remove(struct rpcrdma_ia *ia)
         * mappings and MRs are gone.
         */
        rpcrdma_reps_unmap(r_xprt);
-       list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
-               rpcrdma_regbuf_dma_unmap(req->rl_rdmabuf);
-               rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
-               rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
-       }
+       rpcrdma_reqs_reset(r_xprt);
        rpcrdma_mrs_destroy(r_xprt);
        rpcrdma_sendctxs_destroy(r_xprt);
        ib_dealloc_pd(ia->ri_pd);
@@ -715,6 +712,11 @@ retry:
                goto out;
        }
 
+       rc = rpcrdma_reqs_setup(r_xprt);
+       if (rc) {
+               rpcrdma_ep_disconnect(ep, ia);
+               goto out;
+       }
        rpcrdma_mrs_create(r_xprt);
 
 out:
@@ -996,32 +998,19 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
                                       gfp_t flags)
 {
        struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
-       struct rpcrdma_regbuf *rb;
        struct rpcrdma_req *req;
-       size_t maxhdrsize;
 
        req = kzalloc(sizeof(*req), flags);
        if (req == NULL)
                goto out1;
 
-       /* Compute maximum header buffer size in bytes */
-       maxhdrsize = rpcrdma_fixed_maxsz + 3 +
-                    r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz;
-       maxhdrsize *= sizeof(__be32);
-       rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
-                                 DMA_TO_DEVICE, flags);
-       if (!rb)
-               goto out2;
-       req->rl_rdmabuf = rb;
-       xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
-
        req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE, flags);
        if (!req->rl_sendbuf)
-               goto out3;
+               goto out2;
 
        req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE, flags);
        if (!req->rl_recvbuf)
-               goto out4;
+               goto out3;
 
        INIT_LIST_HEAD(&req->rl_free_mrs);
        INIT_LIST_HEAD(&req->rl_registered);
@@ -1030,10 +1019,8 @@ struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
        spin_unlock(&buffer->rb_lock);
        return req;
 
-out4:
-       kfree(req->rl_sendbuf);
 out3:
-       kfree(req->rl_rdmabuf);
+       kfree(req->rl_sendbuf);
 out2:
        kfree(req);
 out1:
@@ -1041,23 +1028,82 @@ out1:
 }
 
 /**
- * rpcrdma_reqs_reset - Reset all reqs owned by a transport
+ * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
  * @r_xprt: controlling transport instance
+ * @req: rpcrdma_req object to set up
  *
- * ASSUMPTION: the rb_allreqs list is stable for the duration,
+ * Returns zero on success, and a negative errno on failure.
+ */
+int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
+{
+       struct rpcrdma_regbuf *rb;
+       size_t maxhdrsize;
+
+       /* Compute maximum header buffer size in bytes */
+       maxhdrsize = rpcrdma_fixed_maxsz + 3 +
+                    r_xprt->rx_ia.ri_max_rdma_segs * rpcrdma_readchunk_maxsz;
+       maxhdrsize *= sizeof(__be32);
+       rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
+                                 DMA_TO_DEVICE, GFP_KERNEL);
+       if (!rb)
+               goto out;
+
+       if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
+               goto out_free;
+
+       req->rl_rdmabuf = rb;
+       xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
+       return 0;
+
+out_free:
+       rpcrdma_regbuf_free(rb);
+out:
+       return -ENOMEM;
+}
+
+/* ASSUMPTION: the rb_allreqs list is stable for the duration,
  * and thus can be walked without holding rb_lock. Eg. the
  * caller is holding the transport send lock to exclude
  * device removal or disconnection.
  */
-static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
+static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
 {
        struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
        struct rpcrdma_req *req;
+       int rc;
 
        list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
-               /* Credits are valid only for one connection */
-               req->rl_slot.rq_cong = 0;
+               rc = rpcrdma_req_setup(r_xprt, req);
+               if (rc)
+                       return rc;
        }
+       return 0;
+}
+
+static void rpcrdma_req_reset(struct rpcrdma_req *req)
+{
+       /* Credits are valid for only one connection */
+       req->rl_slot.rq_cong = 0;
+
+       rpcrdma_regbuf_free(req->rl_rdmabuf);
+       req->rl_rdmabuf = NULL;
+
+       rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
+       rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
+}
+
+/* ASSUMPTION: the rb_allreqs list is stable for the duration,
+ * and thus can be walked without holding rb_lock. Eg. the
+ * caller is holding the transport send lock to exclude
+ * device removal or disconnection.
+ */
+static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
+{
+       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+       struct rpcrdma_req *req;
+
+       list_for_each_entry(req, &buf->rb_allreqs, rl_all)
+               rpcrdma_req_reset(req);
 }
 
 static struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt,
index 0aed1e98f2bf42347231c245e6d4f7fef3efe681..37d5080c250b87b775e29a56d4c919ac3588175c 100644 (file)
@@ -478,6 +478,7 @@ void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, bool temp);
  */
 struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt, size_t size,
                                       gfp_t flags);
+int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req);
 void rpcrdma_req_destroy(struct rpcrdma_req *req);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);