rxrpc: Move error processing into the local endpoint I/O thread
authorDavid Howells <dhowells@redhat.com>
Mon, 10 Oct 2022 10:47:31 +0000 (11:47 +0100)
committerDavid Howells <dhowells@redhat.com>
Thu, 1 Dec 2022 13:36:40 +0000 (13:36 +0000)
Move the processing of error packets into the local endpoint I/O thread,
leaving the handover from UDP to merely transfer them into the local
endpoint queue.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

net/rxrpc/ar-internal.h
net/rxrpc/io_thread.c
net/rxrpc/peer_event.c

index 044815ba2b493afc3f808bc9f2c5e0b939b2e6f6..566377c641848b6fc0d0f0b214b60116cdff1a89 100644 (file)
@@ -37,6 +37,7 @@ struct rxrpc_txbuf;
  */
 enum rxrpc_skb_mark {
        RXRPC_SKB_MARK_PACKET,          /* Received packet */
+       RXRPC_SKB_MARK_ERROR,           /* Error notification */
        RXRPC_SKB_MARK_REJECT_BUSY,     /* Reject with BUSY */
        RXRPC_SKB_MARK_REJECT_ABORT,    /* Reject with ABORT (code in skb->priority) */
 };
@@ -959,6 +960,7 @@ void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection
  * io_thread.c
  */
 int rxrpc_encap_rcv(struct sock *, struct sk_buff *);
+void rxrpc_error_report(struct sock *);
 int rxrpc_io_thread(void *data);
 static inline void rxrpc_wake_up_io_thread(struct rxrpc_local *local)
 {
@@ -1063,7 +1065,7 @@ void rxrpc_send_keepalive(struct rxrpc_peer *);
 /*
  * peer_event.c
  */
-void rxrpc_error_report(struct sock *);
+void rxrpc_input_error(struct rxrpc_local *, struct sk_buff *);
 void rxrpc_peer_keepalive_worker(struct work_struct *);
 
 /*
index ee2e36c46ae26dfbb15a050fa8a4f27f739cecae..416c6101cf78150da0e60f6f5a673258791f50c4 100644 (file)
@@ -37,6 +37,31 @@ int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
        return 0;
 }
 
+/*
+ * Handle an error received on the local endpoint.
+ */
+void rxrpc_error_report(struct sock *sk)
+{
+       struct rxrpc_local *local;
+       struct sk_buff *skb;
+
+       rcu_read_lock();
+       local = rcu_dereference_sk_user_data(sk);
+       if (unlikely(!local)) {
+               rcu_read_unlock();
+               return;
+       }
+
+       while ((skb = skb_dequeue(&sk->sk_error_queue))) {
+               skb->mark = RXRPC_SKB_MARK_ERROR;
+               rxrpc_new_skb(skb, rxrpc_skb_new_error_report);
+               skb_queue_tail(&local->rx_queue, skb);
+       }
+
+       rxrpc_wake_up_io_thread(local);
+       rcu_read_unlock();
+}
+
 /*
  * post connection-level events to the connection
  * - this includes challenges, responses, some aborts and call terminal packet
@@ -405,6 +430,10 @@ int rxrpc_io_thread(void *data)
                                rxrpc_input_packet(local, skb);
                                rcu_read_unlock();
                                break;
+                       case RXRPC_SKB_MARK_ERROR:
+                               rxrpc_input_error(local, skb);
+                               rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
+                               break;
                        default:
                                WARN_ON_ONCE(1);
                                rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
index f35cfc458dcf9d6e8c304c6df66f018f09852c62..94f63fb1bd6720a97f8c7c47e96cb294a5ae2419 100644 (file)
@@ -131,51 +131,26 @@ static void rxrpc_adjust_mtu(struct rxrpc_peer *peer, unsigned int mtu)
 /*
  * Handle an error received on the local endpoint.
  */
-void rxrpc_error_report(struct sock *sk)
+void rxrpc_input_error(struct rxrpc_local *local, struct sk_buff *skb)
 {
-       struct sock_exterr_skb *serr;
+       struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
        struct sockaddr_rxrpc srx;
-       struct rxrpc_local *local;
        struct rxrpc_peer *peer = NULL;
-       struct sk_buff *skb;
 
-       rcu_read_lock();
-       local = rcu_dereference_sk_user_data(sk);
-       if (unlikely(!local)) {
-               rcu_read_unlock();
-               return;
-       }
-       _enter("%p{%d}", sk, local->debug_id);
+       _enter("L=%x", local->debug_id);
 
-       /* Clear the outstanding error value on the socket so that it doesn't
-        * cause kernel_sendmsg() to return it later.
-        */
-       sock_error(sk);
-
-       skb = sock_dequeue_err_skb(sk);
-       if (!skb) {
-               rcu_read_unlock();
-               _leave("UDP socket errqueue empty");
-               return;
-       }
-       rxrpc_new_skb(skb, rxrpc_skb_new_error_report);
-       serr = SKB_EXT_ERR(skb);
        if (!skb->len && serr->ee.ee_origin == SO_EE_ORIGIN_TIMESTAMPING) {
                _leave("UDP empty message");
-               rcu_read_unlock();
-               rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
                return;
        }
 
+       rcu_read_lock();
        peer = rxrpc_lookup_peer_local_rcu(local, skb, &srx);
        if (peer && !rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input_error))
                peer = NULL;
-       if (!peer) {
-               rcu_read_unlock();
-               rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
-               _leave(" [no peer]");
+       rcu_read_unlock();
+       if (!peer)
                return;
-       }
 
        trace_rxrpc_rx_icmp(peer, &serr->ee, &srx);
 
@@ -188,11 +163,7 @@ void rxrpc_error_report(struct sock *sk)
 
        rxrpc_store_error(peer, serr);
 out:
-       rcu_read_unlock();
-       rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
        rxrpc_put_peer(peer, rxrpc_peer_put_input_error);
-
-       _leave("");
 }
 
 /*