* verb (fmr_op_unmap).
*/
-/* Transport recovery
- *
- * After a transport reconnect, fmr_op_map re-uses the MR already
- * allocated for the RPC, but generates a fresh rkey then maps the
- * MR again. This process is synchronous.
- */
-
#include "xprt_rdma.h"
#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
IB_ACCESS_REMOTE_READ,
};
-static struct workqueue_struct *fmr_recovery_wq;
-
-#define FMR_RECOVERY_WQ_FLAGS (WQ_UNBOUND)
-
-int
-fmr_alloc_recovery_wq(void)
-{
- fmr_recovery_wq = alloc_workqueue("fmr_recovery", WQ_UNBOUND, 0);
- return !fmr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-fmr_destroy_recovery_wq(void)
-{
- struct workqueue_struct *wq;
-
- if (!fmr_recovery_wq)
- return;
-
- wq = fmr_recovery_wq;
- fmr_recovery_wq = NULL;
- destroy_workqueue(wq);
-}
-
static int
__fmr_init(struct rpcrdma_mw *mw, struct ib_pd *pd)
{
return rc;
}
-static void
-__fmr_dma_unmap(struct rpcrdma_mw *mw)
-{
- struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
-
- ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
- mw->mw_sg, mw->mw_nents, mw->mw_dir);
- rpcrdma_put_mw(r_xprt, mw);
-}
-
-static void
-__fmr_reset_and_unmap(struct rpcrdma_mw *mw)
-{
- int rc;
-
- /* ORDER */
- rc = __fmr_unmap(mw);
- if (rc) {
- pr_warn("rpcrdma: ib_unmap_fmr status %d, fmr %p orphaned\n",
- rc, mw);
- return;
- }
- __fmr_dma_unmap(mw);
-}
-
static void
__fmr_release(struct rpcrdma_mw *r)
{
+ LIST_HEAD(unmap_list);
int rc;
kfree(r->fmr.fm_physaddrs);
kfree(r->mw_sg);
+ /* In case this one was left mapped, try to unmap it
+ * to prevent dealloc_fmr from failing with EBUSY
+ */
+ rc = __fmr_unmap(r);
+ if (rc)
+ pr_err("rpcrdma: final ib_unmap_fmr for %p failed %i\n",
+ r, rc);
+
rc = ib_dealloc_fmr(r->fmr.fm_mr);
if (rc)
pr_err("rpcrdma: final ib_dealloc_fmr for %p returned %i\n",
r, rc);
}
-/* Deferred reset of a single FMR. Generate a fresh rkey by
- * replacing the MR. There's no recovery if this fails.
+/* Reset of a single FMR.
+ *
+ * There's no recovery if this fails. The FMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
*/
static void
-__fmr_recovery_worker(struct work_struct *work)
+fmr_op_recover_mr(struct rpcrdma_mw *mw)
{
- struct rpcrdma_mw *mw = container_of(work, struct rpcrdma_mw,
- mw_work);
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+ int rc;
- __fmr_reset_and_unmap(mw);
- return;
-}
+ /* ORDER: invalidate first */
+ rc = __fmr_unmap(mw);
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
-static void
-__fmr_queue_recovery(struct rpcrdma_mw *mw)
-{
- INIT_WORK(&mw->mw_work, __fmr_recovery_worker);
- queue_work(fmr_recovery_wq, &mw->mw_work);
+ /* ORDER: then DMA unmap */
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
+ if (rc) {
+ pr_err("rpcrdma: FMR reset status %d, %p orphaned\n",
+ rc, mw);
+ r_xprt->rx_stats.mrs_orphaned++;
+ return;
+ }
+
+ rpcrdma_put_mw(r_xprt, mw);
+ r_xprt->rx_stats.mrs_recovered++;
}
static int
mw = seg1->rl_mw;
seg1->rl_mw = NULL;
- if (!mw) {
- mw = rpcrdma_get_mw(r_xprt);
- if (!mw)
- return -ENOMEM;
- } else {
- /* this is a retransmit; generate a fresh rkey */
- rc = __fmr_unmap(mw);
- if (rc)
- return rc;
- }
+ if (mw)
+ rpcrdma_defer_mr_recovery(mw);
+ mw = rpcrdma_get_mw(r_xprt);
+ if (!mw)
+ return -ENOMEM;
pageoff = offset_in_page(seg1->mr_offset);
seg1->mr_offset -= pageoff; /* start of page */
pr_err("rpcrdma: ib_map_phys_fmr %u@0x%llx+%i (%d) status %i\n",
len, (unsigned long long)dma_pages[0],
pageoff, mw->mw_nents, rc);
- __fmr_dma_unmap(mw);
+ rpcrdma_defer_mr_recovery(mw);
return rc;
}
/* ORDER: Invalidate all of the req's MRs first
*
* ib_unmap_fmr() is slow, so use a single call instead
- * of one call per mapped MR.
+ * of one call per mapped FMR.
*/
for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
seg = &req->rl_segments[i];
}
rc = ib_unmap_fmr(&unmap_list);
if (rc)
- pr_warn("%s: ib_unmap_fmr failed (%i)\n", __func__, rc);
+ goto out_reset;
/* ORDER: Now DMA unmap all of the req's MRs, and return
* them to the free MW list.
mw = seg->rl_mw;
list_del_init(&mw->fmr.fm_mr->list);
- __fmr_dma_unmap(mw);
+ ib_dma_unmap_sg(r_xprt->rx_ia.ri_device,
+ mw->mw_sg, mw->mw_nents, mw->mw_dir);
+ rpcrdma_put_mw(r_xprt, mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
}
req->rl_nchunks = 0;
+ return;
+
+out_reset:
+ pr_err("rpcrdma: ib_unmap_fmr failed (%i)\n", rc);
+
+ for (i = 0, nchunks = req->rl_nchunks; nchunks; nchunks--) {
+ seg = &req->rl_segments[i];
+ mw = seg->rl_mw;
+
+ list_del_init(&mw->fmr.fm_mr->list);
+ fmr_op_recover_mr(mw);
+
+ i += seg->mr_nsegs;
+ }
}
/* Use a slow, safe mechanism to invalidate all memory regions
mw = seg->rl_mw;
if (sync)
- __fmr_reset_and_unmap(mw);
+ fmr_op_recover_mr(mw);
else
- __fmr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
.ro_map = fmr_op_map,
.ro_unmap_sync = fmr_op_unmap_sync,
.ro_unmap_safe = fmr_op_unmap_safe,
+ .ro_recover_mr = fmr_op_recover_mr,
.ro_open = fmr_op_open,
.ro_maxpages = fmr_op_maxpages,
.ro_init = fmr_op_init,
# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
-static struct workqueue_struct *frwr_recovery_wq;
-
-#define FRWR_RECOVERY_WQ_FLAGS (WQ_UNBOUND | WQ_MEM_RECLAIM)
-
-int
-frwr_alloc_recovery_wq(void)
-{
- frwr_recovery_wq = alloc_workqueue("frwr_recovery",
- FRWR_RECOVERY_WQ_FLAGS, 0);
- return !frwr_recovery_wq ? -ENOMEM : 0;
-}
-
-void
-frwr_destroy_recovery_wq(void)
-{
- struct workqueue_struct *wq;
-
- if (!frwr_recovery_wq)
- return;
-
- wq = frwr_recovery_wq;
- frwr_recovery_wq = NULL;
- destroy_workqueue(wq);
-}
-
static int
__frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, unsigned int depth)
{
return 0;
}
+/* Reset of a single FRMR. Generate a fresh rkey by replacing the MR.
+ *
+ * There's no recovery if this fails. The FRMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
static void
-__frwr_reset_and_unmap(struct rpcrdma_mw *mw)
+frwr_op_recover_mr(struct rpcrdma_mw *mw)
{
struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
struct rpcrdma_ia *ia = &r_xprt->rx_ia;
rc = __frwr_reset_mr(ia, mw);
ib_dma_unmap_sg(ia->ri_device, mw->mw_sg, mw->mw_nents, mw->mw_dir);
- if (rc)
+ if (rc) {
+ pr_err("rpcrdma: FRMR reset status %d, %p orphaned\n",
+ rc, mw);
+ r_xprt->rx_stats.mrs_orphaned++;
return;
- rpcrdma_put_mw(r_xprt, mw);
-}
-
-/* Deferred reset of a single FRMR. Generate a fresh rkey by
- * replacing the MR.
- *
- * There's no recovery if this fails. The FRMR is abandoned, but
- * remains in rb_all. It will be cleaned up when the transport is
- * destroyed.
- */
-static void
-__frwr_recovery_worker(struct work_struct *work)
-{
- struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
- mw_work);
-
- __frwr_reset_and_unmap(r);
-}
+ }
-/* A broken MR was discovered in a context that can't sleep.
- * Defer recovery to the recovery worker.
- */
-static void
-__frwr_queue_recovery(struct rpcrdma_mw *r)
-{
- INIT_WORK(&r->mw_work, __frwr_recovery_worker);
- queue_work(frwr_recovery_wq, &r->mw_work);
+ rpcrdma_put_mw(r_xprt, mw);
+ r_xprt->rx_stats.mrs_recovered++;
}
static int
seg1->rl_mw = NULL;
do {
if (mw)
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
mw = rpcrdma_get_mw(r_xprt);
if (!mw)
return -ENOMEM;
pr_err("rpcrdma: failed to map mr %p (%u/%u)\n",
frmr->fr_mr, n, mw->mw_nents);
rc = n < 0 ? n : -EIO;
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
return rc;
out_senderr:
- pr_err("rpcrdma: ib_post_send status %i\n", rc);
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
return rc;
}
mw = seg->rl_mw;
if (sync)
- __frwr_reset_and_unmap(mw);
+ frwr_op_recover_mr(mw);
else
- __frwr_queue_recovery(mw);
+ rpcrdma_defer_mr_recovery(mw);
i += seg->mr_nsegs;
seg->mr_nsegs = 0;
{
struct rpcrdma_mw *r;
- /* Ensure stale MWs for "buf" are no longer in flight */
- flush_workqueue(frwr_recovery_wq);
-
while (!list_empty(&buf->rb_all)) {
r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
list_del(&r->mw_all);
.ro_map = frwr_op_map,
.ro_unmap_sync = frwr_op_unmap_sync,
.ro_unmap_safe = frwr_op_unmap_safe,
+ .ro_recover_mr = frwr_op_recover_mr,
.ro_open = frwr_op_open,
.ro_maxpages = frwr_op_maxpages,
.ro_init = frwr_op_init,
xprt->stat.bad_xids,
xprt->stat.req_u,
xprt->stat.bklog_u);
- seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu\n",
+ seq_printf(seq, "%lu %lu %lu %llu %llu %llu %llu %lu %lu %lu %lu ",
r_xprt->rx_stats.read_chunk_count,
r_xprt->rx_stats.write_chunk_count,
r_xprt->rx_stats.reply_chunk_count,
r_xprt->rx_stats.failed_marshal_count,
r_xprt->rx_stats.bad_reply_count,
r_xprt->rx_stats.nomsg_call_count);
+ seq_printf(seq, "%lu %lu\n",
+ r_xprt->rx_stats.mrs_recovered,
+ r_xprt->rx_stats.mrs_orphaned);
}
static int
__func__, rc);
rpcrdma_destroy_wq();
- frwr_destroy_recovery_wq();
rc = xprt_unregister_transport(&xprt_rdma_bc);
if (rc)
{
int rc;
- rc = frwr_alloc_recovery_wq();
- if (rc)
- return rc;
-
rc = rpcrdma_alloc_wq();
- if (rc) {
- frwr_destroy_recovery_wq();
+ if (rc)
return rc;
- }
rc = xprt_register_transport(&xprt_rdma);
if (rc) {
rpcrdma_destroy_wq();
- frwr_destroy_recovery_wq();
return rc;
}
if (rc) {
xprt_unregister_transport(&xprt_rdma);
rpcrdma_destroy_wq();
- frwr_destroy_recovery_wq();
return rc;
}
ib_drain_qp(ia->ri_id->qp);
}
+static void
+rpcrdma_mr_recovery_worker(struct work_struct *work)
+{
+ struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
+ rb_recovery_worker.work);
+ struct rpcrdma_mw *mw;
+
+ spin_lock(&buf->rb_recovery_lock);
+ while (!list_empty(&buf->rb_stale_mrs)) {
+ mw = list_first_entry(&buf->rb_stale_mrs,
+ struct rpcrdma_mw, mw_list);
+ list_del_init(&mw->mw_list);
+ spin_unlock(&buf->rb_recovery_lock);
+
+ dprintk("RPC: %s: recovering MR %p\n", __func__, mw);
+ mw->mw_xprt->rx_ia.ri_ops->ro_recover_mr(mw);
+
+ spin_lock(&buf->rb_recovery_lock);
+ };
+ spin_unlock(&buf->rb_recovery_lock);
+}
+
+void
+rpcrdma_defer_mr_recovery(struct rpcrdma_mw *mw)
+{
+ struct rpcrdma_xprt *r_xprt = mw->mw_xprt;
+ struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+
+ spin_lock(&buf->rb_recovery_lock);
+ list_add(&mw->mw_list, &buf->rb_stale_mrs);
+ spin_unlock(&buf->rb_recovery_lock);
+
+ schedule_delayed_work(&buf->rb_recovery_worker, 0);
+}
+
struct rpcrdma_req *
rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
{
buf->rb_max_requests = r_xprt->rx_data.max_requests;
buf->rb_bc_srv_max_requests = 0;
- spin_lock_init(&buf->rb_lock);
atomic_set(&buf->rb_credits, 1);
+ spin_lock_init(&buf->rb_lock);
+ spin_lock_init(&buf->rb_recovery_lock);
+ INIT_LIST_HEAD(&buf->rb_stale_mrs);
+ INIT_DELAYED_WORK(&buf->rb_recovery_worker,
+ rpcrdma_mr_recovery_worker);
rc = ia->ri_ops->ro_init(r_xprt);
if (rc)
{
struct rpcrdma_ia *ia = rdmab_to_ia(buf);
+ cancel_delayed_work_sync(&buf->rb_recovery_worker);
+
while (!list_empty(&buf->rb_recv_bufs)) {
struct rpcrdma_rep *rep;
struct rpcrdma_fmr fmr;
struct rpcrdma_frmr frmr;
};
- struct work_struct mw_work;
struct rpcrdma_xprt *mw_xprt;
struct list_head mw_all;
};
struct list_head rb_allreqs;
u32 rb_bc_max_requests;
+
+ spinlock_t rb_recovery_lock; /* protect rb_stale_mrs */
+ struct list_head rb_stale_mrs;
+ struct delayed_work rb_recovery_worker;
};
#define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
unsigned long bad_reply_count;
unsigned long nomsg_call_count;
unsigned long bcall_count;
+ unsigned long mrs_recovered;
+ unsigned long mrs_orphaned;
};
/*
struct rpcrdma_req *);
void (*ro_unmap_safe)(struct rpcrdma_xprt *,
struct rpcrdma_req *, bool);
+ void (*ro_recover_mr)(struct rpcrdma_mw *);
int (*ro_open)(struct rpcrdma_ia *,
struct rpcrdma_ep *,
struct rpcrdma_create_data_internal *);
void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
+void rpcrdma_defer_mr_recovery(struct rpcrdma_mw *);
+
struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
size_t, gfp_t);
void rpcrdma_free_regbuf(struct rpcrdma_ia *,
int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
-int frwr_alloc_recovery_wq(void);
-void frwr_destroy_recovery_wq(void);
-
int rpcrdma_alloc_wq(void);
void rpcrdma_destroy_wq(void);