rxrpc: Differentiate local and remote abort codes in structs
authorDavid Howells <dhowells@redhat.com>
Thu, 7 Apr 2016 16:23:30 +0000 (17:23 +0100)
committerDavid S. Miller <davem@davemloft.net>
Mon, 11 Apr 2016 19:34:40 +0000 (15:34 -0400)
In the rxrpc_connection and rxrpc_call structs, there's one field to hold
the abort code, no matter whether that value was generated locally to be
sent or was received from the peer via an abort packet.

Split the abort code fields in two for cleanliness sake and add an error
field to hold the Linux error number to the rxrpc_call struct too
(sometimes this is generated in a context where we can't return it to
userspace directly).

Furthermore, add a skb mark to indicate a packet that caused a local abort
to be generated so that recvmsg() can pick up the correct abort code.  A
future addition will need to be to indicate to userspace the difference
between aborts via a control message.

Signed-off-by: David Howells <dhowells@redhat.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
fs/afs/rxrpc.c
include/net/af_rxrpc.h
net/rxrpc/ar-ack.c
net/rxrpc/ar-call.c
net/rxrpc/ar-connevent.c
net/rxrpc/ar-input.c
net/rxrpc/ar-internal.h
net/rxrpc/ar-output.c
net/rxrpc/ar-proc.c
net/rxrpc/ar-recvmsg.c

index b4d337ad6e36b576cdf9eddb6ccb480e7c86dbbf..63cd9f939f19965b3b2f049b444b9bbba97528f9 100644 (file)
@@ -430,9 +430,11 @@ error_kill_call:
 }
 
 /*
- * handles intercepted messages that were arriving in the socket's Rx queue
- * - called with the socket receive queue lock held to ensure message ordering
- * - called with softirqs disabled
+ * Handles intercepted messages that were arriving in the socket's Rx queue.
+ *
+ * Called from the AF_RXRPC call processor in waitqueue process context.  For
+ * each call, it is guaranteed this will be called in order of packet to be
+ * delivered.
  */
 static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
                               struct sk_buff *skb)
@@ -523,6 +525,12 @@ static void afs_deliver_to_call(struct afs_call *call)
                        call->state = AFS_CALL_ABORTED;
                        _debug("Rcv ABORT %u -> %d", abort_code, call->error);
                        break;
+               case RXRPC_SKB_MARK_LOCAL_ABORT:
+                       abort_code = rxrpc_kernel_get_abort_code(skb);
+                       call->error = call->type->abort_to_error(abort_code);
+                       call->state = AFS_CALL_ABORTED;
+                       _debug("Loc ABORT %u -> %d", abort_code, call->error);
+                       break;
                case RXRPC_SKB_MARK_NET_ERROR:
                        call->error = -rxrpc_kernel_get_error_number(skb);
                        call->state = AFS_CALL_ERROR;
index 4fd3e4a2cadd0c3705bc013446d3f641e25e69d2..ac1bc3c49fbdf9832fdb4f895657c0336bb61926 100644 (file)
@@ -20,11 +20,12 @@ struct rxrpc_call;
 /*
  * the mark applied to socket buffers that may be intercepted
  */
-enum {
+enum rxrpc_skb_mark {
        RXRPC_SKB_MARK_DATA,            /* data message */
        RXRPC_SKB_MARK_FINAL_ACK,       /* final ACK received message */
        RXRPC_SKB_MARK_BUSY,            /* server busy message */
        RXRPC_SKB_MARK_REMOTE_ABORT,    /* remote abort message */
+       RXRPC_SKB_MARK_LOCAL_ABORT,     /* local abort message */
        RXRPC_SKB_MARK_NET_ERROR,       /* network error message */
        RXRPC_SKB_MARK_LOCAL_ERROR,     /* local error message */
        RXRPC_SKB_MARK_NEW_CALL,        /* local error message */
index 54bf43ba9aa8116aa53cef44f16b895225f58486..d0eb98e1391c994baa29c6dd033c9b3e22e9b7f9 100644 (file)
@@ -905,7 +905,7 @@ void rxrpc_process_call(struct work_struct *work)
                                       ECONNABORTED, true) < 0)
                        goto no_mem;
                whdr.type = RXRPC_PACKET_TYPE_ABORT;
-               data = htonl(call->abort_code);
+               data = htonl(call->local_abort);
                iov[1].iov_base = &data;
                iov[1].iov_len = sizeof(data);
                genbit = RXRPC_CALL_EV_ABORT;
