staging: lustre: Dynamic LNet Configuration (DLC) dynamic routing
authorAmir Shehata <amir.shehata@intel.com>
Mon, 15 Feb 2016 15:25:53 +0000 (10:25 -0500)
committerGreg Kroah-Hartman <gregkh@linuxfoundation.org>
Sat, 20 Feb 2016 22:29:23 +0000 (14:29 -0800)
This is the second patch of a set of patches that enables DLC.

This patch adds the following features to LNET.  Currently these
features are not driven by user space.
- Enabling Routing on Demand.  The default number of router
  buffers are allocated.
- Disable Routing on demand. Unused router buffers are freed and
  used router buffers are freed when they are no longer in use.
  The following time routing is enabled the default router buffer
  values are used.  It has been decided that remembering the
  user set router buffer values should be remembered and re-set
  by user space scripts.
- Increase the number of router buffers on demand, by allocating
  new ones.
- Decrease the number of router buffers.  Exccess buffers are freed
  if they are not in use.  Otherwise they are freed once they are
  no longer in use.

Signed-off-by: Amir Shehata <amir.shehata@intel.com>
Intel-bug-id: https://jira.hpdd.intel.com/browse/LU-2456
Change-Id: Id07d4ad424d8f5ba72475d4149380afe2ac54e77
Reviewed-on: http://review.whamcloud.com/9831
Reviewed-by: James Simmons <uja.ornl@gmail.com>
Reviewed-by: Doug Oucharek <doug.s.oucharek@intel.com>
Reviewed-by: Liang Zhen <liang.zhen@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
Signed-off-by: Greg Kroah-Hartman <gregkh@linuxfoundation.org>
drivers/staging/lustre/include/linux/lnet/lib-lnet.h
drivers/staging/lustre/include/linux/lnet/lib-types.h
drivers/staging/lustre/lnet/lnet/api-ni.c
drivers/staging/lustre/lnet/lnet/lib-move.c
drivers/staging/lustre/lnet/lnet/router.c

index 77d8e37c7acaa5cd8c89388bf5d05198d7aee312..3a1cf61018abc1d4ce6494dccb01ac277615f27b 100644 (file)
@@ -461,7 +461,11 @@ int lnet_get_route(int idx, __u32 *net, __u32 *hops,
 void lnet_router_debugfs_init(void);
 void lnet_router_debugfs_fini(void);
 int  lnet_rtrpools_alloc(int im_a_router);
-void lnet_rtrpools_free(void);
+void lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages);
+int lnet_rtrpools_adjust(int tiny, int small, int large);
+int lnet_rtrpools_enable(void);
+void lnet_rtrpools_disable(void);
+void lnet_rtrpools_free(int keep_pools);
 lnet_remotenet_t *lnet_find_net_locked(__u32 net);
 
 int lnet_islocalnid(lnet_nid_t nid);
@@ -481,6 +485,8 @@ void lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
 int lnet_send(lnet_nid_t nid, lnet_msg_t *msg, lnet_nid_t rtr_nid);
 void lnet_return_tx_credits_locked(lnet_msg_t *msg);
 void lnet_return_rx_credits_locked(lnet_msg_t *msg);
+void lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp);
+void lnet_drop_routed_msgs_locked(struct list_head *list, int cpt);
 
 /* portals functions */
 /* portals attributes */
index be650d4ba5f29a4c611b3a328d52166eba99cc01..b0ba9d863dcd63ea72470f5e2fff4d9fe342c54c 100644 (file)
@@ -285,6 +285,7 @@ typedef struct lnet_ni {
 #define LNET_PING_FEAT_INVAL           (0)             /* no feature */
 #define LNET_PING_FEAT_BASE            (1 << 0)        /* just a ping */
 #define LNET_PING_FEAT_NI_STATUS       (1 << 1)        /* return NI status */
+#define LNET_PING_FEAT_RTE_DISABLED    (1 << 2)        /* Routing enabled */
 
 #define LNET_PING_FEAT_MASK            (LNET_PING_FEAT_BASE | \
                                         LNET_PING_FEAT_NI_STATUS)
