ceph: record 'offset' for each entry of readdir result
authorYan, Zheng <zyan@redhat.com>
Thu, 28 Apr 2016 07:17:40 +0000 (15:17 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Wed, 25 May 2016 23:15:35 +0000 (01:15 +0200)
This is preparation for using hash value as dentry 'offset'

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/dir.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h

index ebcbd1c946b4c387e9fd59e43bd4943efcaf9ba1..6ae635605be5ae74729c8fbad967c311bd85ef74 100644 (file)
@@ -277,12 +277,12 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_mds_client *mdsc = fsc->mdsc;
        unsigned frag = fpos_frag(ctx->pos);
-       int off = fpos_off(ctx->pos);
+       int i;
        int err;
        u32 ftype;
        struct ceph_mds_reply_info_parsed *rinfo;
 
-       dout("readdir %p file %p frag %u off %u\n", inode, file, frag, off);
+       dout("readdir %p file %p pos %llx\n", inode, file, ctx->pos);
        if (fi->flags & CEPH_F_ATEND)
                return 0;
 
@@ -294,7 +294,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                            inode->i_mode >> 12))
                        return 0;
                ctx->pos = 1;
-               off = 1;
        }
        if (ctx->pos == 1) {
                ino_t ino = parent_ino(file->f_path.dentry);
@@ -304,7 +303,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                            inode->i_mode >> 12))
                        return 0;
                ctx->pos = 2;
-               off = 2;
        }
 
        /* can we use the dcache? */
@@ -320,7 +318,6 @@ static int ceph_readdir(struct file *file, struct dir_context *ctx)
                if (err != -EAGAIN)
                        return err;
                frag = fpos_frag(ctx->pos);
-               off = fpos_off(ctx->pos);
        } else {
                spin_unlock(&ci->i_ceph_lock);
        }
@@ -386,12 +383,12 @@ more:
                rinfo = &req->r_reply_info;
                if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
                        frag = le32_to_cpu(rinfo->dir_dir->frag);
-                       off = req->r_readdir_offset;
-                       fi->next_offset = off;
+                       fi->next_offset = req->r_readdir_offset;
+                       /* adjust ctx->pos to beginning of frag */
+                       ctx->pos = ceph_make_fpos(frag, fi->next_offset);
                }
 
                fi->frag = frag;
-               fi->offset = fi->next_offset;
                fi->last_readdir = req;
 
                if (req->r_did_prepopulate) {
@@ -399,7 +396,8 @@ more:
                        if (fi->readdir_cache_idx < 0) {
                                /* preclude from marking dir ordered */
                                fi->dir_ordered_count = 0;
-                       } else if (ceph_frag_is_leftmost(frag) && off == 2) {
+                       } else if (ceph_frag_is_leftmost(frag) &&
+                                  fi->next_offset == 2) {
                                /* note dir version at start of readdir so
                                 * we can tell if any dentries get dropped */
                                fi->dir_release_count = req->r_dir_release_cnt;
@@ -421,37 +419,54 @@ more:
                        struct ceph_mds_reply_dir_entry *rde =
                                        rinfo->dir_entries + (rinfo->dir_nr-1);
                        err = note_last_dentry(fi, rde->name, rde->name_len,
-                                      fi->next_offset + rinfo->dir_nr);
+                                              fpos_off(rde->offset) + 1);
                        if (err)
                                return err;
                }
        }
 
        rinfo = &fi->last_readdir->r_reply_info;
-       dout("readdir frag %x num %d off %d chunkoff %d\n", frag,
-            rinfo->dir_nr, off, fi->offset);
+       dout("readdir frag %x num %d pos %llx chunk first %llx\n",
+            frag, rinfo->dir_nr, ctx->pos,
+            rinfo->dir_nr ? rinfo->dir_entries[0].offset : 0LL);
 
