workqueue: split apply_workqueue_attrs() into 3 stages
authorLai Jiangshan <laijs@cn.fujitsu.com>
Mon, 27 Apr 2015 09:58:38 +0000 (17:58 +0800)
committerTejun Heo <tj@kernel.org>
Mon, 27 Apr 2015 15:13:40 +0000 (11:13 -0400)
Current apply_workqueue_attrs() includes pwqs-allocation and pwqs-installation,
so when we batch multiple apply_workqueue_attrs()s as a transaction, we can't
ensure the transaction must succeed or fail as a complete unit.

To solve this, we split apply_workqueue_attrs() into three stages.
The first stage does the preparation: allocation memory, pwqs.
The second stage does the attrs-installaion and pwqs-installation.
The third stage frees the allocated memory and (old or unused) pwqs.

As the result, batching multiple apply_workqueue_attrs()s can
succeed or fail as a complete unit:
1) batch do all the first stage for all the workqueues
2) only commit all when all the above succeed.

This patch is a preparation for the next patch ("Allow modifying low level
unbound workqueue cpumask") which will do a multiple apply_workqueue_attrs().

The patch doesn't have functionality changed except two minor adjustment:
1) free_unbound_pwq() for the error path is removed, we use the
   heavier version put_pwq_unlocked() instead since the error path
   is rare. this adjustment simplifies the code.
2) the memory-allocation is also moved into wq_pool_mutex.
   this is needed to avoid to do the further splitting.

tj: minor updates to comments.

Suggested-by: Tejun Heo <tj@kernel.org>
Cc: Christoph Lameter <cl@linux.com>
Cc: Kevin Hilman <khilman@linaro.org>
Cc: Lai Jiangshan <laijs@cn.fujitsu.com>
Cc: Mike Galbraith <bitbucket@online.de>
Cc: Paul E. McKenney <paulmck@linux.vnet.ibm.com>
Cc: Tejun Heo <tj@kernel.org>
Cc: Viresh Kumar <viresh.kumar@linaro.org>
Cc: Frederic Weisbecker <fweisbec@gmail.com>
Signed-off-by: Lai Jiangshan <laijs@cn.fujitsu.com>
Signed-off-by: Tejun Heo <tj@kernel.org>
kernel/workqueue.c

index 586ad91300b0f3924b41bd417cda9cbb076f1a9d..26ff249240160b6fb79ee07a71691555ab91bb9a 100644 (file)
@@ -3425,17 +3425,6 @@ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
        return pwq;
 }
 
-/* undo alloc_unbound_pwq(), used only in the error path */
-static void free_unbound_pwq(struct pool_workqueue *pwq)
-{
-       lockdep_assert_held(&wq_pool_mutex);
-
-       if (pwq) {
-               put_unbound_pool(pwq->pool);
-               kmem_cache_free(pwq_cache, pwq);
-       }
-}
-
 /**
  * wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
  * @attrs: the wq_attrs of interest
@@ -3498,42 +3487,48 @@ static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
        return old_pwq;
 }
 
-/**
- * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
- * @wq: the target workqueue
- * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
- *
- * Apply @attrs to an unbound workqueue @wq.  Unless disabled, on NUMA
- * machines, this function maps a separate pwq to each NUMA node with
- * possibles CPUs in @attrs->cpumask so that work items are affine to the
- * NUMA node it was issued on.  Older pwqs are released as in-flight work
- * items finish.  Note that a work item which repeatedly requeues itself
- * back-to-back will stay on its current pwq.
- *
- * Performs GFP_KERNEL allocations.
- *
- * Return: 0 on success and -errno on failure.
- */
-int apply_workqueue_attrs(struct workqueue_struct *wq,
-                         const struct workqueue_attrs *attrs)
+/* context to store the prepared attrs & pwqs before applying */
+struct apply_wqattrs_ctx {
+       struct workqueue_struct *wq;            /* target workqueue */
+       struct workqueue_attrs  *attrs;         /* attrs to apply */
+       struct pool_workqueue   *dfl_pwq;
+       struct pool_workqueue   *pwq_tbl[];
+};
+
+/* free the resources after success or abort */
+static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
+{
+       if (ctx) {
+               int node;
+
+               for_each_node(node)
+                       put_pwq_unlocked(ctx->pwq_tbl[node]);
+               put_pwq_unlocked(ctx->dfl_pwq);
+
+               free_workqueue_attrs(ctx->attrs);
+
+               kfree(ctx);
+       }
+}
+
+/* allocate the attrs and pwqs for later installation */
+static struct apply_wqattrs_ctx *
+apply_wqattrs_prepare(struct workqueue_struct *wq,
+                     const struct workqueue_attrs *attrs)
 {
+       struct apply_wqattrs_ctx *ctx;
        struct workqueue_attrs *new_attrs, *tmp_attrs;
-       struct pool_workqueue **pwq_tbl, *dfl_pwq;
-       int node, ret;
+       int node;
 
-       /* only unbound workqueues can change attributes */
-       if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
-               return -EINVAL;
+       lockdep_assert_held(&wq_pool_mutex);
 
-       /* creating multiple pwqs breaks ordering guarantee */
-       if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
-               return -EINVAL;
+       ctx = kzalloc(sizeof(*ctx) + nr_node_ids * sizeof(ctx->pwq_tbl[0]),
+                     GFP_KERNEL);
 
-       pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);
        new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
        tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
