libceph: introduce BVECS data type
authorIlya Dryomov <idryomov@gmail.com>
Sat, 20 Jan 2018 09:30:11 +0000 (10:30 +0100)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 2 Apr 2018 08:12:39 +0000 (10:12 +0200)
In preparation for rbd "fancy" striping, introduce ceph_bvec_iter for
working with bio_vec array data buffers.  The wrappers are trivial, but
make it look similar to ceph_bio_iter.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/messenger.h
include/linux/ceph/osd_client.h
net/ceph/messenger.c
net/ceph/osd_client.c

index d7b9605fd51ded979fc17afb5199de5ec504a96e..c7dfcb8a1fb2fcb5e253ab0202955a0752d5b844 100644 (file)
@@ -76,6 +76,7 @@ enum ceph_msg_data_type {
 #ifdef CONFIG_BLOCK
        CEPH_MSG_DATA_BIO,      /* data source/destination is a bio list */
 #endif /* CONFIG_BLOCK */
+       CEPH_MSG_DATA_BVECS,    /* data source/destination is a bio_vec array */
 };
 
 static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
@@ -87,6 +88,7 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
                return true;
        default:
                return false;
@@ -139,6 +141,42 @@ struct ceph_bio_iter {
 
 #endif /* CONFIG_BLOCK */
 
+struct ceph_bvec_iter {
+       struct bio_vec *bvecs;
+       struct bvec_iter iter;
+};
+
+#define __ceph_bvec_iter_advance_step(it, n, STEP) do {                              \
+       BUG_ON((n) > (it)->iter.bi_size);                                     \
+       (void)(STEP);                                                         \
+       bvec_iter_advance((it)->bvecs, &(it)->iter, (n));                     \
+} while (0)
+
+/*
+ * Advance @it by @n bytes.
+ */
+#define ceph_bvec_iter_advance(it, n)                                        \
+       __ceph_bvec_iter_advance_step(it, n, 0)
+
+/*
+ * Advance @it by @n bytes, executing BVEC_STEP for each bio_vec.
+ */
+#define ceph_bvec_iter_advance_step(it, n, BVEC_STEP)                        \
+       __ceph_bvec_iter_advance_step(it, n, ({                               \
+               struct bio_vec bv;                                            \
+               struct bvec_iter __cur_iter;                                  \
+                                                                             \
+               __cur_iter = (it)->iter;                                      \
+               __cur_iter.bi_size = (n);                                     \
+               for_each_bvec(bv, (it)->bvecs, __cur_iter, __cur_iter)        \
+                       (void)(BVEC_STEP);                                    \
+       }))
+
+#define ceph_bvec_iter_shorten(it, n) do {                                   \
+       BUG_ON((n) > (it)->iter.bi_size);                                     \
+       (it)->iter.bi_size = (n);                                             \
+} while (0)
+
 struct ceph_msg_data {
        struct list_head                links;  /* ceph_msg->data */
        enum ceph_msg_data_type         type;
@@ -149,6 +187,7 @@ struct ceph_msg_data {
                        u32                     bio_length;
                };
 #endif /* CONFIG_BLOCK */
+               struct ceph_bvec_iter   bvec_pos;
                struct {
                        struct page     **pages;        /* NOT OWNER. */
                        size_t          length;         /* total # bytes */
@@ -170,6 +209,7 @@ struct ceph_msg_data_cursor {
 #ifdef CONFIG_BLOCK
                struct ceph_bio_iter    bio_iter;
 #endif /* CONFIG_BLOCK */
+               struct bvec_iter        bvec_iter;
                struct {                                /* pages */
                        unsigned int    page_offset;    /* offset in page */
                        unsigned short  page_index;     /* index in array */
@@ -336,6 +376,8 @@ extern void ceph_msg_data_add_pagelist(struct ceph_msg *msg,
 void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
                           u32 length);
 #endif /* CONFIG_BLOCK */
+void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
+                            struct ceph_bvec_iter *bvec_pos);
 
 extern struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
                                     bool can_fail);
index 315691490cb0951c8c659b35b11ef7b95aedafb9..528ccc943cee0a5da4bc95e5d200a8591debe836 100644 (file)
@@ -57,6 +57,7 @@ enum ceph_osd_data_type {
 #ifdef CONFIG_BLOCK
        CEPH_OSD_DATA_TYPE_BIO,
 #endif /* CONFIG_BLOCK */
+       CEPH_OSD_DATA_TYPE_BVECS,
 };
 
 struct ceph_osd_data {
@@ -76,6 +77,7 @@ struct ceph_osd_data {
                        u32                     bio_length;
                };
 #endif /* CONFIG_BLOCK */
+               struct ceph_bvec_iter   bvec_pos;
        };
 };
 
@@ -410,6 +412,9 @@ void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
                                    struct ceph_bio_iter *bio_pos,
                                    u32 bio_length);
 #endif /* CONFIG_BLOCK */
