* now and again after circling through the ring.
*/
-/* Common functions operating for both RXTX and umem queues */
-
-static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
-{
- return q ? q->invalid_descs : 0;
-}
-
-static inline void __xskq_cons_release(struct xsk_queue *q)
-{
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cached_cons);
-}
-
-static inline void __xskq_cons_peek(struct xsk_queue *q)
-{
- /* Refresh the local pointer */
- q->cached_prod = READ_ONCE(q->ring->producer);
- smp_rmb(); /* C, matches B */
-}
-
-static inline void xskq_cons_get_entries(struct xsk_queue *q)
-{
- __xskq_cons_release(q);
- __xskq_cons_peek(q);
-}
-
-static inline bool xskq_prod_is_full(struct xsk_queue *q)
-{
- u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
-
- if (free_entries)
- return false;
-
- /* Refresh the local tail pointer */
- q->cached_cons = READ_ONCE(q->ring->consumer);
- free_entries = q->nentries - (q->cached_prod - q->cached_cons);
-
- return !free_entries;
-}
-
-static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
-{
- u32 entries = q->cached_prod - q->cached_cons;
-
- if (entries >= cnt)
- return true;
-
- __xskq_cons_peek(q);
- entries = q->cached_prod - q->cached_cons;
-
- return entries >= cnt;
-}
+/* The operations on the rings are the following:
+ *
+ * producer consumer
+ *
+ * RESERVE entries PEEK in the ring for entries
+ * WRITE data into the ring READ data from the ring
+ * SUBMIT entries RELEASE entries
+ *
+ * The producer reserves one or more entries in the ring. It can then
+ * fill in these entries and finally submit them so that they can be
+ * seen and read by the consumer.
+ *
+ * The consumer peeks into the ring to see if the producer has written
+ * any new entries. If so, the producer can then read these entries
+ * and when it is done reading them release them back to the producer
+ * so that the producer can use these slots to fill in new entries.
+ *
+ * The function names below reflect these operations.
+ */
-/* UMEM queue */
+/* Functions that read and validate content from consumer rings. */
static inline bool xskq_cons_crosses_non_contig_pg(struct xdp_umem *umem,
u64 addr,
return cross_pg && !next_pg_contig;
}
-static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr)
-{
- if (addr >= q->size) {
- q->invalid_descs++;
- return false;
- }
-
- return true;
-}
-
static inline bool xskq_cons_is_valid_unaligned(struct xsk_queue *q,
u64 addr,
u64 length,
return true;
}
+static inline bool xskq_cons_is_valid_addr(struct xsk_queue *q, u64 addr)
+{
+ if (addr >= q->size) {
+ q->invalid_descs++;
+ return false;
+ }
+
+ return true;
+}
+
static inline bool xskq_cons_read_addr(struct xsk_queue *q, u64 *addr,
struct xdp_umem *umem)
{
return false;
}
-static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
- struct xdp_umem *umem)
-{
- if (q->cached_prod == q->cached_cons)
- xskq_cons_get_entries(q);
- return xskq_cons_read_addr(q, addr, umem);
-}
-
-static inline void xskq_cons_release(struct xsk_queue *q)
-{
- /* To improve performance, only update local state here.
- * Do the actual release operation when we get new entries
- * from the ring in xskq_cons_get_entries() instead.
- */
- q->cached_cons++;
-}
-
-static inline int xskq_prod_reserve(struct xsk_queue *q)
-{
- if (xskq_prod_is_full(q))
- return -ENOSPC;
-
- /* A, matches D */
- q->cached_prod++;
- return 0;
-}
-
-static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
-{
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
-
- if (xskq_prod_is_full(q))
- return -ENOSPC;
-
- /* A, matches D */
- ring->desc[q->cached_prod++ & q->ring_mask] = addr;
- return 0;
-}
-
-static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
-{
- /* Order producer and data */
- smp_wmb(); /* B, matches C */
-
- WRITE_ONCE(q->ring->producer, idx);
-}
-
-static inline void xskq_prod_submit(struct xsk_queue *q)
-{
- __xskq_prod_submit(q, q->cached_prod);
-}
-
-static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
-{
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- u32 idx = q->ring->producer;
-
- ring->desc[idx++ & q->ring_mask] = addr;
-
- __xskq_prod_submit(q, idx);
-}
-
-static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
-{
- __xskq_prod_submit(q, q->ring->producer + nb_entries);
-}
-
-/* Rx/Tx queue */
-
static inline bool xskq_cons_is_valid_desc(struct xsk_queue *q,
struct xdp_desc *d,
struct xdp_umem *umem)
return false;
}
+/* Functions for consumers */
+
+static inline void __xskq_cons_release(struct xsk_queue *q)
+{
+ smp_mb(); /* D, matches A */
+ WRITE_ONCE(q->ring->consumer, q->cached_cons);
+}
+
+static inline void __xskq_cons_peek(struct xsk_queue *q)
+{
+ /* Refresh the local pointer */
+ q->cached_prod = READ_ONCE(q->ring->producer);
+ smp_rmb(); /* C, matches B */
+}
+
+static inline void xskq_cons_get_entries(struct xsk_queue *q)
+{
+ __xskq_cons_release(q);
+ __xskq_cons_peek(q);
+}
+
+static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
+{
+ u32 entries = q->cached_prod - q->cached_cons;
+
+ if (entries >= cnt)
+ return true;
+
+ __xskq_cons_peek(q);
+ entries = q->cached_prod - q->cached_cons;
+
+ return entries >= cnt;
+}
+
+static inline bool xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
+ struct xdp_umem *umem)
+{
+ if (q->cached_prod == q->cached_cons)
+ xskq_cons_get_entries(q);
+ return xskq_cons_read_addr(q, addr, umem);
+}
+
static inline bool xskq_cons_peek_desc(struct xsk_queue *q,
struct xdp_desc *desc,
struct xdp_umem *umem)
return xskq_cons_read_desc(q, desc, umem);
}
+static inline void xskq_cons_release(struct xsk_queue *q)
+{
+ /* To improve performance, only update local state here.
+ * Reflect this to global state when we get new entries
+ * from the ring in xskq_cons_get_entries().
+ */
+ q->cached_cons++;
+}
+
+static inline bool xskq_cons_is_full(struct xsk_queue *q)
+{
+ /* No barriers needed since data is not accessed */
+ return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
+ q->nentries;
+}
+
+/* Functions for producers */
+
+static inline bool xskq_prod_is_full(struct xsk_queue *q)
+{
+ u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
+
+ if (free_entries)
+ return false;
+
+ /* Refresh the local tail pointer */
+ q->cached_cons = READ_ONCE(q->ring->consumer);
+ free_entries = q->nentries - (q->cached_prod - q->cached_cons);
+
+ return !free_entries;
+}
+
+static inline int xskq_prod_reserve(struct xsk_queue *q)
+{
+ if (xskq_prod_is_full(q))
+ return -ENOSPC;
+
+ /* A, matches D */
+ q->cached_prod++;
+ return 0;
+}
+
+static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
+{
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+
+ if (xskq_prod_is_full(q))
+ return -ENOSPC;
+
+ /* A, matches D */
+ ring->desc[q->cached_prod++ & q->ring_mask] = addr;
+ return 0;
+}
+
static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
u64 addr, u32 len)
{
return 0;
}
-static inline bool xskq_cons_is_full(struct xsk_queue *q)
+static inline void __xskq_prod_submit(struct xsk_queue *q, u32 idx)
{
- /* No barriers needed since data is not accessed */
- return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==
- q->nentries;
+ smp_wmb(); /* B, matches C */
+
+ WRITE_ONCE(q->ring->producer, idx);
+}
+
+static inline void xskq_prod_submit(struct xsk_queue *q)
+{
+ __xskq_prod_submit(q, q->cached_prod);
+}
+
+static inline void xskq_prod_submit_addr(struct xsk_queue *q, u64 addr)
+{
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+ u32 idx = q->ring->producer;
+
+ ring->desc[idx++ & q->ring_mask] = addr;
+
+ __xskq_prod_submit(q, idx);
+}
+
+static inline void xskq_prod_submit_n(struct xsk_queue *q, u32 nb_entries)
+{
+ __xskq_prod_submit(q, q->ring->producer + nb_entries);
}
static inline bool xskq_prod_is_empty(struct xsk_queue *q)
return READ_ONCE(q->ring->consumer) == READ_ONCE(q->ring->producer);
}
+/* For both producers and consumers */
+
+static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
+{
+ return q ? q->invalid_descs : 0;
+}
+
void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
struct xsk_queue *xskq_create(u32 nentries, bool umem_queue);
void xskq_destroy(struct xsk_queue *q_ops);