svcrdma: Use the new parsed chunk list when pulling Read chunks
authorChuck Lever <chuck.lever@oracle.com>
Thu, 17 Sep 2020 17:04:17 +0000 (13:04 -0400)
committerChuck Lever <chuck.lever@oracle.com>
Mon, 30 Nov 2020 18:00:23 +0000 (13:00 -0500)
As a pre-requisite for handling multiple Read chunks in each Read
list, convert svc_rdma_recv_read_chunk() to use the new parsed Read
chunk list.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
include/linux/sunrpc/svc_rdma.h
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
net/sunrpc/xprtrdma/svc_rdma_rw.c

index 6f247d043731e5b640b9f74d2a8e5761bfca500e..294b56e61522b537c358252137eb24ff021006c0 100644 (file)
@@ -188,15 +188,15 @@ extern int svc_rdma_recvfrom(struct svc_rqst *);
 
 /* svc_rdma_rw.c */
 extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
-extern int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma,
-                                   struct svc_rqst *rqstp,
-                                   struct svc_rdma_recv_ctxt *head, __be32 *p);
 extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
                                     const struct svc_rdma_chunk *chunk,
                                     const struct xdr_buf *xdr);
 extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
                                     const struct svc_rdma_recv_ctxt *rctxt,
                                     const struct xdr_buf *xdr);
+extern int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
+                                     struct svc_rqst *rqstp,
+                                     struct svc_rdma_recv_ctxt *head);
 
 /* svc_rdma_sendto.c */
 extern void svc_rdma_send_ctxts_destroy(struct svcxprt_rdma *rdma);
index dd10b1de227d6be6043bf6a9394c5bce14ae16f6..cbdb71247755038e98ac182e5fe78a084a374714 100644 (file)
@@ -824,7 +824,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
        struct svcxprt_rdma *rdma_xprt =
                container_of(xprt, struct svcxprt_rdma, sc_xprt);
        struct svc_rdma_recv_ctxt *ctxt;
-       __be32 *p;
        int ret;
 
        rqstp->rq_xprt_ctxt = NULL;
@@ -857,7 +856,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
        rqstp->rq_respages = rqstp->rq_pages;
        rqstp->rq_next_page = rqstp->rq_respages;
 
-       p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
        ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg, ctxt);
        if (ret < 0)
                goto out_err;
@@ -870,9 +868,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
 
        svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
 
-       p += rpcrdma_fixed_maxsz;
-       if (*p != xdr_zero)
-               goto out_readchunk;
+       if (!pcl_is_empty(&ctxt->rc_read_pcl) ||
+           !pcl_is_empty(&ctxt->rc_call_pcl))
+               goto out_readlist;
 
 complete:
        rqstp->rq_xprt_ctxt = ctxt;
@@ -880,10 +878,10 @@ complete:
        svc_xprt_copy_addrs(rqstp, xprt);
        return rqstp->rq_arg.len;
 
-out_readchunk:
-       ret = svc_rdma_recv_read_chunk(rdma_xprt, rqstp, ctxt, p);
+out_readlist:
+       ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
        if (ret < 0)
-               goto out_postfail;
+               goto out_readfail;
        return 0;
 
 out_err:
@@ -891,7 +889,7 @@ out_err:
        svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
        return 0;
 
-out_postfail:
+out_readfail:
        if (ret == -EINVAL)
                svc_rdma_send_error(rdma_xprt, ctxt, ret);
        svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
index 3154c7ab1ca841144dd28ace97f56252988230ad..2d454e37b2fe65b4c113d18a8385795b749b1582 100644 (file)
@@ -258,8 +258,8 @@ static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
 /* State for pulling a Read chunk.
  */
 struct svc_rdma_read_info {
+       struct svc_rqst                 *ri_rqst;
        struct svc_rdma_recv_ctxt       *ri_readctxt;
-       unsigned int                    ri_position;
        unsigned int                    ri_pageno;
        unsigned int                    ri_pageoff;
        unsigned int                    ri_totalbytes;
@@ -658,17 +658,29 @@ out_err:
        return ret;
 }
 
+/**
+ * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
+ * @info: context for ongoing I/O
+ * @segment: co-ordinates of remote memory to be read
+ *
+ * Returns:
+ *   %0: the Read WR chain was constructed successfully
+ *   %-EINVAL: there were not enough rq_pages to finish
+ *   %-ENOMEM: allocating a local resources failed
+ *   %-EIO: a DMA mapping error occurred
+ */
 static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
