From patchwork Wed Jun 19 15:14:27 2019 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Nikos Dragazis X-Patchwork-Id: 54956 X-Patchwork-Delegate: maxime.coquelin@redhat.com Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id 33CF51C394; Wed, 19 Jun 2019 17:15:42 +0200 (CEST) Received: from mx0.arrikto.com (mx0.arrikto.com [212.71.252.59]) by dpdk.org (Postfix) with ESMTP id 86CA21C386 for ; Wed, 19 Jun 2019 17:15:37 +0200 (CEST) Received: from troi.prod.arr (mail.arr [10.99.0.5]) by mx0.arrikto.com (Postfix) with ESMTP id 12940182006; Wed, 19 Jun 2019 18:15:37 +0300 (EEST) Received: from localhost.localdomain (unknown [10.89.50.133]) by troi.prod.arr (Postfix) with ESMTPSA id 8CB73394; Wed, 19 Jun 2019 18:15:35 +0300 (EEST) From: Nikos Dragazis To: dev@dpdk.org Cc: Maxime Coquelin , Tiwei Bie , Zhihong Wang , Stefan Hajnoczi , Wei Wang , Stojaczyk Dariusz , Vangelis Koukis Date: Wed, 19 Jun 2019 18:14:27 +0300 Message-Id: <1560957293-17294-3-git-send-email-ndragazis@arrikto.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1560957293-17294-1-git-send-email-ndragazis@arrikto.com> References: <1560957293-17294-1-git-send-email-ndragazis@arrikto.com> Subject: [dpdk-dev] [PATCH 02/28] vhost: move socket management code X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" The socket.c file serves two purposes: 1. librte_vhost public API entry points, e.g. rte_vhost_driver_register(). 2. AF_UNIX socket management. Move AF_UNIX socket code into trans_af_unix.c so that socket.c only handles the librte_vhost public API entry points. This will make it possible to support other transports besides AF_UNIX. This patch is a preparatory step that simply moves code from socket.c to trans_af_unix.c unmodified, besides dropping 'static' qualifiers where necessary because socket.c now calls into trans_af_unix.c. A lot of socket.c state is exposed in vhost.h but this is a temporary measure and will be cleaned up in later patches. By simply moving code unmodified in this patch it will be easier to review the actual refactoring that follows. Signed-off-by: Nikos Dragazis Signed-off-by: Stefan Hajnoczi --- lib/librte_vhost/socket.c | 549 +-------------------------------------- lib/librte_vhost/trans_af_unix.c | 485 ++++++++++++++++++++++++++++++++++ lib/librte_vhost/vhost.h | 76 ++++++ 3 files changed, 562 insertions(+), 548 deletions(-) diff --git a/lib/librte_vhost/socket.c b/lib/librte_vhost/socket.c index 274988c..a993b67 100644 --- a/lib/librte_vhost/socket.c +++ b/lib/librte_vhost/socket.c @@ -4,16 +4,10 @@ #include #include -#include #include #include #include -#include -#include -#include #include -#include -#include #include #include @@ -22,71 +16,7 @@ #include "vhost.h" #include "vhost_user.h" - -TAILQ_HEAD(vhost_user_connection_list, vhost_user_connection); - -/* - * Every time rte_vhost_driver_register() is invoked, an associated - * vhost_user_socket struct will be created. - */ -struct vhost_user_socket { - struct vhost_user_connection_list conn_list; - pthread_mutex_t conn_mutex; - char *path; - int socket_fd; - struct sockaddr_un un; - bool is_server; - bool reconnect; - bool dequeue_zero_copy; - bool iommu_support; - bool use_builtin_virtio_net; - - /* - * The "supported_features" indicates the feature bits the - * vhost driver supports. The "features" indicates the feature - * bits after the rte_vhost_driver_features_disable/enable(). - * It is also the final feature bits used for vhost-user - * features negotiation. - */ - uint64_t supported_features; - uint64_t features; - - uint64_t protocol_features; - - /* - * Device id to identify a specific backend device. - * It's set to -1 for the default software implementation. - * If valid, one socket can have 1 connection only. - */ - int vdpa_dev_id; - - struct vhost_device_ops const *notify_ops; -}; - -struct vhost_user_connection { - struct vhost_user_socket *vsocket; - int connfd; - int vid; - - TAILQ_ENTRY(vhost_user_connection) next; -}; - -#define MAX_VHOST_SOCKET 1024 -struct vhost_user { - struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET]; - struct fdset fdset; - int vsocket_cnt; - pthread_mutex_t mutex; -}; - -#define MAX_VIRTIO_BACKLOG 128 - -static void vhost_user_server_new_connection(int fd, void *data, int *remove); -static void vhost_user_read_cb(int fd, void *dat, int *remove); -static int create_unix_socket(struct vhost_user_socket *vsocket); -static int vhost_user_start_client(struct vhost_user_socket *vsocket); - -static struct vhost_user vhost_user = { +struct vhost_user vhost_user = { .fdset = { .fd = { [0 ... MAX_FDS - 1] = {-1, NULL, NULL, NULL, 0} }, .fd_mutex = PTHREAD_MUTEX_INITIALIZER, @@ -97,459 +27,6 @@ static struct vhost_user vhost_user = { .mutex = PTHREAD_MUTEX_INITIALIZER, }; -/* - * return bytes# of read on success or negative val on failure. Update fdnum - * with number of fds read. - */ -int -read_fd_message(int sockfd, char *buf, int buflen, int *fds, int max_fds, - int *fd_num) -{ - struct iovec iov; - struct msghdr msgh; - char control[CMSG_SPACE(max_fds * sizeof(int))]; - struct cmsghdr *cmsg; - int got_fds = 0; - int ret; - - *fd_num = 0; - - memset(&msgh, 0, sizeof(msgh)); - iov.iov_base = buf; - iov.iov_len = buflen; - - msgh.msg_iov = &iov; - msgh.msg_iovlen = 1; - msgh.msg_control = control; - msgh.msg_controllen = sizeof(control); - - ret = recvmsg(sockfd, &msgh, 0); - if (ret <= 0) { - RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n"); - return ret; - } - - if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) { - RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n"); - return -1; - } - - for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; - cmsg = CMSG_NXTHDR(&msgh, cmsg)) { - if ((cmsg->cmsg_level == SOL_SOCKET) && - (cmsg->cmsg_type == SCM_RIGHTS)) { - got_fds = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int); - *fd_num = got_fds; - memcpy(fds, CMSG_DATA(cmsg), got_fds * sizeof(int)); - break; - } - } - - /* Clear out unused file descriptors */ - while (got_fds < max_fds) - fds[got_fds++] = -1; - - return ret; -} - -int -send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num) -{ - - struct iovec iov; - struct msghdr msgh; - size_t fdsize = fd_num * sizeof(int); - char control[CMSG_SPACE(fdsize)]; - struct cmsghdr *cmsg; - int ret; - - memset(&msgh, 0, sizeof(msgh)); - iov.iov_base = buf; - iov.iov_len = buflen; - - msgh.msg_iov = &iov; - msgh.msg_iovlen = 1; - - if (fds && fd_num > 0) { - msgh.msg_control = control; - msgh.msg_controllen = sizeof(control); - cmsg = CMSG_FIRSTHDR(&msgh); - if (cmsg == NULL) { - RTE_LOG(ERR, VHOST_CONFIG, "cmsg == NULL\n"); - errno = EINVAL; - return -1; - } - cmsg->cmsg_len = CMSG_LEN(fdsize); - cmsg->cmsg_level = SOL_SOCKET; - cmsg->cmsg_type = SCM_RIGHTS; - memcpy(CMSG_DATA(cmsg), fds, fdsize); - } else { - msgh.msg_control = NULL; - msgh.msg_controllen = 0; - } - - do { - ret = sendmsg(sockfd, &msgh, MSG_NOSIGNAL); - } while (ret < 0 && errno == EINTR); - - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, "sendmsg error\n"); - return ret; - } - - return ret; -} - -static void -vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket) -{ - int vid; - size_t size; - struct vhost_user_connection *conn; - int ret; - - if (vsocket == NULL) - return; - - conn = malloc(sizeof(*conn)); - if (conn == NULL) { - close(fd); - return; - } - - vid = vhost_new_device(); - if (vid == -1) { - goto err; - } - - size = strnlen(vsocket->path, PATH_MAX); - vhost_set_ifname(vid, vsocket->path, size); - - vhost_set_builtin_virtio_net(vid, vsocket->use_builtin_virtio_net); - - vhost_attach_vdpa_device(vid, vsocket->vdpa_dev_id); - - if (vsocket->dequeue_zero_copy) - vhost_enable_dequeue_zero_copy(vid); - - RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid); - - if (vsocket->notify_ops->new_connection) { - ret = vsocket->notify_ops->new_connection(vid); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to add vhost user connection with fd %d\n", - fd); - goto err_cleanup; - } - } - - conn->connfd = fd; - conn->vsocket = vsocket; - conn->vid = vid; - ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb, - NULL, conn); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to add fd %d into vhost server fdset\n", - fd); - - if (vsocket->notify_ops->destroy_connection) - vsocket->notify_ops->destroy_connection(conn->vid); - - goto err_cleanup; - } - - pthread_mutex_lock(&vsocket->conn_mutex); - TAILQ_INSERT_TAIL(&vsocket->conn_list, conn, next); - pthread_mutex_unlock(&vsocket->conn_mutex); - - fdset_pipe_notify(&vhost_user.fdset); - return; - -err_cleanup: - vhost_destroy_device(vid); -err: - free(conn); - close(fd); -} - -/* call back when there is new vhost-user connection from client */ -static void -vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused) -{ - struct vhost_user_socket *vsocket = dat; - - fd = accept(fd, NULL, NULL); - if (fd < 0) - return; - - RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd); - vhost_user_add_connection(fd, vsocket); -} - -static void -vhost_user_read_cb(int connfd, void *dat, int *remove) -{ - struct vhost_user_connection *conn = dat; - struct vhost_user_socket *vsocket = conn->vsocket; - int ret; - - ret = vhost_user_msg_handler(conn->vid, connfd); - if (ret < 0) { - struct virtio_net *dev = get_device(conn->vid); - - close(connfd); - *remove = 1; - - if (dev) - vhost_destroy_device_notify(dev); - - if (vsocket->notify_ops->destroy_connection) - vsocket->notify_ops->destroy_connection(conn->vid); - - vhost_destroy_device(conn->vid); - - pthread_mutex_lock(&vsocket->conn_mutex); - TAILQ_REMOVE(&vsocket->conn_list, conn, next); - pthread_mutex_unlock(&vsocket->conn_mutex); - - free(conn); - - if (vsocket->reconnect) { - create_unix_socket(vsocket); - vhost_user_start_client(vsocket); - } - } -} - -static int -create_unix_socket(struct vhost_user_socket *vsocket) -{ - int fd; - struct sockaddr_un *un = &vsocket->un; - - fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (fd < 0) - return -1; - RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n", - vsocket->is_server ? "server" : "client", fd); - - if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) { - RTE_LOG(ERR, VHOST_CONFIG, - "vhost-user: can't set nonblocking mode for socket, fd: " - "%d (%s)\n", fd, strerror(errno)); - close(fd); - return -1; - } - - memset(un, 0, sizeof(*un)); - un->sun_family = AF_UNIX; - strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path)); - un->sun_path[sizeof(un->sun_path) - 1] = '\0'; - - vsocket->socket_fd = fd; - return 0; -} - -static int -vhost_user_start_server(struct vhost_user_socket *vsocket) -{ - int ret; - int fd = vsocket->socket_fd; - const char *path = vsocket->path; - - /* - * bind () may fail if the socket file with the same name already - * exists. But the library obviously should not delete the file - * provided by the user, since we can not be sure that it is not - * being used by other applications. Moreover, many applications form - * socket names based on user input, which is prone to errors. - * - * The user must ensure that the socket does not exist before - * registering the vhost driver in server mode. - */ - ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un)); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to bind to %s: %s; remove it and try again\n", - path, strerror(errno)); - goto err; - } - RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path); - - ret = listen(fd, MAX_VIRTIO_BACKLOG); - if (ret < 0) - goto err; - - ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection, - NULL, vsocket); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to add listen fd %d to vhost server fdset\n", - fd); - goto err; - } - - return 0; - -err: - close(fd); - return -1; -} - -struct vhost_user_reconnect { - struct sockaddr_un un; - int fd; - struct vhost_user_socket *vsocket; - - TAILQ_ENTRY(vhost_user_reconnect) next; -}; - -TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect); -struct vhost_user_reconnect_list { - struct vhost_user_reconnect_tailq_list head; - pthread_mutex_t mutex; -}; - -static struct vhost_user_reconnect_list reconn_list; -static pthread_t reconn_tid; - -static int -vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz) -{ - int ret, flags; - - ret = connect(fd, un, sz); - if (ret < 0 && errno != EISCONN) - return -1; - - flags = fcntl(fd, F_GETFL, 0); - if (flags < 0) { - RTE_LOG(ERR, VHOST_CONFIG, - "can't get flags for connfd %d\n", fd); - return -2; - } - if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) { - RTE_LOG(ERR, VHOST_CONFIG, - "can't disable nonblocking on fd %d\n", fd); - return -2; - } - return 0; -} - -static void * -vhost_user_client_reconnect(void *arg __rte_unused) -{ - int ret; - struct vhost_user_reconnect *reconn, *next; - - while (1) { - pthread_mutex_lock(&reconn_list.mutex); - - /* - * An equal implementation of TAILQ_FOREACH_SAFE, - * which does not exist on all platforms. - */ - for (reconn = TAILQ_FIRST(&reconn_list.head); - reconn != NULL; reconn = next) { - next = TAILQ_NEXT(reconn, next); - - ret = vhost_user_connect_nonblock(reconn->fd, - (struct sockaddr *)&reconn->un, - sizeof(reconn->un)); - if (ret == -2) { - close(reconn->fd); - RTE_LOG(ERR, VHOST_CONFIG, - "reconnection for fd %d failed\n", - reconn->fd); - goto remove_fd; - } - if (ret == -1) - continue; - - RTE_LOG(INFO, VHOST_CONFIG, - "%s: connected\n", reconn->vsocket->path); - vhost_user_add_connection(reconn->fd, reconn->vsocket); -remove_fd: - TAILQ_REMOVE(&reconn_list.head, reconn, next); - free(reconn); - } - - pthread_mutex_unlock(&reconn_list.mutex); - sleep(1); - } - - return NULL; -} - -static int -vhost_user_reconnect_init(void) -{ - int ret; - - ret = pthread_mutex_init(&reconn_list.mutex, NULL); - if (ret < 0) { - RTE_LOG(ERR, VHOST_CONFIG, "failed to initialize mutex"); - return ret; - } - TAILQ_INIT(&reconn_list.head); - - ret = rte_ctrl_thread_create(&reconn_tid, "vhost_reconn", NULL, - vhost_user_client_reconnect, NULL); - if (ret != 0) { - RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread"); - if (pthread_mutex_destroy(&reconn_list.mutex)) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to destroy reconnect mutex"); - } - } - - return ret; -} - -static int -vhost_user_start_client(struct vhost_user_socket *vsocket) -{ - int ret; - int fd = vsocket->socket_fd; - const char *path = vsocket->path; - struct vhost_user_reconnect *reconn; - - ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un, - sizeof(vsocket->un)); - if (ret == 0) { - vhost_user_add_connection(fd, vsocket); - return 0; - } - - RTE_LOG(WARNING, VHOST_CONFIG, - "failed to connect to %s: %s\n", - path, strerror(errno)); - - if (ret == -2 || !vsocket->reconnect) { - close(fd); - return -1; - } - - RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path); - reconn = malloc(sizeof(*reconn)); - if (reconn == NULL) { - RTE_LOG(ERR, VHOST_CONFIG, - "failed to allocate memory for reconnect\n"); - close(fd); - return -1; - } - reconn->un = vsocket->un; - reconn->fd = fd; - reconn->vsocket = vsocket; - pthread_mutex_lock(&reconn_list.mutex); - TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next); - pthread_mutex_unlock(&reconn_list.mutex); - - return 0; -} - static struct vhost_user_socket * find_vhost_user_socket(const char *path) { @@ -952,30 +429,6 @@ rte_vhost_driver_register(const char *path, uint64_t flags) return ret; } -static bool -vhost_user_remove_reconnect(struct vhost_user_socket *vsocket) -{ - int found = false; - struct vhost_user_reconnect *reconn, *next; - - pthread_mutex_lock(&reconn_list.mutex); - - for (reconn = TAILQ_FIRST(&reconn_list.head); - reconn != NULL; reconn = next) { - next = TAILQ_NEXT(reconn, next); - - if (reconn->vsocket == vsocket) { - TAILQ_REMOVE(&reconn_list.head, reconn, next); - close(reconn->fd); - free(reconn); - found = true; - break; - } - } - pthread_mutex_unlock(&reconn_list.mutex); - return found; -} - /** * Unregister the specified vhost socket */ diff --git a/lib/librte_vhost/trans_af_unix.c b/lib/librte_vhost/trans_af_unix.c index 3f0c308..89a5b7d 100644 --- a/lib/librte_vhost/trans_af_unix.c +++ b/lib/librte_vhost/trans_af_unix.c @@ -4,7 +4,492 @@ * Copyright(c) 2019 Arrikto Inc. */ +#include + +#include + #include "vhost.h" +#include "vhost_user.h" + +#define MAX_VIRTIO_BACKLOG 128 + +static void vhost_user_read_cb(int connfd, void *dat, int *remove); + +/* + * return bytes# of read on success or negative val on failure. Update fdnum + * with number of fds read. + */ +int +read_fd_message(int sockfd, char *buf, int buflen, int *fds, int max_fds, + int *fd_num) +{ + struct iovec iov; + struct msghdr msgh; + char control[CMSG_SPACE(max_fds * sizeof(int))]; + struct cmsghdr *cmsg; + int got_fds = 0; + int ret; + + *fd_num = 0; + + memset(&msgh, 0, sizeof(msgh)); + iov.iov_base = buf; + iov.iov_len = buflen; + + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + msgh.msg_control = control; + msgh.msg_controllen = sizeof(control); + + ret = recvmsg(sockfd, &msgh, 0); + if (ret <= 0) { + RTE_LOG(ERR, VHOST_CONFIG, "recvmsg failed\n"); + return ret; + } + + if (msgh.msg_flags & (MSG_TRUNC | MSG_CTRUNC)) { + RTE_LOG(ERR, VHOST_CONFIG, "truncted msg\n"); + return -1; + } + + for (cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; + cmsg = CMSG_NXTHDR(&msgh, cmsg)) { + if ((cmsg->cmsg_level == SOL_SOCKET) && + (cmsg->cmsg_type == SCM_RIGHTS)) { + got_fds = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof(int); + *fd_num = got_fds; + memcpy(fds, CMSG_DATA(cmsg), got_fds * sizeof(int)); + break; + } + } + + /* Clear out unused file descriptors */ + while (got_fds < max_fds) + fds[got_fds++] = -1; + + return ret; +} + +int +send_fd_message(int sockfd, char *buf, int buflen, int *fds, int fd_num) +{ + struct iovec iov; + struct msghdr msgh; + size_t fdsize = fd_num * sizeof(int); + char control[CMSG_SPACE(fdsize)]; + struct cmsghdr *cmsg; + int ret; + + memset(&msgh, 0, sizeof(msgh)); + iov.iov_base = buf; + iov.iov_len = buflen; + + msgh.msg_iov = &iov; + msgh.msg_iovlen = 1; + + if (fds && fd_num > 0) { + msgh.msg_control = control; + msgh.msg_controllen = sizeof(control); + cmsg = CMSG_FIRSTHDR(&msgh); + if (cmsg == NULL) { + RTE_LOG(ERR, VHOST_CONFIG, "cmsg == NULL\n"); + errno = EINVAL; + return -1; + } + cmsg->cmsg_len = CMSG_LEN(fdsize); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + memcpy(CMSG_DATA(cmsg), fds, fdsize); + } else { + msgh.msg_control = NULL; + msgh.msg_controllen = 0; + } + + do { + ret = sendmsg(sockfd, &msgh, MSG_NOSIGNAL); + } while (ret < 0 && errno == EINTR); + + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, "sendmsg error\n"); + return ret; + } + + return ret; +} + +static void +vhost_user_add_connection(int fd, struct vhost_user_socket *vsocket) +{ + int vid; + size_t size; + struct vhost_user_connection *conn; + int ret; + + if (vsocket == NULL) + return; + + conn = malloc(sizeof(*conn)); + if (conn == NULL) { + close(fd); + return; + } + + vid = vhost_new_device(); + if (vid == -1) { + goto err; + } + + size = strnlen(vsocket->path, PATH_MAX); + vhost_set_ifname(vid, vsocket->path, size); + + vhost_set_builtin_virtio_net(vid, vsocket->use_builtin_virtio_net); + + vhost_attach_vdpa_device(vid, vsocket->vdpa_dev_id); + + if (vsocket->dequeue_zero_copy) + vhost_enable_dequeue_zero_copy(vid); + + RTE_LOG(INFO, VHOST_CONFIG, "new device, handle is %d\n", vid); + + if (vsocket->notify_ops->new_connection) { + ret = vsocket->notify_ops->new_connection(vid); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to add vhost user connection with fd %d\n", + fd); + goto err_cleanup; + } + } + + conn->connfd = fd; + conn->vsocket = vsocket; + conn->vid = vid; + ret = fdset_add(&vhost_user.fdset, fd, vhost_user_read_cb, + NULL, conn); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to add fd %d into vhost server fdset\n", + fd); + + if (vsocket->notify_ops->destroy_connection) + vsocket->notify_ops->destroy_connection(conn->vid); + + goto err_cleanup; + } + + pthread_mutex_lock(&vsocket->conn_mutex); + TAILQ_INSERT_TAIL(&vsocket->conn_list, conn, next); + pthread_mutex_unlock(&vsocket->conn_mutex); + + fdset_pipe_notify(&vhost_user.fdset); + return; + +err_cleanup: + vhost_destroy_device(vid); +err: + free(conn); + close(fd); +} + +/* call back when there is new vhost-user connection from client */ +static void +vhost_user_server_new_connection(int fd, void *dat, int *remove __rte_unused) +{ + struct vhost_user_socket *vsocket = dat; + + fd = accept(fd, NULL, NULL); + if (fd < 0) + return; + + RTE_LOG(INFO, VHOST_CONFIG, "new vhost user connection is %d\n", fd); + vhost_user_add_connection(fd, vsocket); +} + +static void +vhost_user_read_cb(int connfd, void *dat, int *remove) +{ + struct vhost_user_connection *conn = dat; + struct vhost_user_socket *vsocket = conn->vsocket; + int ret; + + ret = vhost_user_msg_handler(conn->vid, connfd); + if (ret < 0) { + struct virtio_net *dev = get_device(conn->vid); + + close(connfd); + *remove = 1; + + if (dev) + vhost_destroy_device_notify(dev); + + if (vsocket->notify_ops->destroy_connection) + vsocket->notify_ops->destroy_connection(conn->vid); + + vhost_destroy_device(conn->vid); + + pthread_mutex_lock(&vsocket->conn_mutex); + TAILQ_REMOVE(&vsocket->conn_list, conn, next); + pthread_mutex_unlock(&vsocket->conn_mutex); + + free(conn); + + if (vsocket->reconnect) { + create_unix_socket(vsocket); + vhost_user_start_client(vsocket); + } + } +} + +int +create_unix_socket(struct vhost_user_socket *vsocket) +{ + int fd; + struct sockaddr_un *un = &vsocket->un; + + fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (fd < 0) + return -1; + RTE_LOG(INFO, VHOST_CONFIG, "vhost-user %s: socket created, fd: %d\n", + vsocket->is_server ? "server" : "client", fd); + + if (!vsocket->is_server && fcntl(fd, F_SETFL, O_NONBLOCK)) { + RTE_LOG(ERR, VHOST_CONFIG, + "vhost-user: can't set nonblocking mode for socket, fd: " + "%d (%s)\n", fd, strerror(errno)); + close(fd); + return -1; + } + + memset(un, 0, sizeof(*un)); + un->sun_family = AF_UNIX; + strncpy(un->sun_path, vsocket->path, sizeof(un->sun_path)); + un->sun_path[sizeof(un->sun_path) - 1] = '\0'; + + vsocket->socket_fd = fd; + return 0; +} + +int +vhost_user_start_server(struct vhost_user_socket *vsocket) +{ + int ret; + int fd = vsocket->socket_fd; + const char *path = vsocket->path; + + /* + * bind () may fail if the socket file with the same name already + * exists. But the library obviously should not delete the file + * provided by the user, since we can not be sure that it is not + * being used by other applications. Moreover, many applications form + * socket names based on user input, which is prone to errors. + * + * The user must ensure that the socket does not exist before + * registering the vhost driver in server mode. + */ + ret = bind(fd, (struct sockaddr *)&vsocket->un, sizeof(vsocket->un)); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to bind to %s: %s; remove it and try again\n", + path, strerror(errno)); + goto err; + } + RTE_LOG(INFO, VHOST_CONFIG, "bind to %s\n", path); + + ret = listen(fd, MAX_VIRTIO_BACKLOG); + if (ret < 0) + goto err; + + ret = fdset_add(&vhost_user.fdset, fd, vhost_user_server_new_connection, + NULL, vsocket); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to add listen fd %d to vhost server fdset\n", + fd); + goto err; + } + + return 0; + +err: + close(fd); + return -1; +} + +struct vhost_user_reconnect { + struct sockaddr_un un; + int fd; + struct vhost_user_socket *vsocket; + + TAILQ_ENTRY(vhost_user_reconnect) next; +}; + +TAILQ_HEAD(vhost_user_reconnect_tailq_list, vhost_user_reconnect); +struct vhost_user_reconnect_list { + struct vhost_user_reconnect_tailq_list head; + pthread_mutex_t mutex; +}; + +static struct vhost_user_reconnect_list reconn_list; +pthread_t reconn_tid; + +static int +vhost_user_connect_nonblock(int fd, struct sockaddr *un, size_t sz) +{ + int ret, flags; + + ret = connect(fd, un, sz); + if (ret < 0 && errno != EISCONN) + return -1; + + flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) { + RTE_LOG(ERR, VHOST_CONFIG, + "can't get flags for connfd %d\n", fd); + return -2; + } + if ((flags & O_NONBLOCK) && fcntl(fd, F_SETFL, flags & ~O_NONBLOCK)) { + RTE_LOG(ERR, VHOST_CONFIG, + "can't disable nonblocking on fd %d\n", fd); + return -2; + } + return 0; +} + +static void * +vhost_user_client_reconnect(void *arg __rte_unused) +{ + int ret; + struct vhost_user_reconnect *reconn, *next; + + while (1) { + pthread_mutex_lock(&reconn_list.mutex); + + /* + * An equal implementation of TAILQ_FOREACH_SAFE, + * which does not exist on all platforms. + */ + for (reconn = TAILQ_FIRST(&reconn_list.head); + reconn != NULL; reconn = next) { + next = TAILQ_NEXT(reconn, next); + + ret = vhost_user_connect_nonblock(reconn->fd, + (struct sockaddr *)&reconn->un, + sizeof(reconn->un)); + if (ret == -2) { + close(reconn->fd); + RTE_LOG(ERR, VHOST_CONFIG, + "reconnection for fd %d failed\n", + reconn->fd); + goto remove_fd; + } + if (ret == -1) + continue; + + RTE_LOG(INFO, VHOST_CONFIG, + "%s: connected\n", reconn->vsocket->path); + vhost_user_add_connection(reconn->fd, reconn->vsocket); +remove_fd: + TAILQ_REMOVE(&reconn_list.head, reconn, next); + free(reconn); + } + + pthread_mutex_unlock(&reconn_list.mutex); + sleep(1); + } + + return NULL; +} + +int +vhost_user_reconnect_init(void) +{ + int ret; + + ret = pthread_mutex_init(&reconn_list.mutex, NULL); + if (ret < 0) { + RTE_LOG(ERR, VHOST_CONFIG, "failed to initialize mutex"); + return ret; + } + TAILQ_INIT(&reconn_list.head); + + ret = rte_ctrl_thread_create(&reconn_tid, "vhost_reconn", NULL, + vhost_user_client_reconnect, NULL); + if (ret != 0) { + RTE_LOG(ERR, VHOST_CONFIG, "failed to create reconnect thread"); + if (pthread_mutex_destroy(&reconn_list.mutex)) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to destroy reconnect mutex"); + } + } + + return ret; +} + +int +vhost_user_start_client(struct vhost_user_socket *vsocket) +{ + int ret; + int fd = vsocket->socket_fd; + const char *path = vsocket->path; + struct vhost_user_reconnect *reconn; + + ret = vhost_user_connect_nonblock(fd, (struct sockaddr *)&vsocket->un, + sizeof(vsocket->un)); + if (ret == 0) { + vhost_user_add_connection(fd, vsocket); + return 0; + } + + RTE_LOG(WARNING, VHOST_CONFIG, + "failed to connect to %s: %s\n", + path, strerror(errno)); + + if (ret == -2 || !vsocket->reconnect) { + close(fd); + return -1; + } + + RTE_LOG(INFO, VHOST_CONFIG, "%s: reconnecting...\n", path); + reconn = malloc(sizeof(*reconn)); + if (reconn == NULL) { + RTE_LOG(ERR, VHOST_CONFIG, + "failed to allocate memory for reconnect\n"); + close(fd); + return -1; + } + reconn->un = vsocket->un; + reconn->fd = fd; + reconn->vsocket = vsocket; + pthread_mutex_lock(&reconn_list.mutex); + TAILQ_INSERT_TAIL(&reconn_list.head, reconn, next); + pthread_mutex_unlock(&reconn_list.mutex); + + return 0; +} + +bool +vhost_user_remove_reconnect(struct vhost_user_socket *vsocket) +{ + int found = false; + struct vhost_user_reconnect *reconn, *next; + + pthread_mutex_lock(&reconn_list.mutex); + + for (reconn = TAILQ_FIRST(&reconn_list.head); + reconn != NULL; reconn = next) { + next = TAILQ_NEXT(reconn, next); + + if (reconn->vsocket == vsocket) { + TAILQ_REMOVE(&reconn_list.head, reconn, next); + close(reconn->fd); + free(reconn); + found = true; + break; + } + } + pthread_mutex_unlock(&reconn_list.mutex); + return found; +} static int af_unix_vring_call(struct virtio_net *dev __rte_unused, diff --git a/lib/librte_vhost/vhost.h b/lib/librte_vhost/vhost.h index 077f213..c363369 100644 --- a/lib/librte_vhost/vhost.h +++ b/lib/librte_vhost/vhost.h @@ -5,6 +5,7 @@ #ifndef _VHOST_NET_CDEV_H_ #define _VHOST_NET_CDEV_H_ #include +#include #include #include #include @@ -13,13 +14,16 @@ #include #include #include +#include /* TODO remove when trans_af_unix.c refactoring is done */ #include +#include #include #include #include #include +#include "fd_man.h" #include "rte_vhost.h" #include "rte_vdpa.h" @@ -360,6 +364,78 @@ struct virtio_net { struct rte_vhost_user_extern_ops extern_ops; } __rte_cache_aligned; +/* The vhost_user, vhost_user_socket, vhost_user_connection, and reconnect + * declarations are temporary measures for moving AF_UNIX code into + * trans_af_unix.c. They will be cleaned up as socket.c is untangled from + * trans_af_unix.c. + */ +TAILQ_HEAD(vhost_user_connection_list, vhost_user_connection); + +/* + * Every time rte_vhost_driver_register() is invoked, an associated + * vhost_user_socket struct will be created. + */ +struct vhost_user_socket { + struct vhost_user_connection_list conn_list; + pthread_mutex_t conn_mutex; + char *path; + int socket_fd; + struct sockaddr_un un; + bool is_server; + bool reconnect; + bool dequeue_zero_copy; + bool iommu_support; + bool use_builtin_virtio_net; + + /* + * The "supported_features" indicates the feature bits the + * vhost driver supports. The "features" indicates the feature + * bits after the rte_vhost_driver_features_disable/enable(). + * It is also the final feature bits used for vhost-user + * features negotiation. + */ + uint64_t supported_features; + uint64_t features; + + uint64_t protocol_features; + + /* + * Device id to identify a specific backend device. + * It's set to -1 for the default software implementation. + * If valid, one socket can have 1 connection only. + */ + int vdpa_dev_id; + + struct vhost_device_ops const *notify_ops; +}; + +struct vhost_user_connection { + struct vhost_user_socket *vsocket; + int connfd; + int vid; + + TAILQ_ENTRY(vhost_user_connection) next; +}; + +#define MAX_VHOST_SOCKET 1024 +struct vhost_user { + struct vhost_user_socket *vsockets[MAX_VHOST_SOCKET]; + struct fdset fdset; + int vsocket_cnt; + pthread_mutex_t mutex; +}; + +extern struct vhost_user vhost_user; + +int create_unix_socket(struct vhost_user_socket *vsocket); +int vhost_user_start_server(struct vhost_user_socket *vsocket); +int vhost_user_start_client(struct vhost_user_socket *vsocket); + +extern pthread_t reconn_tid; + +int vhost_user_reconnect_init(void); +bool vhost_user_remove_reconnect(struct vhost_user_socket *vsocket); + static __rte_always_inline bool vq_is_packed(struct virtio_net *dev) {