static void udp_rmem_release(struct sock *sk, int size, int partial)
{
struct udp_sock *up = udp_sk(sk);
+ struct sk_buff_head *sk_queue;
int amt;
if (likely(partial)) {
up->forward_deficit += size;
size = up->forward_deficit;
if (size < (sk->sk_rcvbuf >> 2) &&
- !skb_queue_empty(&sk->sk_receive_queue))
+ !skb_queue_empty(&up->reader_queue))
return;
} else {
size += up->forward_deficit;
}
up->forward_deficit = 0;
+ /* acquire the sk_receive_queue for fwd allocated memory scheduling */
+ sk_queue = &sk->sk_receive_queue;
+ spin_lock(&sk_queue->lock);
+
sk->sk_forward_alloc += size;
amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
sk->sk_forward_alloc -= amt;
__sk_mem_reduce_allocated(sk, amt >> SK_MEM_QUANTUM_SHIFT);
atomic_sub(size, &sk->sk_rmem_alloc);
+
+ /* this can save us from acquiring the rx queue lock on next receive */
+ skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
+
+ spin_unlock(&sk_queue->lock);
}
-/* Note: called with sk_receive_queue.lock held.
+/* Note: called with reader_queue.lock held.
* Instead of using skb->truesize here, find a copy of it in skb->dev_scratch
* This avoids a cache line miss while receive_queue lock is held.
* Look at __udp_enqueue_schedule_skb() to find where this copy is done.
void udp_destruct_sock(struct sock *sk)
{
/* reclaim completely the forward allocated memory */
+ struct udp_sock *up = udp_sk(sk);
unsigned int total = 0;
struct sk_buff *skb;
- while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+ skb_queue_splice_tail_init(&sk->sk_receive_queue, &up->reader_queue);
+ while ((skb = __skb_dequeue(&up->reader_queue)) != NULL) {
total += skb->truesize;
kfree_skb(skb);
}
int udp_init_sock(struct sock *sk)
{
+ skb_queue_head_init(&udp_sk(sk)->reader_queue);
sk->sk_destruct = udp_destruct_sock;
return 0;
}
}
EXPORT_SYMBOL_GPL(skb_consume_udp);
+static struct sk_buff *__first_packet_length(struct sock *sk,
+ struct sk_buff_head *rcvq,
+ int *total)
+{
+ struct sk_buff *skb;
+
+ while ((skb = skb_peek(rcvq)) != NULL &&
+ udp_lib_checksum_complete(skb)) {
+ __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
+ IS_UDPLITE(sk));
+ __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
+ IS_UDPLITE(sk));
+ atomic_inc(&sk->sk_drops);
+ __skb_unlink(skb, rcvq);
+ *total += skb->truesize;
+ kfree_skb(skb);
+ }
+ return skb;
+}
+
/**
* first_packet_length - return length of first packet in receive queue
* @sk: socket
*/
static int first_packet_length(struct sock *sk)
{
- struct sk_buff_head *rcvq = &sk->sk_receive_queue;
+ struct sk_buff_head *rcvq = &udp_sk(sk)->reader_queue;
+ struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
struct sk_buff *skb;
int total = 0;
int res;
spin_lock_bh(&rcvq->lock);
- while ((skb = skb_peek(rcvq)) != NULL &&
- udp_lib_checksum_complete(skb)) {
- __UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS,
- IS_UDPLITE(sk));
- __UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS,
- IS_UDPLITE(sk));
- atomic_inc(&sk->sk_drops);
- __skb_unlink(skb, rcvq);
- total += skb->truesize;
- kfree_skb(skb);
+ skb = __first_packet_length(sk, rcvq, &total);
+ if (!skb && !skb_queue_empty(sk_queue)) {
+ spin_lock(&sk_queue->lock);
+ skb_queue_splice_tail_init(sk_queue, rcvq);
+ spin_unlock(&sk_queue->lock);
+
+ skb = __first_packet_length(sk, rcvq, &total);
}
res = skb ? skb->len : -1;
if (total)
}
EXPORT_SYMBOL(udp_ioctl);
+struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
+ int noblock, int *peeked, int *off, int *err)
+{
+ struct sk_buff_head *sk_queue = &sk->sk_receive_queue;
+ struct sk_buff_head *queue;
+ struct sk_buff *last;
+ long timeo;
+ int error;
+
+ queue = &udp_sk(sk)->reader_queue;
+ flags |= noblock ? MSG_DONTWAIT : 0;
+ timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
+ do {
+ struct sk_buff *skb;
+
+ error = sock_error(sk);
+ if (error)
+ break;
+
+ error = -EAGAIN;
+ *peeked = 0;
+ do {
+ int _off = *off;
+
+ spin_lock_bh(&queue->lock);
+ skb = __skb_try_recv_from_queue(sk, queue, flags,
+ udp_skb_destructor,
+ peeked, &_off, err,
+ &last);
+ if (skb) {
+ spin_unlock_bh(&queue->lock);
+ *off = _off;
+ return skb;
+ }
+
+ if (skb_queue_empty(sk_queue)) {
+ spin_unlock_bh(&queue->lock);
+ goto busy_check;
+ }
+
+ /* refill the reader queue and walk it again */
+ _off = *off;
+ spin_lock(&sk_queue->lock);
+ skb_queue_splice_tail_init(sk_queue, queue);
+ spin_unlock(&sk_queue->lock);
+
+ skb = __skb_try_recv_from_queue(sk, queue, flags,
+ udp_skb_destructor,
+ peeked, &_off, err,
+ &last);
+ spin_unlock_bh(&queue->lock);
+ if (skb) {
+ *off = _off;
+ return skb;
+ }
+
+busy_check:
+ if (!sk_can_busy_loop(sk))
+ break;
+
+ sk_busy_loop(sk, flags & MSG_DONTWAIT);
+ } while (!skb_queue_empty(sk_queue));
+
+ /* sk_queue is empty, reader_queue may contain peeked packets */
+ } while (timeo &&
+ !__skb_wait_for_more_packets(sk, &error, &timeo,
+ (struct sk_buff *)sk_queue));
+
+ *err = error;
+ return NULL;
+}
+EXPORT_SYMBOL_GPL(__skb_recv_udp);
+
/*
* This should be easy, if there is something there we
* return it, otherwise we block.
return err;
csum_copy_err:
- if (!__sk_queue_drop_skb(sk, skb, flags, udp_skb_destructor)) {
+ if (!__sk_queue_drop_skb(sk, &udp_sk(sk)->reader_queue, skb, flags,
+ udp_skb_destructor)) {
UDP_INC_STATS(sock_net(sk), UDP_MIB_CSUMERRORS, is_udplite);
UDP_INC_STATS(sock_net(sk), UDP_MIB_INERRORS, is_udplite);
}
unsigned int mask = datagram_poll(file, sock, wait);
struct sock *sk = sock->sk;
+ if (!skb_queue_empty(&udp_sk(sk)->reader_queue))
+ mask |= POLLIN | POLLRDNORM;
+
sock_rps_record_flow(sk);
/* Check for false positives due to checksum errors */