@@ -968,7 +968,7 @@ void rxrpc_process_call(struct work_struct *work)
                write_lock_bh(&call->state_lock);
                if (call->state <= RXRPC_CALL_COMPLETE) {
                        call->state = RXRPC_CALL_LOCALLY_ABORTED;
-                       call->abort_code = RX_CALL_TIMEOUT;
+                       call->local_abort = RX_CALL_TIMEOUT;
                        set_bit(RXRPC_CALL_EV_ABORT, &call->events);
                }
                write_unlock_bh(&call->state_lock);
index 7c8d300ade9bb32079b716b4fced72dce51f585b..67a211f0ebbaeb307404ebd477fcf0ffc2a33c8f 100644 (file)
@@ -682,7 +682,7 @@ void rxrpc_release_call(struct rxrpc_call *call)
            call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
                _debug("+++ ABORTING STATE %d +++\n", call->state);
                call->state = RXRPC_CALL_LOCALLY_ABORTED;
-               call->abort_code = RX_CALL_DEAD;
+               call->local_abort = RX_CALL_DEAD;
                set_bit(RXRPC_CALL_EV_ABORT, &call->events);
                rxrpc_queue_call(call);
        }
@@ -758,7 +758,7 @@ static void rxrpc_mark_call_released(struct rxrpc_call *call)
                if (call->state < RXRPC_CALL_COMPLETE) {
                        _debug("abort call %p", call);
                        call->state = RXRPC_CALL_LOCALLY_ABORTED;
-                       call->abort_code = RX_CALL_DEAD;
+                       call->local_abort = RX_CALL_DEAD;
                        if (!test_and_set_bit(RXRPC_CALL_EV_ABORT, &call->events))
                                sched = true;
                }
index 1bdaaed8cdc456c419a9da5ce95412d1853ef06c..4dc6ab81fd2f16662249da93dc94e6d2ede97f45 100644 (file)
@@ -40,11 +40,13 @@ static void rxrpc_abort_calls(struct rxrpc_connection *conn, int state,
                write_lock(&call->state_lock);
                if (call->state <= RXRPC_CALL_COMPLETE) {
                        call->state = state;
-                       call->abort_code = abort_code;
-                       if (state == RXRPC_CALL_LOCALLY_ABORTED)
+                       if (state == RXRPC_CALL_LOCALLY_ABORTED) {
+                               call->local_abort = conn->local_abort;
                                set_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
-                       else
+                       } else {
+                               call->remote_abort = conn->remote_abort;
                                set_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events);
+                       }
                        rxrpc_queue_call(call);
                }
                write_unlock(&call->state_lock);
@@ -101,7 +103,7 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn,
        whdr._rsvd      = 0;
        whdr.serviceId  = htons(conn->service_id);
 
-       word = htonl(abort_code);
+       word            = htonl(conn->local_abort);
 
        iov[0].iov_base = &whdr;
        iov[0].iov_len  = sizeof(whdr);
