From 8829b1a71ba3dc2d0d5d3b6d494b798cc35f3a7e Mon Sep 17 00:00:00 2001 From: Simon Gaiser Date: Tue, 30 Jan 2018 07:38:25 +0100 Subject: [PATCH 4/4] Revert "xen: optimize xenbus driver for multiple concurrent xenstore accesses" This reverts commit fd8aa9095a95c02dcc35540a263267c29b8fda9d. --- drivers/xen/xenbus/xenbus.h | 48 +-- drivers/xen/xenbus/xenbus_comms.c | 307 ++---------------- drivers/xen/xenbus/xenbus_dev_frontend.c | 188 ++++------- drivers/xen/xenbus/xenbus_xs.c | 520 +++++++++++++++++-------------- 4 files changed, 391 insertions(+), 672 deletions(-) diff --git a/drivers/xen/xenbus/xenbus.h b/drivers/xen/xenbus/xenbus.h index 149c5e7efc89..51995276f549 100644 --- a/drivers/xen/xenbus/xenbus.h +++ b/drivers/xen/xenbus/xenbus.h @@ -32,10 +32,6 @@ #ifndef _XENBUS_XENBUS_H #define _XENBUS_XENBUS_H -#include -#include -#include - #define XEN_BUS_ID_SIZE 20 struct xen_bus_type { @@ -56,49 +52,16 @@ enum xenstore_init { XS_LOCAL, }; -struct xs_watch_event { - struct list_head list; - unsigned int len; - struct xenbus_watch *handle; - const char *path; - const char *token; - char body[]; -}; - -enum xb_req_state { - xb_req_state_queued, - xb_req_state_wait_reply, - xb_req_state_got_reply, - xb_req_state_aborted -}; - -struct xb_req_data { - struct list_head list; - wait_queue_head_t wq; - struct xsd_sockmsg msg; - enum xsd_sockmsg_type type; - char *body; - const struct kvec *vec; - int num_vecs; - int err; - enum xb_req_state state; - void (*cb)(struct xb_req_data *); - void *par; -}; - extern enum xenstore_init xen_store_domain_type; extern const struct attribute_group *xenbus_dev_groups[]; -extern struct mutex xs_response_mutex; -extern struct list_head xs_reply_list; -extern struct list_head xb_write_list; -extern wait_queue_head_t xb_waitq; -extern struct mutex xb_write_mutex; int xs_init(void); int xb_init_comms(void); void xb_deinit_comms(void); -int xs_watch_msg(struct xs_watch_event *event); -void xs_request_exit(struct xb_req_data *req); +int xb_write(const void *data, unsigned int len); +int xb_read(void *data, unsigned int len); +int xb_data_to_read(void); +int xb_wait_for_data_to_read(void); int xenbus_match(struct device *_dev, struct device_driver *_drv); int xenbus_dev_probe(struct device *_dev); @@ -129,7 +92,6 @@ int xenbus_read_otherend_details(struct xenbus_device *xendev, void xenbus_ring_ops_init(void); -int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par); -void xenbus_dev_queue_reply(struct xb_req_data *req); +void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg); #endif diff --git a/drivers/xen/xenbus/xenbus_comms.c b/drivers/xen/xenbus/xenbus_comms.c index 856ada5d39c9..c21ec02643e1 100644 --- a/drivers/xen/xenbus/xenbus_comms.c +++ b/drivers/xen/xenbus/xenbus_comms.c @@ -34,7 +34,6 @@ #include #include -#include #include #include #include @@ -43,22 +42,11 @@ #include #include "xenbus.h" -/* A list of replies. Currently only one will ever be outstanding. */ -LIST_HEAD(xs_reply_list); - -/* A list of write requests. */ -LIST_HEAD(xb_write_list); -DECLARE_WAIT_QUEUE_HEAD(xb_waitq); -DEFINE_MUTEX(xb_write_mutex); - -/* Protect xenbus reader thread against save/restore. */ -DEFINE_MUTEX(xs_response_mutex); - static int xenbus_irq; -static struct task_struct *xenbus_task; static DECLARE_WORK(probe_work, xenbus_probe); +static DECLARE_WAIT_QUEUE_HEAD(xb_waitq); static irqreturn_t wake_waiting(int irq, void *unused) { @@ -96,31 +84,30 @@ static const void *get_input_chunk(XENSTORE_RING_IDX cons, return buf + MASK_XENSTORE_IDX(cons); } -static int xb_data_to_write(void) -{ - struct xenstore_domain_interface *intf = xen_store_interface; - - return (intf->req_prod - intf->req_cons) != XENSTORE_RING_SIZE && - !list_empty(&xb_write_list); -} - /** * xb_write - low level write * @data: buffer to send * @len: length of buffer * - * Returns number of bytes written or -err. + * Returns 0 on success, error otherwise. */ -static int xb_write(const void *data, unsigned int len) +int xb_write(const void *data, unsigned len) { struct xenstore_domain_interface *intf = xen_store_interface; XENSTORE_RING_IDX cons, prod; - unsigned int bytes = 0; + int rc; while (len != 0) { void *dst; unsigned int avail; + rc = wait_event_interruptible( + xb_waitq, + (intf->req_prod - intf->req_cons) != + XENSTORE_RING_SIZE); + if (rc < 0) + return rc; + /* Read indexes, then verify. */ cons = intf->req_cons; prod = intf->req_prod; @@ -128,11 +115,6 @@ static int xb_write(const void *data, unsigned int len) intf->req_cons = intf->req_prod = 0; return -EIO; } - if (!xb_data_to_write()) - return bytes; - - /* Must write data /after/ reading the consumer index. */ - virt_mb(); dst = get_output_chunk(cons, prod, intf->req, &avail); if (avail == 0) @@ -140,45 +122,52 @@ static int xb_write(const void *data, unsigned int len) if (avail > len) avail = len; + /* Must write data /after/ reading the consumer index. */ + virt_mb(); + memcpy(dst, data, avail); data += avail; len -= avail; - bytes += avail; /* Other side must not see new producer until data is there. */ virt_wmb(); intf->req_prod += avail; /* Implies mb(): other side will see the updated producer. */ - if (prod <= intf->req_cons) - notify_remote_via_evtchn(xen_store_evtchn); + notify_remote_via_evtchn(xen_store_evtchn); } - return bytes; + return 0; } -static int xb_data_to_read(void) +int xb_data_to_read(void) { struct xenstore_domain_interface *intf = xen_store_interface; return (intf->rsp_cons != intf->rsp_prod); } -static int xb_read(void *data, unsigned int len) +int xb_wait_for_data_to_read(void) +{ + return wait_event_interruptible(xb_waitq, xb_data_to_read()); +} + +int xb_read(void *data, unsigned len) { struct xenstore_domain_interface *intf = xen_store_interface; XENSTORE_RING_IDX cons, prod; - unsigned int bytes = 0; + int rc; while (len != 0) { unsigned int avail; const char *src; + rc = xb_wait_for_data_to_read(); + if (rc < 0) + return rc; + /* Read indexes, then verify. */ cons = intf->rsp_cons; prod = intf->rsp_prod; - if (cons == prod) - return bytes; - if (!check_indexes(cons, prod)) { intf->rsp_cons = intf->rsp_prod = 0; return -EIO; @@ -196,243 +185,17 @@ static int xb_read(void *data, unsigned int len) memcpy(data, src, avail); data += avail; len -= avail; - bytes += avail; /* Other side must not see free space until we've copied out */ virt_mb(); intf->rsp_cons += avail; - /* Implies mb(): other side will see the updated consumer. */ - if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE) - notify_remote_via_evtchn(xen_store_evtchn); - } - - return bytes; -} - -static int process_msg(void) -{ - static struct { - struct xsd_sockmsg msg; - char *body; - union { - void *alloc; - struct xs_watch_event *watch; - }; - bool in_msg; - bool in_hdr; - unsigned int read; - } state; - struct xb_req_data *req; - int err; - unsigned int len; - - if (!state.in_msg) { - state.in_msg = true; - state.in_hdr = true; - state.read = 0; - - /* - * We must disallow save/restore while reading a message. - * A partial read across s/r leaves us out of sync with - * xenstored. - * xs_response_mutex is locked as long as we are processing one - * message. state.in_msg will be true as long as we are holding - * the lock here. - */ - mutex_lock(&xs_response_mutex); - - if (!xb_data_to_read()) { - /* We raced with save/restore: pending data 'gone'. */ - mutex_unlock(&xs_response_mutex); - state.in_msg = false; - return 0; - } - } - - if (state.in_hdr) { - if (state.read != sizeof(state.msg)) { - err = xb_read((void *)&state.msg + state.read, - sizeof(state.msg) - state.read); - if (err < 0) - goto out; - state.read += err; - if (state.read != sizeof(state.msg)) - return 0; - if (state.msg.len > XENSTORE_PAYLOAD_MAX) { - err = -EINVAL; - goto out; - } - } - - len = state.msg.len + 1; - if (state.msg.type == XS_WATCH_EVENT) - len += sizeof(*state.watch); - - state.alloc = kmalloc(len, GFP_NOIO | __GFP_HIGH); - if (!state.alloc) - return -ENOMEM; - - if (state.msg.type == XS_WATCH_EVENT) - state.body = state.watch->body; - else - state.body = state.alloc; - state.in_hdr = false; - state.read = 0; - } - - err = xb_read(state.body + state.read, state.msg.len - state.read); - if (err < 0) - goto out; - - state.read += err; - if (state.read != state.msg.len) - return 0; + pr_debug("Finished read of %i bytes (%i to go)\n", avail, len); - state.body[state.msg.len] = '\0'; - - if (state.msg.type == XS_WATCH_EVENT) { - state.watch->len = state.msg.len; - err = xs_watch_msg(state.watch); - } else { - err = -ENOENT; - mutex_lock(&xb_write_mutex); - list_for_each_entry(req, &xs_reply_list, list) { - if (req->msg.req_id == state.msg.req_id) { - if (req->state == xb_req_state_wait_reply) { - req->msg.type = state.msg.type; - req->msg.len = state.msg.len; - req->body = state.body; - req->state = xb_req_state_got_reply; - list_del(&req->list); - req->cb(req); - } else { - list_del(&req->list); - kfree(req); - } - err = 0; - break; - } - } - mutex_unlock(&xb_write_mutex); - if (err) - goto out; - } - - mutex_unlock(&xs_response_mutex); - - state.in_msg = false; - state.alloc = NULL; - return err; - - out: - mutex_unlock(&xs_response_mutex); - state.in_msg = false; - kfree(state.alloc); - state.alloc = NULL; - return err; -} - -static int process_writes(void) -{ - static struct { - struct xb_req_data *req; - int idx; - unsigned int written; - } state; - void *base; - unsigned int len; - int err = 0; - - if (!xb_data_to_write()) - return 0; - - mutex_lock(&xb_write_mutex); - - if (!state.req) { - state.req = list_first_entry(&xb_write_list, - struct xb_req_data, list); - state.idx = -1; - state.written = 0; - } - - if (state.req->state == xb_req_state_aborted) - goto out_err; - - while (state.idx < state.req->num_vecs) { - if (state.idx < 0) { - base = &state.req->msg; - len = sizeof(state.req->msg); - } else { - base = state.req->vec[state.idx].iov_base; - len = state.req->vec[state.idx].iov_len; - } - err = xb_write(base + state.written, len - state.written); - if (err < 0) - goto out_err; - state.written += err; - if (state.written != len) - goto out; - - state.idx++; - state.written = 0; - } - - list_del(&state.req->list); - state.req->state = xb_req_state_wait_reply; - list_add_tail(&state.req->list, &xs_reply_list); - state.req = NULL; - - out: - mutex_unlock(&xb_write_mutex); - - return 0; - - out_err: - state.req->msg.type = XS_ERROR; - state.req->err = err; - list_del(&state.req->list); - if (state.req->state == xb_req_state_aborted) - kfree(state.req); - else { - state.req->state = xb_req_state_got_reply; - wake_up(&state.req->wq); - } - - mutex_unlock(&xb_write_mutex); - - state.req = NULL; - - return err; -} - -static int xb_thread_work(void) -{ - return xb_data_to_read() || xb_data_to_write(); -} - -static int xenbus_thread(void *unused) -{ - int err; - - while (!kthread_should_stop()) { - if (wait_event_interruptible(xb_waitq, xb_thread_work())) - continue; - - err = process_msg(); - if (err == -ENOMEM) - schedule(); - else if (err) - pr_warn_ratelimited("error %d while reading message\n", - err); - - err = process_writes(); - if (err) - pr_warn_ratelimited("error %d while writing message\n", - err); + /* Implies mb(): other side will see the updated consumer. */ + notify_remote_via_evtchn(xen_store_evtchn); } - xenbus_task = NULL; return 0; } @@ -460,7 +223,6 @@ int xb_init_comms(void) rebind_evtchn_irq(xen_store_evtchn, xenbus_irq); } else { int err; - err = bind_evtchn_to_irqhandler(xen_store_evtchn, wake_waiting, 0, "xenbus", &xb_waitq); if (err < 0) { @@ -469,13 +231,6 @@ int xb_init_comms(void) } xenbus_irq = err; - - if (!xenbus_task) { - xenbus_task = kthread_run(xenbus_thread, NULL, - "xenbus"); - if (IS_ERR(xenbus_task)) - return PTR_ERR(xenbus_task); - } } return 0; diff --git a/drivers/xen/xenbus/xenbus_dev_frontend.c b/drivers/xen/xenbus/xenbus_dev_frontend.c index 1f4733b80c87..19f823b378a0 100644 --- a/drivers/xen/xenbus/xenbus_dev_frontend.c +++ b/drivers/xen/xenbus/xenbus_dev_frontend.c @@ -112,7 +112,6 @@ struct xenbus_file_priv { struct list_head read_buffers; wait_queue_head_t read_waitq; - struct kref kref; }; /* Read out any raw xenbus messages queued up. */ @@ -297,107 +296,6 @@ static void watch_fired(struct xenbus_watch *watch, mutex_unlock(&adap->dev_data->reply_mutex); } -static void xenbus_file_free(struct kref *kref) -{ - struct xenbus_file_priv *u; - struct xenbus_transaction_holder *trans, *tmp; - struct watch_adapter *watch, *tmp_watch; - struct read_buffer *rb, *tmp_rb; - - u = container_of(kref, struct xenbus_file_priv, kref); - - /* - * No need for locking here because there are no other users, - * by definition. - */ - - list_for_each_entry_safe(trans, tmp, &u->transactions, list) { - xenbus_transaction_end(trans->handle, 1); - list_del(&trans->list); - kfree(trans); - } - - list_for_each_entry_safe(watch, tmp_watch, &u->watches, list) { - unregister_xenbus_watch(&watch->watch); - list_del(&watch->list); - free_watch_adapter(watch); - } - - list_for_each_entry_safe(rb, tmp_rb, &u->read_buffers, list) { - list_del(&rb->list); - kfree(rb); - } - kfree(u); -} - -static struct xenbus_transaction_holder *xenbus_get_transaction( - struct xenbus_file_priv *u, uint32_t tx_id) -{ - struct xenbus_transaction_holder *trans; - - list_for_each_entry(trans, &u->transactions, list) - if (trans->handle.id == tx_id) - return trans; - - return NULL; -} - -void xenbus_dev_queue_reply(struct xb_req_data *req) -{ - struct xenbus_file_priv *u = req->par; - struct xenbus_transaction_holder *trans = NULL; - int rc; - LIST_HEAD(staging_q); - - xs_request_exit(req); - - mutex_lock(&u->msgbuffer_mutex); - - if (req->type == XS_TRANSACTION_START) { - trans = xenbus_get_transaction(u, 0); - if (WARN_ON(!trans)) - goto out; - if (req->msg.type == XS_ERROR) { - list_del(&trans->list); - kfree(trans); - } else { - rc = kstrtou32(req->body, 10, &trans->handle.id); - if (WARN_ON(rc)) - goto out; - } - } else if (req->msg.type == XS_TRANSACTION_END) { - trans = xenbus_get_transaction(u, req->msg.tx_id); - if (WARN_ON(!trans)) - goto out; - list_del(&trans->list); - kfree(trans); - } - - mutex_unlock(&u->msgbuffer_mutex); - - mutex_lock(&u->reply_mutex); - rc = queue_reply(&staging_q, &req->msg, sizeof(req->msg)); - if (!rc) - rc = queue_reply(&staging_q, req->body, req->msg.len); - if (!rc) { - list_splice_tail(&staging_q, &u->read_buffers); - wake_up(&u->read_waitq); - } else { - queue_cleanup(&staging_q); - } - mutex_unlock(&u->reply_mutex); - - kfree(req->body); - kfree(req); - - kref_put(&u->kref, xenbus_file_free); - - return; - - out: - mutex_unlock(&u->msgbuffer_mutex); -} - static int xenbus_command_reply(struct xenbus_file_priv *u, unsigned int msg_type, const char *reply) { @@ -418,9 +316,6 @@ static int xenbus_command_reply(struct xenbus_file_priv *u, wake_up(&u->read_waitq); mutex_unlock(&u->reply_mutex); - if (!rc) - kref_put(&u->kref, xenbus_file_free); - return rc; } @@ -428,22 +323,57 @@ static int xenbus_write_transaction(unsigned msg_type, struct xenbus_file_priv *u) { int rc; + void *reply; struct xenbus_transaction_holder *trans = NULL; + LIST_HEAD(staging_q); if (msg_type == XS_TRANSACTION_START) { - trans = kzalloc(sizeof(*trans), GFP_KERNEL); + trans = kmalloc(sizeof(*trans), GFP_KERNEL); if (!trans) { rc = -ENOMEM; goto out; } - list_add(&trans->list, &u->transactions); - } else if (u->u.msg.tx_id != 0 && - !xenbus_get_transaction(u, u->u.msg.tx_id)) - return xenbus_command_reply(u, XS_ERROR, "ENOENT"); + } else if (u->u.msg.tx_id != 0) { + list_for_each_entry(trans, &u->transactions, list) + if (trans->handle.id == u->u.msg.tx_id) + break; + if (&trans->list == &u->transactions) + return xenbus_command_reply(u, XS_ERROR, "ENOENT"); + } + + reply = xenbus_dev_request_and_reply(&u->u.msg); + if (IS_ERR(reply)) { + if (msg_type == XS_TRANSACTION_START) + kfree(trans); + rc = PTR_ERR(reply); + goto out; + } - rc = xenbus_dev_request_and_reply(&u->u.msg, u); - if (rc) + if (msg_type == XS_TRANSACTION_START) { + if (u->u.msg.type == XS_ERROR) + kfree(trans); + else { + trans->handle.id = simple_strtoul(reply, NULL, 0); + list_add(&trans->list, &u->transactions); + } + } else if (u->u.msg.type == XS_TRANSACTION_END) { + list_del(&trans->list); kfree(trans); + } + + mutex_lock(&u->reply_mutex); + rc = queue_reply(&staging_q, &u->u.msg, sizeof(u->u.msg)); + if (!rc) + rc = queue_reply(&staging_q, reply, u->u.msg.len); + if (!rc) { + list_splice_tail(&staging_q, &u->read_buffers); + wake_up(&u->read_waitq); + } else { + queue_cleanup(&staging_q); + } + mutex_unlock(&u->reply_mutex); + + kfree(reply); out: return rc; @@ -575,8 +505,6 @@ static ssize_t xenbus_file_write(struct file *filp, * OK, now we have a complete message. Do something with it. */ - kref_get(&u->kref); - msg_type = u->u.msg.type; switch (msg_type) { @@ -591,10 +519,8 @@ static ssize_t xenbus_file_write(struct file *filp, ret = xenbus_write_transaction(msg_type, u); break; } - if (ret != 0) { + if (ret != 0) rc = ret; - kref_put(&u->kref, xenbus_file_free); - } /* Buffered message consumed */ u->len = 0; @@ -619,8 +545,6 @@ static int xenbus_file_open(struct inode *inode, struct file *filp) if (u == NULL) return -ENOMEM; - kref_init(&u->kref); - INIT_LIST_HEAD(&u->transactions); INIT_LIST_HEAD(&u->watches); INIT_LIST_HEAD(&u->read_buffers); @@ -637,8 +561,32 @@ static int xenbus_file_open(struct inode *inode, struct file *filp) static int xenbus_file_release(struct inode *inode, struct file *filp) { struct xenbus_file_priv *u = filp->private_data; + struct xenbus_transaction_holder *trans, *tmp; + struct watch_adapter *watch, *tmp_watch; + struct read_buffer *rb, *tmp_rb; + + /* + * No need for locking here because there are no other users, + * by definition. + */ - kref_put(&u->kref, xenbus_file_free); + list_for_each_entry_safe(trans, tmp, &u->transactions, list) { + xenbus_transaction_end(trans->handle, 1); + list_del(&trans->list); + kfree(trans); + } + + list_for_each_entry_safe(watch, tmp_watch, &u->watches, list) { + unregister_xenbus_watch(&watch->watch); + list_del(&watch->list); + free_watch_adapter(watch); + } + + list_for_each_entry_safe(rb, tmp_rb, &u->read_buffers, list) { + list_del(&rb->list); + kfree(rb); + } + kfree(u); return 0; } diff --git a/drivers/xen/xenbus/xenbus_xs.c b/drivers/xen/xenbus/xenbus_xs.c index e46080214955..ebc768f44abe 100644 --- a/drivers/xen/xenbus/xenbus_xs.c +++ b/drivers/xen/xenbus/xenbus_xs.c @@ -43,7 +43,6 @@ #include #include #include -#include #include #include #include @@ -51,28 +50,61 @@ #include #include "xenbus.h" -/* - * Framework to protect suspend/resume handling against normal Xenstore - * message handling: - * During suspend/resume there must be no open transaction and no pending - * Xenstore request. - * New watch events happening in this time can be ignored by firing all watches - * after resume. - */ +struct xs_stored_msg { + struct list_head list; -/* Lock protecting enter/exit critical region. */ -static DEFINE_SPINLOCK(xs_state_lock); -/* Number of users in critical region (protected by xs_state_lock). */ -static unsigned int xs_state_users; -/* Suspend handler waiting or already active (protected by xs_state_lock)? */ -static int xs_suspend_active; -/* Unique Xenstore request id (protected by xs_state_lock). */ -static uint32_t xs_request_id; + struct xsd_sockmsg hdr; -/* Wait queue for all callers waiting for critical region to become usable. */ -static DECLARE_WAIT_QUEUE_HEAD(xs_state_enter_wq); -/* Wait queue for suspend handling waiting for critical region being empty. */ -static DECLARE_WAIT_QUEUE_HEAD(xs_state_exit_wq); + union { + /* Queued replies. */ + struct { + char *body; + } reply; + + /* Queued watch events. */ + struct { + struct xenbus_watch *handle; + const char *path; + const char *token; + } watch; + } u; +}; + +struct xs_handle { + /* A list of replies. Currently only one will ever be outstanding. */ + struct list_head reply_list; + spinlock_t reply_lock; + wait_queue_head_t reply_waitq; + + /* + * Mutex ordering: transaction_mutex -> watch_mutex -> request_mutex. + * response_mutex is never taken simultaneously with the other three. + * + * transaction_mutex must be held before incrementing + * transaction_count. The mutex is held when a suspend is in + * progress to prevent new transactions starting. + * + * When decrementing transaction_count to zero the wait queue + * should be woken up, the suspend code waits for count to + * reach zero. + */ + + /* One request at a time. */ + struct mutex request_mutex; + + /* Protect xenbus reader thread against save/restore. */ + struct mutex response_mutex; + + /* Protect transactions against save/restore. */ + struct mutex transaction_mutex; + atomic_t transaction_count; + wait_queue_head_t transaction_wq; + + /* Protect watch (de)register against save/restore. */ + struct rw_semaphore watch_mutex; +}; + +static struct xs_handle xs_state; /* List of registered watches, and a lock to protect it. */ static LIST_HEAD(watches); @@ -82,9 +114,6 @@ static DEFINE_SPINLOCK(watches_lock); static LIST_HEAD(watch_events); static DEFINE_SPINLOCK(watch_events_lock); -/* Protect watch (de)register against save/restore. */ -static DECLARE_RWSEM(xs_watch_rwsem); - /* * Details of the xenwatch callback kernel thread. The thread waits on the * watch_events_waitq for work to do (queued on watch_events list). When it @@ -95,59 +124,6 @@ static pid_t xenwatch_pid; static DEFINE_MUTEX(xenwatch_mutex); static DECLARE_WAIT_QUEUE_HEAD(watch_events_waitq); -static void xs_suspend_enter(void) -{ - spin_lock(&xs_state_lock); - xs_suspend_active++; - spin_unlock(&xs_state_lock); - wait_event(xs_state_exit_wq, xs_state_users == 0); -} - -static void xs_suspend_exit(void) -{ - spin_lock(&xs_state_lock); - xs_suspend_active--; - spin_unlock(&xs_state_lock); - wake_up_all(&xs_state_enter_wq); -} - -static uint32_t xs_request_enter(struct xb_req_data *req) -{ - uint32_t rq_id; - - req->type = req->msg.type; - - spin_lock(&xs_state_lock); - - while (!xs_state_users && xs_suspend_active) { - spin_unlock(&xs_state_lock); - wait_event(xs_state_enter_wq, xs_suspend_active == 0); - spin_lock(&xs_state_lock); - } - - if (req->type == XS_TRANSACTION_START) - xs_state_users++; - xs_state_users++; - rq_id = xs_request_id++; - - spin_unlock(&xs_state_lock); - - return rq_id; -} - -void xs_request_exit(struct xb_req_data *req) -{ - spin_lock(&xs_state_lock); - xs_state_users--; - if ((req->type == XS_TRANSACTION_START && req->msg.type == XS_ERROR) || - req->type == XS_TRANSACTION_END) - xs_state_users--; - spin_unlock(&xs_state_lock); - - if (xs_suspend_active && !xs_state_users) - wake_up(&xs_state_exit_wq); -} - static int get_error(const char *errorstring) { unsigned int i; @@ -185,24 +161,21 @@ static bool xenbus_ok(void) } return false; } - -static bool test_reply(struct xb_req_data *req) +static void *read_reply(enum xsd_sockmsg_type *type, unsigned int *len) { - if (req->state == xb_req_state_got_reply || !xenbus_ok()) - return true; + struct xs_stored_msg *msg; + char *body; - /* Make sure to reread req->state each time. */ - barrier(); + spin_lock(&xs_state.reply_lock); - return false; -} - -static void *read_reply(struct xb_req_data *req) -{ - while (req->state != xb_req_state_got_reply) { - wait_event(req->wq, test_reply(req)); - - if (!xenbus_ok()) + while (list_empty(&xs_state.reply_list)) { + spin_unlock(&xs_state.reply_lock); + if (xenbus_ok()) + /* XXX FIXME: Avoid synchronous wait for response here. */ + wait_event_timeout(xs_state.reply_waitq, + !list_empty(&xs_state.reply_list), + msecs_to_jiffies(500)); + else { /* * If we are in the process of being shut-down there is * no point of trying to contact XenBus - it is either @@ -210,82 +183,76 @@ static void *read_reply(struct xb_req_data *req) * has been killed or is unreachable. */ return ERR_PTR(-EIO); - if (req->err) - return ERR_PTR(req->err); - + } + spin_lock(&xs_state.reply_lock); } - return req->body; -} + msg = list_entry(xs_state.reply_list.next, + struct xs_stored_msg, list); + list_del(&msg->list); -static void xs_send(struct xb_req_data *req, struct xsd_sockmsg *msg) -{ - bool notify; + spin_unlock(&xs_state.reply_lock); - req->msg = *msg; - req->err = 0; - req->state = xb_req_state_queued; - init_waitqueue_head(&req->wq); - - req->msg.req_id = xs_request_enter(req); + *type = msg->hdr.type; + if (len) + *len = msg->hdr.len; + body = msg->u.reply.body; - mutex_lock(&xb_write_mutex); - list_add_tail(&req->list, &xb_write_list); - notify = list_is_singular(&xb_write_list); - mutex_unlock(&xb_write_mutex); + kfree(msg); - if (notify) - wake_up(&xb_waitq); + return body; } -static void *xs_wait_for_reply(struct xb_req_data *req, struct xsd_sockmsg *msg) +static void transaction_start(void) { - void *ret; - - ret = read_reply(req); - - xs_request_exit(req); - - msg->type = req->msg.type; - msg->len = req->msg.len; + mutex_lock(&xs_state.transaction_mutex); + atomic_inc(&xs_state.transaction_count); + mutex_unlock(&xs_state.transaction_mutex); +} - mutex_lock(&xb_write_mutex); - if (req->state == xb_req_state_queued || - req->state == xb_req_state_wait_reply) - req->state = xb_req_state_aborted; - else - kfree(req); - mutex_unlock(&xb_write_mutex); +static void transaction_end(void) +{ + if (atomic_dec_and_test(&xs_state.transaction_count)) + wake_up(&xs_state.transaction_wq); +} - return ret; +static void transaction_suspend(void) +{ + mutex_lock(&xs_state.transaction_mutex); + wait_event(xs_state.transaction_wq, + atomic_read(&xs_state.transaction_count) == 0); } -static void xs_wake_up(struct xb_req_data *req) +static void transaction_resume(void) { - wake_up(&req->wq); + mutex_unlock(&xs_state.transaction_mutex); } -int xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void *par) +void *xenbus_dev_request_and_reply(struct xsd_sockmsg *msg) { - struct xb_req_data *req; - struct kvec *vec; + void *ret; + enum xsd_sockmsg_type type = msg->type; + int err; - req = kmalloc(sizeof(*req) + sizeof(*vec), GFP_KERNEL); - if (!req) - return -ENOMEM; + if (type == XS_TRANSACTION_START) + transaction_start(); + + mutex_lock(&xs_state.request_mutex); - vec = (struct kvec *)(req + 1); - vec->iov_len = msg->len; - vec->iov_base = msg + 1; + err = xb_write(msg, sizeof(*msg) + msg->len); + if (err) { + msg->type = XS_ERROR; + ret = ERR_PTR(err); + } else + ret = read_reply(&msg->type, &msg->len); - req->vec = vec; - req->num_vecs = 1; - req->cb = xenbus_dev_queue_reply; - req->par = par; + mutex_unlock(&xs_state.request_mutex); - xs_send(req, msg); + if ((msg->type == XS_TRANSACTION_END) || + ((type == XS_TRANSACTION_START) && (msg->type == XS_ERROR))) + transaction_end(); - return 0; + return ret; } EXPORT_SYMBOL(xenbus_dev_request_and_reply); @@ -296,31 +263,37 @@ static void *xs_talkv(struct xenbus_transaction t, unsigned int num_vecs, unsigned int *len) { - struct xb_req_data *req; struct xsd_sockmsg msg; void *ret = NULL; unsigned int i; int err; - req = kmalloc(sizeof(*req), GFP_NOIO | __GFP_HIGH); - if (!req) - return ERR_PTR(-ENOMEM); - - req->vec = iovec; - req->num_vecs = num_vecs; - req->cb = xs_wake_up; - msg.tx_id = t.id; + msg.req_id = 0; msg.type = type; msg.len = 0; for (i = 0; i < num_vecs; i++) msg.len += iovec[i].iov_len; - xs_send(req, &msg); + mutex_lock(&xs_state.request_mutex); - ret = xs_wait_for_reply(req, &msg); - if (len) - *len = msg.len; + err = xb_write(&msg, sizeof(msg)); + if (err) { + mutex_unlock(&xs_state.request_mutex); + return ERR_PTR(err); + } + + for (i = 0; i < num_vecs; i++) { + err = xb_write(iovec[i].iov_base, iovec[i].iov_len); + if (err) { + mutex_unlock(&xs_state.request_mutex); + return ERR_PTR(err); + } + } + + ret = read_reply(&msg.type, len); + + mutex_unlock(&xs_state.request_mutex); if (IS_ERR(ret)) return ret; @@ -527,9 +500,13 @@ int xenbus_transaction_start(struct xenbus_transaction *t) { char *id_str; + transaction_start(); + id_str = xs_single(XBT_NIL, XS_TRANSACTION_START, "", NULL); - if (IS_ERR(id_str)) + if (IS_ERR(id_str)) { + transaction_end(); return PTR_ERR(id_str); + } t->id = simple_strtoul(id_str, NULL, 0); kfree(id_str); @@ -543,13 +520,18 @@ EXPORT_SYMBOL_GPL(xenbus_transaction_start); int xenbus_transaction_end(struct xenbus_transaction t, int abort) { char abortstr[2]; + int err; if (abort) strcpy(abortstr, "F"); else strcpy(abortstr, "T"); - return xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL)); + err = xs_error(xs_single(t, XS_TRANSACTION_END, abortstr, NULL)); + + transaction_end(); + + return err; } EXPORT_SYMBOL_GPL(xenbus_transaction_end); @@ -682,30 +664,6 @@ static struct xenbus_watch *find_watch(const char *token) return NULL; } - -int xs_watch_msg(struct xs_watch_event *event) -{ - if (count_strings(event->body, event->len) != 2) { - kfree(event); - return -EINVAL; - } - event->path = (const char *)event->body; - event->token = (const char *)strchr(event->body, '\0') + 1; - - spin_lock(&watches_lock); - event->handle = find_watch(event->token); - if (event->handle != NULL) { - spin_lock(&watch_events_lock); - list_add_tail(&event->list, &watch_events); - wake_up(&watch_events_waitq); - spin_unlock(&watch_events_lock); - } else - kfree(event); - spin_unlock(&watches_lock); - - return 0; -} - /* * Certain older XenBus toolstack cannot handle reading values that are * not populated. Some Xen 3.4 installation are incapable of doing this @@ -754,7 +712,7 @@ int register_xenbus_watch(struct xenbus_watch *watch) sprintf(token, "%lX", (long)watch); - down_read(&xs_watch_rwsem); + down_read(&xs_state.watch_mutex); spin_lock(&watches_lock); BUG_ON(find_watch(token)); @@ -769,7 +727,7 @@ int register_xenbus_watch(struct xenbus_watch *watch) spin_unlock(&watches_lock); } - up_read(&xs_watch_rwsem); + up_read(&xs_state.watch_mutex); return err; } @@ -777,13 +735,13 @@ EXPORT_SYMBOL_GPL(register_xenbus_watch); void unregister_xenbus_watch(struct xenbus_watch *watch) { - struct xs_watch_event *event, *tmp; + struct xs_stored_msg *msg, *tmp; char token[sizeof(watch) * 2 + 1]; int err; sprintf(token, "%lX", (long)watch); - down_read(&xs_watch_rwsem); + down_read(&xs_state.watch_mutex); spin_lock(&watches_lock); BUG_ON(!find_watch(token)); @@ -794,7 +752,7 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) if (err) pr_warn("Failed to release watch %s: %i\n", watch->node, err); - up_read(&xs_watch_rwsem); + up_read(&xs_state.watch_mutex); /* Make sure there are no callbacks running currently (unless its us) */ @@ -803,11 +761,12 @@ void unregister_xenbus_watch(struct xenbus_watch *watch) /* Cancel pending watch events. */ spin_lock(&watch_events_lock); - list_for_each_entry_safe(event, tmp, &watch_events, list) { - if (event->handle != watch) + list_for_each_entry_safe(msg, tmp, &watch_events, list) { + if (msg->u.watch.handle != watch) continue; - list_del(&event->list); - kfree(event); + list_del(&msg->list); + kfree(msg->u.watch.path); + kfree(msg); } spin_unlock(&watch_events_lock); @@ -818,10 +777,10 @@ EXPORT_SYMBOL_GPL(unregister_xenbus_watch); void xs_suspend(void) { - xs_suspend_enter(); - - down_write(&xs_watch_rwsem); - mutex_lock(&xs_response_mutex); + transaction_suspend(); + down_write(&xs_state.watch_mutex); + mutex_lock(&xs_state.request_mutex); + mutex_lock(&xs_state.response_mutex); } void xs_resume(void) @@ -831,31 +790,31 @@ void xs_resume(void) xb_init_comms(); - mutex_unlock(&xs_response_mutex); - - xs_suspend_exit(); + mutex_unlock(&xs_state.response_mutex); + mutex_unlock(&xs_state.request_mutex); + transaction_resume(); - /* No need for watches_lock: the xs_watch_rwsem is sufficient. */ + /* No need for watches_lock: the watch_mutex is sufficient. */ list_for_each_entry(watch, &watches, list) { sprintf(token, "%lX", (long)watch); xs_watch(watch->node, token); } - up_write(&xs_watch_rwsem); + up_write(&xs_state.watch_mutex); } void xs_suspend_cancel(void) { - mutex_unlock(&xs_response_mutex); - up_write(&xs_watch_rwsem); - - xs_suspend_exit(); + mutex_unlock(&xs_state.response_mutex); + mutex_unlock(&xs_state.request_mutex); + up_write(&xs_state.watch_mutex); + mutex_unlock(&xs_state.transaction_mutex); } static int xenwatch_thread(void *unused) { struct list_head *ent; - struct xs_watch_event *event; + struct xs_stored_msg *msg; for (;;) { wait_event_interruptible(watch_events_waitq, @@ -873,10 +832,12 @@ static int xenwatch_thread(void *unused) spin_unlock(&watch_events_lock); if (ent != &watch_events) { - event = list_entry(ent, struct xs_watch_event, list); - event->handle->callback(event->handle, event->path, - event->token); - kfree(event); + msg = list_entry(ent, struct xs_stored_msg, list); + msg->u.watch.handle->callback(msg->u.watch.handle, + msg->u.watch.path, + msg->u.watch.token); + kfree(msg->u.watch.path); + kfree(msg); } mutex_unlock(&xenwatch_mutex); @@ -885,37 +846,126 @@ static int xenwatch_thread(void *unused) return 0; } -/* - * Wake up all threads waiting for a xenstore reply. In case of shutdown all - * pending replies will be marked as "aborted" in order to let the waiters - * return in spite of xenstore possibly no longer being able to reply. This - * will avoid blocking shutdown by a thread waiting for xenstore but being - * necessary for shutdown processing to proceed. - */ -static int xs_reboot_notify(struct notifier_block *nb, - unsigned long code, void *unused) +static int process_msg(void) { - struct xb_req_data *req; + struct xs_stored_msg *msg; + char *body; + int err; + + /* + * We must disallow save/restore while reading a xenstore message. + * A partial read across s/r leaves us out of sync with xenstored. + */ + for (;;) { + err = xb_wait_for_data_to_read(); + if (err) + return err; + mutex_lock(&xs_state.response_mutex); + if (xb_data_to_read()) + break; + /* We raced with save/restore: pending data 'disappeared'. */ + mutex_unlock(&xs_state.response_mutex); + } - mutex_lock(&xb_write_mutex); - list_for_each_entry(req, &xs_reply_list, list) - wake_up(&req->wq); - list_for_each_entry(req, &xb_write_list, list) - wake_up(&req->wq); - mutex_unlock(&xb_write_mutex); - return NOTIFY_DONE; + + msg = kmalloc(sizeof(*msg), GFP_NOIO | __GFP_HIGH); + if (msg == NULL) { + err = -ENOMEM; + goto out; + } + + err = xb_read(&msg->hdr, sizeof(msg->hdr)); + if (err) { + kfree(msg); + goto out; + } + + if (msg->hdr.len > XENSTORE_PAYLOAD_MAX) { + kfree(msg); + err = -EINVAL; + goto out; + } + + body = kmalloc(msg->hdr.len + 1, GFP_NOIO | __GFP_HIGH); + if (body == NULL) { + kfree(msg); + err = -ENOMEM; + goto out; + } + + err = xb_read(body, msg->hdr.len); + if (err) { + kfree(body); + kfree(msg); + goto out; + } + body[msg->hdr.len] = '\0'; + + if (msg->hdr.type == XS_WATCH_EVENT) { + if (count_strings(body, msg->hdr.len) != 2) { + err = -EINVAL; + kfree(msg); + kfree(body); + goto out; + } + msg->u.watch.path = (const char *)body; + msg->u.watch.token = (const char *)strchr(body, '\0') + 1; + + spin_lock(&watches_lock); + msg->u.watch.handle = find_watch(msg->u.watch.token); + if (msg->u.watch.handle != NULL) { + spin_lock(&watch_events_lock); + list_add_tail(&msg->list, &watch_events); + wake_up(&watch_events_waitq); + spin_unlock(&watch_events_lock); + } else { + kfree(body); + kfree(msg); + } + spin_unlock(&watches_lock); + } else { + msg->u.reply.body = body; + spin_lock(&xs_state.reply_lock); + list_add_tail(&msg->list, &xs_state.reply_list); + spin_unlock(&xs_state.reply_lock); + wake_up(&xs_state.reply_waitq); + } + + out: + mutex_unlock(&xs_state.response_mutex); + return err; } -static struct notifier_block xs_reboot_nb = { - .notifier_call = xs_reboot_notify, -}; +static int xenbus_thread(void *unused) +{ + int err; + + for (;;) { + err = process_msg(); + if (err) + pr_warn("error %d while reading message\n", err); + if (kthread_should_stop()) + break; + } + + return 0; +} int xs_init(void) { int err; struct task_struct *task; - register_reboot_notifier(&xs_reboot_nb); + INIT_LIST_HEAD(&xs_state.reply_list); + spin_lock_init(&xs_state.reply_lock); + init_waitqueue_head(&xs_state.reply_waitq); + + mutex_init(&xs_state.request_mutex); + mutex_init(&xs_state.response_mutex); + mutex_init(&xs_state.transaction_mutex); + init_rwsem(&xs_state.watch_mutex); + atomic_set(&xs_state.transaction_count, 0); + init_waitqueue_head(&xs_state.transaction_wq); /* Initialize the shared memory rings to talk to xenstored */ err = xb_init_comms(); @@ -927,6 +977,10 @@ int xs_init(void) return PTR_ERR(task); xenwatch_pid = task->pid; + task = kthread_run(xenbus_thread, NULL, "xenbus"); + if (IS_ERR(task)) + return PTR_ERR(task); + /* shutdown watches for kexec boot */ xs_reset_watches(); -- 2.15.1