u64 length; /* bytes from offset */
unsigned long flags;
- struct rbd_img_request *img_request;
- u64 img_offset; /* image relative offset */
- struct list_head links; /* img_request->obj_requests */
+ /*
+ * An object request associated with an image will have its
+ * img_data flag set; a standalone object request will not.
+ *
+ * A standalone object request will have which == BAD_WHICH
+ * and a null obj_request pointer.
+ *
+ * An object request initiated in support of a layered image
+ * object (to check for its existence before a write) will
+ * have which == BAD_WHICH and a non-null obj_request pointer.
+ *
+ * Finally, an object request for rbd image data will have
+ * which != BAD_WHICH, and will have a non-null img_request
+ * pointer. The value of which will be in the range
+ * 0..(img_request->obj_request_count-1).
+ */
+ union {
+ struct rbd_obj_request *obj_request; /* STAT op */
+ struct {
+ struct rbd_img_request *img_request;
+ u64 img_offset;
+ /* links for img_request->obj_requests list */
+ struct list_head links;
+ };
+ };
u32 which; /* posn image request list */
enum obj_request_type type;
INIT_LIST_HEAD(&img_request->obj_requests);
kref_init(&img_request->kref);
- (void) obj_request_existence_set;
- (void) obj_request_known_test;
- (void) obj_request_exists_test;
-
rbd_img_request_get(img_request); /* Avoid a warning */
rbd_img_request_put(img_request); /* TEMPORARY */
return -ENOMEM;
}
+static void rbd_img_obj_exists_callback(struct rbd_obj_request *obj_request)
+{
+ struct rbd_device *rbd_dev;
+ struct ceph_osd_client *osdc;
+ struct rbd_obj_request *orig_request;
+ int result;
+
+ rbd_assert(!obj_request_img_data_test(obj_request));
+
+ /*
+ * All we need from the object request is the original
+ * request and the result of the STAT op. Grab those, then
+ * we're done with the request.
+ */
+ orig_request = obj_request->obj_request;
+ obj_request->obj_request = NULL;
+ rbd_assert(orig_request);
+ rbd_assert(orig_request->img_request);
+
+ result = obj_request->result;
+ obj_request->result = 0;
+
+ dout("%s: obj %p for obj %p result %d %llu/%llu\n", __func__,
+ obj_request, orig_request, result,
+ obj_request->xferred, obj_request->length);
+ rbd_obj_request_put(obj_request);
+
+ rbd_assert(orig_request);
+ rbd_assert(orig_request->img_request);
+ rbd_dev = orig_request->img_request->rbd_dev;
+ osdc = &rbd_dev->rbd_client->client->osdc;
+
+ /*
+ * Our only purpose here is to determine whether the object
+ * exists, and we don't want to treat the non-existence as
+ * an error. If something else comes back, transfer the
+ * error to the original request and complete it now.
+ */
+ if (!result) {
+ obj_request_existence_set(orig_request, true);
+ } else if (result == -ENOENT) {
+ obj_request_existence_set(orig_request, false);
+ } else if (result) {
+ orig_request->result = result;
+ goto out_err;
+ }
+
+ /*
+ * Resubmit the original request now that we have recorded
+ * whether the target object exists.
+ */
+ orig_request->result = rbd_obj_request_submit(osdc, orig_request);
+out_err:
+ if (orig_request->result)
+ rbd_obj_request_complete(orig_request);
+ rbd_obj_request_put(orig_request);
+}
+
+static int rbd_img_obj_exists_submit(struct rbd_obj_request *obj_request)
+{
+ struct rbd_obj_request *stat_request;
+ struct rbd_device *rbd_dev;
+ struct ceph_osd_client *osdc;
+ struct page **pages = NULL;
+ u32 page_count;
+ size_t size;
+ int ret;
+
+ /*
+ * The response data for a STAT call consists of:
+ * le64 length;
+ * struct {
+ * le32 tv_sec;
+ * le32 tv_nsec;
+ * } mtime;
+ */
+ size = sizeof (__le64) + sizeof (__le32) + sizeof (__le32);
+ page_count = (u32)calc_pages_for(0, size);
+ pages = ceph_alloc_page_vector(page_count, GFP_KERNEL);
+ if (IS_ERR(pages))
+ return PTR_ERR(pages);
+
+ ret = -ENOMEM;
+ stat_request = rbd_obj_request_create(obj_request->object_name, 0, 0,
+ OBJ_REQUEST_PAGES);
+ if (!stat_request)
+ goto out;
+
+ rbd_obj_request_get(obj_request);
+ stat_request->obj_request = obj_request;
+ stat_request->pages = pages;
+ stat_request->page_count = page_count;
+
+ rbd_assert(obj_request->img_request);
+ rbd_dev = obj_request->img_request->rbd_dev;
+ stat_request->osd_req = rbd_osd_req_create(rbd_dev, false,
+ stat_request);
+ if (!stat_request->osd_req)
+ goto out;
+ stat_request->callback = rbd_img_obj_exists_callback;
+
+ osd_req_op_init(stat_request->osd_req, 0, CEPH_OSD_OP_STAT);
+ osd_req_op_raw_data_in_pages(stat_request->osd_req, 0, pages, size, 0,
+ false, false);
+ rbd_osd_req_format(stat_request, false);
+
+ osdc = &rbd_dev->rbd_client->client->osdc;
+ ret = rbd_obj_request_submit(osdc, stat_request);
+out:
+ if (ret)
+ rbd_obj_request_put(obj_request);
+
+ return ret;
+}
+
static int rbd_img_request_submit(struct rbd_img_request *img_request)
{
struct rbd_device *rbd_dev = img_request->rbd_dev;
struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
struct rbd_obj_request *obj_request;
struct rbd_obj_request *next_obj_request;
+ bool write_request = img_request_write_test(img_request);
+ bool layered = img_request_layered_test(img_request);
dout("%s: img %p\n", __func__, img_request);
for_each_obj_request_safe(img_request, obj_request, next_obj_request) {
+ bool known;
+ bool object_exists;
int ret;
- ret = rbd_obj_request_submit(osdc, obj_request);
+ /*
+ * We need to know whether the target object exists
+ * for a layered write. Issue an existence check
+ * first if we need to.
+ */
+ known = obj_request_known_test(obj_request);
+ object_exists = known && obj_request_exists_test(obj_request);
+ if (!write_request || !layered || object_exists)
+ ret = rbd_obj_request_submit(osdc, obj_request);
+ else
+ ret = rbd_img_obj_exists_submit(obj_request);
if (ret)
return ret;
}