diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c index d7585458dfe4..c66030c39fcd 100644 --- a/net/sunrpc/xprt.c +++ b/net/sunrpc/xprt.c @@ -1224,15 +1224,20 @@ void xprt_end_transmit(struct rpc_task *task) } /** - * xprt_transmit - send an RPC request on a transport - * @task: controlling RPC task + * xprt_request_transmit - send an RPC request on a transport + * @req: pointer to request to transmit + * @snd_task: RPC task that owns the transport lock * - * We have to copy the iovec because sendmsg fiddles with its contents. + * This performs the transmission of a single request. + * Note that if the request is not the same as snd_task, then it + * does need to be pinned. + * Returns '0' on success. */ -void xprt_transmit(struct rpc_task *task) +static int +xprt_request_transmit(struct rpc_rqst *req, struct rpc_task *snd_task) { - struct rpc_rqst *req = task->tk_rqstp; - struct rpc_xprt *xprt = req->rq_xprt; + struct rpc_xprt *xprt = req->rq_xprt; + struct rpc_task *task = req->rq_task; unsigned int connect_cookie; int is_retrans = RPC_WAS_SENT(task); int status; @@ -1240,11 +1245,13 @@ void xprt_transmit(struct rpc_task *task) dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen); if (!req->rq_bytes_sent) { - if (xprt_request_data_received(task)) + if (xprt_request_data_received(task)) { + status = 0; goto out_dequeue; + } /* Verify that our message lies in the RPCSEC_GSS window */ if (rpcauth_xmit_need_reencode(task)) { - task->tk_status = -EBADMSG; + status = -EBADMSG; goto out_dequeue; } } @@ -1257,12 +1264,11 @@ void xprt_transmit(struct rpc_task *task) req->rq_ntrans++; connect_cookie = xprt->connect_cookie; - status = xprt->ops->send_request(req, task); + status = xprt->ops->send_request(req, snd_task); trace_xprt_transmit(xprt, req->rq_xid, status); if (status != 0) { req->rq_ntrans--; - task->tk_status = status; - return; + return status; } if (is_retrans) @@ -1284,6 +1290,49 @@ void xprt_transmit(struct rpc_task *task) req->rq_connect_cookie = connect_cookie; out_dequeue: xprt_request_dequeue_transmit(task); + rpc_wake_up_queued_task_set_status(&xprt->sending, task, status); + return status; +} + +/** + * xprt_transmit - send an RPC request on a transport + * @task: controlling RPC task + * + * Attempts to drain the transmit queue. On exit, either the transport + * signalled an error that needs to be handled before transmission can + * resume, or @task finished transmitting, and detected that it already + * received a reply. + */ +void +xprt_transmit(struct rpc_task *task) +{ + struct rpc_rqst *next, *req = task->tk_rqstp; + struct rpc_xprt *xprt = req->rq_xprt; + int status; + + spin_lock(&xprt->queue_lock); + while (!list_empty(&xprt->xmit_queue)) { + next = list_first_entry(&xprt->xmit_queue, + struct rpc_rqst, rq_xmit); + xprt_pin_rqst(next); + spin_unlock(&xprt->queue_lock); + status = xprt_request_transmit(next, task); + if (status == -EBADMSG && next != req) + status = 0; + cond_resched(); + spin_lock(&xprt->queue_lock); + xprt_unpin_rqst(next); + if (status == 0) { + if (!xprt_request_data_received(task) || + test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) + continue; + } else if (!test_bit(RPC_TASK_NEED_XMIT, &task->tk_runstate)) + rpc_wake_up_queued_task(&xprt->pending, task); + else + task->tk_status = status; + break; + } + spin_unlock(&xprt->queue_lock); } static void xprt_add_backlog(struct rpc_xprt *xprt, struct rpc_task *task)