@@ -112,7 +114,7 @@ static int rxrpc_abort_connection(struct rxrpc_connection *conn,
 
        serial = atomic_inc_return(&conn->serial);
        whdr.serial = htonl(serial);
-       _proto("Tx CONN ABORT %%%u { %d }", serial, abort_code);
+       _proto("Tx CONN ABORT %%%u { %d }", serial, conn->local_abort);
 
        ret = kernel_sendmsg(conn->trans->local->socket, &msg, iov, 2, len);
        if (ret < 0) {
index c947cd13f43562f4479314b0f1113c34c169e52e..c6c784d3a3e8a864a5e4c8b691236ac7f9d33ba8 100644 (file)
@@ -349,7 +349,7 @@ void rxrpc_fast_process_packet(struct rxrpc_call *call, struct sk_buff *skb)
                write_lock_bh(&call->state_lock);
                if (call->state < RXRPC_CALL_COMPLETE) {
                        call->state = RXRPC_CALL_REMOTELY_ABORTED;
-                       call->abort_code = abort_code;
+                       call->remote_abort = abort_code;
                        set_bit(RXRPC_CALL_EV_RCVD_ABORT, &call->events);
                        rxrpc_queue_call(call);
                }
@@ -422,7 +422,7 @@ protocol_error:
 protocol_error_locked:
        if (call->state <= RXRPC_CALL_COMPLETE) {
                call->state = RXRPC_CALL_LOCALLY_ABORTED;
-               call->abort_code = RX_PROTOCOL_ERROR;
+               call->local_abort = RX_PROTOCOL_ERROR;
                set_bit(RXRPC_CALL_EV_ABORT, &call->events);
                rxrpc_queue_call(call);
        }
@@ -494,7 +494,7 @@ protocol_error:
        write_lock_bh(&call->state_lock);
        if (call->state <= RXRPC_CALL_COMPLETE) {
                call->state = RXRPC_CALL_LOCALLY_ABORTED;
-               call->abort_code = RX_PROTOCOL_ERROR;
+               call->local_abort = RX_PROTOCOL_ERROR;
                set_bit(RXRPC_CALL_EV_ABORT, &call->events);
                rxrpc_queue_call(call);
        }
index eeb829e837e10e15aad6423eec2aaac495ec249f..258b74a2a23fb0875a498bbd688f7a92fe17880f 100644 (file)
@@ -289,7 +289,9 @@ struct rxrpc_connection {
                RXRPC_CONN_LOCALLY_ABORTED,     /* - conn aborted locally */
                RXRPC_CONN_NETWORK_ERROR,       /* - conn terminated by network error */
        } state;
-       int                     error;          /* error code for local abort */
+       u32                     local_abort;    /* local abort code */
+       u32                     remote_abort;   /* remote abort code */
+       int                     error;          /* local error incurred */
        int                     debug_id;       /* debug ID for printks */
        unsigned int            call_counter;   /* call ID counter */
        atomic_t                serial;         /* packet serial number counter */
@@ -399,7 +401,9 @@ struct rxrpc_call {
        rwlock_t                state_lock;     /* lock for state transition */
        atomic_t                usage;
        atomic_t                sequence;       /* Tx data packet sequence counter */
-       u32                     abort_code;     /* local/remote abort code */
+       u32                     local_abort;    /* local abort code */
+       u32                     remote_abort;   /* remote abort code */
+       int                     error;          /* local error incurred */
        enum rxrpc_call_state   state : 8;      /* current state of call */
        int                     debug_id;       /* debug ID for printks */
        u8                      channel;        /* connection channel occupied by this call */
@@ -453,7 +457,7 @@ static inline void rxrpc_abort_call(struct rxrpc_call *call, u32 abort_code)
 {
        write_lock_bh(&call->state_lock);
        if (call->state < RXRPC_CALL_COMPLETE) {
-               call->abort_code = abort_code;
+               call->local_abort = abort_code;
                call->state = RXRPC_CALL_LOCALLY_ABORTED;
                set_bit(RXRPC_CALL_EV_ABORT, &call->events);
        }
index d36fb6e1a29ca64a7db3ecb1686738150bd8b50c..94e7d953743758b973b628bbc53bae9192c2b058 100644 (file)
@@ -110,7 +110,7 @@ static void rxrpc_send_abort(struct rxrpc_call *call, u32 abort_code)
 
        if (call->state <= RXRPC_CALL_COMPLETE) {
                call->state = RXRPC_CALL_LOCALLY_ABORTED;
-               call->abort_code = abort_code;
+               call->local_abort = abort_code;
                set_bit(RXRPC_CALL_EV_ABORT, &call->events);
                del_timer_sync(&call->resend_timer);
                del_timer_sync(&call->ack_timer);
index 525b2ba5a8f4095105d27833dbb948cd26c75b7b..225163bc658d518d7fc8ca0276e84fc9ddd8b3d9 100644 (file)
@@ -80,7 +80,7 @@ static int rxrpc_call_seq_show(struct seq_file *seq, void *v)
                   call->conn->in_clientflag ? "Svc" : "Clt",
                   atomic_read(&call->usage),
                   rxrpc_call_states[call->state],
-                  call->abort_code,
+                  call->remote_abort ?: call->local_abort,
                   call->user_call_ID);
 
        return 0;
index 64facba24a4507b97f7612497a2f18aa6ff19466..160f0927aa3e8921ec030a0b2733cab71c2961be 100644 (file)
@@ -288,7 +288,11 @@ receive_non_data_message:
                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_BUSY, 0, &abort_code);
                break;
        case RXRPC_SKB_MARK_REMOTE_ABORT:
-               abort_code = call->abort_code;
+               abort_code = call->remote_abort;
+               ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
+               break;
+       case RXRPC_SKB_MARK_LOCAL_ABORT:
+               abort_code = call->local_abort;
                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &abort_code);
                break;
        case RXRPC_SKB_MARK_NET_ERROR:
@@ -303,6 +307,7 @@ receive_non_data_message:
                               &abort_code);
                break;
        default:
+               pr_err("RxRPC: Unknown packet mark %u\n", skb->mark);
                BUG();
                break;
        }
@@ -401,9 +406,14 @@ u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
 {
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 
-       ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_REMOTE_ABORT);
-
-       return sp->call->abort_code;
+       switch (skb->mark) {
+       case RXRPC_SKB_MARK_REMOTE_ABORT:
+               return sp->call->remote_abort;
+       case RXRPC_SKB_MARK_LOCAL_ABORT:
+               return sp->call->local_abort;
+       default:
+               BUG();
+       }
 }
 
 EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);