+void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
+                                        unsigned int which,
+                                        struct ceph_bvec_iter *bvec_pos);
 
 extern void osd_req_op_cls_request_data_pagelist(struct ceph_osd_request *,
                                        unsigned int which,
@@ -419,6 +424,9 @@ extern void osd_req_op_cls_request_data_pages(struct ceph_osd_request *,
                                        struct page **pages, u64 length,
                                        u32 alignment, bool pages_from_pool,
                                        bool own_pages);
+void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
+                                      unsigned int which,
+                                      struct bio_vec *bvecs, u32 bytes);
 extern void osd_req_op_cls_response_data_pages(struct ceph_osd_request *,
                                        unsigned int which,
                                        struct page **pages, u64 length,
index b9fa8b869c086293910c1d4607efe6d107e5c2e4..91a57857cf113b90d55259416e5df37c69ee8a6b 100644 (file)
@@ -894,6 +894,58 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data_cursor *cursor,
 }
 #endif /* CONFIG_BLOCK */
 
+static void ceph_msg_data_bvecs_cursor_init(struct ceph_msg_data_cursor *cursor,
+                                       size_t length)
+{
+       struct ceph_msg_data *data = cursor->data;
+       struct bio_vec *bvecs = data->bvec_pos.bvecs;
+
+       cursor->resid = min_t(size_t, length, data->bvec_pos.iter.bi_size);
+       cursor->bvec_iter = data->bvec_pos.iter;
+       cursor->bvec_iter.bi_size = cursor->resid;
+
+       BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
+       cursor->last_piece =
+           cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
+}
+
+static struct page *ceph_msg_data_bvecs_next(struct ceph_msg_data_cursor *cursor,
+                                               size_t *page_offset,
+                                               size_t *length)
+{
+       struct bio_vec bv = bvec_iter_bvec(cursor->data->bvec_pos.bvecs,
+                                          cursor->bvec_iter);
+
+       *page_offset = bv.bv_offset;
+       *length = bv.bv_len;
+       return bv.bv_page;
+}
+
+static bool ceph_msg_data_bvecs_advance(struct ceph_msg_data_cursor *cursor,
+                                       size_t bytes)
+{
+       struct bio_vec *bvecs = cursor->data->bvec_pos.bvecs;
+
+       BUG_ON(bytes > cursor->resid);
+       BUG_ON(bytes > bvec_iter_len(bvecs, cursor->bvec_iter));
+       cursor->resid -= bytes;
+       bvec_iter_advance(bvecs, &cursor->bvec_iter, bytes);
+
+       if (!cursor->resid) {
+               BUG_ON(!cursor->last_piece);
+               return false;   /* no more data */
+       }
+
+       if (!bytes || cursor->bvec_iter.bi_bvec_done)
+               return false;   /* more bytes to process in this segment */
+
+       BUG_ON(cursor->last_piece);
+       BUG_ON(cursor->resid < bvec_iter_len(bvecs, cursor->bvec_iter));
+       cursor->last_piece =
+           cursor->resid == bvec_iter_len(bvecs, cursor->bvec_iter);
+       return true;
+}
+
 /*
  * For a page array, a piece comes from the first page in the array
  * that has not already been fully consumed.
@@ -1077,6 +1129,9 @@ static void __ceph_msg_data_cursor_init(struct ceph_msg_data_cursor *cursor)
                ceph_msg_data_bio_cursor_init(cursor, length);
                break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
+               ceph_msg_data_bvecs_cursor_init(cursor, length);
+               break;
        case CEPH_MSG_DATA_NONE:
        default:
                /* BUG(); */
