rbd: new request handling code
authorIlya Dryomov <idryomov@gmail.com>
Mon, 29 Jan 2018 13:04:08 +0000 (14:04 +0100)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 2 Apr 2018 08:12:40 +0000 (10:12 +0200)
The notable changes are:

- instead of explicitly stat'ing the object to see if it exists before
  issuing the write, send the write optimistically along with the stat
  in a single OSD request
- zero copyup optimization
- all object requests are associated with an image request and have
  a valid ->img_request pointer; there are no standalone (!IMG_DATA)
  object requests anymore
- code is structured as a state machine (vs a bunch of callbacks with
  implicit state)

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
drivers/block/rbd.c

index bff3e138543f5b70e2093f1f9522e83131ee0317..1bffad122dc229fc0fd0a809ab867da9be4a52d0 100644 (file)
@@ -235,11 +235,37 @@ enum obj_req_flags {
        OBJ_REQ_EXISTS,         /* target exists: no = 0, yes = 1 */
 };
 
+/*
+ * Writes go through the following state machine to deal with
+ * layering:
+ *
+ *                       need copyup
+ * RBD_OBJ_WRITE_GUARD ---------------> RBD_OBJ_WRITE_COPYUP
+ *        |     ^                              |
+ *        v     \------------------------------/
+ *      done
+ *        ^
+ *        |
+ * RBD_OBJ_WRITE_FLAT
+ *
+ * Writes start in RBD_OBJ_WRITE_GUARD or _FLAT, depending on whether
+ * there is a parent or not.
+ */
+enum rbd_obj_write_state {
+       RBD_OBJ_WRITE_FLAT = 1,
+       RBD_OBJ_WRITE_GUARD,
+       RBD_OBJ_WRITE_COPYUP,
+};
+
 struct rbd_obj_request {
        u64                     object_no;
        u64                     offset;         /* object start byte */
        u64                     length;         /* bytes from offset */
        unsigned long           flags;
+       union {
+               bool                    tried_parent;   /* for reads */
+               enum rbd_obj_write_state write_state;   /* for writes */
+       };
 
        /*
         * An object request associated with an image will have its
@@ -1282,6 +1308,27 @@ static void zero_bvecs(struct ceph_bvec_iter *bvec_pos, u32 off, u32 bytes)
        }));
 }
 
+/*
+ * Zero a range in @obj_req data buffer defined by a bio (list) or
+ * bio_vec array.
+ *
+ * @off is relative to the start of the data buffer.
+ */
+static void rbd_obj_zero_range(struct rbd_obj_request *obj_req, u32 off,
+                              u32 bytes)
+{
+       switch (obj_req->type) {
+       case OBJ_REQUEST_BIO:
+               zero_bios(&obj_req->bio_pos, off, bytes);
+               break;
+       case OBJ_REQUEST_BVECS:
+               zero_bvecs(&obj_req->bvec_pos, off, bytes);
+               break;
+       default:
+               rbd_assert(0);
+       }
+}
+
 /*
  * The default/initial value for all object request flags is 0.  For
  * each flag, once its value is set to 1 it is never reset to 0
@@ -1567,6 +1614,35 @@ rbd_img_request_op_type(struct rbd_img_request *img_request)
                return OBJ_OP_READ;
 }
 
+static bool rbd_obj_is_entire(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+
+       return !obj_req->offset &&
+              obj_req->length == rbd_dev->layout.object_size;
+}
+
+static bool rbd_obj_is_tail(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+
+       return obj_req->offset + obj_req->length ==
+                                       rbd_dev->layout.object_size;
+}
+
+static bool rbd_img_is_write(struct rbd_img_request *img_req)
+{
+       switch (rbd_img_request_op_type(img_req)) {
+       case OBJ_OP_READ:
+               return false;
+       case OBJ_OP_WRITE:
+       case OBJ_OP_DISCARD:
+               return true;
+       default:
+               rbd_assert(0);
+       }
+}
+
 static void
 rbd_img_obj_request_read_callback(struct rbd_obj_request *obj_request)
 {
@@ -1697,63 +1773,28 @@ static void rbd_osd_call_callback(struct rbd_obj_request *obj_request)
                obj_request_done_set(obj_request);
 }
 
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req);
+
 static void rbd_osd_req_callback(struct ceph_osd_request *osd_req)
 {
-       struct rbd_obj_request *obj_request = osd_req->r_priv;
-       u16 opcode;
-
-       dout("%s: osd_req %p\n", __func__, osd_req);
-       rbd_assert(osd_req == obj_request->osd_req);
-       if (obj_request_img_data_test(obj_request)) {
-               rbd_assert(obj_request->img_request);
-               rbd_assert(obj_request->which != BAD_WHICH);
-       } else {
-               rbd_assert(obj_request->which == BAD_WHICH);
-       }
+       struct rbd_obj_request *obj_req = osd_req->r_priv;
 
-       if (osd_req->r_result < 0)
-               obj_request->result = osd_req->r_result;
+       dout("%s osd_req %p result %d for obj_req %p\n", __func__, osd_req,
+            osd_req->r_result, obj_req);
+       rbd_assert(osd_req == obj_req->osd_req);
 
-       /*
-        * We support a 64-bit length, but ultimately it has to be
-        * passed to the block layer, which just supports a 32-bit
-        * length field.
-        */
-       obj_request->xferred = osd_req->r_ops[0].outdata_len;
-       rbd_assert(obj_request->xferred < (u64)UINT_MAX);
-
-       opcode = osd_req->r_ops[0].op;
-       switch (opcode) {
-       case CEPH_OSD_OP_READ:
-               rbd_osd_read_callback(obj_request);
-               break;
-       case CEPH_OSD_OP_SETALLOCHINT:
-               rbd_assert(osd_req->r_ops[1].op == CEPH_OSD_OP_WRITE ||
-                          osd_req->r_ops[1].op == CEPH_OSD_OP_WRITEFULL);
-               /* fall through */
-       case CEPH_OSD_OP_WRITE:
-       case CEPH_OSD_OP_WRITEFULL:
-               rbd_osd_write_callback(obj_request);
-               break;
-       case CEPH_OSD_OP_STAT:
-               rbd_osd_stat_callback(obj_request);
-               break;
-       case CEPH_OSD_OP_DELETE:
-       case CEPH_OSD_OP_TRUNCATE:
-       case CEPH_OSD_OP_ZERO:
-               rbd_osd_discard_callback(obj_request);
-               break;
-       case CEPH_OSD_OP_CALL:
-               rbd_osd_call_callback(obj_request);
-               break;
-       default:
-               rbd_warn(NULL, "unexpected OSD op: object_no %016llx opcode %d",
-                        obj_request->object_no, opcode);
-               break;
-       }
+       obj_req->result = osd_req->r_result < 0 ? osd_req->r_result : 0;
+       if (!obj_req->result && !rbd_img_is_write(obj_req->img_request))
+               obj_req->xferred = osd_req->r_result;
+       else
+               /*
+                * Writes aren't allowed to return a data payload.  In some
+                * guarded write cases (e.g. stat + zero on an empty object)
+                * a stat response makes it through, but we don't care.
+                */
+               obj_req->xferred = 0;
 
-       if (obj_request_done_test(obj_request))
-               rbd_obj_request_complete(obj_request);
+       rbd_obj_handle_request(obj_req);
 }
 
 static void rbd_osd_req_format_read(struct rbd_obj_request *obj_request)
