ceph: single workqueue for inode related works
authorYan, Zheng <zyan@redhat.com>
Sat, 18 May 2019 03:18:44 +0000 (11:18 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 5 Jun 2019 18:34:39 +0000 (20:34 +0200)
We have three workqueue for inode works. Later patch will introduce
one more work for inode. It's not good to introcuce more workqueue
and add more 'struct work_struct' to 'struct ceph_inode_info'.

Signed-off-by: "Yan, Zheng" <zyan@redhat.com>
Reviewed-by: Jeff Layton <jlayton@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/super.c
fs/ceph/super.h

index 305daf043eb0eaca81e17dfbd405cd784edaab81..183c37c0a8fcdfe4e2d84b096c7e398b950d6027 100644 (file)
@@ -791,7 +791,7 @@ static void ceph_aio_complete_req(struct ceph_osd_request *req)
                if (aio_work) {
                        INIT_WORK(&aio_work->work, ceph_aio_retry_work);
                        aio_work->req = req;
-                       queue_work(ceph_inode_to_client(inode)->wb_wq,
+                       queue_work(ceph_inode_to_client(inode)->inode_wq,
                                   &aio_work->work);
                        return;
                }
index f85355bf49c4c7b3830822264c16f7b25c31d731..3acdd3cc603966e8fd201cffc0cf6732122f8153 100644 (file)
@@ -33,9 +33,7 @@
 
 static const struct inode_operations ceph_symlink_iops;
 
-static void ceph_invalidate_work(struct work_struct *work);
-static void ceph_writeback_work(struct work_struct *work);
-static void ceph_vmtruncate_work(struct work_struct *work);
+static void ceph_inode_work(struct work_struct *work);
 
 /*
  * find or create an inode, given the ceph ino number
@@ -509,10 +507,8 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        INIT_LIST_HEAD(&ci->i_snap_realm_item);
        INIT_LIST_HEAD(&ci->i_snap_flush_item);
 
-       INIT_WORK(&ci->i_wb_work, ceph_writeback_work);
-       INIT_WORK(&ci->i_pg_inv_work, ceph_invalidate_work);
-
-       INIT_WORK(&ci->i_vmtruncate_work, ceph_vmtruncate_work);
+       INIT_WORK(&ci->i_work, ceph_inode_work);
+       ci->i_work_mask = 0;
 
        ceph_fscache_inode_init(ci);
 
@@ -1746,51 +1742,62 @@ bool ceph_inode_set_size(struct inode *inode, loff_t size)
  */
 void ceph_queue_writeback(struct inode *inode)
 {
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       set_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask);
+
        ihold(inode);
-       if (queue_work(ceph_inode_to_client(inode)->wb_wq,
-                      &ceph_inode(inode)->i_wb_work)) {
+       if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+                      &ci->i_work)) {
                dout("ceph_queue_writeback %p\n", inode);
        } else {
-               dout("ceph_queue_writeback %p failed\n", inode);
+               dout("ceph_queue_writeback %p already queued, mask=%lx\n",
+                    inode, ci->i_work_mask);
                iput(inode);
        }
 }
 
-static void ceph_writeback_work(struct work_struct *work)
-{
-       struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
-                                                 i_wb_work);
-       struct inode *inode = &ci->vfs_inode;
-
-       dout("writeback %p\n", inode);
-       filemap_fdatawrite(&inode->i_data);
-       iput(inode);
-}
-
 /*
  * queue an async invalidation
  */
 void ceph_queue_invalidate(struct inode *inode)
 {
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       set_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask);
+
        ihold(inode);
-       if (queue_work(ceph_inode_to_client(inode)->pg_inv_wq,
-                      &ceph_inode(inode)->i_pg_inv_work)) {
+       if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+                      &ceph_inode(inode)->i_work)) {
                dout("ceph_queue_invalidate %p\n", inode);
        } else {
-               dout("ceph_queue_invalidate %p failed\n", inode);
+               dout("ceph_queue_invalidate %p already queued, mask=%lx\n",
+                    inode, ci->i_work_mask);
                iput(inode);
        }
 }
 
 /*
- * Invalidate inode pages in a worker thread.  (This can't be done
- * in the message handler context.)
+ * Queue an async vmtruncate.  If we fail to queue work, we will handle
+ * the truncation the next time we call __ceph_do_pending_vmtruncate.
  */