@@ -410,7 +411,12 @@ typedef struct {
 
 #define LNET_PEER_HASHSIZE     503     /* prime! */
 
-#define LNET_NRBPOOLS          3       /* # different router buffer pools */
+#define LNET_TINY_BUF_IDX      0
+#define LNET_SMALL_BUF_IDX     1
+#define LNET_LARGE_BUF_IDX     2
+
+/* # different router buffer pools */
+#define LNET_NRBPOOLS          (LNET_LARGE_BUF_IDX + 1)
 
 enum {
        /* Didn't match anything */
index cd68ca73e309893efe1c91a9ee766a8d63b6a834..06046b216c93380552d6026de0725717bb69dc58 100644 (file)
@@ -638,7 +638,7 @@ lnet_unprepare(void)
 
        lnet_msg_containers_destroy();
        lnet_peer_tables_destroy();
-       lnet_rtrpools_free();
+       lnet_rtrpools_free(0);
 
        if (the_lnet.ln_counters) {
                cfs_percpt_free(the_lnet.ln_counters);
@@ -1501,6 +1501,8 @@ lnet_create_ping_info(void)
        pinfo->pi_pid     = the_lnet.ln_pid;
        pinfo->pi_magic   = LNET_PROTO_PING_MAGIC;
        pinfo->pi_features = LNET_PING_FEAT_NI_STATUS;
+       if (!the_lnet.ln_routing)
+               pinfo->pi_features |= LNET_PING_FEAT_RTE_DISABLED;
 
        for (i = 0; i < n; i++) {
                lnet_ni_status_t *ns = &pinfo->pi_ni[i];
index cc8c2c5583896a240c194ad7699a0d0cdee03cad..f2b1116e9dd9bfdbe0a93a201ed99e81c733cd32 100644 (file)
@@ -945,9 +945,6 @@ lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
        rbp = lnet_msg2bufpool(msg);
 
        if (!msg->msg_rtrcredit) {
-               LASSERT((rbp->rbp_credits < 0) ==
-                        !list_empty(&rbp->rbp_msgs));
-
                msg->msg_rtrcredit = 1;
                rbp->rbp_credits--;
                if (rbp->rbp_credits < rbp->rbp_mincredits)
@@ -1038,6 +1035,43 @@ lnet_return_tx_credits_locked(lnet_msg_t *msg)
        }
 }
 
+void
+lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
+{
+       lnet_msg_t *msg;
+
+       if (list_empty(&rbp->rbp_msgs))
+               return;
+       msg = list_entry(rbp->rbp_msgs.next,
+                        lnet_msg_t, msg_list);
+       list_del(&msg->msg_list);
+
+       (void)lnet_post_routed_recv_locked(msg, 1);
+}
+
+void
+lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
+{
+       struct list_head drop;
+       lnet_msg_t *msg;
+       lnet_msg_t *tmp;
+
+       INIT_LIST_HEAD(&drop);
+
+       list_splice_init(list, &drop);
+
+       lnet_net_unlock(cpt);
+
+       list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
+               lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
+                            0, 0, 0, msg->msg_hdr.payload_length);
+               list_del_init(&msg->msg_list);
+               lnet_finalize(NULL, msg, -ECANCELED);
+       }
+
+       lnet_net_lock(cpt);
+}
+
 void
 lnet_return_rx_credits_locked(lnet_msg_t *msg)
 {
@@ -1058,27 +1092,41 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
 
                rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
                rbp = rb->rb_pool;
-               LASSERT(rbp == lnet_msg2bufpool(msg));
 
                msg->msg_kiov = NULL;
                msg->msg_rtrcredit = 0;
 
-               LASSERT((rbp->rbp_credits < 0) ==
-                       !list_empty(&rbp->rbp_msgs));
+               LASSERT(rbp == lnet_msg2bufpool(msg));
+
                LASSERT((rbp->rbp_credits > 0) ==
                        !list_empty(&rbp->rbp_bufs));
 
-               list_add(&rb->rb_list, &rbp->rbp_bufs);
-               rbp->rbp_credits++;
-               if (rbp->rbp_credits <= 0) {
-                       msg2 = list_entry(rbp->rbp_msgs.next,
-                                         lnet_msg_t, msg_list);
-                       list_del(&msg2->msg_list);
+               /*
+                * If routing is now turned off, we just drop this buffer and
+                * don't bother trying to return credits.
+                */
+               if (!the_lnet.ln_routing) {
+                       lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
+                       goto routing_off;
+               }
 
-                       (void) lnet_post_routed_recv_locked(msg2, 1);
+               /*
+                * It is possible that a user has lowered the desired number of
+                * buffers in this pool.  Make sure we never put back
+                * more buffers than the stated number.
+                */
+               if (rbp->rbp_credits >= rbp->rbp_nbuffers) {
+                       /* Discard this buffer so we don't have too many. */
+                       lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
+               } else {
+                       list_add(&rb->rb_list, &rbp->rbp_bufs);
+                       rbp->rbp_credits++;
+                       if (rbp->rbp_credits <= 0)
+                               lnet_schedule_blocked_locked(rbp);
                }
        }
 
+routing_off:
        if (msg->msg_peerrtrcredit) {
                /* give back peer router credits */
                msg->msg_peerrtrcredit = 0;
@@ -1087,7 +1135,14 @@ lnet_return_rx_credits_locked(lnet_msg_t *msg)
                        !list_empty(&rxpeer->lp_rtrq));
 
                rxpeer->lp_rtrcredits++;
-               if (rxpeer->lp_rtrcredits <= 0) {
+               /*
+                * drop all messages which are queued to be routed on that
+                * peer.
+                */
+               if (!the_lnet.ln_routing) {
+                       lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
+                                                    msg->msg_rx_cpt);
+               } else if (rxpeer->lp_rtrcredits <= 0) {
                        msg2 = list_entry(rxpeer->lp_rtrq.next,
                                          lnet_msg_t, msg_list);
                        list_del(&msg2->msg_list);
@@ -1646,6 +1701,9 @@ lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
 {
        int rc = 0;
 
+       if (!the_lnet.ln_routing)
+               return -ECANCELED;
+
        if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
            lnet_msg2bufpool(msg)->rbp_credits <= 0) {
                if (!ni->ni_lnd->lnd_eager_recv) {
@@ -1799,9 +1857,8 @@ lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
 
        if (the_lnet.ln_routing &&
            ni->ni_last_alive != ktime_get_real_seconds()) {
-               lnet_ni_lock(ni);
-
                /* NB: so far here is the only place to set NI status to "up */
+               lnet_ni_lock(ni);
                ni->ni_last_alive = ktime_get_real_seconds();
                if (ni->ni_status &&
                    ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
index 67566ca2db02b1d41976f9377b21ebfc3dc8c330..e3a661129163728e1240f2716137f3ae14b841b0 100644 (file)
 #define LNET_NRB_TINY          (LNET_NRB_TINY_MIN * 4)
 #define LNET_NRB_SMALL_MIN     4096    /* min value for each CPT */
 #define LNET_NRB_SMALL         (LNET_NRB_SMALL_MIN * 4)
+#define LNET_NRB_SMALL_PAGES   1
 #define LNET_NRB_LARGE_MIN     256     /* min value for each CPT */
 #define LNET_NRB_LARGE         (LNET_NRB_LARGE_MIN * 4)
+#define LNET_NRB_LARGE_PAGES   ((LNET_MTU + PAGE_CACHE_SIZE - 1) >> \
+                                PAGE_CACHE_SHIFT)
 
 static char *forwarding = "";
 module_param(forwarding, charp, 0444);
@@ -570,7 +573,8 @@ lnet_get_route(int idx, __u32 *net, __u32 *hops,
                                        *hops     = route->lr_hops;
                                        *priority = route->lr_priority;
                                        *gateway  = route->lr_gateway->lp_nid;
-                                       *alive    = route->lr_gateway->lp_alive;
+                                       *alive = route->lr_gateway->lp_alive &&
+                                                !route->lr_downis;
                                        lnet_net_unlock(cpt);
                                        return 0;
                                }
@@ -608,7 +612,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
 {
        lnet_ping_info_t *info = rcd->rcd_pinginfo;
        struct lnet_peer *gw = rcd->rcd_gateway;
-       lnet_route_t *rtr;
+       lnet_route_t *rte;
 
        if (!gw->lp_alive)
                return;
@@ -634,11 +638,16 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
        if (!(gw->lp_ping_feats & LNET_PING_FEAT_NI_STATUS))
                return; /* can't carry NI status info */
 
-       list_for_each_entry(rtr, &gw->lp_routes, lr_gwlist) {
+       list_for_each_entry(rte, &gw->lp_routes, lr_gwlist) {
                int down = 0;
                int up = 0;
                int i;
 
+               if (gw->lp_ping_feats & LNET_PING_FEAT_RTE_DISABLED) {
+                       rte->lr_downis = 1;
+                       continue;
+               }
+
                for (i = 0; i < info->pi_nnis && i < LNET_MAX_RTR_NIS; i++) {
                        lnet_ni_status_t *stat = &info->pi_ni[i];
                        lnet_nid_t nid = stat->ns_nid;
@@ -659,7 +668,7 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
                        }
 
                        if (stat->ns_status == LNET_NI_STATUS_UP) {
-                               if (LNET_NIDNET(nid) == rtr->lr_net) {
+                               if (LNET_NIDNET(nid) == rte->lr_net) {
                                        up = 1;
                                        break;
                                }
@@ -673,10 +682,10 @@ lnet_parse_rc_info(lnet_rc_data_t *rcd)
                }
 
                if (up) { /* ignore downed NIs if NI for dest network is up */
-                       rtr->lr_downis = 0;
+                       rte->lr_downis = 0;
                        continue;
                }
-               rtr->lr_downis = down;
+               rte->lr_downis = down;
        }
 }
 
@@ -1226,7 +1235,7 @@ rescan:
        return 0;
 }
 
-static void
+void
 lnet_destroy_rtrbuf(lnet_rtrbuf_t *rb, int npages)
 {
        int sz = offsetof(lnet_rtrbuf_t, rb_kiov[npages]);
@@ -1273,67 +1282,103 @@ lnet_new_rtrbuf(lnet_rtrbufpool_t *rbp, int cpt)
 }
 
 static void
-lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp)
+lnet_rtrpool_free_bufs(lnet_rtrbufpool_t *rbp, int cpt)
 {
        int npages = rbp->rbp_npages;
-       int nbuffers = 0;
+       struct list_head tmp;
        lnet_rtrbuf_t *rb;
 
        if (!rbp->rbp_nbuffers) /* not initialized or already freed */
                return;
 
-       LASSERT(list_empty(&rbp->rbp_msgs));
-       LASSERT(rbp->rbp_credits == rbp->rbp_nbuffers);
+       INIT_LIST_HEAD(&tmp);
 
-       while (!list_empty(&rbp->rbp_bufs)) {
-               LASSERT(rbp->rbp_credits > 0);
+       lnet_net_lock(cpt);
+       lnet_drop_routed_msgs_locked(&rbp->rbp_msgs, cpt);
+       list_splice_init(&rbp->rbp_bufs, &tmp);
+       rbp->rbp_nbuffers = 0;
+       rbp->rbp_credits = 0;
+       rbp->rbp_mincredits = 0;
+       lnet_net_unlock(cpt);
 
-               rb = list_entry(rbp->rbp_bufs.next,
-                               lnet_rtrbuf_t, rb_list);
+       /* Free buffers on the free list. */
+       while (!list_empty(&tmp)) {
+               rb = list_entry(tmp.next, lnet_rtrbuf_t, rb_list);
                list_del(&rb->rb_list);
                lnet_destroy_rtrbuf(rb, npages);
-               nbuffers++;
        }
-
-       LASSERT(rbp->rbp_nbuffers == nbuffers);
-       LASSERT(rbp->rbp_credits == nbuffers);
-
-       rbp->rbp_nbuffers = 0;
-       rbp->rbp_credits = 0;
 }
 
 static int
-lnet_rtrpool_alloc_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
+lnet_rtrpool_adjust_bufs(lnet_rtrbufpool_t *rbp, int nbufs, int cpt)
 {
+       struct list_head rb_list;
        lnet_rtrbuf_t *rb;
-       int i;
+       int num_rb;
+       int num_buffers = 0;
+       int npages = rbp->rbp_npages;
 
-       if (rbp->rbp_nbuffers) {
-               LASSERT(rbp->rbp_nbuffers == nbufs);
+       /*
+        * If we are called for less buffers than already in the pool, we
+        * just lower the nbuffers number and excess buffers will be
+        * thrown away as they are returned to the free list.  Credits
+        * then get adjusted as well.
+        */
+       if (nbufs <= rbp->rbp_nbuffers) {
+               lnet_net_lock(cpt);
+               rbp->rbp_nbuffers = nbufs;
+               lnet_net_unlock(cpt);
                return 0;
        }
 
-       for (i = 0; i < nbufs; i++) {
-               rb = lnet_new_rtrbuf(rbp, cpt);
+       INIT_LIST_HEAD(&rb_list);
+
+       /*
+        * allocate the buffers on a local list first.  If all buffers are
+        * allocated successfully then join this list to the rbp buffer
+        * list. If not then free all allocated buffers.
+        */
+       num_rb = rbp->rbp_nbuffers;
 
+       while (num_rb < nbufs) {
+               rb = lnet_new_rtrbuf(rbp, cpt);
                if (!rb) {
-                       CERROR("Failed to allocate %d router bufs of %d pages\n",
-                              nbufs, rbp->rbp_npages);
-                       return -ENOMEM;
+                       CERROR("Failed to allocate %d route bufs of %d pages\n",
+                              nbufs, npages);
+                       goto failed;
                }
 
-               rbp->rbp_nbuffers++;
-               rbp->rbp_credits++;
-               rbp->rbp_mincredits++;
-               list_add(&rb->rb_list, &rbp->rbp_bufs);
-
-               /* No allocation "under fire" */
-               /* Otherwise we'd need code to schedule blocked msgs etc */
-               LASSERT(!the_lnet.ln_routing);
+               list_add(&rb->rb_list, &rb_list);
+               num_buffers++;
+               num_rb++;
        }
 
-       LASSERT(rbp->rbp_credits == nbufs);
+       lnet_net_lock(cpt);
+
+       list_splice_tail(&rb_list, &rbp->rbp_bufs);
+       rbp->rbp_nbuffers += num_buffers;
+       rbp->rbp_credits += num_buffers;
+       rbp->rbp_mincredits = rbp->rbp_credits;
+       /*
+        * We need to schedule blocked msg using the newly
+        * added buffers.
+        */
+       while (!list_empty(&rbp->rbp_bufs) &&
+              !list_empty(&rbp->rbp_msgs))
+               lnet_schedule_blocked_locked(rbp);
+
+       lnet_net_unlock(cpt);
+
        return 0;
+
+failed:
+       while (!list_empty(&rb_list)) {
+               rb = list_entry(rb_list.next, lnet_rtrbuf_t, rb_list);
+               list_del(&rb->rb_list);
+               lnet_destroy_rtrbuf(rb, npages);
+       }
+
+       return -ENOMEM;
 }
 
 static void
@@ -1348,7 +1393,7 @@ lnet_rtrpool_init(lnet_rtrbufpool_t *rbp, int npages)
 }
 
 void
-lnet_rtrpools_free(void)
+lnet_rtrpools_free(int keep_pools)
 {
        lnet_rtrbufpool_t *rtrp;
        int i;
@@ -1357,17 +1402,19 @@ lnet_rtrpools_free(void)
                return;
 
        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
-               lnet_rtrpool_free_bufs(&rtrp[0]);
-               lnet_rtrpool_free_bufs(&rtrp[1]);
-               lnet_rtrpool_free_bufs(&rtrp[2]);
+               lnet_rtrpool_free_bufs(&rtrp[LNET_TINY_BUF_IDX], i);
+               lnet_rtrpool_free_bufs(&rtrp[LNET_SMALL_BUF_IDX], i);
+               lnet_rtrpool_free_bufs(&rtrp[LNET_LARGE_BUF_IDX], i);
        }
 
-       cfs_percpt_free(the_lnet.ln_rtrpools);
-       the_lnet.ln_rtrpools = NULL;
+       if (!keep_pools) {
+               cfs_percpt_free(the_lnet.ln_rtrpools);
+               the_lnet.ln_rtrpools = NULL;
+       }
 }
 
 static int
-lnet_nrb_tiny_calculate(int npages)
+lnet_nrb_tiny_calculate(void)
 {
        int nrbs = LNET_NRB_TINY;
 
@@ -1386,7 +1433,7 @@ lnet_nrb_tiny_calculate(int npages)
 }
 
 static int
-lnet_nrb_small_calculate(int npages)
+lnet_nrb_small_calculate(void)
 {
        int nrbs = LNET_NRB_SMALL;
 
@@ -1405,7 +1452,7 @@ lnet_nrb_small_calculate(int npages)
 }
 
 static int
-lnet_nrb_large_calculate(int npages)
+lnet_nrb_large_calculate(void)
 {
        int nrbs = LNET_NRB_LARGE;
 
@@ -1427,16 +1474,12 @@ int
 lnet_rtrpools_alloc(int im_a_router)
 {
        lnet_rtrbufpool_t *rtrp;
-       int large_pages;
-       int small_pages = 1;
        int nrb_tiny;
        int nrb_small;
        int nrb_large;
        int rc;
        int i;
 
-       large_pages = (LNET_MTU + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
-
        if (!strcmp(forwarding, "")) {
                /* not set either way */
                if (!im_a_router)
@@ -1451,15 +1494,15 @@ lnet_rtrpools_alloc(int im_a_router)
                return -EINVAL;
        }
 
-       nrb_tiny = lnet_nrb_tiny_calculate(0);
+       nrb_tiny = lnet_nrb_tiny_calculate();
        if (nrb_tiny < 0)
                return -EINVAL;
 
-       nrb_small = lnet_nrb_small_calculate(small_pages);
+       nrb_small = lnet_nrb_small_calculate();
        if (nrb_small < 0)
                return -EINVAL;
 
-       nrb_large = lnet_nrb_large_calculate(large_pages);
+       nrb_large = lnet_nrb_large_calculate();
        if (nrb_large < 0)
                return -EINVAL;
 
@@ -1473,18 +1516,23 @@ lnet_rtrpools_alloc(int im_a_router)
        }
 
        cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
-               lnet_rtrpool_init(&rtrp[0], 0);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[0], nrb_tiny, i);
+               lnet_rtrpool_init(&rtrp[LNET_TINY_BUF_IDX], 0);
+               rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
+                                             nrb_tiny, i);
                if (rc)
                        goto failed;
 
-               lnet_rtrpool_init(&rtrp[1], small_pages);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[1], nrb_small, i);
+               lnet_rtrpool_init(&rtrp[LNET_SMALL_BUF_IDX],
+                                 LNET_NRB_SMALL_PAGES);
+               rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
+                                             nrb_small, i);
                if (rc)
                        goto failed;
 
-               lnet_rtrpool_init(&rtrp[2], large_pages);
-               rc = lnet_rtrpool_alloc_bufs(&rtrp[2], nrb_large, i);
+               lnet_rtrpool_init(&rtrp[LNET_LARGE_BUF_IDX],
+                                 LNET_NRB_LARGE_PAGES);
+               rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
+                                             nrb_large, i);
                if (rc)
                        goto failed;
        }
@@ -1496,10 +1544,113 @@ lnet_rtrpools_alloc(int im_a_router)
        return 0;
 
  failed:
-       lnet_rtrpools_free();
+       lnet_rtrpools_free(0);
        return rc;
 }
 
+int
+lnet_rtrpools_adjust(int tiny, int small, int large)
+{
+       int nrb = 0;
+       int rc = 0;
+       int i;
+       lnet_rtrbufpool_t *rtrp;
+
+       /*
+        * this function doesn't revert the changes if adding new buffers
+        * failed.  It's up to the user space caller to revert the
+        * changes.
+        */
+
+       if (!the_lnet.ln_routing)
+               return 0;
+
+       /*
+        * If the provided values for each buffer pool are different than the
+        * configured values, we need to take action.
+        */
+       if (tiny >= 0 && tiny != tiny_router_buffers) {
+               tiny_router_buffers = tiny;
+               nrb = lnet_nrb_tiny_calculate();
+               cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+                       rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_TINY_BUF_IDX],
+                                                     nrb, i);
+                       if (rc)
+                               return rc;
+               }
+       }
+       if (small >= 0 && small != small_router_buffers) {
+               small_router_buffers = small;
+               nrb = lnet_nrb_small_calculate();
+               cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+                       rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_SMALL_BUF_IDX],
+                                                     nrb, i);
+                       if (rc)
+                               return rc;
+               }
+       }
+       if (large >= 0 && large != large_router_buffers) {
+               large_router_buffers = large;
+               nrb = lnet_nrb_large_calculate();
+               cfs_percpt_for_each(rtrp, i, the_lnet.ln_rtrpools) {
+                       rc = lnet_rtrpool_adjust_bufs(&rtrp[LNET_LARGE_BUF_IDX],
+                                                     nrb, i);
+                       if (rc)
+                               return rc;
+               }
+       }
+
+       return 0;
+}
+
+int
+lnet_rtrpools_enable(void)
+{
+       int rc;
+
+       if (the_lnet.ln_routing)
+               return 0;
+
+       if (!the_lnet.ln_rtrpools)
+               /*
+                * If routing is turned off, and we have never
+                * initialized the pools before, just call the
+                * standard buffer pool allocation routine as
+                * if we are just configuring this for the first
+                * time.
+                */
+               return lnet_rtrpools_alloc(1);
+
+       rc = lnet_rtrpools_adjust(0, 0, 0);
+       if (rc)
+               return rc;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       the_lnet.ln_routing = 1;
+
+       the_lnet.ln_ping_info->pi_features &= ~LNET_PING_FEAT_RTE_DISABLED;
+       lnet_net_unlock(LNET_LOCK_EX);
+
+       return 0;
+}
+
+void
+lnet_rtrpools_disable(void)
+{
+       if (!the_lnet.ln_routing)
+               return;
+
+       lnet_net_lock(LNET_LOCK_EX);
+       the_lnet.ln_routing = 0;
+       the_lnet.ln_ping_info->pi_features |= LNET_PING_FEAT_RTE_DISABLED;
+
+       tiny_router_buffers = 0;
+       small_router_buffers = 0;
+       large_router_buffers = 0;
+       lnet_net_unlock(LNET_LOCK_EX);
+       lnet_rtrpools_free(1);
+}
+
 int
 lnet_notify(lnet_ni_t *ni, lnet_nid_t nid, int alive, unsigned long when)
 {