-                                      struct svc_rqst *rqstp,
-                                      u32 rkey, u32 len, u64 offset)
+                                      const struct svc_rdma_segment *segment)
 {
        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
        struct svc_rdma_chunk_ctxt *cc = &info->ri_cc;
+       struct svc_rqst *rqstp = info->ri_rqst;
        struct svc_rdma_rw_ctxt *ctxt;
-       unsigned int sge_no, seg_len;
+       unsigned int sge_no, seg_len, len;
        struct scatterlist *sg;
        int ret;
 
+       len = segment->rs_length;
        sge_no = PAGE_ALIGN(info->ri_pageoff + len) >> PAGE_SHIFT;
        ctxt = svc_rdma_get_rw_ctxt(cc->cc_rdma, sge_no);
        if (!ctxt)
@@ -702,8 +714,8 @@ static int svc_rdma_build_read_segment(struct svc_rdma_read_info *info,
                        goto out_overrun;
        }
 
-       ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, offset, rkey,
-                                  DMA_FROM_DEVICE);
+       ret = svc_rdma_rw_ctx_init(cc->cc_rdma, ctxt, segment->rs_offset,
+                                  segment->rs_handle, DMA_FROM_DEVICE);
        if (ret < 0)
                return -EIO;
 
@@ -716,48 +728,63 @@ out_overrun:
        return -EINVAL;
 }
 
-/* Walk the segments in the Read chunk starting at @p and construct
- * RDMA Read operations to pull the chunk to the server.
+/**
+ * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
+ * @info: context for ongoing I/O
+ * @chunk: Read chunk to pull
+ *
+ * Return values:
+ *   %0: the Read WR chain was constructed successfully
+ *   %-EINVAL: there were not enough resources to finish
+ *   %-ENOMEM: allocating a local resources failed
+ *   %-EIO: a DMA mapping error occurred
  */
