SUNRPC: Replace direct task wakeups from softirq context
authorTrond Myklebust <trond.myklebust@hammerspace.com>
Wed, 1 May 2019 20:28:29 +0000 (16:28 -0400)
committerTrond Myklebust <trond.myklebust@hammerspace.com>
Sat, 6 Jul 2019 18:54:48 +0000 (14:54 -0400)
Replace the direct task wakeups from inside a softirq context with
wakeups from a process context.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
include/linux/sunrpc/xprtsock.h
net/sunrpc/xprtsock.c

index b81d0b3e0799f43c67057bdbbdb20467205f96be..7638dbe7bc50026eebcd8cf8f519ea49b4999a41 100644 (file)
@@ -56,6 +56,7 @@ struct sock_xprt {
         */
        unsigned long           sock_state;
        struct delayed_work     connect_worker;
+       struct work_struct      error_worker;
        struct work_struct      recv_worker;
        struct mutex            recv_mutex;
        struct sockaddr_storage srcaddr;
@@ -84,6 +85,10 @@ struct sock_xprt {
 #define XPRT_SOCK_CONNECTING   1U
 #define XPRT_SOCK_DATA_READY   (2)
 #define XPRT_SOCK_UPD_TIMEOUT  (3)
+#define XPRT_SOCK_WAKE_ERROR   (4)
+#define XPRT_SOCK_WAKE_WRITE   (5)
+#define XPRT_SOCK_WAKE_PENDING (6)
+#define XPRT_SOCK_WAKE_DISCONNECT      (7)
 
 #endif /* __KERNEL__ */
 
index 36652352a38c3da3e035f0d350fef489e2039585..92af57019b96777311d9028171ac8dd9ea03d8a1 100644 (file)
@@ -1211,6 +1211,15 @@ static void xs_sock_reset_state_flags(struct rpc_xprt *xprt)
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 
        clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
+       clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state);
+       clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state);
+       clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state);
+}
+
+static void xs_run_error_worker(struct sock_xprt *transport, unsigned int nr)
+{
+       set_bit(nr, &transport->sock_state);
+       queue_work(xprtiod_workqueue, &transport->error_worker);
 }
 
 static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
@@ -1231,6 +1240,7 @@ static void xs_sock_reset_connection_flags(struct rpc_xprt *xprt)
  */
 static void xs_error_report(struct sock *sk)
 {
+       struct sock_xprt *transport;
        struct rpc_xprt *xprt;
        int err;
 
@@ -1238,13 +1248,14 @@ static void xs_error_report(struct sock *sk)
        if (!(xprt = xprt_from_sock(sk)))
                goto out;
 
+       transport = container_of(xprt, struct sock_xprt, xprt);
        err = -sk->sk_err;
        if (err == 0)
                goto out;
        dprintk("RPC:       xs_error_report client %p, error=%d...\n",
                        xprt, -err);
        trace_rpc_socket_error(xprt, sk->sk_socket, err);
-       xprt_wake_pending_tasks(xprt, err);
+       xs_run_error_worker(transport, XPRT_SOCK_WAKE_ERROR);
  out:
        read_unlock_bh(&sk->sk_callback_lock);
 }
@@ -1507,7 +1518,7 @@ static void xs_tcp_state_change(struct sock *sk)
                        xprt->stat.connect_count++;
                        xprt->stat.connect_time += (long)jiffies -
                                                   xprt->stat.connect_start;
-                       xprt_wake_pending_tasks(xprt, -EAGAIN);
+                       xs_run_error_worker(transport, XPRT_SOCK_WAKE_PENDING);
                }
                spin_unlock(&xprt->transport_lock);
                break;
@@ -1525,7 +1536,7 @@ static void xs_tcp_state_change(struct sock *sk)
                /* The server initiated a shutdown of the socket */
                xprt->connect_cookie++;
                clear_bit(XPRT_CONNECTED, &xprt->state);
-               xs_tcp_force_close(xprt);
+               xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
                /* fall through */
        case TCP_CLOSING:
                /*
@@ -1547,7 +1558,7 @@ static void xs_tcp_state_change(struct sock *sk)
                        xprt_clear_connecting(xprt);
                clear_bit(XPRT_CLOSING, &xprt->state);
                /* Trigger the socket release */
-               xs_tcp_force_close(xprt);
+               xs_run_error_worker(transport, XPRT_SOCK_WAKE_DISCONNECT);
        }
  out:
        read_unlock_bh(&sk->sk_callback_lock);
