bool xsk_umem_has_addrs(struct xdp_umem *umem, u32 cnt)
{
- return xskq_has_addrs(umem->fq, cnt);
+ return xskq_cons_has_entries(umem->fq, cnt);
}
EXPORT_SYMBOL(xsk_umem_has_addrs);
u64 *xsk_umem_peek_addr(struct xdp_umem *umem, u64 *addr)
{
- return xskq_peek_addr(umem->fq, addr, umem);
+ return xskq_cons_peek_addr(umem->fq, addr, umem);
}
EXPORT_SYMBOL(xsk_umem_peek_addr);
void xsk_umem_discard_addr(struct xdp_umem *umem)
{
- xskq_discard_addr(umem->fq);
+ xskq_cons_release(umem->fq);
}
EXPORT_SYMBOL(xsk_umem_discard_addr);
u32 metalen;
int err;
- if (!xskq_peek_addr(xs->umem->fq, &addr, xs->umem) ||
+ if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
xs->rx_dropped++;
return -ENOSPC;
addr = xsk_umem_adjust_offset(xs->umem, addr, offset);
err = xskq_prod_reserve_desc(xs->rx, addr, len);
if (!err) {
- xskq_discard_addr(xs->umem->fq);
+ xskq_cons_release(xs->umem->fq);
xdp_return_buff(xdp);
return 0;
}
goto out_unlock;
}
- if (!xskq_peek_addr(xs->umem->fq, &addr, xs->umem) ||
+ if (!xskq_cons_peek_addr(xs->umem->fq, &addr, xs->umem) ||
len > xs->umem->chunk_size_nohr - XDP_PACKET_HEADROOM) {
err = -ENOSPC;
goto out_drop;
if (err)
goto out_drop;
- xskq_discard_addr(xs->umem->fq);
+ xskq_cons_release(xs->umem->fq);
xskq_prod_submit(xs->rx);
spin_unlock_bh(&xs->rx_lock);
rcu_read_lock();
list_for_each_entry_rcu(xs, &umem->xsk_list, list) {
- if (!xskq_peek_desc(xs->tx, desc, umem))
+ if (!xskq_cons_peek_desc(xs->tx, desc, umem))
continue;
if (xskq_prod_reserve_addr(umem->cq, desc->addr))
goto out;
- xskq_discard_desc(xs->tx);
+ xskq_cons_release(xs->tx);
rcu_read_unlock();
return true;
}
if (xs->queue_id >= xs->dev->real_num_tx_queues)
goto out;
- while (xskq_peek_desc(xs->tx, &desc, xs->umem)) {
+ while (xskq_cons_peek_desc(xs->tx, &desc, xs->umem)) {
char *buffer;
u64 addr;
u32 len;
skb->destructor = xsk_destruct_skb;
err = dev_direct_xmit(skb, xs->queue_id);
- xskq_discard_desc(xs->tx);
+ xskq_cons_release(xs->tx);
/* Ignore NET_XMIT_CN as packet might have been sent */
if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) {
/* SKB completed but not sent */
if (xs->rx && !xskq_prod_is_empty(xs->rx))
mask |= EPOLLIN | EPOLLRDNORM;
- if (xs->tx && !xskq_full_desc(xs->tx))
+ if (xs->tx && !xskq_cons_is_full(xs->tx))
mask |= EPOLLOUT | EPOLLWRNORM;
return mask;
u32 ring_mask;
u32 nentries;
u32 cached_prod;
- u32 cons_head;
- u32 cons_tail;
+ u32 cached_cons;
struct xdp_ring *ring;
u64 invalid_descs;
};
return q ? q->invalid_descs : 0;
}
-static inline u32 xskq_nb_avail(struct xsk_queue *q)
+static inline void __xskq_cons_release(struct xsk_queue *q)
{
- u32 entries = q->cached_prod - q->cons_tail;
+ smp_mb(); /* D, matches A */
+ WRITE_ONCE(q->ring->consumer, q->cached_cons);
+}
- if (entries == 0) {
- /* Refresh the local pointer */
- q->cached_prod = READ_ONCE(q->ring->producer);
- entries = q->cached_prod - q->cons_tail;
- }
+static inline void __xskq_cons_peek(struct xsk_queue *q)
+{
+ /* Refresh the local pointer */
+ q->cached_prod = READ_ONCE(q->ring->producer);
+ smp_rmb(); /* C, matches B */
+}
- return entries;
+static inline void xskq_cons_get_entries(struct xsk_queue *q)
+{
+ __xskq_cons_release(q);
+ __xskq_cons_peek(q);
}
static inline bool xskq_prod_is_full(struct xsk_queue *q)
{
- u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail);
+ u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
if (free_entries)
return false;
/* Refresh the local tail pointer */
- q->cons_tail = READ_ONCE(q->ring->consumer);
- free_entries = q->nentries - (q->cached_prod - q->cons_tail);
+ q->cached_cons = READ_ONCE(q->ring->consumer);
+ free_entries = q->nentries - (q->cached_prod - q->cached_cons);
return !free_entries;
}
-static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
+static inline bool xskq_cons_has_entries(struct xsk_queue *q, u32 cnt)
{
- u32 entries = q->cached_prod - q->cons_tail;
+ u32 entries = q->cached_prod - q->cached_cons;
if (entries >= cnt)
return true;
- /* Refresh the local pointer. */
- q->cached_prod = READ_ONCE(q->ring->producer);
- entries = q->cached_prod - q->cons_tail;
+ __xskq_cons_peek(q);
+ entries = q->cached_prod - q->cached_cons;
return entries >= cnt;
}
static inline u64 *xskq_validate_addr(struct xsk_queue *q, u64 *addr,
struct xdp_umem *umem)
{
- while (q->cons_tail != q->cons_head) {
- struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
- unsigned int idx = q->cons_tail & q->ring_mask;
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+
+ while (q->cached_cons != q->cached_prod) {
+ u32 idx = q->cached_cons & q->ring_mask;
*addr = READ_ONCE(ring->desc[idx]) & q->chunk_mask;
return addr;
out:
- q->cons_tail++;
+ q->cached_cons++;
}
return NULL;
}
-static inline u64 *xskq_peek_addr(struct xsk_queue *q, u64 *addr,
- struct xdp_umem *umem)
+static inline u64 *xskq_cons_peek_addr(struct xsk_queue *q, u64 *addr,
+ struct xdp_umem *umem)
{
- if (q->cons_tail == q->cons_head) {
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cons_tail);
- q->cons_head = q->cons_tail + xskq_nb_avail(q);
-
- /* Order consumer and data */
- smp_rmb();
- }
-
+ if (q->cached_prod == q->cached_cons)
+ xskq_cons_get_entries(q);
return xskq_validate_addr(q, addr, umem);
}
-static inline void xskq_discard_addr(struct xsk_queue *q)
+static inline void xskq_cons_release(struct xsk_queue *q)
{
- q->cons_tail++;
+ /* To improve performance, only update local state here.
+ * Do the actual release operation when we get new entries
+ * from the ring in xskq_cons_get_entries() instead.
+ */
+ q->cached_cons++;
}
static inline int xskq_prod_reserve(struct xsk_queue *q)
struct xdp_desc *desc,
struct xdp_umem *umem)
{
- while (q->cons_tail != q->cons_head) {
+ while (q->cached_cons != q->cached_prod) {
struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
- unsigned int idx = q->cons_tail & q->ring_mask;
+ u32 idx = q->cached_cons & q->ring_mask;
*desc = READ_ONCE(ring->desc[idx]);
if (xskq_is_valid_desc(q, desc, umem))
return desc;
- q->cons_tail++;
+ q->cached_cons++;
}
return NULL;
}
-static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
- struct xdp_desc *desc,
- struct xdp_umem *umem)
+static inline struct xdp_desc *xskq_cons_peek_desc(struct xsk_queue *q,
+ struct xdp_desc *desc,
+ struct xdp_umem *umem)
{
- if (q->cons_tail == q->cons_head) {
- smp_mb(); /* D, matches A */
- WRITE_ONCE(q->ring->consumer, q->cons_tail);
- q->cons_head = q->cons_tail + xskq_nb_avail(q);
-
- /* Order consumer and data */
- smp_rmb(); /* C, matches B */
- }
-
+ if (q->cached_prod == q->cached_cons)
+ xskq_cons_get_entries(q);
return xskq_validate_desc(q, desc, umem);
}
-static inline void xskq_discard_desc(struct xsk_queue *q)
-{
- q->cons_tail++;
-}
-
static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
u64 addr, u32 len)
{
return 0;
}
-static inline bool xskq_full_desc(struct xsk_queue *q)
+static inline bool xskq_cons_is_full(struct xsk_queue *q)
{
/* No barriers needed since data is not accessed */
return READ_ONCE(q->ring->producer) - READ_ONCE(q->ring->consumer) ==