[dpdk-dev,v2,11/11] lib/librte_vhost: support dynamically registering vhost server
Commit Message
* support calling rte_vhost_driver_register after rte_vhost_driver_session_start
* add mutext to protect fdset from concurrent access
* add busy flag in fdentry. this flag is set before cb and cleared after cb is finished.
mutex lock scenario in vhost:
* event_dispatch(in rte_vhost_driver_session_start) runs in a seperate thread, infinitely
processing vhost messages through cb(callback).
* event_dispatch acquires the lock, get the cb and its context, mark the busy flag,
and releases the mutex.
* vserver_new_vq_conn cb calls fdset_add, which acquires the mutex and add new fd into fdset.
* vserver_message_handler cb frees data context, marks remove flag to request to delete
connfd(connection fd) from fdset.
* after cb returns, event_dispatch
1. clears busy flag.
2. if there is remove request, call fdset_del, which acquires mutex, checks busy flag, and
removes connfd from fdset.
* rte_vhost_driver_unregister(not implemented) runs in another thread, acquires the mutex,
calls fdset_del to remove fd(listenerfd) from fdset. Then it could free data context.
The above steps ensures fd data context isn't freed when cb is using.
VM(s) should have been shutdown before rte_vhost_driver_unregister.
Signed-off-by: Huawei Xie <huawei.xie@intel.com>
---
lib/librte_vhost/vhost_user/fd_man.c | 63 +++++++++++++++++++++++++---
lib/librte_vhost/vhost_user/fd_man.h | 5 ++-
lib/librte_vhost/vhost_user/vhost-net-user.c | 34 +++++++++------
3 files changed, 82 insertions(+), 20 deletions(-)
Comments
On 2015/02/12 14:07, Huawei Xie wrote:
> * support calling rte_vhost_driver_register after rte_vhost_driver_session_start
> * add mutext to protect fdset from concurrent access
> * add busy flag in fdentry. this flag is set before cb and cleared after cb is finished.
>
> mutex lock scenario in vhost:
>
> * event_dispatch(in rte_vhost_driver_session_start) runs in a seperate thread, infinitely
> processing vhost messages through cb(callback).
> * event_dispatch acquires the lock, get the cb and its context, mark the busy flag,
> and releases the mutex.
> * vserver_new_vq_conn cb calls fdset_add, which acquires the mutex and add new fd into fdset.
> * vserver_message_handler cb frees data context, marks remove flag to request to delete
> connfd(connection fd) from fdset.
> * after cb returns, event_dispatch
> 1. clears busy flag.
> 2. if there is remove request, call fdset_del, which acquires mutex, checks busy flag, and
> removes connfd from fdset.
> * rte_vhost_driver_unregister(not implemented) runs in another thread, acquires the mutex,
> calls fdset_del to remove fd(listenerfd) from fdset. Then it could free data context.
>
> The above steps ensures fd data context isn't freed when cb is using.
>
> VM(s) should have been shutdown before rte_vhost_driver_unregister.
>
> Signed-off-by: Huawei Xie <huawei.xie@intel.com>
> ---
> lib/librte_vhost/vhost_user/fd_man.c | 63 +++++++++++++++++++++++++---
> lib/librte_vhost/vhost_user/fd_man.h | 5 ++-
> lib/librte_vhost/vhost_user/vhost-net-user.c | 34 +++++++++------
> 3 files changed, 82 insertions(+), 20 deletions(-)
>
> diff --git a/lib/librte_vhost/vhost_user/fd_man.c b/lib/librte_vhost/vhost_user/fd_man.c
> index 929fbc3..63ac4df 100644
> --- a/lib/librte_vhost/vhost_user/fd_man.c
> +++ b/lib/librte_vhost/vhost_user/fd_man.c
> @@ -40,6 +40,7 @@
> #include <sys/types.h>
> #include <unistd.h>
>
> +#include <rte_common.h>
> #include <rte_log.h>
>
> #include "fd_man.h"
> @@ -145,6 +146,8 @@ fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
> if (pfdset == NULL || fd == -1)
> return -1;
>
> + pthread_mutex_lock(&pfdset->fd_mutex);
> +
> /* Find a free slot in the list. */
> i = fdset_find_free_slot(pfdset);
> if (i == -1)
> @@ -153,6 +156,8 @@ fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
> fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
> pfdset->num++;
>
> + pthread_mutex_unlock(&pfdset->fd_mutex);
> +
> return 0;
> }
>
> @@ -164,17 +169,36 @@ fdset_del(struct fdset *pfdset, int fd)
> {
> int i;
>
> + if (pfdset == NULL || fd == -1)
> + return;
> +
> +again:
> + pthread_mutex_lock(&pfdset->fd_mutex);
> +
> i = fdset_find_fd(pfdset, fd);
> if (i != -1 && fd != -1) {
> + /* busy indicates r/wcb is executing! */
> + if (pfdset->fd[i].busy == 1) {
> + pthread_mutex_unlock(&pfdset->fd_mutex);
> + goto again;
> + }
> +
> pfdset->fd[i].fd = -1;
> pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
> pfdset->num--;
> }
> +
> + pthread_mutex_unlock(&pfdset->fd_mutex);
> }
>
> /**
> * This functions runs in infinite blocking loop until there is no fd in
> * pfdset. It calls corresponding r/w handler if there is event on the fd.
> + *
> + * Before the callback is called, we set the flag to busy status; If other
> + * thread(now rte_vhost_driver_unregister) calls fdset_del concurrently, it
> + * will wait until the flag is reset to zero(which indicates the callback is
> + * finished), then it could free the context after fdset_del.
> */
> void
> fdset_event_dispatch(struct fdset *pfdset)
> @@ -183,6 +207,10 @@ fdset_event_dispatch(struct fdset *pfdset)
> int i, maxfds;
> struct fdentry *pfdentry;
> int num = MAX_FDS;
> + fd_cb rcb, wcb;
> + void *dat;
> + int fd;
> + int remove1, remove2;
>
> if (pfdset == NULL)
> return;
> @@ -190,18 +218,41 @@ fdset_event_dispatch(struct fdset *pfdset)
> while (1) {
> FD_ZERO(&rfds);
> FD_ZERO(&wfds);
> + pthread_mutex_lock(&pfdset->fd_mutex);
> +
> maxfds = fdset_fill(&rfds, &wfds, pfdset);
> - if (maxfds == -1)
> - return;
> + if (maxfds == -1) {
> + pthread_mutex_unlock(&pfdset->fd_mutex);
> + sleep(1);
> + continue;
> + }
> +
> + pthread_mutex_unlock(&pfdset->fd_mutex);
>
> select(maxfds + 1, &rfds, &wfds, NULL, NULL);
>
> for (i = 0; i < num; i++) {
> + remove1 = remove2 = 0;
> + pthread_mutex_lock(&pfdset->fd_mutex);
> pfdentry = &pfdset->fd[i];
> - if (pfdentry->fd >= 0 && FD_ISSET(pfdentry->fd, &rfds) && pfdentry->rcb)
> - pfdentry->rcb(pfdentry->fd, pfdentry->dat);
> - if (pfdentry->fd >= 0 && FD_ISSET(pfdentry->fd, &wfds) && pfdentry->wcb)
> - pfdentry->wcb(pfdentry->fd, pfdentry->dat);
> + fd = pfdentry->fd;
> + rcb = pfdentry->rcb;
> + wcb = pfdentry->wcb;
> + dat = pfdentry->dat;
> + pfdentry->busy = 1;
> + pthread_mutex_unlock(&pfdset->fd_mutex);
> + if (fd >= 0 && FD_ISSET(fd, &rfds) && rcb)
> + rcb(fd, dat, &remove1);
> + if (fd >= 0 && FD_ISSET(fd, &wfds) && wcb)
> + wcb(fd, dat, &remove2);
Hi Xie,
Should we add pthread_mutex_lock() before accessing pfdentry->busy?
> + pfdentry->busy = 0;
Should we add pthread_mutex_unlock()?
Thanks,
Tetsuya
> + /*
> + * fdset_del needs to check busy flag.
> + * We don't allow fdset_del to be called in callback
> + * directly.
> + */
> + if (remove1 || remove2)
> + fdset_del(pfdset, fd);
> }
> }
> }
> diff --git a/lib/librte_vhost/vhost_user/fd_man.h b/lib/librte_vhost/vhost_user/fd_man.h
> index 26b4619..74ecde2 100644
> --- a/lib/librte_vhost/vhost_user/fd_man.h
> +++ b/lib/librte_vhost/vhost_user/fd_man.h
> @@ -34,20 +34,23 @@
> #ifndef _FD_MAN_H_
> #define _FD_MAN_H_
> #include <stdint.h>
> +#include <pthread.h>
>
> #define MAX_FDS 1024
>
> -typedef void (*fd_cb)(int fd, void *dat);
> +typedef void (*fd_cb)(int fd, void *dat, int *remove);
>
> struct fdentry {
> int fd; /* -1 indicates this entry is empty */
> fd_cb rcb; /* callback when this fd is readable. */
> fd_cb wcb; /* callback when this fd is writeable.*/
> void *dat; /* fd context */
> + int busy; /* whether this entry is being used in cb. */
> };
>
> struct fdset {
> struct fdentry fd[MAX_FDS];
> + pthread_mutex_t fd_mutex;
> int num; /* current fd number of this fdset */
> };
>
> diff --git a/lib/librte_vhost/vhost_user/vhost-net-user.c b/lib/librte_vhost/vhost_user/vhost-net-user.c
> index 634a498..3aa9436 100644
> --- a/lib/librte_vhost/vhost_user/vhost-net-user.c
> +++ b/lib/librte_vhost/vhost_user/vhost-net-user.c
> @@ -41,6 +41,7 @@
> #include <sys/socket.h>
> #include <sys/un.h>
> #include <errno.h>
> +#include <pthread.h>
>
> #include <rte_log.h>
> #include <rte_virtio_net.h>
> @@ -51,8 +52,9 @@
> #include "virtio-net-user.h"
>
> #define MAX_VIRTIO_BACKLOG 128
> -static void vserver_new_vq_conn(int fd, void *data);
> -static void vserver_message_handler(int fd, void *dat);
> +
> +static void vserver_new_vq_conn(int fd, void *data, int *remove);
> +static void vserver_message_handler(int fd, void *dat, int *remove);
> struct vhost_net_device_ops const *ops;
>
> struct connfd_ctx {
> @@ -61,10 +63,18 @@ struct connfd_ctx {
> };
>
> #define MAX_VHOST_SERVER 1024
> -static struct {
> +struct _vhost_server {
> struct vhost_server *server[MAX_VHOST_SERVER];
> - struct fdset fdset; /**< The fd list this vhost server manages. */
> -} g_vhost_server;
> + struct fdset fdset;
> +};
> +
> +static struct _vhost_server g_vhost_server = {
> + .fdset = {
> + .fd = { [0 ... MAX_FDS - 1] = {-1, NULL, NULL, NULL, 0} },
> + .fd_mutex = PTHREAD_MUTEX_INITIALIZER,
> + .num = 0
> + },
> +};
>
> static int vserver_idx;
>
> @@ -261,7 +271,7 @@ send_vhost_message(int sockfd, struct VhostUserMsg *msg)
>
> /* call back when there is new virtio connection. */
> static void
> -vserver_new_vq_conn(int fd, void *dat)
> +vserver_new_vq_conn(int fd, void *dat, __rte_unused int *remove)
> {
> struct vhost_server *vserver = (struct vhost_server *)dat;
> int conn_fd;
> @@ -304,7 +314,7 @@ vserver_new_vq_conn(int fd, void *dat)
>
> /* callback when there is message on the connfd */
> static void
> -vserver_message_handler(int connfd, void *dat)
> +vserver_message_handler(int connfd, void *dat, int *remove)
> {
> struct vhost_device_ctx ctx;
> struct connfd_ctx *cfd_ctx = (struct connfd_ctx *)dat;
> @@ -319,7 +329,7 @@ vserver_message_handler(int connfd, void *dat)
> "vhost read message failed\n");
>
> close(connfd);
> - fdset_del(&g_vhost_server.fdset, connfd);
> + *remove = 1;
> free(cfd_ctx);
> user_destroy_device(ctx);
> ops->destroy_device(ctx);
> @@ -330,7 +340,7 @@ vserver_message_handler(int connfd, void *dat)
> "vhost peer closed\n");
>
> close(connfd);
> - fdset_del(&g_vhost_server.fdset, connfd);
> + *remove = 1;
> free(cfd_ctx);
> user_destroy_device(ctx);
> ops->destroy_device(ctx);
> @@ -342,7 +352,7 @@ vserver_message_handler(int connfd, void *dat)
> "vhost read incorrect message\n");
>
> close(connfd);
> - fdset_del(&g_vhost_server.fdset, connfd);
> + *remove = 1;
> free(cfd_ctx);
> user_destroy_device(ctx);
> ops->destroy_device(ctx);
> @@ -426,10 +436,8 @@ rte_vhost_driver_register(const char *path)
> {
> struct vhost_server *vserver;
>
> - if (vserver_idx == 0) {
> - fdset_init(&g_vhost_server.fdset);
> + if (vserver_idx == 0)
> ops = get_virtio_net_callbacks();
> - }
> if (vserver_idx == MAX_VHOST_SERVER)
> return -1;
>
> -----Original Message-----
> From: dev [mailto:dev-bounces@dpdk.org] On Behalf Of Huawei Xie
> Sent: Thursday, February 12, 2015 5:07 AM
> To: dev@dpdk.org
> Subject: [dpdk-dev] [PATCH v2 11/11] lib/librte_vhost: support dynamically registering vhost server
>
> * support calling rte_vhost_driver_register after rte_vhost_driver_session_start
> * add mutext to protect fdset from concurrent access
> * add busy flag in fdentry. this flag is set before cb and cleared after cb is finished.
>
> mutex lock scenario in vhost:
>
> * event_dispatch(in rte_vhost_driver_session_start) runs in a seperate thread, infinitely
> processing vhost messages through cb(callback).
> * event_dispatch acquires the lock, get the cb and its context, mark the busy flag,
> and releases the mutex.
> * vserver_new_vq_conn cb calls fdset_add, which acquires the mutex and add new fd into fdset.
> * vserver_message_handler cb frees data context, marks remove flag to request to delete
> connfd(connection fd) from fdset.
> * after cb returns, event_dispatch
> 1. clears busy flag.
> 2. if there is remove request, call fdset_del, which acquires mutex, checks busy flag, and
> removes connfd from fdset.
> * rte_vhost_driver_unregister(not implemented) runs in another thread, acquires the mutex,
> calls fdset_del to remove fd(listenerfd) from fdset. Then it could free data context.
>
> The above steps ensures fd data context isn't freed when cb is using.
>
> VM(s) should have been shutdown before rte_vhost_driver_unregister.
>
> Signed-off-by: Huawei Xie <huawei.xie@intel.com>
Acked-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
> ---
> lib/librte_vhost/vhost_user/fd_man.c | 63 +++++++++++++++++++++++++---
> lib/librte_vhost/vhost_user/fd_man.h | 5 ++-
> lib/librte_vhost/vhost_user/vhost-net-user.c | 34 +++++++++------
> 3 files changed, 82 insertions(+), 20 deletions(-)
>
@@ -40,6 +40,7 @@
#include <sys/types.h>
#include <unistd.h>
+#include <rte_common.h>
#include <rte_log.h>
#include "fd_man.h"
@@ -145,6 +146,8 @@ fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
if (pfdset == NULL || fd == -1)
return -1;
+ pthread_mutex_lock(&pfdset->fd_mutex);
+
/* Find a free slot in the list. */
i = fdset_find_free_slot(pfdset);
if (i == -1)
@@ -153,6 +156,8 @@ fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
pfdset->num++;
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+
return 0;
}
@@ -164,17 +169,36 @@ fdset_del(struct fdset *pfdset, int fd)
{
int i;
+ if (pfdset == NULL || fd == -1)
+ return;
+
+again:
+ pthread_mutex_lock(&pfdset->fd_mutex);
+
i = fdset_find_fd(pfdset, fd);
if (i != -1 && fd != -1) {
+ /* busy indicates r/wcb is executing! */
+ if (pfdset->fd[i].busy == 1) {
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+ goto again;
+ }
+
pfdset->fd[i].fd = -1;
pfdset->fd[i].rcb = pfdset->fd[i].wcb = NULL;
pfdset->num--;
}
+
+ pthread_mutex_unlock(&pfdset->fd_mutex);
}
/**
* This functions runs in infinite blocking loop until there is no fd in
* pfdset. It calls corresponding r/w handler if there is event on the fd.
+ *
+ * Before the callback is called, we set the flag to busy status; If other
+ * thread(now rte_vhost_driver_unregister) calls fdset_del concurrently, it
+ * will wait until the flag is reset to zero(which indicates the callback is
+ * finished), then it could free the context after fdset_del.
*/
void
fdset_event_dispatch(struct fdset *pfdset)
@@ -183,6 +207,10 @@ fdset_event_dispatch(struct fdset *pfdset)
int i, maxfds;
struct fdentry *pfdentry;
int num = MAX_FDS;
+ fd_cb rcb, wcb;
+ void *dat;
+ int fd;
+ int remove1, remove2;
if (pfdset == NULL)
return;
@@ -190,18 +218,41 @@ fdset_event_dispatch(struct fdset *pfdset)
while (1) {
FD_ZERO(&rfds);
FD_ZERO(&wfds);
+ pthread_mutex_lock(&pfdset->fd_mutex);
+
maxfds = fdset_fill(&rfds, &wfds, pfdset);
- if (maxfds == -1)
- return;
+ if (maxfds == -1) {
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+ sleep(1);
+ continue;
+ }
+
+ pthread_mutex_unlock(&pfdset->fd_mutex);
select(maxfds + 1, &rfds, &wfds, NULL, NULL);
for (i = 0; i < num; i++) {
+ remove1 = remove2 = 0;
+ pthread_mutex_lock(&pfdset->fd_mutex);
pfdentry = &pfdset->fd[i];
- if (pfdentry->fd >= 0 && FD_ISSET(pfdentry->fd, &rfds) && pfdentry->rcb)
- pfdentry->rcb(pfdentry->fd, pfdentry->dat);
- if (pfdentry->fd >= 0 && FD_ISSET(pfdentry->fd, &wfds) && pfdentry->wcb)
- pfdentry->wcb(pfdentry->fd, pfdentry->dat);
+ fd = pfdentry->fd;
+ rcb = pfdentry->rcb;
+ wcb = pfdentry->wcb;
+ dat = pfdentry->dat;
+ pfdentry->busy = 1;
+ pthread_mutex_unlock(&pfdset->fd_mutex);
+ if (fd >= 0 && FD_ISSET(fd, &rfds) && rcb)
+ rcb(fd, dat, &remove1);
+ if (fd >= 0 && FD_ISSET(fd, &wfds) && wcb)
+ wcb(fd, dat, &remove2);
+ pfdentry->busy = 0;
+ /*
+ * fdset_del needs to check busy flag.
+ * We don't allow fdset_del to be called in callback
+ * directly.
+ */
+ if (remove1 || remove2)
+ fdset_del(pfdset, fd);
}
}
}
@@ -34,20 +34,23 @@
#ifndef _FD_MAN_H_
#define _FD_MAN_H_
#include <stdint.h>
+#include <pthread.h>
#define MAX_FDS 1024
-typedef void (*fd_cb)(int fd, void *dat);
+typedef void (*fd_cb)(int fd, void *dat, int *remove);
struct fdentry {
int fd; /* -1 indicates this entry is empty */
fd_cb rcb; /* callback when this fd is readable. */
fd_cb wcb; /* callback when this fd is writeable.*/
void *dat; /* fd context */
+ int busy; /* whether this entry is being used in cb. */
};
struct fdset {
struct fdentry fd[MAX_FDS];
+ pthread_mutex_t fd_mutex;
int num; /* current fd number of this fdset */
};
@@ -41,6 +41,7 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <errno.h>
+#include <pthread.h>
#include <rte_log.h>
#include <rte_virtio_net.h>
@@ -51,8 +52,9 @@
#include "virtio-net-user.h"
#define MAX_VIRTIO_BACKLOG 128
-static void vserver_new_vq_conn(int fd, void *data);
-static void vserver_message_handler(int fd, void *dat);
+
+static void vserver_new_vq_conn(int fd, void *data, int *remove);
+static void vserver_message_handler(int fd, void *dat, int *remove);
struct vhost_net_device_ops const *ops;
struct connfd_ctx {
@@ -61,10 +63,18 @@ struct connfd_ctx {
};
#define MAX_VHOST_SERVER 1024
-static struct {
+struct _vhost_server {
struct vhost_server *server[MAX_VHOST_SERVER];
- struct fdset fdset; /**< The fd list this vhost server manages. */
-} g_vhost_server;
+ struct fdset fdset;
+};
+
+static struct _vhost_server g_vhost_server = {
+ .fdset = {
+ .fd = { [0 ... MAX_FDS - 1] = {-1, NULL, NULL, NULL, 0} },
+ .fd_mutex = PTHREAD_MUTEX_INITIALIZER,
+ .num = 0
+ },
+};
static int vserver_idx;
@@ -261,7 +271,7 @@ send_vhost_message(int sockfd, struct VhostUserMsg *msg)
/* call back when there is new virtio connection. */
static void
-vserver_new_vq_conn(int fd, void *dat)
+vserver_new_vq_conn(int fd, void *dat, __rte_unused int *remove)
{
struct vhost_server *vserver = (struct vhost_server *)dat;
int conn_fd;
@@ -304,7 +314,7 @@ vserver_new_vq_conn(int fd, void *dat)
/* callback when there is message on the connfd */
static void
-vserver_message_handler(int connfd, void *dat)
+vserver_message_handler(int connfd, void *dat, int *remove)
{
struct vhost_device_ctx ctx;
struct connfd_ctx *cfd_ctx = (struct connfd_ctx *)dat;
@@ -319,7 +329,7 @@ vserver_message_handler(int connfd, void *dat)
"vhost read message failed\n");
close(connfd);
- fdset_del(&g_vhost_server.fdset, connfd);
+ *remove = 1;
free(cfd_ctx);
user_destroy_device(ctx);
ops->destroy_device(ctx);
@@ -330,7 +340,7 @@ vserver_message_handler(int connfd, void *dat)
"vhost peer closed\n");
close(connfd);
- fdset_del(&g_vhost_server.fdset, connfd);
+ *remove = 1;
free(cfd_ctx);
user_destroy_device(ctx);
ops->destroy_device(ctx);
@@ -342,7 +352,7 @@ vserver_message_handler(int connfd, void *dat)
"vhost read incorrect message\n");
close(connfd);
- fdset_del(&g_vhost_server.fdset, connfd);
+ *remove = 1;
free(cfd_ctx);
user_destroy_device(ctx);
ops->destroy_device(ctx);
@@ -426,10 +436,8 @@ rte_vhost_driver_register(const char *path)
{
struct vhost_server *vserver;
- if (vserver_idx == 0) {
- fdset_init(&g_vhost_server.fdset);
+ if (vserver_idx == 0)
ops = get_virtio_net_callbacks();
- }
if (vserver_idx == MAX_VHOST_SERVER)
return -1;