-       if (!pwq_tbl || !new_attrs || !tmp_attrs)
-               goto enomem;
+       if (!ctx || !new_attrs || !tmp_attrs)
+               goto out_free;
 
        /* make a copy of @attrs and sanitize it */
        copy_workqueue_attrs(new_attrs, attrs);
@@ -3546,76 +3541,112 @@ int apply_workqueue_attrs(struct workqueue_struct *wq,
         */
        copy_workqueue_attrs(tmp_attrs, new_attrs);
 
-       /*
-        * CPUs should stay stable across pwq creations and installations.
-        * Pin CPUs, determine the target cpumask for each node and create
-        * pwqs accordingly.
-        */
-       get_online_cpus();
-
-       mutex_lock(&wq_pool_mutex);
-
        /*
         * If something goes wrong during CPU up/down, we'll fall back to
         * the default pwq covering whole @attrs->cpumask.  Always create
         * it even if we don't use it immediately.
         */
-       dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
-       if (!dfl_pwq)
-               goto enomem_pwq;
+       ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+       if (!ctx->dfl_pwq)
+               goto out_free;
 
        for_each_node(node) {
                if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
-                       pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
-                       if (!pwq_tbl[node])
-                               goto enomem_pwq;
+                       ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
+                       if (!ctx->pwq_tbl[node])
+                               goto out_free;
                } else {
-                       dfl_pwq->refcnt++;
-                       pwq_tbl[node] = dfl_pwq;
+                       ctx->dfl_pwq->refcnt++;
+                       ctx->pwq_tbl[node] = ctx->dfl_pwq;
                }
        }
 
-       mutex_unlock(&wq_pool_mutex);
+       ctx->attrs = new_attrs;
+       ctx->wq = wq;
+       free_workqueue_attrs(tmp_attrs);
+       return ctx;
+
+out_free:
+       free_workqueue_attrs(tmp_attrs);
+       free_workqueue_attrs(new_attrs);
+       apply_wqattrs_cleanup(ctx);
+       return NULL;
+}
+
+/* set attrs and install prepared pwqs, @ctx points to old pwqs on return */
+static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
+{
+       int node;
 
        /* all pwqs have been created successfully, let's install'em */
-       mutex_lock(&wq->mutex);
+       mutex_lock(&ctx->wq->mutex);
 
-       copy_workqueue_attrs(wq->unbound_attrs, new_attrs);
+       copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
 
        /* save the previous pwq and install the new one */
        for_each_node(node)
-               pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
+               ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node,
+                                                         ctx->pwq_tbl[node]);
 
        /* @dfl_pwq might not have been used, ensure it's linked */
-       link_pwq(dfl_pwq);
-       swap(wq->dfl_pwq, dfl_pwq);
+       link_pwq(ctx->dfl_pwq);
+       swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
 
-       mutex_unlock(&wq->mutex);
+       mutex_unlock(&ctx->wq->mutex);
+}
 
-       /* put the old pwqs */
-       for_each_node(node)
-               put_pwq_unlocked(pwq_tbl[node]);
-       put_pwq_unlocked(dfl_pwq);
+/**
+ * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
+ * @wq: the target workqueue
+ * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
+ *
+ * Apply @attrs to an unbound workqueue @wq.  Unless disabled, on NUMA
+ * machines, this function maps a separate pwq to each NUMA node with
+ * possibles CPUs in @attrs->cpumask so that work items are affine to the
+ * NUMA node it was issued on.  Older pwqs are released as in-flight work
+ * items finish.  Note that a work item which repeatedly requeues itself
+ * back-to-back will stay on its current pwq.
+ *
+ * Performs GFP_KERNEL allocations.
+ *
+ * Return: 0 on success and -errno on failure.
+ */
+int apply_workqueue_attrs(struct workqueue_struct *wq,
+                         const struct workqueue_attrs *attrs)
+{
+       struct apply_wqattrs_ctx *ctx;
+       int ret = -ENOMEM;
 
-       put_online_cpus();
-       ret = 0;
-       /* fall through */
-out_free:
-       free_workqueue_attrs(tmp_attrs);
-       free_workqueue_attrs(new_attrs);
-       kfree(pwq_tbl);
-       return ret;
+       /* only unbound workqueues can change attributes */
+       if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
+               return -EINVAL;
 
-enomem_pwq:
-       free_unbound_pwq(dfl_pwq);
-       for_each_node(node)
-               if (pwq_tbl && pwq_tbl[node] != dfl_pwq)
-                       free_unbound_pwq(pwq_tbl[node]);
+       /* creating multiple pwqs breaks ordering guarantee */
+       if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
+               return -EINVAL;
+
+       /*
+        * CPUs should stay stable across pwq creations and installations.
+        * Pin CPUs, determine the target cpumask for each node and create
+        * pwqs accordingly.
+        */
+       get_online_cpus();
+
+       mutex_lock(&wq_pool_mutex);
+       ctx = apply_wqattrs_prepare(wq, attrs);
        mutex_unlock(&wq_pool_mutex);
+
+       /* the ctx has been prepared successfully, let's commit it */
+       if (ctx) {
+               apply_wqattrs_commit(ctx);
+               ret = 0;
+       }
+
        put_online_cpus();
-enomem:
-       ret = -ENOMEM;
-       goto out_free;
+
+       apply_wqattrs_cleanup(ctx);
+
+       return ret;
 }
 
 /**