rbd: issue stat request before layered write
authorAlex Elder <elder@inktank.com>
Mon, 11 Feb 2013 18:33:24 +0000 (12:33 -0600)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:19:04 +0000 (21:19 -0700)
This is a step toward fully implementing layered writes.

Add checks before request submission for the object(s) associated
with an image request.  For write requests, if we don't know that
the target object exists, issue a STAT request to find out.  When
that request completes, mark the known and exists flags for the
original object request accordingly and re-submit the object
request.  (Note that this still does the existence check only; the
copyup operation is not yet done.)

A new object request is created to perform the existence check.  A
pointer to the original request is added to that object request to
allow the stat request to re-issue the original request after
updating its flags.  If there is a failure with the stat request
the error code is stored with the original request, which is then
completed.

This resolves:
    http://tracker.ceph.com/issues/3418

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
drivers/block/rbd.c

index b1b8ef864d580177ba4f01655dce5c9a3b94fd9c..449847badcd883f2fac408b951ba2e992f35756a 100644 (file)
@@ -183,9 +183,31 @@ struct rbd_obj_request {
        u64                     length;         /* bytes from offset */
        unsigned long           flags;
 
-       struct rbd_img_request  *img_request;
-       u64                     img_offset;     /* image relative offset */
-       struct list_head        links;          /* img_request->obj_requests */
+       /*
+        * An object request associated with an image will have its
+        * img_data flag set; a standalone object request will not.
+        *
+        * A standalone object request will have which == BAD_WHICH
+        * and a null obj_request pointer.
+        *
+        * An object request initiated in support of a layered image
+        * object (to check for its existence before a write) will
+        * have which == BAD_WHICH and a non-null obj_request pointer.
+        *
+        * Finally, an object request for rbd image data will have
+        * which != BAD_WHICH, and will have a non-null img_request
+        * pointer.  The value of which will be in the range
+        * 0..(img_request->obj_request_count-1).
+        */
+       union {
+               struct rbd_obj_request  *obj_request;   /* STAT op */
+               struct {
+                       struct rbd_img_request  *img_request;
+                       u64                     img_offset;
+                       /* links for img_request->obj_requests list */
+                       struct list_head        links;
+               };
+       };
        u32                     which;          /* posn image request list */
 
        enum obj_request_type   type;
@@ -1656,10 +1678,6 @@ static struct rbd_img_request *rbd_img_request_create(
        INIT_LIST_HEAD(&img_request->obj_requests);
        kref_init(&img_request->kref);
 
-       (void) obj_request_existence_set;
-       (void) obj_request_known_test;
-       (void) obj_request_exists_test;
-
        rbd_img_request_get(img_request);       /* Avoid a warning */
        rbd_img_request_put(img_request);       /* TEMPORARY */
 
@@ -1847,18 +1865,147 @@ out_unwind:
        return -ENOMEM;
 }
 
+static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
+{
+       struct rbd_device *rbd_dev;
+       struct ceph_osd_client *osdc;
+       struct rbd_obj_request *orig_request;
+       int result;
+
+       rbd_assert(!obj_request_img_data_test(obj_request));
+
+       /*
+        * All we need from the object request is the original
+        * request and the result of the STAT op.  Grab those, then
+        * we're done with the request.
+        */
+       orig_request = obj_request->obj_request;
+       obj_request->obj_request = NULL;
+       rbd_assert(orig_request);
+       rbd_assert(orig_request->img_request);
+
+       result = obj_request->result;
+       obj_request->result = 0;
+
+       dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
+               obj_request, orig_request, result,
+               obj_request->xferred, obj_request->length);
+       rbd_obj_request_put(obj_request);
+
+       rbd_assert(orig_request);
+       rbd_assert(orig_request->img_request);
+       rbd_dev = orig_request->img_request->rbd_dev;
+       osdc = &rbd_dev->rbd_client->client->osdc;
+
+       /*
+        * Our only purpose here is to determine whether the object
+        * exists, and we don't want to treat the non-existence as
+        * an error.  If something else comes back, transfer the
+        * error to the original request and complete it now.
+        */
+       if (!result) {
+               obj_request_existence_set(orig_request, true);
+       } else if (result == -ENOENT) {
+               obj_request_existence_set(orig_request, false);
+       } else if (result) {
+               orig_request->result = result;
+               goto out_err;
+       }
+
+       /*
+        * Resubmit the original request now that we have recorded
+        * whether the target object exists.
+        */
+       orig_request->result = rbd_obj_request_submit(osdc, orig_request);
+out_err:
+       if (orig_request->result)
+               rbd_obj_request_complete(orig_request);
+       rbd_obj_request_put(orig_request);
+}
+
+static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
+{
+       struct rbd_obj_request *stat_request;
+       struct rbd_device *rbd_dev;
+       struct ceph_osd_client *osdc;
+       struct page **pages = NULL;
+       u32 page_count;
+       size_t size;
+       int ret;
+
+       /*
+        * The response data for a STAT call consists of:
+        *     le64 length;
+        *     struct {
+        *         le32 tv_sec;
+        *         le32 tv_nsec;
+        *     } mtime;
+        */
+       size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
+       page_count = (u32)calc_pages_for(0, size);
+       pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       ret = -ENOMEM;
+       stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
+                                                       OBJ_REQUEST_PAGES);
+       if (!stat_request)
+               goto out;
+
+       rbd_obj_request_get(obj_request);
+       stat_request->obj_request = obj_request;
+       stat_request->pages = pages;
+       stat_request->page_count = page_count;
+
+       rbd_assert(obj_request->img_request);
+       rbd_dev = obj_request->img_request->rbd_dev;
+       stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+                                               stat_request);
+       if (!stat_request->osd_req)
+               goto out;
+       stat_request->callback = rbd_img_obj_exists_callback;
+
+       osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
+       osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
+                                       false, false);
+       rbd_osd_req_format(stat_request, false);
+
+       osdc = &rbd_dev->rbd_client->client->osdc;
+       ret = rbd_obj_request_submit(osdc, stat_request);
+out:
+       if (ret)
+               rbd_obj_request_put(obj_request);
+
+       return ret;
+}
+
 static int rbd_img_request_submit(struct rbd_img_request *img_request)
 {
        struct rbd_device *rbd_dev = img_request->rbd_dev;
        struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
        struct rbd_obj_request *obj_request;
        struct rbd_obj_request *next_obj_request;
+       bool write_request = img_request_write_test(img_request);
+       bool layered = img_request_layered_test(img_request);
 
        dout("%s: img %p\n", __func__, img_request);
        for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
+               bool known;
+               bool object_exists;
                int ret;
 
-               ret = rbd_obj_request_submit(osdc, obj_request);
+               /*
+                * We need to know whether the target object exists
+                * for a layered write.  Issue an existence check
+                * first if we need to.
+                */
+               known = obj_request_known_test(obj_request);
+               object_exists = known && obj_request_exists_test(obj_request);
+               if (!write_request || !layered || object_exists)
+                       ret = rbd_obj_request_submit(osdc, obj_request);
+               else
+                       ret = rbd_img_obj_exists_submit(obj_request);
                if (ret)
                        return ret;
        }