struct page *rc_pages[RPCSVC_MAXPAGES];
};
+/*
+ * State for sending a Write chunk.
+ * - Tracks progress of writing one chunk over all its segments
+ * - Stores arguments for the SGL constructor functions
+ */
+struct svc_rdma_write_info {
+ struct svcxprt_rdma *wi_rdma;
+
+ const struct svc_rdma_chunk *wi_chunk;
+
+ /* write state of this chunk */
+ unsigned int wi_seg_off;
+ unsigned int wi_seg_no;
+
+ /* SGL constructor arguments */
+ const struct xdr_buf *wi_xdr;
+ unsigned char *wi_base;
+ unsigned int wi_next_off;
+
+ struct svc_rdma_chunk_ctxt wi_cc;
+ struct work_struct wi_work;
+};
+
struct svc_rdma_send_ctxt {
struct llist_node sc_node;
struct rpc_rdma_cid sc_cid;
struct ib_cqe sc_cqe;
struct xdr_buf sc_hdrbuf;
struct xdr_stream sc_stream;
+ struct svc_rdma_write_info sc_reply_info;
void *sc_xprt_buf;
int sc_page_count;
int sc_cur_sge_no;
const struct xdr_buf *xdr);
extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
const struct xdr_buf *xdr);
extern int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
struct svc_rqst *rqstp,
llist_add_batch(first, last, &rdma->sc_rw_ctxts);
}
-/* State for sending a Write or Reply chunk.
- * - Tracks progress of writing one chunk over all its segments
- * - Stores arguments for the SGL constructor functions
- */
-struct svc_rdma_write_info {
- struct svcxprt_rdma *wi_rdma;
-
- const struct svc_rdma_chunk *wi_chunk;
-
- /* write state of this chunk */
- unsigned int wi_seg_off;
- unsigned int wi_seg_no;
-
- /* SGL constructor arguments */
- const struct xdr_buf *wi_xdr;
- unsigned char *wi_base;
- unsigned int wi_next_off;
-
- struct svc_rdma_chunk_ctxt wi_cc;
- struct work_struct wi_work;
-};
-
static struct svc_rdma_write_info *
svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
const struct svc_rdma_chunk *chunk)
queue_work(svcrdma_wq, &info->wi_work);
}
+static void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_chunk_ctxt *cc)
+{
+ svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
+ svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE);
+}
+
+/**
+ * svc_rdma_reply_done - Reply chunk Write completion handler
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion report
+ *
+ * Pages under I/O are released by a subsequent Send completion.
+ */
+static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct ib_cqe *cqe = wc->wr_cqe;
+ struct svc_rdma_chunk_ctxt *cc =
+ container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
+ struct svcxprt_rdma *rdma = cq->cq_context;
+
+ switch (wc->status) {
+ case IB_WC_SUCCESS:
+ trace_svcrdma_wc_reply(&cc->cc_cid);
+ svc_rdma_reply_chunk_release(rdma, cc);
+ return;
+ case IB_WC_WR_FLUSH_ERR:
+ trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
+ break;
+ default:
+ trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
+ }
+
+ svc_rdma_reply_chunk_release(rdma, cc);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
+}
+
/**
* svc_rdma_write_done - Write chunk completion
* @cq: controlling Completion Queue
/**
* svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
* @rdma: controlling RDMA transport
- * @rctxt: Write and Reply chunks from client
+ * @rctxt: Write and Reply chunks provisioned by the client
+ * @sctxt: Send WR resources
* @xdr: xdr_buf containing an RPC Reply
*
* Returns a non-negative number of bytes the chunk consumed, or
*/
int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
const struct xdr_buf *xdr)
{
- struct svc_rdma_write_info *info;
- struct svc_rdma_chunk_ctxt *cc;
- struct svc_rdma_chunk *chunk;
+ struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
+ struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
int ret;
- if (pcl_is_empty(&rctxt->rc_reply_pcl))
- return 0;
+ if (likely(pcl_is_empty(&rctxt->rc_reply_pcl)))
+ return 0; /* client provided no Reply chunk */
- chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
- info = svc_rdma_write_info_alloc(rdma, chunk);
- if (!info)
- return -ENOMEM;
- cc = &info->wi_cc;
+ info->wi_rdma = rdma;
+ info->wi_chunk = pcl_first_chunk(&rctxt->rc_reply_pcl);
+ info->wi_seg_off = 0;
+ info->wi_seg_no = 0;
+ svc_rdma_cc_init(rdma, &info->wi_cc);
+ info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
svc_rdma_xb_write, info);
if (ret < 0)
- goto out_err;
+ return ret;
trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
ret = svc_rdma_post_chunk_ctxt(rdma, cc);
if (ret < 0)
- goto out_err;
+ return ret;
return xdr->len;
-
-out_err:
- svc_rdma_write_info_free(info);
- return ret;
}
/**