From patchwork Thu Nov 30 18:44:09 2017 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Jianfeng Tan X-Patchwork-Id: 31839 X-Patchwork-Delegate: thomas@monjalon.net Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id BF66B1AEF3; Thu, 30 Nov 2017 19:42:14 +0100 (CET) Received: from mga02.intel.com (mga02.intel.com [134.134.136.20]) by dpdk.org (Postfix) with ESMTP id CDCCA7D04 for ; Thu, 30 Nov 2017 19:42:09 +0100 (CET) Received: from fmsmga005.fm.intel.com ([10.253.24.32]) by orsmga101.jf.intel.com with ESMTP/TLS/DHE-RSA-AES256-GCM-SHA384; 30 Nov 2017 10:42:09 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.45,341,1508828400"; d="scan'208";a="182109663" Received: from dpdk06.sh.intel.com ([10.67.110.196]) by fmsmga005.fm.intel.com with ESMTP; 30 Nov 2017 10:42:08 -0800 From: Jianfeng Tan To: dev@dpdk.org Cc: anatoly.burakov@intel.com, bruce.richardson@intel.com, konstantin.ananyev@intel.com, thomas@monjalon.net, Jianfeng Tan Date: Thu, 30 Nov 2017 18:44:09 +0000 Message-Id: <1512067450-59203-3-git-send-email-jianfeng.tan@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1512067450-59203-1-git-send-email-jianfeng.tan@intel.com> References: <1512067450-59203-1-git-send-email-jianfeng.tan@intel.com> Subject: [dpdk-dev] [PATCH 2/3] eal: add synchronous multi-process communication X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: DPDK patches and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" We need the synchronous way for multi-process communication, that is to say we need an immediate response after we send a message to the other side. We will stop the mp_handler thread, and after sending message, the send thread will wait there for reponse and process the respond. Suggested-by: Anatoly Burakov Signed-off-by: Jianfeng Tan --- lib/librte_eal/common/eal_common_proc.c | 53 +++++++++++++++++++++++++++++++-- lib/librte_eal/common/include/rte_eal.h | 5 +++- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c index 5d0a095..65ebaf2 100644 --- a/lib/librte_eal/common/eal_common_proc.c +++ b/lib/librte_eal/common/eal_common_proc.c @@ -30,6 +30,8 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#define _GNU_SOURCE + #include #include #include @@ -41,6 +43,8 @@ #include #include #include +#include +#include #include #include @@ -134,6 +138,7 @@ rte_eal_mp_action_unregister(const char *name) struct mp_fds { int efd; + int evfd; /* eventfd used for pausing mp_handler thread */ union { /* fds for primary process */ @@ -331,6 +336,13 @@ mp_handler(void *arg __rte_unused) exit(EXIT_FAILURE); } + ev.data.fd = mp_fds.evfd; + if (epoll_ctl(mp_fds.efd, EPOLL_CTL_ADD, ev.data.fd, &ev) < 0) { + RTE_LOG(ERR, EAL, "epoll_ctl failed: %s\n", + strerror(errno)); + exit(EXIT_FAILURE); + } + events = calloc(20, sizeof ev); while (1) { @@ -348,6 +360,14 @@ mp_handler(void *arg __rte_unused) continue; } + if (events[i].data.fd == mp_fds.evfd) { + RTE_LOG(INFO, EAL, "mp_handler thread will pause\n"); + pause(); + RTE_LOG(INFO, EAL, "mp_handler thread stops pausing\n"); + + continue; + } + fd = events[i].data.fd; if ((events[i].events & EPOLLIN)) { @@ -377,13 +397,14 @@ mp_handler(void *arg __rte_unused) return NULL; } +static pthread_t tid; + int rte_eal_mp_channel_init(void) { int i, fd, ret; const char *path; struct sockaddr_un un; - pthread_t tid; char thread_name[RTE_MAX_THREAD_NAME_LEN]; mp_fds.efd = epoll_create1(0); @@ -462,6 +483,8 @@ rte_eal_mp_channel_init(void) return -1; } + mp_fds.evfd = eventfd(0, 0); + return 0; } @@ -485,7 +508,8 @@ rte_eal_mp_sendmsg(const char *action_name, const void *params, int len_params, int fds[], - int fds_num) + int fds_num, + int need_ack) { int i; int ret = 0; @@ -511,6 +535,11 @@ rte_eal_mp_sendmsg(const char *action_name, RTE_LOG(INFO, EAL, "send msg: %s, %d\n", action_name, len_msg); + if (need_ack) { + // stop mp_handler thread. + eventfd_write(mp_fds.evfd, (eventfd_t)1); + } + msg = malloc(len_msg); if (!msg) { RTE_LOG(ERR, EAL, "Cannot alloc memory for msg\n"); @@ -547,12 +576,32 @@ rte_eal_mp_sendmsg(const char *action_name, ret = send_msg(mp_fds.secondaries[i], &msgh); if (ret < 0) break; + + if (need_ack) { + /* We will hang there until the other side + * responses and what if other side is sending + * msg at the same time? + */ + process_msg(mp_fds.secondaries[i]); + } } } else { ret = send_msg(mp_fds.primary, &msgh); + + if (ret > 0 && need_ack) { + // We will hang there until the other side responses + ret = process_msg(mp_fds.primary); + } } free(msg); + if (need_ack) { + // start mp_handler thread. + union sigval value; + + pthread_sigqueue(tid, 0, value); + } + return ret; } diff --git a/lib/librte_eal/common/include/rte_eal.h b/lib/librte_eal/common/include/rte_eal.h index 8776bcf..9875cae 100644 --- a/lib/librte_eal/common/include/rte_eal.h +++ b/lib/librte_eal/common/include/rte_eal.h @@ -274,13 +274,16 @@ void rte_eal_mp_action_unregister(const char *name); * @param fds_num * The fds_num argument is number of fds to be sent with sendmsg. * + * @param need_ack + * The fds_num argument is number of fds to be sent with sendmsg. + * * @return * - (>=0) on success. * - (<0) on failure. */ int rte_eal_mp_sendmsg(const char *action_name, const void *params, - int len_params, int fds[], int fds_num); + int len_params, int fds[], int fds_num, int need_ack); /** * Usage function typedef used by the application usage function.