-static void ceph_invalidate_work(struct work_struct *work)
+void ceph_queue_vmtruncate(struct inode *inode)
 {
-       struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
-                                                 i_pg_inv_work);
-       struct inode *inode = &ci->vfs_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       set_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask);
+
+       ihold(inode);
+       if (queue_work(ceph_inode_to_client(inode)->inode_wq,
+                      &ci->i_work)) {
+               dout("ceph_queue_vmtruncate %p\n", inode);
+       } else {
+               dout("ceph_queue_vmtruncate %p already queued, mask=%lx\n",
+                    inode, ci->i_work_mask);
+               iput(inode);
+       }
+}
+
+static void ceph_do_invalidate_pages(struct inode *inode)
+{
+       struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        u32 orig_gen;
        int check = 0;
@@ -1842,44 +1849,6 @@ static void ceph_invalidate_work(struct work_struct *work)
 out:
        if (check)
                ceph_check_caps(ci, 0, NULL);
-       iput(inode);
-}
-
-
-/*
- * called by trunc_wq;
- *
- * We also truncate in a separate thread as well.
- */
-static void ceph_vmtruncate_work(struct work_struct *work)
-{
-       struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
-                                                 i_vmtruncate_work);
-       struct inode *inode = &ci->vfs_inode;
-
-       dout("vmtruncate_work %p\n", inode);
-       __ceph_do_pending_vmtruncate(inode);
-       iput(inode);
-}
-
-/*
- * Queue an async vmtruncate.  If we fail to queue work, we will handle
- * the truncation the next time we call __ceph_do_pending_vmtruncate.
- */
-void ceph_queue_vmtruncate(struct inode *inode)
-{
-       struct ceph_inode_info *ci = ceph_inode(inode);
-
-       ihold(inode);
-
-       if (queue_work(ceph_sb_to_client(inode->i_sb)->trunc_wq,
-                      &ci->i_vmtruncate_work)) {
-               dout("ceph_queue_vmtruncate %p\n", inode);
-       } else {
-               dout("ceph_queue_vmtruncate %p failed, pending=%d\n",
-                    inode, ci->i_truncate_pending);
-               iput(inode);
-       }
 }
 
 /*
@@ -1943,6 +1912,25 @@ retry:
        wake_up_all(&ci->i_cap_wq);
 }
 
+static void ceph_inode_work(struct work_struct *work)
+{
+       struct ceph_inode_info *ci = container_of(work, struct ceph_inode_info,
+                                                i_work);
+       struct inode *inode = &ci->vfs_inode;
+
+       if (test_and_clear_bit(CEPH_I_WORK_WRITEBACK, &ci->i_work_mask)) {
+               dout("writeback %p\n", inode);
+               filemap_fdatawrite(&inode->i_data);
+       }
+       if (test_and_clear_bit(CEPH_I_WORK_INVALIDATE_PAGES, &ci->i_work_mask))
+               ceph_do_invalidate_pages(inode);
+
+       if (test_and_clear_bit(CEPH_I_WORK_VMTRUNCATE, &ci->i_work_mask))
+               __ceph_do_pending_vmtruncate(inode);
+
+       iput(inode);
+}
+
 /*
  * symlinks
  */
index 01be7c1bc4c686c45f24f01a4a24642722aa9dac..d57fa60dcd43603562d9a12a8c100b0f276fb6ba 100644 (file)
@@ -672,18 +672,12 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
         * The number of concurrent works can be high but they don't need
         * to be processed in parallel, limit concurrency.
         */
-       fsc->wb_wq = alloc_workqueue("ceph-writeback", 0, 1);
-       if (!fsc->wb_wq)
+       fsc->inode_wq = alloc_workqueue("ceph-inode", WQ_UNBOUND, 0);
+       if (!fsc->inode_wq)
                goto fail_client;