@@ -1125,6 +1180,9 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data_cursor *cursor,
                page = ceph_msg_data_bio_next(cursor, page_offset, length);
                break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
+               page = ceph_msg_data_bvecs_next(cursor, page_offset, length);
+               break;
        case CEPH_MSG_DATA_NONE:
        default:
                page = NULL;
@@ -1163,6 +1221,9 @@ static void ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
                new_piece = ceph_msg_data_bio_advance(cursor, bytes);
                break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_BVECS:
+               new_piece = ceph_msg_data_bvecs_advance(cursor, bytes);
+               break;
        case CEPH_MSG_DATA_NONE:
        default:
                BUG();
@@ -3247,6 +3308,20 @@ void ceph_msg_data_add_bio(struct ceph_msg *msg, struct ceph_bio_iter *bio_pos,
 EXPORT_SYMBOL(ceph_msg_data_add_bio);
 #endif /* CONFIG_BLOCK */
 
+void ceph_msg_data_add_bvecs(struct ceph_msg *msg,
+                            struct ceph_bvec_iter *bvec_pos)
+{
+       struct ceph_msg_data *data;
+
+       data = ceph_msg_data_create(CEPH_MSG_DATA_BVECS);
+       BUG_ON(!data);
+       data->bvec_pos = *bvec_pos;
+
+       list_add_tail(&data->links, &msg->data);
+       msg->data_length += bvec_pos->iter.bi_size;
+}
+EXPORT_SYMBOL(ceph_msg_data_add_bvecs);
+
 /*
  * construct a new message with given type, size
  * the new msg has a ref count of 1.
index 339d8773ebe84c0a2d3188086f41b9f534b69ede..407be0533c183d55b614cee28f1a35471100ab2a 100644 (file)
@@ -155,6 +155,13 @@ static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
 }
 #endif /* CONFIG_BLOCK */
 
+static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
+                                    struct ceph_bvec_iter *bvec_pos)
+{
+       osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
+       osd_data->bvec_pos = *bvec_pos;
+}
+
 #define osd_req_op_data(oreq, whch, typ, fld)                          \
 ({                                                                     \
        struct ceph_osd_request *__oreq = (oreq);                       \
@@ -229,6 +236,17 @@ void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
 #endif /* CONFIG_BLOCK */
 
+void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
+                                        unsigned int which,
+                                        struct ceph_bvec_iter *bvec_pos)
+{
+       struct ceph_osd_data *osd_data;
+
+       osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
+       ceph_osd_data_bvecs_init(osd_data, bvec_pos);
+}
+EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
+
 static void osd_req_op_cls_request_info_pagelist(
                        struct ceph_osd_request *osd_req,
                        unsigned int which, struct ceph_pagelist *pagelist)
@@ -266,6 +284,23 @@ void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
 }
 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
 
+void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
+                                      unsigned int which,
+                                      struct bio_vec *bvecs, u32 bytes)
+{
+       struct ceph_osd_data *osd_data;
+       struct ceph_bvec_iter it = {
+               .bvecs = bvecs,
+               .iter = { .bi_size = bytes },
+       };
+
+       osd_data = osd_req_op_data(osd_req, which, cls, request_data);
+       ceph_osd_data_bvecs_init(osd_data, &it);
+       osd_req->r_ops[which].cls.indata_len += bytes;
+       osd_req->r_ops[which].indata_len += bytes;
+}
+EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
+
 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
                        unsigned int which, struct page **pages, u64 length,
                        u32 alignment, bool pages_from_pool, bool own_pages)
@@ -291,6 +326,8 @@ static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
        case CEPH_OSD_DATA_TYPE_BIO:
                return (u64)osd_data->bio_length;
 #endif /* CONFIG_BLOCK */
+       case CEPH_OSD_DATA_TYPE_BVECS:
+               return osd_data->bvec_pos.iter.bi_size;
        default:
                WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
                return 0;
@@ -831,6 +868,8 @@ static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
        } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
                ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
 #endif
+       } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
+               ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
        } else {
                BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
        }