mirror of
https://github.com/Fishwaldo/Star64_linux.git
synced 2025-04-22 14:23:58 +00:00
net/smc: urgent data support
Add support for out of band data send and receive. Signed-off-by: Stefan Raspl <raspl@linux.ibm.com> Signed-off-by: Ursula Braun <ubraun@linux.ibm.com> Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
parent
b9f227c370
commit
de8474eb9d
8 changed files with 238 additions and 36 deletions
|
@ -8,8 +8,6 @@
|
||||||
*
|
*
|
||||||
* Initial restrictions:
|
* Initial restrictions:
|
||||||
* - support for alternate links postponed
|
* - support for alternate links postponed
|
||||||
* - partial support for non-blocking sockets only
|
|
||||||
* - support for urgent data postponed
|
|
||||||
*
|
*
|
||||||
* Copyright IBM Corp. 2016, 2018
|
* Copyright IBM Corp. 2016, 2018
|
||||||
*
|
*
|
||||||
|
@ -1338,6 +1336,8 @@ static __poll_t smc_poll(struct file *file, struct socket *sock,
|
||||||
if (sk->sk_state == SMC_APPCLOSEWAIT1)
|
if (sk->sk_state == SMC_APPCLOSEWAIT1)
|
||||||
mask |= EPOLLIN;
|
mask |= EPOLLIN;
|
||||||
}
|
}
|
||||||
|
if (smc->conn.urg_state == SMC_URG_VALID)
|
||||||
|
mask |= EPOLLPRI;
|
||||||
|
|
||||||
}
|
}
|
||||||
release_sock(sk);
|
release_sock(sk);
|
||||||
|
@ -1477,10 +1477,13 @@ static int smc_getsockopt(struct socket *sock, int level, int optname,
|
||||||
static int smc_ioctl(struct socket *sock, unsigned int cmd,
|
static int smc_ioctl(struct socket *sock, unsigned int cmd,
|
||||||
unsigned long arg)
|
unsigned long arg)
|
||||||
{
|
{
|
||||||
|
union smc_host_cursor cons, urg;
|
||||||
|
struct smc_connection *conn;
|
||||||
struct smc_sock *smc;
|
struct smc_sock *smc;
|
||||||
int answ;
|
int answ;
|
||||||
|
|
||||||
smc = smc_sk(sock->sk);
|
smc = smc_sk(sock->sk);
|
||||||
|
conn = &smc->conn;
|
||||||
if (smc->use_fallback) {
|
if (smc->use_fallback) {
|
||||||
if (!smc->clcsock)
|
if (!smc->clcsock)
|
||||||
return -EBADF;
|
return -EBADF;
|
||||||
|
@ -1517,6 +1520,23 @@ static int smc_ioctl(struct socket *sock, unsigned int cmd,
|
||||||
else
|
else
|
||||||
answ = smc_tx_prepared_sends(&smc->conn);
|
answ = smc_tx_prepared_sends(&smc->conn);
|
||||||
break;
|
break;
|
||||||
|
case SIOCATMARK:
|
||||||
|
if (smc->sk.sk_state == SMC_LISTEN)
|
||||||
|
return -EINVAL;
|
||||||
|
if (smc->sk.sk_state == SMC_INIT ||
|
||||||
|
smc->sk.sk_state == SMC_CLOSED) {
|
||||||
|
answ = 0;
|
||||||
|
} else {
|
||||||
|
smc_curs_write(&cons,
|
||||||
|
smc_curs_read(&conn->local_tx_ctrl.cons, conn),
|
||||||
|
conn);
|
||||||
|
smc_curs_write(&urg,
|
||||||
|
smc_curs_read(&conn->urg_curs, conn),
|
||||||
|
conn);
|
||||||
|
answ = smc_curs_diff(conn->rmb_desc->len,
|
||||||
|
&cons, &urg) == 1;
|
||||||
|
}
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
return -ENOIOCTLCMD;
|
return -ENOIOCTLCMD;
|
||||||
}
|
}
|
||||||
|
|
|
@ -114,6 +114,12 @@ struct smc_host_cdc_msg { /* Connection Data Control message */
|
||||||
u8 reserved[18];
|
u8 reserved[18];
|
||||||
} __aligned(8);
|
} __aligned(8);
|
||||||
|
|
||||||
|
enum smc_urg_state {
|
||||||
|
SMC_URG_VALID, /* data present */
|
||||||
|
SMC_URG_NOTYET, /* data pending */
|
||||||
|
SMC_URG_READ /* data was already read */
|
||||||
|
};
|
||||||
|
|
||||||
struct smc_connection {
|
struct smc_connection {
|
||||||
struct rb_node alert_node;
|
struct rb_node alert_node;
|
||||||
struct smc_link_group *lgr; /* link group of connection */
|
struct smc_link_group *lgr; /* link group of connection */
|
||||||
|
@ -160,6 +166,15 @@ struct smc_connection {
|
||||||
union smc_host_cursor rx_curs_confirmed; /* confirmed to peer
|
union smc_host_cursor rx_curs_confirmed; /* confirmed to peer
|
||||||
* source of snd_una ?
|
* source of snd_una ?
|
||||||
*/
|
*/
|
||||||
|
union smc_host_cursor urg_curs; /* points at urgent byte */
|
||||||
|
enum smc_urg_state urg_state;
|
||||||
|
bool urg_tx_pend; /* urgent data staged */
|
||||||
|
bool urg_rx_skip_pend;
|
||||||
|
/* indicate urgent oob data
|
||||||
|
* read, but previous regular
|
||||||
|
* data still pending
|
||||||
|
*/
|
||||||
|
char urg_rx_byte; /* urgent byte */
|
||||||
atomic_t bytes_to_rcv; /* arrived data,
|
atomic_t bytes_to_rcv; /* arrived data,
|
||||||
* not yet received
|
* not yet received
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -164,6 +164,28 @@ static inline bool smc_cdc_before(u16 seq1, u16 seq2)
|
||||||
return (s16)(seq1 - seq2) < 0;
|
return (s16)(seq1 - seq2) < 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc,
|
||||||
|
int *diff_prod)
|
||||||
|
{
|
||||||
|
struct smc_connection *conn = &smc->conn;
|
||||||
|
char *base;
|
||||||
|
|
||||||
|
/* new data included urgent business */
|
||||||
|
smc_curs_write(&conn->urg_curs,
|
||||||
|
smc_curs_read(&conn->local_rx_ctrl.prod, conn),
|
||||||
|
conn);
|
||||||
|
conn->urg_state = SMC_URG_VALID;
|
||||||
|
if (!sock_flag(&smc->sk, SOCK_URGINLINE))
|
||||||
|
/* we'll skip the urgent byte, so don't account for it */
|
||||||
|
(*diff_prod)--;
|
||||||
|
base = (char *)conn->rmb_desc->cpu_addr;
|
||||||
|
if (conn->urg_curs.count)
|
||||||
|
conn->urg_rx_byte = *(base + conn->urg_curs.count - 1);
|
||||||
|
else
|
||||||
|
conn->urg_rx_byte = *(base + conn->rmb_desc->len - 1);
|
||||||
|
sk_send_sigurg(&smc->sk);
|
||||||
|
}
|
||||||
|
|
||||||
static void smc_cdc_msg_recv_action(struct smc_sock *smc,
|
static void smc_cdc_msg_recv_action(struct smc_sock *smc,
|
||||||
struct smc_cdc_msg *cdc)
|
struct smc_cdc_msg *cdc)
|
||||||
{
|
{
|
||||||
|
@ -194,15 +216,25 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
|
||||||
diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old,
|
diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old,
|
||||||
&conn->local_rx_ctrl.prod);
|
&conn->local_rx_ctrl.prod);
|
||||||
if (diff_prod) {
|
if (diff_prod) {
|
||||||
|
if (conn->local_rx_ctrl.prod_flags.urg_data_present)
|
||||||
|
smc_cdc_handle_urg_data_arrival(smc, &diff_prod);
|
||||||
/* bytes_to_rcv is decreased in smc_recvmsg */
|
/* bytes_to_rcv is decreased in smc_recvmsg */
|
||||||
smp_mb__before_atomic();
|
smp_mb__before_atomic();
|
||||||
atomic_add(diff_prod, &conn->bytes_to_rcv);
|
atomic_add(diff_prod, &conn->bytes_to_rcv);
|
||||||
/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
|
/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
|
||||||
smp_mb__after_atomic();
|
smp_mb__after_atomic();
|
||||||
smc->sk.sk_data_ready(&smc->sk);
|
smc->sk.sk_data_ready(&smc->sk);
|
||||||
} else if ((conn->local_rx_ctrl.prod_flags.write_blocked) ||
|
} else {
|
||||||
(conn->local_rx_ctrl.prod_flags.cons_curs_upd_req)) {
|
if (conn->local_rx_ctrl.prod_flags.write_blocked ||
|
||||||
smc->sk.sk_data_ready(&smc->sk);
|
conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
|
||||||
|
conn->local_rx_ctrl.prod_flags.urg_data_pending) {
|
||||||
|
if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
|
||||||
|
conn->urg_state = SMC_URG_NOTYET;
|
||||||
|
/* force immediate tx of current consumer cursor, but
|
||||||
|
* under send_lock to guarantee arrival in seqno-order
|
||||||
|
*/
|
||||||
|
smc_tx_sndbuf_nonempty(conn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* piggy backed tx info */
|
/* piggy backed tx info */
|
||||||
|
@ -212,6 +244,12 @@ static void smc_cdc_msg_recv_action(struct smc_sock *smc,
|
||||||
/* trigger socket release if connection closed */
|
/* trigger socket release if connection closed */
|
||||||
smc_close_wake_tx_prepared(smc);
|
smc_close_wake_tx_prepared(smc);
|
||||||
}
|
}
|
||||||
|
if (diff_cons && conn->urg_tx_pend &&
|
||||||
|
atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
|
||||||
|
/* urg data confirmed by peer, indicate we're ready for more */
|
||||||
|
conn->urg_tx_pend = false;
|
||||||
|
smc->sk.sk_write_space(&smc->sk);
|
||||||
|
}
|
||||||
|
|
||||||
if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
|
if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
|
||||||
smc->sk.sk_err = ECONNRESET;
|
smc->sk.sk_err = ECONNRESET;
|
||||||
|
|
|
@ -146,6 +146,19 @@ static inline int smc_curs_diff(unsigned int size,
|
||||||
return max_t(int, 0, (new->count - old->count));
|
return max_t(int, 0, (new->count - old->count));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* calculate cursor difference between old and new - returns negative
|
||||||
|
* value in case old > new
|
||||||
|
*/
|
||||||
|
static inline int smc_curs_comp(unsigned int size,
|
||||||
|
union smc_host_cursor *old,
|
||||||
|
union smc_host_cursor *new)
|
||||||
|
{
|
||||||
|
if (old->wrap > new->wrap ||
|
||||||
|
(old->wrap == new->wrap && old->count > new->count))
|
||||||
|
return -smc_curs_diff(size, new, old);
|
||||||
|
return smc_curs_diff(size, old, new);
|
||||||
|
}
|
||||||
|
|
||||||
static inline void smc_host_cursor_to_cdc(union smc_cdc_cursor *peer,
|
static inline void smc_host_cursor_to_cdc(union smc_cdc_cursor *peer,
|
||||||
union smc_host_cursor *local,
|
union smc_host_cursor *local,
|
||||||
struct smc_connection *conn)
|
struct smc_connection *conn)
|
||||||
|
|
|
@ -544,6 +544,7 @@ create:
|
||||||
}
|
}
|
||||||
conn->local_tx_ctrl.common.type = SMC_CDC_MSG_TYPE;
|
conn->local_tx_ctrl.common.type = SMC_CDC_MSG_TYPE;
|
||||||
conn->local_tx_ctrl.len = SMC_WR_TX_SIZE;
|
conn->local_tx_ctrl.len = SMC_WR_TX_SIZE;
|
||||||
|
conn->urg_state = SMC_URG_READ;
|
||||||
#ifndef KERNEL_HAS_ATOMIC64
|
#ifndef KERNEL_HAS_ATOMIC64
|
||||||
spin_lock_init(&conn->acurs_lock);
|
spin_lock_init(&conn->acurs_lock);
|
||||||
#endif
|
#endif
|
||||||
|
|
118
net/smc/smc_rx.c
118
net/smc/smc_rx.c
|
@ -47,16 +47,59 @@ static void smc_rx_wake_up(struct sock *sk)
|
||||||
* @conn connection to update
|
* @conn connection to update
|
||||||
* @cons consumer cursor
|
* @cons consumer cursor
|
||||||
* @len number of Bytes consumed
|
* @len number of Bytes consumed
|
||||||
|
* Returns:
|
||||||
|
* 1 if we should end our receive, 0 otherwise
|
||||||
*/
|
*/
|
||||||
static void smc_rx_update_consumer(struct smc_connection *conn,
|
static int smc_rx_update_consumer(struct smc_sock *smc,
|
||||||
union smc_host_cursor cons, size_t len)
|
union smc_host_cursor cons, size_t len)
|
||||||
{
|
{
|
||||||
|
struct smc_connection *conn = &smc->conn;
|
||||||
|
struct sock *sk = &smc->sk;
|
||||||
|
bool force = false;
|
||||||
|
int diff, rc = 0;
|
||||||
|
|
||||||
smc_curs_add(conn->rmb_desc->len, &cons, len);
|
smc_curs_add(conn->rmb_desc->len, &cons, len);
|
||||||
|
|
||||||
|
/* did we process urgent data? */
|
||||||
|
if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) {
|
||||||
|
diff = smc_curs_comp(conn->rmb_desc->len, &cons,
|
||||||
|
&conn->urg_curs);
|
||||||
|
if (sock_flag(sk, SOCK_URGINLINE)) {
|
||||||
|
if (diff == 0) {
|
||||||
|
force = true;
|
||||||
|
rc = 1;
|
||||||
|
conn->urg_state = SMC_URG_READ;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (diff == 1) {
|
||||||
|
/* skip urgent byte */
|
||||||
|
force = true;
|
||||||
|
smc_curs_add(conn->rmb_desc->len, &cons, 1);
|
||||||
|
conn->urg_rx_skip_pend = false;
|
||||||
|
} else if (diff < -1)
|
||||||
|
/* we read past urgent byte */
|
||||||
|
conn->urg_state = SMC_URG_READ;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn),
|
smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn),
|
||||||
conn);
|
conn);
|
||||||
|
|
||||||
/* send consumer cursor update if required */
|
/* send consumer cursor update if required */
|
||||||
/* similar to advertising new TCP rcv_wnd if required */
|
/* similar to advertising new TCP rcv_wnd if required */
|
||||||
smc_tx_consumer_update(conn);
|
smc_tx_consumer_update(conn, force);
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void smc_rx_update_cons(struct smc_sock *smc, size_t len)
|
||||||
|
{
|
||||||
|
struct smc_connection *conn = &smc->conn;
|
||||||
|
union smc_host_cursor cons;
|
||||||
|
|
||||||
|
smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn),
|
||||||
|
conn);
|
||||||
|
smc_rx_update_consumer(smc, cons, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
struct smc_spd_priv {
|
struct smc_spd_priv {
|
||||||
|
@ -70,7 +113,6 @@ static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
|
||||||
struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
|
struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
|
||||||
struct smc_sock *smc = priv->smc;
|
struct smc_sock *smc = priv->smc;
|
||||||
struct smc_connection *conn;
|
struct smc_connection *conn;
|
||||||
union smc_host_cursor cons;
|
|
||||||
struct sock *sk = &smc->sk;
|
struct sock *sk = &smc->sk;
|
||||||
|
|
||||||
if (sk->sk_state == SMC_CLOSED ||
|
if (sk->sk_state == SMC_CLOSED ||
|
||||||
|
@ -79,9 +121,7 @@ static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
|
||||||
goto out;
|
goto out;
|
||||||
conn = &smc->conn;
|
conn = &smc->conn;
|
||||||
lock_sock(sk);
|
lock_sock(sk);
|
||||||
smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn),
|
smc_rx_update_cons(smc, priv->len);
|
||||||
conn);
|
|
||||||
smc_rx_update_consumer(conn, cons, priv->len);
|
|
||||||
release_sock(sk);
|
release_sock(sk);
|
||||||
if (atomic_sub_and_test(priv->len, &conn->splice_pending))
|
if (atomic_sub_and_test(priv->len, &conn->splice_pending))
|
||||||
smc_rx_wake_up(sk);
|
smc_rx_wake_up(sk);
|
||||||
|
@ -184,6 +224,52 @@ int smc_rx_wait(struct smc_sock *smc, long *timeo,
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len,
|
||||||
|
int flags)
|
||||||
|
{
|
||||||
|
struct smc_connection *conn = &smc->conn;
|
||||||
|
union smc_host_cursor cons;
|
||||||
|
struct sock *sk = &smc->sk;
|
||||||
|
int rc = 0;
|
||||||
|
|
||||||
|
if (sock_flag(sk, SOCK_URGINLINE) ||
|
||||||
|
!(conn->urg_state == SMC_URG_VALID) ||
|
||||||
|
conn->urg_state == SMC_URG_READ)
|
||||||
|
return -EINVAL;
|
||||||
|
|
||||||
|
if (conn->urg_state == SMC_URG_VALID) {
|
||||||
|
if (!(flags & MSG_PEEK))
|
||||||
|
smc->conn.urg_state = SMC_URG_READ;
|
||||||
|
msg->msg_flags |= MSG_OOB;
|
||||||
|
if (len > 0) {
|
||||||
|
if (!(flags & MSG_TRUNC))
|
||||||
|
rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1);
|
||||||
|
len = 1;
|
||||||
|
smc_curs_write(&cons,
|
||||||
|
smc_curs_read(&conn->local_tx_ctrl.cons,
|
||||||
|
conn),
|
||||||
|
conn);
|
||||||
|
if (smc_curs_diff(conn->rmb_desc->len, &cons,
|
||||||
|
&conn->urg_curs) > 1)
|
||||||
|
conn->urg_rx_skip_pend = true;
|
||||||
|
/* Urgent Byte was already accounted for, but trigger
|
||||||
|
* skipping the urgent byte in non-inline case
|
||||||
|
*/
|
||||||
|
if (!(flags & MSG_PEEK))
|
||||||
|
smc_rx_update_consumer(smc, cons, 0);
|
||||||
|
} else {
|
||||||
|
msg->msg_flags |= MSG_TRUNC;
|
||||||
|
}
|
||||||
|
|
||||||
|
return rc ? -EFAULT : len;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
return -EAGAIN;
|
||||||
|
}
|
||||||
|
|
||||||
/* smc_rx_recvmsg - receive data from RMBE
|
/* smc_rx_recvmsg - receive data from RMBE
|
||||||
* @msg: copy data to receive buffer
|
* @msg: copy data to receive buffer
|
||||||
* @pipe: copy data to pipe if set - indicates splice() call
|
* @pipe: copy data to pipe if set - indicates splice() call
|
||||||
|
@ -209,12 +295,12 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
|
||||||
|
|
||||||
if (unlikely(flags & MSG_ERRQUEUE))
|
if (unlikely(flags & MSG_ERRQUEUE))
|
||||||
return -EINVAL; /* future work for sk.sk_family == AF_SMC */
|
return -EINVAL; /* future work for sk.sk_family == AF_SMC */
|
||||||
if (flags & MSG_OOB)
|
|
||||||
return -EINVAL; /* future work */
|
|
||||||
|
|
||||||
sk = &smc->sk;
|
sk = &smc->sk;
|
||||||
if (sk->sk_state == SMC_LISTEN)
|
if (sk->sk_state == SMC_LISTEN)
|
||||||
return -ENOTCONN;
|
return -ENOTCONN;
|
||||||
|
if (flags & MSG_OOB)
|
||||||
|
return smc_rx_recv_urg(smc, msg, len, flags);
|
||||||
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
|
timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
|
||||||
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
|
target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
|
||||||
|
|
||||||
|
@ -227,6 +313,9 @@ int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
|
||||||
|
|
||||||
if (atomic_read(&conn->bytes_to_rcv))
|
if (atomic_read(&conn->bytes_to_rcv))
|
||||||
goto copy;
|
goto copy;
|
||||||
|
else if (conn->urg_state == SMC_URG_VALID)
|
||||||
|
/* we received a single urgent Byte - skip */
|
||||||
|
smc_rx_update_cons(smc, 0);
|
||||||
|
|
||||||
if (sk->sk_shutdown & RCV_SHUTDOWN ||
|
if (sk->sk_shutdown & RCV_SHUTDOWN ||
|
||||||
smc_cdc_rxed_any_close_or_senddone(conn) ||
|
smc_cdc_rxed_any_close_or_senddone(conn) ||
|
||||||
|
@ -281,14 +370,18 @@ copy:
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* not more than what user space asked for */
|
|
||||||
copylen = min_t(size_t, read_remaining, readable);
|
|
||||||
smc_curs_write(&cons,
|
smc_curs_write(&cons,
|
||||||
smc_curs_read(&conn->local_tx_ctrl.cons, conn),
|
smc_curs_read(&conn->local_tx_ctrl.cons, conn),
|
||||||
conn);
|
conn);
|
||||||
/* subsequent splice() calls pick up where previous left */
|
/* subsequent splice() calls pick up where previous left */
|
||||||
if (splbytes)
|
if (splbytes)
|
||||||
smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
|
smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
|
||||||
|
if (conn->urg_state == SMC_URG_VALID &&
|
||||||
|
sock_flag(&smc->sk, SOCK_URGINLINE) &&
|
||||||
|
readable > 1)
|
||||||
|
readable--; /* always stop at urgent Byte */
|
||||||
|
/* not more than what user space asked for */
|
||||||
|
copylen = min_t(size_t, read_remaining, readable);
|
||||||
/* determine chunks where to read from rcvbuf */
|
/* determine chunks where to read from rcvbuf */
|
||||||
/* either unwrapped case, or 1st chunk of wrapped case */
|
/* either unwrapped case, or 1st chunk of wrapped case */
|
||||||
chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
|
chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
|
||||||
|
@ -333,8 +426,8 @@ copy:
|
||||||
atomic_sub(copylen, &conn->bytes_to_rcv);
|
atomic_sub(copylen, &conn->bytes_to_rcv);
|
||||||
/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
|
/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
|
||||||
smp_mb__after_atomic();
|
smp_mb__after_atomic();
|
||||||
if (msg)
|
if (msg && smc_rx_update_consumer(smc, cons, copylen))
|
||||||
smc_rx_update_consumer(conn, cons, copylen);
|
goto out;
|
||||||
}
|
}
|
||||||
} while (read_remaining);
|
} while (read_remaining);
|
||||||
out:
|
out:
|
||||||
|
@ -346,4 +439,5 @@ void smc_rx_init(struct smc_sock *smc)
|
||||||
{
|
{
|
||||||
smc->sk.sk_data_ready = smc_rx_wake_up;
|
smc->sk.sk_data_ready = smc_rx_wake_up;
|
||||||
atomic_set(&smc->conn.splice_pending, 0);
|
atomic_set(&smc->conn.splice_pending, 0);
|
||||||
|
smc->conn.urg_state = SMC_URG_READ;
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
/***************************** sndbuf producer *******************************/
|
/***************************** sndbuf producer *******************************/
|
||||||
|
|
||||||
/* callback implementation for sk.sk_write_space()
|
/* callback implementation for sk.sk_write_space()
|
||||||
* to wakeup sndbuf producers that blocked with smc_tx_wait_memory().
|
* to wakeup sndbuf producers that blocked with smc_tx_wait().
|
||||||
* called under sk_socket lock.
|
* called under sk_socket lock.
|
||||||
*/
|
*/
|
||||||
static void smc_tx_write_space(struct sock *sk)
|
static void smc_tx_write_space(struct sock *sk)
|
||||||
|
@ -56,7 +56,7 @@ static void smc_tx_write_space(struct sock *sk)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Wakeup sndbuf producers that blocked with smc_tx_wait_memory().
|
/* Wakeup sndbuf producers that blocked with smc_tx_wait().
|
||||||
* Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
|
* Cf. tcp_data_snd_check()=>tcp_check_space()=>tcp_new_space().
|
||||||
*/
|
*/
|
||||||
void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
|
void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
|
||||||
|
@ -66,8 +66,10 @@ void smc_tx_sndbuf_nonfull(struct smc_sock *smc)
|
||||||
smc->sk.sk_write_space(&smc->sk);
|
smc->sk.sk_write_space(&smc->sk);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* blocks sndbuf producer until at least one byte of free space available */
|
/* blocks sndbuf producer until at least one byte of free space available
|
||||||
static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
|
* or urgent Byte was consumed
|
||||||
|
*/
|
||||||
|
static int smc_tx_wait(struct smc_sock *smc, int flags)
|
||||||
{
|
{
|
||||||
DEFINE_WAIT_FUNC(wait, woken_wake_function);
|
DEFINE_WAIT_FUNC(wait, woken_wake_function);
|
||||||
struct smc_connection *conn = &smc->conn;
|
struct smc_connection *conn = &smc->conn;
|
||||||
|
@ -103,14 +105,15 @@ static int smc_tx_wait_memory(struct smc_sock *smc, int flags)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
|
sk_clear_bit(SOCKWQ_ASYNC_NOSPACE, sk);
|
||||||
if (atomic_read(&conn->sndbuf_space))
|
if (atomic_read(&conn->sndbuf_space) && !conn->urg_tx_pend)
|
||||||
break; /* at least 1 byte of free space available */
|
break; /* at least 1 byte of free & no urgent data */
|
||||||
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
|
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
|
||||||
sk_wait_event(sk, &timeo,
|
sk_wait_event(sk, &timeo,
|
||||||
sk->sk_err ||
|
sk->sk_err ||
|
||||||
(sk->sk_shutdown & SEND_SHUTDOWN) ||
|
(sk->sk_shutdown & SEND_SHUTDOWN) ||
|
||||||
smc_cdc_rxed_any_close(conn) ||
|
smc_cdc_rxed_any_close(conn) ||
|
||||||
atomic_read(&conn->sndbuf_space),
|
(atomic_read(&conn->sndbuf_space) &&
|
||||||
|
!conn->urg_tx_pend),
|
||||||
&wait);
|
&wait);
|
||||||
}
|
}
|
||||||
remove_wait_queue(sk_sleep(sk), &wait);
|
remove_wait_queue(sk_sleep(sk), &wait);
|
||||||
|
@ -157,8 +160,11 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
|
||||||
if (smc_cdc_rxed_any_close(conn))
|
if (smc_cdc_rxed_any_close(conn))
|
||||||
return send_done ?: -ECONNRESET;
|
return send_done ?: -ECONNRESET;
|
||||||
|
|
||||||
if (!atomic_read(&conn->sndbuf_space)) {
|
if (msg->msg_flags & MSG_OOB)
|
||||||
rc = smc_tx_wait_memory(smc, msg->msg_flags);
|
conn->local_tx_ctrl.prod_flags.urg_data_pending = 1;
|
||||||
|
|
||||||
|
if (!atomic_read(&conn->sndbuf_space) || conn->urg_tx_pend) {
|
||||||
|
rc = smc_tx_wait(smc, msg->msg_flags);
|
||||||
if (rc) {
|
if (rc) {
|
||||||
if (send_done)
|
if (send_done)
|
||||||
return send_done;
|
return send_done;
|
||||||
|
@ -168,7 +174,7 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* initialize variables for 1st iteration of subsequent loop */
|
/* initialize variables for 1st iteration of subsequent loop */
|
||||||
/* could be just 1 byte, even after smc_tx_wait_memory above */
|
/* could be just 1 byte, even after smc_tx_wait above */
|
||||||
writespace = atomic_read(&conn->sndbuf_space);
|
writespace = atomic_read(&conn->sndbuf_space);
|
||||||
/* not more than what user space asked for */
|
/* not more than what user space asked for */
|
||||||
copylen = min_t(size_t, send_remaining, writespace);
|
copylen = min_t(size_t, send_remaining, writespace);
|
||||||
|
@ -218,6 +224,8 @@ int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len)
|
||||||
/* since we just produced more new data into sndbuf,
|
/* since we just produced more new data into sndbuf,
|
||||||
* trigger sndbuf consumer: RDMA write into peer RMBE and CDC
|
* trigger sndbuf consumer: RDMA write into peer RMBE and CDC
|
||||||
*/
|
*/
|
||||||
|
if ((msg->msg_flags & MSG_OOB) && !send_remaining)
|
||||||
|
conn->urg_tx_pend = true;
|
||||||
if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) &&
|
if ((msg->msg_flags & MSG_MORE || smc_tx_is_corked(smc)) &&
|
||||||
(atomic_read(&conn->sndbuf_space) >
|
(atomic_read(&conn->sndbuf_space) >
|
||||||
(conn->sndbuf_desc->len >> 1)))
|
(conn->sndbuf_desc->len >> 1)))
|
||||||
|
@ -299,6 +307,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
|
||||||
union smc_host_cursor sent, prep, prod, cons;
|
union smc_host_cursor sent, prep, prod, cons;
|
||||||
struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
|
struct ib_sge sges[SMC_IB_MAX_SEND_SGE];
|
||||||
struct smc_link_group *lgr = conn->lgr;
|
struct smc_link_group *lgr = conn->lgr;
|
||||||
|
struct smc_cdc_producer_flags *pflags;
|
||||||
int to_send, rmbespace;
|
int to_send, rmbespace;
|
||||||
struct smc_link *link;
|
struct smc_link *link;
|
||||||
dma_addr_t dma_addr;
|
dma_addr_t dma_addr;
|
||||||
|
@ -326,7 +335,8 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
|
||||||
conn);
|
conn);
|
||||||
|
|
||||||
/* if usable snd_wnd closes ask peer to advertise once it opens again */
|
/* if usable snd_wnd closes ask peer to advertise once it opens again */
|
||||||
conn->local_tx_ctrl.prod_flags.write_blocked = (to_send >= rmbespace);
|
pflags = &conn->local_tx_ctrl.prod_flags;
|
||||||
|
pflags->write_blocked = (to_send >= rmbespace);
|
||||||
/* cf. usable snd_wnd */
|
/* cf. usable snd_wnd */
|
||||||
len = min(to_send, rmbespace);
|
len = min(to_send, rmbespace);
|
||||||
|
|
||||||
|
@ -391,6 +401,8 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
|
||||||
src_len_sum = src_len;
|
src_len_sum = src_len;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (conn->urg_tx_pend && len == to_send)
|
||||||
|
pflags->urg_data_present = 1;
|
||||||
smc_tx_advance_cursors(conn, &prod, &sent, len);
|
smc_tx_advance_cursors(conn, &prod, &sent, len);
|
||||||
/* update connection's cursors with advanced local cursors */
|
/* update connection's cursors with advanced local cursors */
|
||||||
smc_curs_write(&conn->local_tx_ctrl.prod,
|
smc_curs_write(&conn->local_tx_ctrl.prod,
|
||||||
|
@ -410,6 +422,7 @@ static int smc_tx_rdma_writes(struct smc_connection *conn)
|
||||||
*/
|
*/
|
||||||
int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
|
int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
|
||||||
{
|
{
|
||||||
|
struct smc_cdc_producer_flags *pflags;
|
||||||
struct smc_cdc_tx_pend *pend;
|
struct smc_cdc_tx_pend *pend;
|
||||||
struct smc_wr_buf *wr_buf;
|
struct smc_wr_buf *wr_buf;
|
||||||
int rc;
|
int rc;
|
||||||
|
@ -433,14 +446,21 @@ int smc_tx_sndbuf_nonempty(struct smc_connection *conn)
|
||||||
goto out_unlock;
|
goto out_unlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!conn->local_tx_ctrl.prod_flags.urg_data_present) {
|
||||||
rc = smc_tx_rdma_writes(conn);
|
rc = smc_tx_rdma_writes(conn);
|
||||||
if (rc) {
|
if (rc) {
|
||||||
smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
|
smc_wr_tx_put_slot(&conn->lgr->lnk[SMC_SINGLE_LINK],
|
||||||
(struct smc_wr_tx_pend_priv *)pend);
|
(struct smc_wr_tx_pend_priv *)pend);
|
||||||
goto out_unlock;
|
goto out_unlock;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rc = smc_cdc_msg_send(conn, wr_buf, pend);
|
rc = smc_cdc_msg_send(conn, wr_buf, pend);
|
||||||
|
pflags = &conn->local_tx_ctrl.prod_flags;
|
||||||
|
if (!rc && pflags->urg_data_present) {
|
||||||
|
pflags->urg_data_pending = 0;
|
||||||
|
pflags->urg_data_present = 0;
|
||||||
|
}
|
||||||
|
|
||||||
out_unlock:
|
out_unlock:
|
||||||
spin_unlock_bh(&conn->send_lock);
|
spin_unlock_bh(&conn->send_lock);
|
||||||
|
@ -473,7 +493,7 @@ out:
|
||||||
release_sock(&smc->sk);
|
release_sock(&smc->sk);
|
||||||
}
|
}
|
||||||
|
|
||||||
void smc_tx_consumer_update(struct smc_connection *conn)
|
void smc_tx_consumer_update(struct smc_connection *conn, bool force)
|
||||||
{
|
{
|
||||||
union smc_host_cursor cfed, cons;
|
union smc_host_cursor cfed, cons;
|
||||||
int to_confirm;
|
int to_confirm;
|
||||||
|
@ -487,6 +507,7 @@ void smc_tx_consumer_update(struct smc_connection *conn)
|
||||||
to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons);
|
to_confirm = smc_curs_diff(conn->rmb_desc->len, &cfed, &cons);
|
||||||
|
|
||||||
if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
|
if (conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
|
||||||
|
force ||
|
||||||
((to_confirm > conn->rmbe_update_limit) &&
|
((to_confirm > conn->rmbe_update_limit) &&
|
||||||
((to_confirm > (conn->rmb_desc->len / 2)) ||
|
((to_confirm > (conn->rmb_desc->len / 2)) ||
|
||||||
conn->local_rx_ctrl.prod_flags.write_blocked))) {
|
conn->local_rx_ctrl.prod_flags.write_blocked))) {
|
||||||
|
|
|
@ -32,6 +32,6 @@ void smc_tx_init(struct smc_sock *smc);
|
||||||
int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len);
|
int smc_tx_sendmsg(struct smc_sock *smc, struct msghdr *msg, size_t len);
|
||||||
int smc_tx_sndbuf_nonempty(struct smc_connection *conn);
|
int smc_tx_sndbuf_nonempty(struct smc_connection *conn);
|
||||||
void smc_tx_sndbuf_nonfull(struct smc_sock *smc);
|
void smc_tx_sndbuf_nonfull(struct smc_sock *smc);
|
||||||
void smc_tx_consumer_update(struct smc_connection *conn);
|
void smc_tx_consumer_update(struct smc_connection *conn, bool force);
|
||||||
|
|
||||||
#endif /* SMC_TX_H */
|
#endif /* SMC_TX_H */
|
||||||
|
|
Loading…
Add table
Reference in a new issue