workqueue: separate out drain_workqueue() from destroy_workqueue()
authorTejun Heo <tj@kernel.org>
Tue, 5 Apr 2011 16:01:44 +0000 (18:01 +0200)
committerTejun Heo <tj@kernel.org>
Fri, 20 May 2011 11:54:46 +0000 (13:54 +0200)
There are users which want to drain workqueues without destroying it.
Separate out drain functionality from destroy_workqueue() into
drain_workqueue() and make it accessible to workqueue users.

To guarantee forward-progress, only chain queueing is allowed while
drain is in progress.  If a new work item which isn't chained from the
running or pending work items is queued while draining is in progress,
WARN_ON_ONCE() is triggered.

Signed-off-by: Tejun Heo <tj@kernel.org>
Cc: James Bottomley <James.Bottomley@hansenpartnership.com>
include/linux/workqueue.h
kernel/workqueue.c

index 57b31b3d83bdd159601c0e1b720416e630d49d9d..2be2887c6958059db65241338bfaed4a9aa61945 100644 (file)
@@ -255,7 +255,7 @@ enum {
        WQ_HIGHPRI              = 1 << 4, /* high priority */
        WQ_CPU_INTENSIVE        = 1 << 5, /* cpu instensive workqueue */
 
-       WQ_DYING                = 1 << 6, /* internal: workqueue is dying */
+       WQ_DRAINING             = 1 << 6, /* internal: workqueue is draining */
        WQ_RESCUER              = 1 << 7, /* internal: workqueue has rescuer */
 
        WQ_MAX_ACTIVE           = 512,    /* I like 512, better ideas? */
@@ -355,6 +355,7 @@ extern int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
                        struct delayed_work *work, unsigned long delay);
 
 extern void flush_workqueue(struct workqueue_struct *wq);
+extern void drain_workqueue(struct workqueue_struct *wq);
 extern void flush_scheduled_work(void);
 
 extern int schedule_work(struct work_struct *work);
index e3378e8d3a5ce7e16fbbbb35ebd30a1c51fde650..25c8afeaeae80d4ab093dd69730b55a5cbb1bd3e 100644 (file)
@@ -221,7 +221,7 @@ typedef unsigned long mayday_mask_t;
  * per-CPU workqueues:
  */
 struct workqueue_struct {
-       unsigned int            flags;          /* I: WQ_* flags */
+       unsigned int            flags;          /* W: WQ_* flags */
        union {
                struct cpu_workqueue_struct __percpu    *pcpu;
                struct cpu_workqueue_struct             *single;
@@ -240,6 +240,7 @@ struct workqueue_struct {
        mayday_mask_t           mayday_mask;    /* cpus requesting rescue */
        struct worker           *rescuer;       /* I: rescue worker */
 
+       int                     nr_drainers;    /* W: drain in progress */
        int                     saved_max_active; /* W: saved cwq max_active */
        const char              *name;          /* I: workqueue name */
 #ifdef CONFIG_LOCKDEP
@@ -990,7 +991,7 @@ static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
        debug_work_activate(work);
 
        /* if dying, only works from the same workqueue are allowed */
-       if (unlikely(wq->flags & WQ_DYING) &&
+       if (unlikely(wq->flags & WQ_DRAINING) &&
            WARN_ON_ONCE(!is_chained_work(wq)))
                return;
 
@@ -2381,6 +2382,54 @@ out_unlock:
 }
 EXPORT_SYMBOL_GPL(flush_workqueue);
 
+/**
+ * drain_workqueue - drain a workqueue
+ * @wq: workqueue to drain
+ *
+ * Wait until the workqueue becomes empty.  While draining is in progress,
+ * only chain queueing is allowed.  IOW, only currently pending or running
+ * work items on @wq can queue further work items on it.  @wq is flushed
+ * repeatedly until it becomes empty.  The number of flushing is detemined
+ * by the depth of chaining and should be relatively short.  Whine if it
+ * takes too long.
+ */
+void drain_workqueue(struct workqueue_struct *wq)
+{
+       unsigned int flush_cnt = 0;
+       unsigned int cpu;
+
+       /*
+        * __queue_work() needs to test whether there are drainers, is much
+        * hotter than drain_workqueue() and already looks at @wq->flags.
+        * Use WQ_DRAINING so that queue doesn't have to check nr_drainers.
+        */
+       spin_lock(&workqueue_lock);
+       if (!wq->nr_drainers++)
+               wq->flags |= WQ_DRAINING;
+       spin_unlock(&workqueue_lock);
+reflush:
+       flush_workqueue(wq);
+
+       for_each_cwq_cpu(cpu, wq) {
+               struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
+
+               if (!cwq->nr_active && list_empty(&cwq->delayed_works))
+                       continue;
+
+               if (++flush_cnt == 10 ||
+                   (flush_cnt % 100 == 0 && flush_cnt <= 1000))
+                       pr_warning("workqueue %s: flush on destruction isn't complete after %u tries\n",
+                                  wq->name, flush_cnt);
+               goto reflush;
+       }
+
+       spin_lock(&workqueue_lock);
+       if (!--wq->nr_drainers)
+               wq->flags &= ~WQ_DRAINING;
+       spin_unlock(&workqueue_lock);
+}
+EXPORT_SYMBOL_GPL(drain_workqueue);
+
 static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
                             bool wait_executing)
 {
@@ -3011,34 +3060,10 @@ EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
  */
 void destroy_workqueue(struct workqueue_struct *wq)
 {
-       unsigned int flush_cnt = 0;
        unsigned int cpu;
 
-       /*
-        * Mark @wq dying and drain all pending works.  Once WQ_DYING is
-        * set, only chain queueing is allowed.  IOW, only currently
-        * pending or running work items on @wq can queue further work
-        * items on it.  @wq is flushed repeatedly until it becomes empty.
-        * The number of flushing is detemined by the depth of chaining and
-        * should be relatively short.  Whine if it takes too long.
-        */
-       wq->flags |= WQ_DYING;
-reflush:
-       flush_workqueue(wq);
-
-       for_each_cwq_cpu(cpu, wq) {
-               struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
-
-               if (!cwq->nr_active && list_empty(&cwq->delayed_works))
-                       continue;
-
-               if (++flush_cnt == 10 ||
-                   (flush_cnt % 100 == 0 && flush_cnt <= 1000))
-                       printk(KERN_WARNING "workqueue %s: flush on "
-                              "destruction isn't complete after %u tries\n",
-                              wq->name, flush_cnt);
-               goto reflush;
-       }
+       /* drain it before proceeding with destruction */
+       drain_workqueue(wq);
 
        /*
         * wq list is used to freeze wq, remove from list after