@@ -3,12 +3,16 @@
*/
#include <errno.h>
+#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <rte_common.h>
#include <rte_log.h>
+#include <rte_malloc.h>
+#include <rte_string_fns.h>
+#include <rte_thread.h>
#include "fd_man.h"
@@ -19,6 +23,79 @@ RTE_LOG_REGISTER_SUFFIX(vhost_fdset_logtype, fdset, INFO);
#define FDPOLLERR (POLLERR | POLLHUP | POLLNVAL)
+struct fdentry {
+ int fd; /* -1 indicates this entry is empty */
+ fd_cb rcb; /* callback when this fd is readable. */
+ fd_cb wcb; /* callback when this fd is writeable.*/
+ void *dat; /* fd context */
+ int busy; /* whether this entry is being used in cb. */
+};
+
+struct fdset {
+ char name[RTE_THREAD_NAME_SIZE];
+ struct pollfd rwfds[MAX_FDS];
+ struct fdentry fd[MAX_FDS];
+ rte_thread_t tid;
+ pthread_mutex_t fd_mutex;
+ pthread_mutex_t fd_polling_mutex;
+ int num; /* current fd number of this fdset */
+
+ union pipefds {
+ struct {
+ int pipefd[2];
+ };
+ struct {
+ int readfd;
+ int writefd;
+ };
+ } u;
+
+ pthread_mutex_t sync_mutex;
+ pthread_cond_t sync_cond;
+ bool sync;
+ bool destroy;
+};
+
+static int fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat);
+static uint32_t fdset_event_dispatch(void *arg);
+
+#define MAX_FDSETS 8
+
+static struct fdset *fdsets[MAX_FDSETS];
+static pthread_mutex_t fdsets_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+static struct fdset *
+fdset_lookup(const char *name)
+{
+ int i;
+
+ for (i = 0; i < MAX_FDSETS; i++) {
+ struct fdset *fdset = fdsets[i];
+ if (fdset == NULL)
+ continue;
+
+ if (!strncmp(fdset->name, name, RTE_THREAD_NAME_SIZE))
+ return fdset;
+ }
+
+ return NULL;
+}
+
+static int
+fdset_insert(struct fdset *fdset)
+{
+ int i;
+
+ for (i = 0; i < MAX_FDSETS; i++) {
+ if (fdsets[i] == NULL) {
+ fdsets[i] = fdset;
+ return 0;
+ }
+ }
+
+ return -1;
+}
+
static void
fdset_pipe_read_cb(int readfd, void *dat,
int *remove __rte_unused)
@@ -63,7 +140,7 @@ fdset_pipe_init(struct fdset *fdset)
return -1;
}
- ret = fdset_add(fdset, fdset->u.readfd,
+ ret = fdset_add_no_sync(fdset, fdset->u.readfd,
fdset_pipe_read_cb, NULL, fdset);
if (ret < 0) {
VHOST_FDMAN_LOG(ERR,
@@ -179,34 +256,77 @@ fdset_add_fd(struct fdset *pfdset, int idx, int fd,
pfd->revents = 0;
}
-void
-fdset_uninit(struct fdset *pfdset)
-{
- fdset_pipe_uninit(pfdset);
-}
-
-int
-fdset_init(struct fdset *pfdset)
+struct fdset *
+fdset_init(const char *name)
{
+ struct fdset *fdset;
+ uint32_t val;
int i;
- pthread_mutex_init(&pfdset->fd_mutex, NULL);
- pthread_mutex_init(&pfdset->fd_polling_mutex, NULL);
+ pthread_mutex_lock(&fdsets_mutex);
+ fdset = fdset_lookup(name);
+ if (fdset) {
+ pthread_mutex_unlock(&fdsets_mutex);
+ return fdset;
+ }
+
+ fdset = rte_zmalloc(NULL, sizeof(*fdset), 0);
+ if (!fdset) {
+ VHOST_FDMAN_LOG(ERR, "Failed to alloc fdset %s", name);
+ goto err_unlock;
+ }
+
+ rte_strscpy(fdset->name, name, RTE_THREAD_NAME_SIZE);
+
+ pthread_mutex_init(&fdset->fd_mutex, NULL);
+ pthread_mutex_init(&fdset->fd_polling_mutex, NULL);
for (i = 0; i < MAX_FDS; i++) {
- pfdset->fd[i].fd = -1;
- pfdset->fd[i].dat = NULL;
+ fdset->fd[i].fd = -1;
+ fdset->fd[i].dat = NULL;
+ }
+ fdset->num = 0;
+
+ if (fdset_pipe_init(fdset)) {
+ VHOST_FDMAN_LOG(ERR, "Failed to init pipe for %s", name);
+ goto err_free;
+ }
+
+ if (rte_thread_create_internal_control(&fdset->tid, fdset->name,
+ fdset_event_dispatch, fdset)) {
+ VHOST_FDMAN_LOG(ERR, "Failed to create %s event dispatch thread",
+ fdset->name);
+ goto err_pipe;
}
- pfdset->num = 0;
- return fdset_pipe_init(pfdset);
+ if (fdset_insert(fdset)) {
+ VHOST_FDMAN_LOG(ERR, "Failed to insert fdset %s", name);
+ goto err_thread;
+ }
+
+ pthread_mutex_unlock(&fdsets_mutex);
+
+ return fdset;
+
+err_thread:
+ fdset->destroy = true;
+ fdset_sync(fdset);
+ rte_thread_join(fdset->tid, &val);
+err_pipe:
+ fdset_pipe_uninit(fdset);
+err_free:
+ rte_free(fdset);
+err_unlock:
+ pthread_mutex_unlock(&fdsets_mutex);
+
+ return NULL;
}
/**
* Register the fd in the fdset with read/write handler and context.
*/
-int
-fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+static int
+fdset_add_no_sync(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
{
int i;
@@ -229,6 +349,18 @@ fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
fdset_add_fd(pfdset, i, fd, rcb, wcb, dat);
pthread_mutex_unlock(&pfdset->fd_mutex);
+ return 0;
+}
+
+int
+fdset_add(struct fdset *pfdset, int fd, fd_cb rcb, fd_cb wcb, void *dat)
+{
+ int ret;
+
+ ret = fdset_add_no_sync(pfdset, fd, rcb, wcb, dat);
+ if (ret < 0)
+ return ret;
+
fdset_sync(pfdset);
return 0;
@@ -312,7 +444,7 @@ fdset_try_del(struct fdset *pfdset, int fd)
* will wait until the flag is reset to zero(which indicates the callback is
* finished), then it could free the context after fdset_del.
*/
-uint32_t
+static uint32_t
fdset_event_dispatch(void *arg)
{
int i;
@@ -401,6 +533,9 @@ fdset_event_dispatch(void *arg)
if (need_shrink)
fdset_shrink(pfdset);
+
+ if (pfdset->destroy)
+ break;
}
return 0;
@@ -8,50 +8,19 @@
#include <poll.h>
#include <stdbool.h>
+struct fdset;
+
#define MAX_FDS 1024
typedef void (*fd_cb)(int fd, void *dat, int *remove);
-struct fdentry {
- int fd; /* -1 indicates this entry is empty */
- fd_cb rcb; /* callback when this fd is readable. */
- fd_cb wcb; /* callback when this fd is writeable.*/
- void *dat; /* fd context */
- int busy; /* whether this entry is being used in cb. */
-};
-
-struct fdset {
- struct pollfd rwfds[MAX_FDS];
- struct fdentry fd[MAX_FDS];
- pthread_mutex_t fd_mutex;
- pthread_mutex_t fd_polling_mutex;
- int num; /* current fd number of this fdset */
-
- union pipefds {
- struct {
- int pipefd[2];
- };
- struct {
- int readfd;
- int writefd;
- };
- } u;
-
- pthread_mutex_t sync_mutex;
- pthread_cond_t sync_cond;
- bool sync;
-};
-
-void fdset_uninit(struct fdset *pfdset);
-
-int fdset_init(struct fdset *pfdset);
+struct fdset *fdset_init(const char *name);
int fdset_add(struct fdset *pfdset, int fd,
fd_cb rcb, fd_cb wcb, void *dat);
void *fdset_del(struct fdset *pfdset, int fd);
-int fdset_try_del(struct fdset *pfdset, int fd);
-uint32_t fdset_event_dispatch(void *arg);
+int fdset_try_del(struct fdset *pfdset, int fd);
#endif
@@ -77,7 +77,7 @@ struct vhost_user_connection {
#define MAX_VHOST_SOCKET 1024
struct vhost_user {
struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET];
- struct fdset fdset;
+ struct fdset *fdset;
int vsocket_cnt;
pthread_mutex_t mutex;
};
@@ -262,7 +262,7 @@ vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket)
conn->connfd = fd;
conn->vsocket = vsocket;
conn->vid = vid;
- ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb,
+ ret = fdset_add(vhost_user.fdset, fd, vhost_user_read_cb,
NULL, conn);
if (ret < 0) {
VHOST_CONFIG_LOG(vsocket->path, ERR,
@@ -395,7 +395,7 @@ vhost_user_start_server(struct vhost_user_socket *vsocket)
if (ret < 0)
goto err;
- ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection,
+ ret = fdset_add(vhost_user.fdset, fd, vhost_user_server_new_connection,
NULL, vsocket);
if (ret < 0) {
VHOST_CONFIG_LOG(path, ERR, "failed to add listen fd %d to vhost server fdset",
@@ -1083,7 +1083,7 @@ rte_vhost_driver_unregister(const char *path)
* mutex lock, and try again since the r/wcb
* may use the mutex lock.
*/
- if (fdset_try_del(&vhost_user.fdset, vsocket->socket_fd) == -1) {
+ if (fdset_try_del(vhost_user.fdset, vsocket->socket_fd) == -1) {
pthread_mutex_unlock(&vhost_user.mutex);
goto again;
}
@@ -1103,7 +1103,7 @@ rte_vhost_driver_unregister(const char *path)
* try again since the r/wcb may use the
* conn_mutex and mutex locks.
*/
- if (fdset_try_del(&vhost_user.fdset,
+ if (fdset_try_del(vhost_user.fdset,
conn->connfd) == -1) {
pthread_mutex_unlock(&vsocket->conn_mutex);
pthread_mutex_unlock(&vhost_user.mutex);
@@ -1171,7 +1171,6 @@ int
rte_vhost_driver_start(const char *path)
{
struct vhost_user_socket *vsocket;
- static rte_thread_t fdset_tid;
pthread_mutex_lock(&vhost_user.mutex);
vsocket = find_vhost_user_socket(path);
@@ -1183,19 +1182,12 @@ rte_vhost_driver_start(const char *path)
if (vsocket->is_vduse)
return vduse_device_create(path, vsocket->net_compliant_ol_flags);
- if (fdset_tid.opaque_id == 0) {
- if (fdset_init(&vhost_user.fdset) < 0) {
+ if (vhost_user.fdset == NULL) {
+ vhost_user.fdset = fdset_init("vhost-evt");
+ if (vhost_user.fdset == NULL) {
VHOST_CONFIG_LOG(path, ERR, "failed to init Vhost-user fdset");
return -1;
}
-
- int ret = rte_thread_create_internal_control(&fdset_tid,
- "vhost-evt", fdset_event_dispatch, &vhost_user.fdset);
- if (ret != 0) {
- VHOST_CONFIG_LOG(path, ERR, "failed to create fdset handling thread");
- fdset_uninit(&vhost_user.fdset);
- return -1;
- }
}
if (vsocket->is_server)
@@ -28,13 +28,11 @@
#define VDUSE_CTRL_PATH "/dev/vduse/control"
struct vduse {
- struct fdset fdset;
+ struct fdset *fdset;
};
static struct vduse vduse;
-static bool vduse_events_thread;
-
static const char * const vduse_reqs_str[] = {
"VDUSE_GET_VQ_STATE",
"VDUSE_SET_STATUS",
@@ -215,7 +213,7 @@ vduse_vring_setup(struct virtio_net *dev, unsigned int index)
}
if (vq == dev->cvq) {
- ret = fdset_add(&vduse.fdset, vq->kickfd, vduse_control_queue_event, NULL, dev);
+ ret = fdset_add(vduse.fdset, vq->kickfd, vduse_control_queue_event, NULL, dev);
if (ret) {
VHOST_CONFIG_LOG(dev->ifname, ERR,
"Failed to setup kickfd handler for VQ %u: %s",
@@ -238,7 +236,7 @@ vduse_vring_cleanup(struct virtio_net *dev, unsigned int index)
int ret;
if (vq == dev->cvq && vq->kickfd >= 0)
- fdset_del(&vduse.fdset, vq->kickfd);
+ fdset_del(vduse.fdset, vq->kickfd);
vq_efd.index = index;
vq_efd.fd = VDUSE_EVENTFD_DEASSIGN;
@@ -413,7 +411,6 @@ int
vduse_device_create(const char *path, bool compliant_ol_flags)
{
int control_fd, dev_fd, vid, ret;
- rte_thread_t fdset_tid;
uint32_t i, max_queue_pairs, total_queues;
struct virtio_net *dev;
struct virtio_net_config vnet_config = {{ 0 }};
@@ -422,22 +419,12 @@ vduse_device_create(const char *path, bool compliant_ol_flags)
struct vduse_dev_config *dev_config = NULL;
const char *name = path + strlen("/dev/vduse/");
- /* If first device, create events dispatcher thread */
- if (vduse_events_thread == false) {
- if (fdset_init(&vduse.fdset) < 0) {
+ if (vduse.fdset == NULL) {
+ vduse.fdset = fdset_init("vduse-evt");
+ if (vduse.fdset == NULL) {
VHOST_CONFIG_LOG(path, ERR, "failed to init VDUSE fdset");
return -1;
}
-
- ret = rte_thread_create_internal_control(&fdset_tid, "vduse-evt",
- fdset_event_dispatch, &vduse.fdset);
- if (ret != 0) {
- VHOST_CONFIG_LOG(path, ERR, "failed to create vduse fdset handling thread");
- fdset_uninit(&vduse.fdset);
- return -1;
- }
-
- vduse_events_thread = true;
}
control_fd = open(VDUSE_CTRL_PATH, O_RDWR);
@@ -555,7 +542,7 @@ vduse_device_create(const char *path, bool compliant_ol_flags)
dev->cvq = dev->virtqueue[max_queue_pairs * 2];
- ret = fdset_add(&vduse.fdset, dev->vduse_dev_fd, vduse_events_handler, NULL, dev);
+ ret = fdset_add(vduse.fdset, dev->vduse_dev_fd, vduse_events_handler, NULL, dev);
if (ret) {
VHOST_CONFIG_LOG(name, ERR, "Failed to add fd %d to vduse fdset",
dev->vduse_dev_fd);
@@ -602,7 +589,7 @@ vduse_device_destroy(const char *path)
vduse_device_stop(dev);
- fdset_del(&vduse.fdset, dev->vduse_dev_fd);
+ fdset_del(vduse.fdset, dev->vduse_dev_fd);
if (dev->vduse_dev_fd >= 0) {
close(dev->vduse_dev_fd);