@@ -1806,12 +1847,6 @@ err_req:
        return NULL;
 }
 
-/*
- * Create an osd request.  A read request has one osd op (read).
- * A write request has either one (watch) or two (hint+write) osd ops.
- * (All rbd data writes are prefixed with an allocation hint op, but
- * technically osd watch is a write request, hence this distinction.)
- */
 static struct ceph_osd_request *rbd_osd_req_create(
                                        struct rbd_device *rbd_dev,
                                        enum obj_operation_type op_type,
@@ -1831,8 +1866,6 @@ static struct ceph_osd_request *rbd_osd_req_create(
                snapc = img_request->snapc;
        }
 
-       rbd_assert(num_ops == 1 || ((op_type == OBJ_OP_WRITE) && num_ops == 2));
-
        return __rbd_osd_req_create(rbd_dev, snapc, num_ops,
            (op_type == OBJ_OP_WRITE || op_type == OBJ_OP_DISCARD) ?
            CEPH_OSD_FLAG_WRITE : CEPH_OSD_FLAG_READ, obj_request);
@@ -2251,6 +2284,211 @@ static void rbd_img_obj_request_fill(struct rbd_obj_request *obj_request,
                rbd_osd_req_format_read(obj_request);
 }
 
+static void rbd_osd_req_setup_data(struct rbd_obj_request *obj_req, u32 which)
+{
+       switch (obj_req->type) {
+       case OBJ_REQUEST_BIO:
+               osd_req_op_extent_osd_data_bio(obj_req->osd_req, which,
+                                              &obj_req->bio_pos,
+                                              obj_req->length);
+               break;
+       case OBJ_REQUEST_BVECS:
+               rbd_assert(obj_req->bvec_pos.iter.bi_size ==
+                                                       obj_req->length);
+               osd_req_op_extent_osd_data_bvec_pos(obj_req->osd_req, which,
+                                                   &obj_req->bvec_pos);
+               break;
+       default:
+               rbd_assert(0);
+       }
+}
+
+static int rbd_obj_setup_read(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+
+       obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1, obj_req);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
+
+       osd_req_op_extent_init(obj_req->osd_req, 0, CEPH_OSD_OP_READ,
+                              obj_req->offset, obj_req->length, 0, 0);
+       rbd_osd_req_setup_data(obj_req, 0);
+
+       rbd_osd_req_format_read(obj_req);
+       return 0;
+}
+
+static int __rbd_obj_setup_stat(struct rbd_obj_request *obj_req,
+                               unsigned int which)
+{
+       struct page **pages;
+
+       /*
+        * The response data for a STAT call consists of:
+        *     le64 length;
+        *     struct {
+        *         le32 tv_sec;
+        *         le32 tv_nsec;
+        *     } mtime;
+        */
+       pages = ceph_alloc_page_vector(1, GFP_NOIO);
+       if (IS_ERR(pages))
+               return PTR_ERR(pages);
+
+       osd_req_op_init(obj_req->osd_req, which, CEPH_OSD_OP_STAT, 0);
+       osd_req_op_raw_data_in_pages(obj_req->osd_req, which, pages,
+                                    8 + sizeof(struct ceph_timespec),
+                                    0, false, true);
+       return 0;
+}
+
+static void __rbd_obj_setup_write(struct rbd_obj_request *obj_req,
+                                 unsigned int which)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       u16 opcode;
+
+       osd_req_op_alloc_hint_init(obj_req->osd_req, which++,
+                                  rbd_dev->layout.object_size,
+                                  rbd_dev->layout.object_size);
+
+       if (rbd_obj_is_entire(obj_req))
+               opcode = CEPH_OSD_OP_WRITEFULL;
+       else
+               opcode = CEPH_OSD_OP_WRITE;
+
+       osd_req_op_extent_init(obj_req->osd_req, which, opcode,
+                              obj_req->offset, obj_req->length, 0, 0);
+       rbd_osd_req_setup_data(obj_req, which++);
+
+       rbd_assert(which == obj_req->osd_req->r_num_ops);
+       rbd_osd_req_format_write(obj_req);
+}
+
+static int rbd_obj_setup_write(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       unsigned int num_osd_ops, which = 0;
+       int ret;
+
+       if (obj_request_overlaps_parent(obj_req)) {
+               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+               num_osd_ops = 3; /* stat + setallochint + write/writefull */
+       } else {
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+               num_osd_ops = 2; /* setallochint + write/writefull */
+       }
+
+       obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE,
+                                             num_osd_ops, obj_req);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
+
+       if (obj_request_overlaps_parent(obj_req)) {
+               ret = __rbd_obj_setup_stat(obj_req, which++);
+               if (ret)
+                       return ret;
+       }
+
+       __rbd_obj_setup_write(obj_req, which);
+       return 0;
+}
+
+static void __rbd_obj_setup_discard(struct rbd_obj_request *obj_req,
+                                   unsigned int which)
+{
+       u16 opcode;
+
+       if (rbd_obj_is_entire(obj_req)) {
+               if (obj_request_overlaps_parent(obj_req)) {
+                       opcode = CEPH_OSD_OP_TRUNCATE;
+               } else {
+                       osd_req_op_init(obj_req->osd_req, which++,
+                                       CEPH_OSD_OP_DELETE, 0);
+                       opcode = 0;
+               }
+       } else if (rbd_obj_is_tail(obj_req)) {
+               opcode = CEPH_OSD_OP_TRUNCATE;
+       } else {
+               opcode = CEPH_OSD_OP_ZERO;
+       }
+
+       if (opcode)
+               osd_req_op_extent_init(obj_req->osd_req, which++, opcode,
+                                      obj_req->offset, obj_req->length,
+                                      0, 0);
+
+       rbd_assert(which == obj_req->osd_req->r_num_ops);
+       rbd_osd_req_format_write(obj_req);
+}
+
+static int rbd_obj_setup_discard(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       unsigned int num_osd_ops, which = 0;
+       int ret;
+
+       if (rbd_obj_is_entire(obj_req)) {
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+               num_osd_ops = 1; /* truncate/delete */
+       } else {
+               if (obj_request_overlaps_parent(obj_req)) {
+                       obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+                       num_osd_ops = 2; /* stat + truncate/zero */
+               } else {
+                       obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+                       num_osd_ops = 1; /* truncate/zero */
+               }
+       }
+
+       obj_req->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_DISCARD,
+                                             num_osd_ops, obj_req);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
+
+       if (!rbd_obj_is_entire(obj_req) &&
+           obj_request_overlaps_parent(obj_req)) {
+               ret = __rbd_obj_setup_stat(obj_req, which++);
+               if (ret)
+                       return ret;
+       }
+
+       __rbd_obj_setup_discard(obj_req, which);
+       return 0;
+}
+
+/*
+ * For each object request in @img_req, allocate an OSD request, add
+ * individual OSD ops and prepare them for submission.  The number of
+ * OSD ops depends on op_type and the overlap point (if any).
+ */
+static int __rbd_img_fill_request(struct rbd_img_request *img_req)
+{
+       struct rbd_obj_request *obj_req;
+       int ret;
+
+       for_each_obj_request(img_req, obj_req) {
+               switch (rbd_img_request_op_type(img_req)) {
+               case OBJ_OP_READ:
+                       ret = rbd_obj_setup_read(obj_req);
+                       break;
+               case OBJ_OP_WRITE:
+                       ret = rbd_obj_setup_write(obj_req);
+                       break;
+               case OBJ_OP_DISCARD:
+                       ret = rbd_obj_setup_discard(obj_req);
+                       break;
+               default:
+                       rbd_assert(0);
+               }
+               if (ret)
+                       return ret;
+       }
+
+       return 0;
+}
+
 /*
  * Split up an image request into one or more object requests, each
  * to a different object.  The "type" parameter indicates whether
@@ -2268,7 +2506,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
        struct rbd_obj_request *next_obj_request;
        struct ceph_bio_iter bio_it;
        struct ceph_bvec_iter bvec_it;
-       enum obj_operation_type op_type;
        u64 img_offset;
        u64 resid;
 
@@ -2278,7 +2515,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
        img_offset = img_request->offset;
        resid = img_request->length;
        rbd_assert(resid > 0);
-       op_type = rbd_img_request_op_type(img_request);
 
        if (type == OBJ_REQUEST_BIO) {
                bio_it = *(struct ceph_bio_iter *)data_desc;
@@ -2289,7 +2525,6 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
        }
 
        while (resid) {
-               struct ceph_osd_request *osd_req;
                u64 object_no = img_offset >> rbd_dev->header.obj_order;
                u64 offset = rbd_segment_offset(rbd_dev, img_offset);
                u64 length = rbd_segment_length(rbd_dev, img_offset, resid);
@@ -2317,23 +2552,14 @@ static int rbd_img_request_fill(struct rbd_img_request *img_request,
                        ceph_bvec_iter_advance(&bvec_it, length);
                }
 
-               osd_req = rbd_osd_req_create(rbd_dev, op_type,
-                                       (op_type == OBJ_OP_WRITE) ? 2 : 1,
-                                       obj_request);
-               if (!osd_req)
-                       goto out_unwind;
-
-               obj_request->osd_req = osd_req;
                obj_request->callback = rbd_img_obj_callback;
                obj_request->img_offset = img_offset;
 
-               rbd_img_obj_request_fill(obj_request, osd_req, op_type, 0);
-
                img_offset += length;
                resid -= length;
        }
 
-       return 0;
+       return __rbd_img_fill_request(img_request);
 
 out_unwind:
        for_each_obj_request_safe(img_request, obj_request, next_obj_request)
@@ -2712,16 +2938,171 @@ static int rbd_img_request_submit(struct rbd_img_request *img_request)
 
        rbd_img_request_get(img_request);
        for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
-               ret = rbd_img_obj_request_submit(obj_request);
-               if (ret)
-                       goto out_put_ireq;
+               rbd_obj_request_submit(obj_request);
        }
 
-out_put_ireq:
        rbd_img_request_put(img_request);
        return ret;
 }
 
+static void rbd_img_end_child_request(struct rbd_img_request *img_req);
+
+static int rbd_obj_read_from_parent(struct rbd_obj_request *obj_req,
+                                   u64 img_offset, u32 bytes)
+{
+       struct rbd_img_request *img_req = obj_req->img_request;
+       struct rbd_img_request *child_img_req;
+       int ret;
+
+       child_img_req = rbd_parent_request_create(obj_req, img_offset, bytes);
+       if (!child_img_req)
+               return -ENOMEM;
+
+       child_img_req->callback = rbd_img_end_child_request;
+
+       if (!rbd_img_is_write(img_req)) {
+               switch (obj_req->type) {
+               case OBJ_REQUEST_BIO:
+                       ret = rbd_img_request_fill(child_img_req,
+                                                  OBJ_REQUEST_BIO,
+                                                  &obj_req->bio_pos);
+                       break;
+               case OBJ_REQUEST_BVECS:
+                       ret = rbd_img_request_fill(child_img_req,
+                                                  OBJ_REQUEST_BVECS,
+                                                  &obj_req->bvec_pos);
+                       break;
+               default:
+                       rbd_assert(0);
+               }
+       } else {
+               struct ceph_bvec_iter it = {
+                       .bvecs = obj_req->copyup_bvecs,
+                       .iter = { .bi_size = bytes },
+               };
+
+               ret = rbd_img_request_fill(child_img_req, OBJ_REQUEST_BVECS,
+                                          &it);
+       }
+       if (ret) {
+               rbd_img_request_put(child_img_req);
+               return ret;
+       }
+
+       rbd_img_request_submit(child_img_req);
+       return 0;
+}
+
+static bool rbd_obj_handle_read(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       int ret;
+
+       if (obj_req->result == -ENOENT &&
+           obj_req->img_offset < rbd_dev->parent_overlap &&
+           !obj_req->tried_parent) {
+               u64 obj_overlap = min(obj_req->length,
+                             rbd_dev->parent_overlap - obj_req->img_offset);
+
+               obj_req->tried_parent = true;
+               ret = rbd_obj_read_from_parent(obj_req, obj_req->img_offset,
+                                              obj_overlap);
+               if (ret) {
+                       obj_req->result = ret;
+                       return true;
+               }
+               return false;
+       }
+
+       /*
+        * -ENOENT means a hole in the image -- zero-fill the entire
+        * length of the request.  A short read also implies zero-fill
+        * to the end of the request.  In both cases we update xferred
+        * count to indicate the whole request was satisfied.
+        */
+       if (obj_req->result == -ENOENT ||
+           (!obj_req->result && obj_req->xferred < obj_req->length)) {
+               rbd_assert(!obj_req->xferred || !obj_req->result);
+               rbd_obj_zero_range(obj_req, obj_req->xferred,
+                                  obj_req->length - obj_req->xferred);
+               obj_req->result = 0;
+               obj_req->xferred = obj_req->length;
+       }
+
+       return true;
+}
+
+/*
+ * copyup_bvecs pages are never highmem pages
+ */
+static bool is_zero_bvecs(struct bio_vec *bvecs, u32 bytes)
+{
+       struct ceph_bvec_iter it = {
+               .bvecs = bvecs,
+               .iter = { .bi_size = bytes },
+       };
+
+       ceph_bvec_iter_advance_step(&it, bytes, ({
+               if (memchr_inv(page_address(bv.bv_page) + bv.bv_offset, 0,
+                              bv.bv_len))
+                       return false;
+       }));
+       return true;
+}
+
+static int rbd_obj_issue_copyup(struct rbd_obj_request *obj_req, u32 bytes)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       unsigned int num_osd_ops = obj_req->osd_req->r_num_ops;
+
+       dout("%s obj_req %p bytes %u\n", __func__, obj_req, bytes);
+       rbd_assert(obj_req->osd_req->r_ops[0].op == CEPH_OSD_OP_STAT);
+       rbd_osd_req_destroy(obj_req->osd_req);
+
+       /*
+        * Create a copyup request with the same number of OSD ops as
+        * the original request.  The original request was stat + op(s),
+        * the new copyup request will be copyup + the same op(s).
+        */
+       obj_req->osd_req = rbd_osd_req_create(rbd_dev,
+                               rbd_img_request_op_type(obj_req->img_request),
+                               num_osd_ops, obj_req);
+       if (!obj_req->osd_req)
+               return -ENOMEM;
+
+       /*
+        * Only send non-zero copyup data to save some I/O and network
+        * bandwidth -- zero copyup data is equivalent to the object not
+        * existing.
+        */
+       if (is_zero_bvecs(obj_req->copyup_bvecs, bytes)) {
+               dout("%s obj_req %p detected zeroes\n", __func__, obj_req);
+               bytes = 0;
+       }
+
+       osd_req_op_cls_init(obj_req->osd_req, 0, CEPH_OSD_OP_CALL, "rbd",
+                           "copyup");
+       osd_req_op_cls_request_data_bvecs(obj_req->osd_req, 0,
+                                         obj_req->copyup_bvecs, bytes);
+
+       switch (rbd_img_request_op_type(obj_req->img_request)) {
+       case OBJ_OP_WRITE:
+               __rbd_obj_setup_write(obj_req, 1);
+               break;
+       case OBJ_OP_DISCARD:
+               rbd_assert(!rbd_obj_is_entire(obj_req));
+               __rbd_obj_setup_discard(obj_req, 1);
+               break;
+       default:
+               rbd_assert(0);
+       }
+
+       rbd_obj_request_submit(obj_req);
+       /* FIXME: in lieu of rbd_img_obj_callback() */
+       rbd_img_request_put(obj_req->img_request);
+       return 0;
+}
+
 static int setup_copyup_bvecs(struct rbd_obj_request *obj_req, u64 obj_overlap)
 {
        u32 i;
@@ -2850,6 +3231,149 @@ out_err:
        obj_request_done_set(obj_request);
 }
 