-       fsc->pg_inv_wq = alloc_workqueue("ceph-pg-invalid", 0, 1);
-       if (!fsc->pg_inv_wq)
-               goto fail_wb_wq;
-       fsc->trunc_wq = alloc_workqueue("ceph-trunc", 0, 1);
-       if (!fsc->trunc_wq)
-               goto fail_pg_inv_wq;
        fsc->cap_wq = alloc_workqueue("ceph-cap", 0, 1);
        if (!fsc->cap_wq)
-               goto fail_trunc_wq;
+               goto fail_inode_wq;
 
        /* set up mempools */
        err = -ENOMEM;
@@ -697,12 +691,8 @@ static struct ceph_fs_client *create_fs_client(struct ceph_mount_options *fsopt,
 
 fail_cap_wq:
        destroy_workqueue(fsc->cap_wq);
-fail_trunc_wq:
-       destroy_workqueue(fsc->trunc_wq);
-fail_pg_inv_wq:
-       destroy_workqueue(fsc->pg_inv_wq);
-fail_wb_wq:
-       destroy_workqueue(fsc->wb_wq);
+fail_inode_wq:
+       destroy_workqueue(fsc->inode_wq);
 fail_client:
        ceph_destroy_client(fsc->client);
 fail:
@@ -715,9 +705,7 @@ fail:
 
 static void flush_fs_workqueues(struct ceph_fs_client *fsc)
 {
-       flush_workqueue(fsc->wb_wq);
-       flush_workqueue(fsc->pg_inv_wq);
-       flush_workqueue(fsc->trunc_wq);
+       flush_workqueue(fsc->inode_wq);
        flush_workqueue(fsc->cap_wq);
 }
 
@@ -725,9 +713,7 @@ static void destroy_fs_client(struct ceph_fs_client *fsc)
 {
        dout("destroy_fs_client %p\n", fsc);
 
-       destroy_workqueue(fsc->wb_wq);
-       destroy_workqueue(fsc->pg_inv_wq);
-       destroy_workqueue(fsc->trunc_wq);
+       destroy_workqueue(fsc->inode_wq);
        destroy_workqueue(fsc->cap_wq);
 
        mempool_destroy(fsc->wb_pagevec_pool);
index 6edab9a750f8a00211f7ccf2aa659982c684330a..3fb866a979cef5f1b6d43aa2949ff118063496cd 100644 (file)
@@ -109,9 +109,7 @@ struct ceph_fs_client {
        mempool_t *wb_pagevec_pool;
        atomic_long_t writeback_count;
 
-       struct workqueue_struct *wb_wq;
-       struct workqueue_struct *pg_inv_wq;
-       struct workqueue_struct *trunc_wq;
+       struct workqueue_struct *inode_wq;
        struct workqueue_struct *cap_wq;
 
 #ifdef CONFIG_DEBUG_FS
@@ -387,10 +385,8 @@ struct ceph_inode_info {
        struct list_head i_snap_realm_item;
        struct list_head i_snap_flush_item;
 
-       struct work_struct i_wb_work;  /* writeback work */
-       struct work_struct i_pg_inv_work;  /* page invalidation work */
-
-       struct work_struct i_vmtruncate_work;
+       struct work_struct i_work;
+       unsigned long  i_work_mask;
 
 #ifdef CONFIG_CEPH_FSCACHE
        struct fscache_cookie *fscache;
@@ -512,6 +508,13 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 #define CEPH_I_ERROR_FILELOCK  (1 << 12) /* have seen file lock errors */
 
 
+/*
+ * Masks of ceph inode work.
+ */
+#define CEPH_I_WORK_WRITEBACK          0 /* writeback */
+#define CEPH_I_WORK_INVALIDATE_PAGES   1 /* invalidate pages */
+#define CEPH_I_WORK_VMTRUNCATE         2 /* vmtruncate */
+
 /*
  * We set the ERROR_WRITE bit when we start seeing write errors on an inode
  * and then clear it when they start succeeding. Note that we do a lockless