@@ -1556,6 +1567,7 @@ static void xs_tcp_state_change(struct sock *sk)
 static void xs_write_space(struct sock *sk)
 {
        struct socket_wq *wq;
+       struct sock_xprt *transport;
        struct rpc_xprt *xprt;
 
        if (!sk->sk_socket)
@@ -1564,13 +1576,14 @@ static void xs_write_space(struct sock *sk)
 
        if (unlikely(!(xprt = xprt_from_sock(sk))))
                return;
+       transport = container_of(xprt, struct sock_xprt, xprt);
        rcu_read_lock();
        wq = rcu_dereference(sk->sk_wq);
        if (!wq || test_and_clear_bit(SOCKWQ_ASYNC_NOSPACE, &wq->flags) == 0)
                goto out;
 
-       if (xprt_write_space(xprt))
-               sk->sk_write_pending--;
+       xs_run_error_worker(transport, XPRT_SOCK_WAKE_WRITE);
+       sk->sk_write_pending--;
 out:
        rcu_read_unlock();
 }
@@ -2461,6 +2474,56 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
                        delay);
 }
 
+static void xs_wake_disconnect(struct sock_xprt *transport)
+{
+       if (test_and_clear_bit(XPRT_SOCK_WAKE_DISCONNECT, &transport->sock_state))
+               xs_tcp_force_close(&transport->xprt);
+}
+
+static void xs_wake_write(struct sock_xprt *transport)
+{
+       if (test_and_clear_bit(XPRT_SOCK_WAKE_WRITE, &transport->sock_state))
+               xprt_write_space(&transport->xprt);
+}
+
+static void xs_wake_error(struct sock_xprt *transport)
+{
+       int sockerr;
+       int sockerr_len = sizeof(sockerr);
+
+       if (!test_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
+               return;
+       mutex_lock(&transport->recv_mutex);
+       if (transport->sock == NULL)
+               goto out;
+       if (!test_and_clear_bit(XPRT_SOCK_WAKE_ERROR, &transport->sock_state))
+               goto out;
+       if (kernel_getsockopt(transport->sock, SOL_SOCKET, SO_ERROR,
+                               (char *)&sockerr, &sockerr_len) != 0)
+               goto out;
+       if (sockerr < 0)
+               xprt_wake_pending_tasks(&transport->xprt, sockerr);
+out:
+       mutex_unlock(&transport->recv_mutex);
+}
+
+static void xs_wake_pending(struct sock_xprt *transport)
+{
+       if (test_and_clear_bit(XPRT_SOCK_WAKE_PENDING, &transport->sock_state))
+               xprt_wake_pending_tasks(&transport->xprt, -EAGAIN);
+}
+
+static void xs_error_handle(struct work_struct *work)
+{
+       struct sock_xprt *transport = container_of(work,
+                       struct sock_xprt, error_worker);
+
+       xs_wake_disconnect(transport);
+       xs_wake_write(transport);
+       xs_wake_error(transport);
+       xs_wake_pending(transport);
+}
+
 /**
  * xs_local_print_stats - display AF_LOCAL socket-specifc stats
  * @xprt: rpc_xprt struct containing statistics
@@ -2873,6 +2936,7 @@ static struct rpc_xprt *xs_setup_local(struct xprt_create *args)
        xprt->timeout = &xs_local_default_timeout;
 
        INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
+       INIT_WORK(&transport->error_worker, xs_error_handle);
        INIT_DELAYED_WORK(&transport->connect_worker, xs_dummy_setup_socket);
 
        switch (sun->sun_family) {
@@ -2943,6 +3007,7 @@ static struct rpc_xprt *xs_setup_udp(struct xprt_create *args)
        xprt->timeout = &xs_udp_default_timeout;
 
        INIT_WORK(&transport->recv_worker, xs_udp_data_receive_workfn);
+       INIT_WORK(&transport->error_worker, xs_error_handle);
        INIT_DELAYED_WORK(&transport->connect_worker, xs_udp_setup_socket);
 
        switch (addr->sa_family) {
@@ -3024,6 +3089,7 @@ static struct rpc_xprt *xs_setup_tcp(struct xprt_create *args)
                (xprt->timeout->to_retries + 1);
 
        INIT_WORK(&transport->recv_worker, xs_stream_data_receive_workfn);
+       INIT_WORK(&transport->error_worker, xs_error_handle);
        INIT_DELAYED_WORK(&transport->connect_worker, xs_tcp_setup_socket);
 
        switch (addr->sa_family) {