ceph: add inline data to pagecache
authorYan, Zheng <zyan@redhat.com>
Fri, 14 Nov 2014 13:41:55 +0000 (21:41 +0800)
committerIlya Dryomov <idryomov@redhat.com>
Wed, 17 Dec 2014 17:09:52 +0000 (20:09 +0300)
Request reply and cap message can contain inline data. add inline data
to the page cache if there is Fc cap.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/inode.c
fs/ceph/super.h
fs/ceph/super.h.rej [new file with mode: 0644]
include/linux/ceph/ceph_fs.h

index f2c7aa878aa42b068750e47c215b003b4b8a43fe..4a3f55f27ab4d524fc1950935494cee94fe6e8e2 100644 (file)
@@ -1318,6 +1318,49 @@ out:
        return ret;
 }
 
+void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
+                          char *data, size_t len)
+{
+       struct address_space *mapping = inode->i_mapping;
+       struct page *page;
+
+       if (locked_page) {
+               page = locked_page;
+       } else {
+               if (i_size_read(inode) == 0)
+                       return;
+               page = find_or_create_page(mapping, 0,
+                                          mapping_gfp_mask(mapping) & ~__GFP_FS);
+               if (!page)
+                       return;
+               if (PageUptodate(page)) {
+                       unlock_page(page);
+                       page_cache_release(page);
+                       return;
+               }
+       }
+
+       dout("fill_inline_data %p %llx.%llx len %lu locked_page %p\n",
+            inode, ceph_vinop(inode), len, locked_page);
+
+       if (len > 0) {
+               void *kaddr = kmap_atomic(page);
+               memcpy(kaddr, data, len);
+               kunmap_atomic(kaddr);
+       }
+
+       if (page != locked_page) {
+               if (len < PAGE_CACHE_SIZE)
+                       zero_user_segment(page, len, PAGE_CACHE_SIZE);
+               else
+                       flush_dcache_page(page);
+
+               SetPageUptodate(page);
+               unlock_page(page);
+               page_cache_release(page);
+       }
+}
+
 static struct vm_operations_struct ceph_vmops = {
        .fault          = ceph_filemap_fault,
        .page_mkwrite   = ceph_page_mkwrite,
index b88ae601f309a961e939ba0ae99be1d44b468d89..6372eb9ce491f5ef9c58a9c05e6304db8247acf1 100644 (file)
@@ -2405,6 +2405,7 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
        bool queue_invalidate = false;
        bool queue_revalidate = false;
        bool deleted_inode = false;
+       bool fill_inline = false;
 
        dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
             inode, cap, mds, seq, ceph_cap_string(newcaps));
@@ -2578,6 +2579,13 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
        }
        BUG_ON(cap->issued & ~cap->implemented);
 
+       if (inline_version > 0 && inline_version >= ci->i_inline_version) {
+               ci->i_inline_version = inline_version;
+               if (ci->i_inline_version != CEPH_INLINE_NONE &&
+                   (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
+                       fill_inline = true;
+       }
+
        spin_unlock(&ci->i_ceph_lock);
 
        if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
@@ -2591,6 +2599,9 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
                        wake = true;
        }
 
+       if (fill_inline)
+               ceph_fill_inline_data(inode, NULL, inline_data, inline_len);
+
        if (queue_trunc) {
                ceph_queue_vmtruncate(inode);
                ceph_queue_revalidate(inode);
index 72607c17e6fd479d246fcf164db8c948488270ae..feea6a8f88ae5158c8520d5596fbb342101bfd58 100644 (file)
@@ -387,6 +387,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        spin_lock_init(&ci->i_ceph_lock);
 
        ci->i_version = 0;
+       ci->i_inline_version = 0;
        ci->i_time_warp_seq = 0;
        ci->i_ceph_flags = 0;
        ci->i_ordered_count = 0;
@@ -676,6 +677,7 @@ static int fill_inode(struct inode *inode,
        bool wake = false;
        bool queue_trunc = false;
        bool new_version = false;
+       bool fill_inline = false;
 
        dout("fill_inode %p ino %llx.%llx v %llu had %llu\n",
             inode, ceph_vinop(inode), le64_to_cpu(info->version),
@@ -875,8 +877,22 @@ static int fill_inode(struct inode *inode,
                           ceph_vinop(inode));
                __ceph_get_fmode(ci, cap_fmode);
        }
+
+       if (iinfo->inline_version > 0 &&
+           iinfo->inline_version >= ci->i_inline_version) {
+               int cache_caps = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
+               ci->i_inline_version = iinfo->inline_version;
+               if (ci->i_inline_version != CEPH_INLINE_NONE &&
+                   (le32_to_cpu(info->cap.caps) & cache_caps))
+                       fill_inline = true;
+       }
+
        spin_unlock(&ci->i_ceph_lock);
 
+       if (fill_inline)
+               ceph_fill_inline_data(inode, NULL,
+                                     iinfo->inline_data, iinfo->inline_len);
+
        if (wake)
                wake_up_all(&ci->i_cap_wq);
 
index fc1c8255dead1645a5153ce933fad8884e5bfe98..b0de9162758b7079b42e20174369304daecdf272 100644 (file)
@@ -253,6 +253,7 @@ struct ceph_inode_info {
        spinlock_t i_ceph_lock;
 
        u64 i_version;
+       u64 i_inline_version;
        u32 i_time_warp_seq;
 
        unsigned i_ceph_flags;
@@ -858,7 +859,7 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
                                      int mds, int drop, int unless);
 
 extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
-                        int *got, loff_t endoff);
+                        loff_t endoff, int *got, struct page **pinned_page);
 
 /* for counting open files by mode */
 static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
@@ -880,6 +881,8 @@ extern int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
                            struct file *file, unsigned flags, umode_t mode,
                            int *opened);
 extern int ceph_release(struct inode *inode, struct file *filp);
+extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
+                                 char *data, size_t len);
 
 /* dir.c */
 extern const struct file_operations ceph_dir_fops;
diff --git a/fs/ceph/super.h.rej b/fs/ceph/super.h.rej
new file mode 100644 (file)
index 0000000..88fe3df
--- /dev/null
@@ -0,0 +1,10 @@
+--- fs/ceph/super.h
++++ fs/ceph/super.h
+@@ -254,6 +255,7 @@
+       spinlock_t i_ceph_lock;
+
+       u64 i_version;
++      u64 i_inline_version;
+       u32 i_time_warp_seq;
+
+       unsigned i_ceph_flags;
index 31d8b98b7f96b8746c7ab8c086d08cf9c08f4e7d..2d4acfa2c7b7ab3d68414c18120ce97f700492e3 100644 (file)
@@ -552,6 +552,7 @@ struct ceph_filelock {
 
 int ceph_flags_to_mode(int flags);
 
+#define CEPH_INLINE_NONE       ((__u64)-1)
 
 /* capability bits */
 #define CEPH_CAP_PIN         1  /* no specific capabilities beyond the pin */