-static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
-                                    struct svc_rdma_read_info *info,
-                                    __be32 *p)
+static int svc_rdma_build_read_chunk(struct svc_rdma_read_info *info,
+                                    const struct svc_rdma_chunk *chunk)
 {
+       const struct svc_rdma_segment *segment;
        int ret;
 
        ret = -EINVAL;
-       while (*p++ != xdr_zero && be32_to_cpup(p++) == info->ri_position) {
-               u32 handle, length;
-               u64 offset;
-
-               p = xdr_decode_rdma_segment(p, &handle, &length, &offset);
-               ret = svc_rdma_build_read_segment(info, rqstp, handle, length,
-                                                 offset);
+       pcl_for_each_segment(segment, chunk) {
+               ret = svc_rdma_build_read_segment(info, segment);
                if (ret < 0)
                        break;
-
-               info->ri_totalbytes += length;
+               info->ri_totalbytes += segment->rs_length;
        }
        return ret;
 }
 
-/* Construct RDMA Reads to pull over a normal Read chunk. The chunk
- * data lands in the page list of head->rc_arg.pages.
+/**
+ * svc_rdma_read_data_items - Construct RDMA Reads to pull data item Read chunks
+ * @info: context for RDMA Reads
+ *
+ * The chunk data lands in the page list of head->rc_arg.pages.
  *
  * Currently NFSD does not look at the head->rc_arg.tail[0] iovec.
  * Therefore, XDR round-up of the Read chunk and trailing
  * inline content must both be added at the end of the pagelist.
+ *
+ * Return values:
+ *   %0: RDMA Read WQEs were successfully built
+ *   %-EINVAL: client provided too many chunks or segments,
+ *   %-ENOMEM: rdma_rw context pool was exhausted,
+ *   %-ENOTCONN: posting failed (connection is lost),
+ *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
  */
-static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
-                                           struct svc_rdma_read_info *info,
-                                           __be32 *p)
+static int svc_rdma_read_data_items(struct svc_rdma_read_info *info)
 {
        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
        struct xdr_buf *buf = &head->rc_arg;
+       struct svc_rdma_chunk *chunk;
        unsigned int length;
        int ret;
 
-       ret = svc_rdma_build_read_chunk(rqstp, info, p);
+       if (head->rc_read_pcl.cl_count > 1)
+               return -EINVAL;
+
+       chunk = pcl_first_chunk(&head->rc_read_pcl);
+       ret = svc_rdma_build_read_chunk(info, chunk);
        if (ret < 0)
                goto out;
 
@@ -768,11 +795,9 @@ static int svc_rdma_build_normal_read_chunk(struct svc_rqst *rqstp,
         * chunk is not included in either the pagelist or in
         * the tail.
         */
-       head->rc_arg.tail[0].iov_base =
-               head->rc_arg.head[0].iov_base + info->ri_position;
-       head->rc_arg.tail[0].iov_len =
-               head->rc_arg.head[0].iov_len - info->ri_position;
-       head->rc_arg.head[0].iov_len = info->ri_position;
+       buf->tail[0].iov_base = buf->head[0].iov_base + chunk->ch_position;
+       buf->tail[0].iov_len = buf->head[0].iov_len - chunk->ch_position;
+       buf->head[0].iov_len = chunk->ch_position;
 
        /* Read chunk may need XDR roundup (see RFC 8166, s. 3.4.5.2).
         *
@@ -792,26 +817,36 @@ out:
        return ret;
 }
 
-/* Construct RDMA Reads to pull over a Position Zero Read chunk.
- * The start of the data lands in the first page just after
- * the Transport header, and the rest lands in the page list of
+/**
+ * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
+ * @info: context for RDMA Reads
+ *
+ * The start of the data lands in the first page just after the
+ * Transport header, and the rest lands in the page list of
  * head->rc_arg.pages.
  *
  * Assumptions:
- *     - A PZRC has an XDR-aligned length (no implicit round-up).
- *     - There can be no trailing inline content (IOW, we assume
- *       a PZRC is never sent in an RDMA_MSG message, though it's
- *       allowed by spec).
+ *     - A PZRC is never sent in an RDMA_MSG message, though it's
+ *       allowed by spec.
+ *
+ * Return values:
+ *   %0: RDMA Read WQEs were successfully built
+ *   %-EINVAL: client provided too many chunks or segments,
+ *   %-ENOMEM: rdma_rw context pool was exhausted,
+ *   %-ENOTCONN: posting failed (connection is lost),
+ *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
  */
-static int svc_rdma_build_pz_read_chunk(struct svc_rqst *rqstp,
-                                       struct svc_rdma_read_info *info,
-                                       __be32 *p)
+static int svc_rdma_read_special(struct svc_rdma_read_info *info)
 {
        struct svc_rdma_recv_ctxt *head = info->ri_readctxt;
        struct xdr_buf *buf = &head->rc_arg;
        int ret;
 
-       ret = svc_rdma_build_read_chunk(rqstp, info, p);
+       if (head->rc_call_pcl.cl_count > 1)
+               return -EINVAL;
+
+       ret = svc_rdma_build_read_chunk(info,
+                                       pcl_first_chunk(&head->rc_call_pcl));
        if (ret < 0)
                goto out;
 
@@ -848,24 +883,31 @@ static void svc_rdma_save_io_pages(struct svc_rqst *rqstp,
 }
 
 /**
- * svc_rdma_recv_read_chunk - Pull a Read chunk from the client
+ * svc_rdma_process_read_list - Pull list of Read chunks from the client
  * @rdma: controlling RDMA transport
  * @rqstp: set of pages to use as Read sink buffers
  * @head: pages under I/O collect here
- * @p: pointer to start of Read chunk
  *
- * Returns:
- *     %0 if all needed RDMA Reads were posted successfully,
- *     %-EINVAL if client provided too many segments,
- *     %-ENOMEM if rdma_rw context pool was exhausted,
- *     %-ENOTCONN if posting failed (connection is lost),
- *     %-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
+ * pull each Read chunk as they decode an incoming RPC message.
  *
- * Assumptions:
- * - All Read segments in @p have the same Position value.
+ * On Linux, however, the server needs to have a fully-constructed RPC
+ * message in rqstp->rq_arg when there is a positive return code from
+ * ->xpo_recvfrom. So the Read list is safety-checked immediately when
+ * it is received, then here the whole Read list is pulled all at once.
+ * The ingress RPC message is fully reconstructed once all associated
+ * RDMA Reads have completed.
+ *
+ * Return values:
+ *   %1: all needed RDMA Reads were posted successfully,
+ *   %-EINVAL: client provided too many chunks or segments,
+ *   %-ENOMEM: rdma_rw context pool was exhausted,
+ *   %-ENOTCONN: posting failed (connection is lost),
+ *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
  */
-int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
-                            struct svc_rdma_recv_ctxt *head, __be32 *p)
+int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
+                              struct svc_rqst *rqstp,
+                              struct svc_rdma_recv_ctxt *head)
 {
        struct svc_rdma_read_info *info;
        struct svc_rdma_chunk_ctxt *cc;
@@ -887,16 +929,16 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
        if (!info)
                return -ENOMEM;
        cc = &info->ri_cc;
+       info->ri_rqst = rqstp;
        info->ri_readctxt = head;
        info->ri_pageno = 0;
        info->ri_pageoff = 0;
        info->ri_totalbytes = 0;
 
-       info->ri_position = be32_to_cpup(p + 1);
-       if (info->ri_position)
-               ret = svc_rdma_build_normal_read_chunk(rqstp, info, p);
+       if (pcl_is_empty(&head->rc_call_pcl))
+               ret = svc_rdma_read_data_items(info);
        else
-               ret = svc_rdma_build_pz_read_chunk(rqstp, info, p);
+               ret = svc_rdma_read_special(info);
        if (ret < 0)
                goto out_err;
 
@@ -905,7 +947,7 @@ int svc_rdma_recv_read_chunk(struct svcxprt_rdma *rdma, struct svc_rqst *rqstp,
        if (ret < 0)
                goto out_err;
        svc_rdma_save_io_pages(rqstp, 0, head->rc_page_count);
-       return 0;
+       return 1;
 
 out_err:
        svc_rdma_read_info_free(info);