-       ctx->pos = ceph_make_fpos(frag, off);
-       while (off >= fi->offset && off - fi->offset < rinfo->dir_nr) {
-               struct ceph_mds_reply_dir_entry *rde =
-                       rinfo->dir_entries + (off - fi->offset);
+       i = 0;
+       /* search start position */
+       if (rinfo->dir_nr > 0) {
+               int step, nr = rinfo->dir_nr;
+               while (nr > 0) {
+                       step = nr >> 1;
+                       if (rinfo->dir_entries[i + step].offset < ctx->pos) {
+                               i +=  step + 1;
+                               nr -= step + 1;
+                       } else {
+                               nr = step;
+                       }
+               }
+       }
+       for (; i < rinfo->dir_nr; i++) {
+               struct ceph_mds_reply_dir_entry *rde = rinfo->dir_entries + i;
                struct ceph_vino vino;
                ino_t ino;
 
-               dout("readdir off %d (%d/%d) -> %lld '%.*s' %p\n",
-                    off, off - fi->offset, rinfo->dir_nr, ctx->pos,
+               BUG_ON(rde->offset < ctx->pos);
+
+               ctx->pos = rde->offset;
+               dout("readdir (%d/%d) -> %llx '%.*s' %p\n",
+                    i, rinfo->dir_nr, ctx->pos,
                     rde->name_len, rde->name, &rde->inode.in);
+
                BUG_ON(!rde->inode.in);
                ftype = le32_to_cpu(rde->inode.in->mode) >> 12;
                vino.ino = le64_to_cpu(rde->inode.in->ino);
                vino.snap = le64_to_cpu(rde->inode.in->snapid);
                ino = ceph_vino_to_ino(vino);
+
                if (!dir_emit(ctx, rde->name, rde->name_len,
                              ceph_translate_ino(inode->i_sb, ino), ftype)) {
                        dout("filldir stopping us...\n");
                        return 0;
                }
-               off++;
                ctx->pos++;
        }
 
@@ -464,8 +479,7 @@ more:
        /* more frags? */
        if (!ceph_frag_is_rightmost(frag)) {
                frag = ceph_frag_next(frag);
-               off = 2;
-               ctx->pos = ceph_make_fpos(frag, off);
+               ctx->pos = ceph_make_fpos(frag, 2);
                dout("readdir next frag is %x\n", frag);
                goto more;
        }
@@ -497,7 +511,7 @@ more:
        return 0;
 }
 
-static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
+static void reset_readdir(struct ceph_file_info *fi)
 {
        if (fi->last_readdir) {
                ceph_mdsc_put_request(fi->last_readdir);
@@ -511,6 +525,23 @@ static void reset_readdir(struct ceph_file_info *fi, unsigned frag)
        fi->flags &= ~CEPH_F_ATEND;
 }
 
+/*
+ * discard buffered readdir content on seekdir(0), or seek to new frag,
+ * or seek prior to current chunk
+ */
+static bool need_reset_readdir(struct ceph_file_info *fi, loff_t new_pos)
+{
+       struct ceph_mds_reply_info_parsed *rinfo;
+       if (new_pos == 0)
+               return true;
+       if (fpos_frag(new_pos) != fi->frag)
+               return true;
+       rinfo = fi->last_readdir ? &fi->last_readdir->r_reply_info : NULL;
+       if (!rinfo || !rinfo->dir_nr)
+               return true;
+       return new_pos < rinfo->dir_entries[0].offset;;
+}
+
 static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
 {
        struct ceph_file_info *fi = file->private_data;
@@ -539,13 +570,9 @@ static loff_t ceph_dir_llseek(struct file *file, loff_t offset, int whence)
                }
                retval = offset;
 
-               if (offset == 0 ||
-                   fpos_frag(offset) != fi->frag ||
-                   fpos_off(offset) < fi->offset) {
-                       /* discard buffered readdir content on seekdir(0), or
-                        * seek to new frag, or seek prior to current chunk */
+               if (need_reset_readdir(fi, offset)) {
                        dout("dir_llseek dropping %p content\n", file);
-                       reset_readdir(fi, fpos_frag(offset));
+                       reset_readdir(fi);
                } else if (fpos_cmp(offset, old_offset) > 0) {
                        /* reset dir_release_count if we did a forward seek */
                        fi->dir_release_count = 0;
index 40d081d7028ed716b0c11c0d43f2e5298fa7dfd1..b53c95903aebb83e1b7c85971541d7f6aee14cdb 100644 (file)
@@ -1523,6 +1523,7 @@ retry_lookup:
 
                di = dn->d_fsdata;
                di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
+               rde->offset = di->offset;
 
                update_dentry_lease(dn, rde->lease, req->r_session,
                                    req->r_request_started);
index 1c2befcd24fb378e03a9a3844e97fb47bbad1871..48def22fc7b940e0742756836a4be7e918c4f229 100644 (file)
@@ -214,6 +214,8 @@ static int parse_reply_info_dir(void **p, void *end,
                err = parse_reply_info_in(p, end, &rde->inode, features);
                if (err < 0)
                        goto out_bad;
+               /* ceph_readdir_prepopulate() will update it */
+               rde->offset = 0;
                i++;
                num--;
        }
index 2a865812a41b49005caaf435edd9446055c36192..4ce19d852657caf37c8491f146dda99a93fb6011 100644 (file)
@@ -52,6 +52,7 @@ struct ceph_mds_reply_dir_entry {
        u32                           name_len;
        struct ceph_mds_reply_lease   *lease;
        struct ceph_mds_reply_info_in inode;
+       loff_t                        offset;
 };
 
 /*
index 0ea86406f463cd923600f20677c750feb893820c..0628099ba1f2dec3b026f14f818153be75e38fdd 100644 (file)
@@ -635,7 +635,6 @@ struct ceph_file_info {
        struct ceph_mds_request *last_readdir;
 
        /* readdir: position within a frag */
-       unsigned offset;       /* offset of last chunk, adjusted for . and .. */
        unsigned next_offset;  /* offset of next chunk (last_name's + 1) */
        char *last_name;       /* last entry in previous chunk */
        long long dir_release_count;