+static int rbd_obj_handle_write_guard(struct rbd_obj_request *obj_req)
+{
+       struct rbd_device *rbd_dev = obj_req->img_request->rbd_dev;
+       u64 img_offset;
+       u64 obj_overlap;
+       int ret;
+
+       if (!obj_request_overlaps_parent(obj_req)) {
+               /*
+                * The overlap has become 0 (most likely because the
+                * image has been flattened).  Use rbd_obj_issue_copyup()
+                * to re-submit the original write request -- the copyup
+                * operation itself will be a no-op, since someone must
+                * have populated the child object while we weren't
+                * looking.  Move to WRITE_FLAT state as we'll be done
+                * with the operation once the null copyup completes.
+                */
+               obj_req->write_state = RBD_OBJ_WRITE_FLAT;
+               return rbd_obj_issue_copyup(obj_req, 0);
+       }
+
+       /*
+        * Determine the byte range covered by the object in the
+        * child image to which the original request was to be sent.
+        */
+       img_offset = obj_req->img_offset - obj_req->offset;
+       obj_overlap = rbd_dev->layout.object_size;
+
+       /*
+        * There is no defined parent data beyond the parent
+        * overlap, so limit what we read at that boundary if
+        * necessary.
+        */
+       if (img_offset + obj_overlap > rbd_dev->parent_overlap) {
+               rbd_assert(img_offset < rbd_dev->parent_overlap);
+               obj_overlap = rbd_dev->parent_overlap - img_offset;
+       }
+
+       ret = setup_copyup_bvecs(obj_req, obj_overlap);
+       if (ret)
+               return ret;
+
+       obj_req->write_state = RBD_OBJ_WRITE_COPYUP;
+       return rbd_obj_read_from_parent(obj_req, img_offset, obj_overlap);
+}
+
+static bool rbd_obj_handle_write(struct rbd_obj_request *obj_req)
+{
+       int ret;
+
+again:
+       switch (obj_req->write_state) {
+       case RBD_OBJ_WRITE_GUARD:
+               rbd_assert(!obj_req->xferred);
+               if (obj_req->result == -ENOENT) {
+                       /*
+                        * The target object doesn't exist.  Read the data for
+                        * the entire target object up to the overlap point (if
+                        * any) from the parent, so we can use it for a copyup.
+                        */
+                       ret = rbd_obj_handle_write_guard(obj_req);
+                       if (ret) {
+                               obj_req->result = ret;
+                               return true;
+                       }
+                       return false;
+               }
+               /* fall through */
+       case RBD_OBJ_WRITE_FLAT:
+               if (!obj_req->result)
+                       /*
+                        * There is no such thing as a successful short
+                        * write -- indicate the whole request was satisfied.
+                        */
+                       obj_req->xferred = obj_req->length;
+               return true;
+       case RBD_OBJ_WRITE_COPYUP:
+               obj_req->write_state = RBD_OBJ_WRITE_GUARD;
+               if (obj_req->result)
+                       goto again;
+
+               rbd_assert(obj_req->xferred);
+               ret = rbd_obj_issue_copyup(obj_req, obj_req->xferred);
+               if (ret) {
+                       obj_req->result = ret;
+                       return true;
+               }
+               return false;
+       default:
+               rbd_assert(0);
+       }
+}
+
+/*
+ * Returns true if @obj_req is completed, or false otherwise.
+ */
+static bool __rbd_obj_handle_request(struct rbd_obj_request *obj_req)
+{
+       switch (rbd_img_request_op_type(obj_req->img_request)) {
+       case OBJ_OP_READ:
+               return rbd_obj_handle_read(obj_req);
+       case OBJ_OP_WRITE:
+               return rbd_obj_handle_write(obj_req);
+       case OBJ_OP_DISCARD:
+               if (rbd_obj_handle_write(obj_req)) {
+                       /*
+                        * Hide -ENOENT from delete/truncate/zero -- discarding
+                        * a non-existent object is not a problem.
+                        */
+                       if (obj_req->result == -ENOENT) {
+                               obj_req->result = 0;
+                               obj_req->xferred = obj_req->length;
+                       }
+                       return true;
+               }
+               return false;
+       default:
+               rbd_assert(0);
+       }
+}
+
+static void rbd_img_end_child_request(struct rbd_img_request *img_req)
+{
+       struct rbd_obj_request *obj_req = img_req->obj_request;
+
+       rbd_assert(test_bit(IMG_REQ_CHILD, &img_req->flags));
+
+       obj_req->result = img_req->result;
+       obj_req->xferred = img_req->xferred;
+       rbd_img_request_put(img_req);
+
+       rbd_obj_handle_request(obj_req);
+}
+
+static void rbd_obj_handle_request(struct rbd_obj_request *obj_req)
+{
+       if (!__rbd_obj_handle_request(obj_req))
+               return;
+
+       obj_request_done_set(obj_req);
+       rbd_obj_request_complete(obj_req);
+}
+
 static const struct rbd_client_id rbd_empty_cid;
 
 static bool rbd_cid_equal(const struct rbd_client_id *lhs,