orangefs: hopefully saner op refcounting and locking

* create with refcount 1
* make op_release() decrement and free if zero (i.e. old put_op()
  has become that).
* mark when submitter has given up waiting; from that point nobody
  else can move between the lists, change state, etc.
* have daemon read/write_iter grab a reference when picking op
  and *always* give it up in the end
* don't put into hash until we know it's been successfully passed to
  daemon

* move op->lock _lower_ than htab_in_progress_lock (and make sure
  to take it in purge_inprogress_ops())

Signed-off-by: Al Viro <viro@zeniv.linux.org.uk>
Signed-off-by: Mike Marshall <hubcap@omnibond.com>
This commit is contained in:
Al Viro 2016-01-22 19:47:47 -05:00 committed by Mike Marshall
parent fee25ce125
commit ed42fe0593
8 changed files with 107 additions and 143 deletions

View file

@ -43,9 +43,7 @@ static void orangefs_devreq_add_op(struct orangefs_kernel_op_s *op)
{
int index = hash_func(op->tag, hash_table_size);
spin_lock(&htable_ops_in_progress_lock);
list_add_tail(&op->list, &htable_ops_in_progress[index]);
spin_unlock(&htable_ops_in_progress_lock);
}
static struct orangefs_kernel_op_s *orangefs_devreq_remove_op(__u64 tag)
@ -60,8 +58,9 @@ static struct orangefs_kernel_op_s *orangefs_devreq_remove_op(__u64 tag)
next,
&htable_ops_in_progress[index],
list) {
if (op->tag == tag) {
list_del(&op->list);
if (op->tag == tag && !op_state_purged(op)) {
list_del_init(&op->list);
get_op(op); /* increase ref count. */
spin_unlock(&htable_ops_in_progress_lock);
return op;
}
@ -127,12 +126,17 @@ static ssize_t orangefs_devreq_read(struct file *file,
return -EINVAL;
}
restart:
/* Get next op (if any) from top of list. */
spin_lock(&orangefs_request_list_lock);
list_for_each_entry_safe(op, temp, &orangefs_request_list, list) {
__s32 fsid;
/* This lock is held past the end of the loop when we break. */
spin_lock(&op->lock);
if (unlikely(op_state_purged(op))) {
spin_unlock(&op->lock);
continue;
}
fsid = fsid_of_op(op);
if (fsid != ORANGEFS_FS_ID_NULL) {
@ -197,16 +201,10 @@ static ssize_t orangefs_devreq_read(struct file *file,
spin_unlock(&orangefs_request_list_lock);
return -EAGAIN;
}
/*
* Set the operation to be in progress and move it between lists since
* it has been sent to the client.
*/
set_op_state_inprogress(cur_op);
list_del(&cur_op->list);
list_del_init(&cur_op->list);
get_op(op);
spin_unlock(&orangefs_request_list_lock);
orangefs_devreq_add_op(cur_op);
spin_unlock(&cur_op->lock);
/* Push the upcall out. */
@ -224,6 +222,25 @@ static ssize_t orangefs_devreq_read(struct file *file,
if (ret != 0)
goto error;
spin_lock(&htable_ops_in_progress_lock);
spin_lock(&cur_op->lock);
if (unlikely(op_state_given_up(cur_op))) {
spin_unlock(&cur_op->lock);
spin_unlock(&htable_ops_in_progress_lock);
op_release(cur_op);
goto restart;
}
/*
* Set the operation to be in progress and move it between lists since
* it has been sent to the client.
*/
set_op_state_inprogress(cur_op);
orangefs_devreq_add_op(cur_op);
spin_unlock(&cur_op->lock);
spin_unlock(&htable_ops_in_progress_lock);
op_release(cur_op);
/* The client only asks to read one size buffer. */
return MAX_DEV_REQ_UPSIZE;
error:
@ -235,11 +252,13 @@ error:
gossip_err("orangefs: Failed to copy data to user space\n");
spin_lock(&orangefs_request_list_lock);
spin_lock(&cur_op->lock);
set_op_state_waiting(cur_op);
orangefs_devreq_remove_op(cur_op->tag);
list_add(&cur_op->list, &orangefs_request_list);
if (likely(!op_state_given_up(cur_op))) {
set_op_state_waiting(cur_op);
list_add(&cur_op->list, &orangefs_request_list);
}
spin_unlock(&cur_op->lock);
spin_unlock(&orangefs_request_list_lock);
op_release(cur_op);
return -EFAULT;
}
@ -278,15 +297,13 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
__func__,
total,
(unsigned int) MAX_DEV_REQ_DOWNSIZE);
ret = -EFAULT;
goto out;
return -EFAULT;
}
n = copy_from_iter(&head, head_size, iter);
if (n < head_size) {
gossip_err("%s: failed to copy head.\n", __func__);
ret = -EFAULT;
goto out;
return -EFAULT;
}
if (head.version < ORANGEFS_MINIMUM_USERSPACE_VERSION) {
@ -295,31 +312,26 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
__func__,
head.version,
ORANGEFS_MINIMUM_USERSPACE_VERSION);
ret = -EPROTO;
goto out;
return -EPROTO;
}
if (head.magic != ORANGEFS_DEVREQ_MAGIC) {
gossip_err("Error: Device magic number does not match.\n");
ret = -EPROTO;
goto out;
return -EPROTO;
}
op = orangefs_devreq_remove_op(head.tag);
if (!op) {
gossip_err("WARNING: No one's waiting for tag %llu\n",
llu(head.tag));
goto out;
return ret;
}
get_op(op); /* increase ref count. */
n = copy_from_iter(&op->downcall, downcall_size, iter);
if (n != downcall_size) {
gossip_err("%s: failed to copy downcall.\n", __func__);
put_op(op);
ret = -EFAULT;
goto out;
goto Broken;
}
if (op->downcall.status)
@ -339,9 +351,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
downcall_size,
op->downcall.trailer_size,
total);
put_op(op);
ret = -EFAULT;
goto out;
goto Broken;
}
/* Only READDIR operations should have trailers. */
@ -350,9 +361,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
gossip_err("%s: %x operation with trailer.",
__func__,
op->downcall.type);
put_op(op);
ret = -EFAULT;
goto out;
goto Broken;
}
/* READDIR operations should always have trailers. */
@ -361,9 +371,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
gossip_err("%s: %x operation with no trailer.",
__func__,
op->downcall.type);
put_op(op);
ret = -EFAULT;
goto out;
goto Broken;
}
if (op->downcall.type != ORANGEFS_VFS_OP_READDIR)
@ -374,9 +383,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
if (op->downcall.trailer_buf == NULL) {
gossip_err("%s: failed trailer vmalloc.\n",
__func__);
put_op(op);
ret = -ENOMEM;
goto out;
goto Broken;
}
memset(op->downcall.trailer_buf, 0, op->downcall.trailer_size);
n = copy_from_iter(op->downcall.trailer_buf,
@ -385,9 +393,8 @@ static ssize_t orangefs_devreq_write_iter(struct kiocb *iocb,
if (n != op->downcall.trailer_size) {
gossip_err("%s: failed to copy trailer.\n", __func__);
vfree(op->downcall.trailer_buf);
put_op(op);
ret = -EFAULT;
goto out;
goto Broken;
}
wakeup:
@ -404,7 +411,6 @@ wakeup:
* the buffers are done being used.
*/
if (op->downcall.type == ORANGEFS_VFS_OP_FILE_IO) {
int timed_out = 0;
DEFINE_WAIT(wait_entry);
/*
@ -412,7 +418,8 @@ wakeup:
* that this op is done
*/
spin_lock(&op->lock);
set_op_state_serviced(op);
if (!op_state_given_up(op))
set_op_state_serviced(op);
spin_unlock(&op->lock);
while (1) {
@ -435,7 +442,6 @@ wakeup:
gossip_debug(GOSSIP_DEV_DEBUG,
"%s: timed out.\n",
__func__);
timed_out = 1;
break;
}
continue;
@ -450,15 +456,6 @@ wakeup:
spin_lock(&op->lock);
finish_wait(&op->io_completion_waitq, &wait_entry);
spin_unlock(&op->lock);
/* NOTE: for I/O operations we handle releasing the op
* object except in the case of timeout. the reason we
* can't free the op in timeout cases is that the op
* service logic in the vfs retries operations using
* the same op ptr, thus it can't be freed.
*/
if (!timed_out)
op_release(op);
} else {
/*
* tell the vfs op waiting on a waitqueue that
@ -468,11 +465,22 @@ wakeup:
* notification
*/
spin_lock(&op->lock);
set_op_state_serviced(op);
if (!op_state_given_up(op))
set_op_state_serviced(op);
spin_unlock(&op->lock);
}
out:
op_release(op);
return ret;
Broken:
spin_lock(&op->lock);
if (!op_state_given_up(op)) {
op->downcall.status = ret;
set_op_state_serviced(op);
}
spin_unlock(&op->lock);
goto out;
}
/* Returns whether any FS are still pending remounted */