[dpdk-dev,v3,3/5] eal: use locks to determine if secondary process is active

Message ID eec1a7d6c2afc3813cbc334e887887fecb57fcb6.1519740527.git.anatoly.burakov@intel.com (mailing list archive)
State Superseded, archived
Headers

Checks

Context Check Description
ci/checkpatch success coding style OK
ci/Intel-compilation fail apply patch file failure

Commit Message

Burakov, Anatoly Feb. 27, 2018, 2:35 p.m. UTC
  Previously, IPC would remove sockets it considers to be "inactive"
based on whether they have responded. Change this to create lock
files in addition to socket files, so that we can determine if
secondary process is active before attempting to communicate with
it. That way, we can distinguish secondaries that are alive but
are not responding, from those that have already died.

Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
---

Notes:
    v3: no changes
    
    v2: no changes

 lib/librte_eal/common/eal_common_proc.c | 204 +++++++++++++++++++++++++++-----
 1 file changed, 175 insertions(+), 29 deletions(-)
  

Comments

Jianfeng Tan Feb. 28, 2018, 1:26 a.m. UTC | #1
> -----Original Message-----
> From: Burakov, Anatoly
> Sent: Tuesday, February 27, 2018 10:36 PM
> To: dev@dpdk.org
> Cc: Tan, Jianfeng
> Subject: [PATCH v3 3/5] eal: use locks to determine if secondary process is
> active
> 
> Previously, IPC would remove sockets it considers to be "inactive"
> based on whether they have responded.

To be more precise, it was not depending on if the other side responses or not; it was depending on sendmsg return error, ECONNREFUSED.

> Change this to create lock
> files in addition to socket files, so that we can determine if
> secondary process is active before attempting to communicate with
> it. That way, we can distinguish secondaries that are alive but
> are not responding, from those that have already died.

I think, by the old way, we can also "distinguish secondaries that are alive but are not responding, from those that have already died", can't we?

Thanks,
Jianfeng

> 
> Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
> ---
> 
> Notes:
>     v3: no changes
> 
>     v2: no changes
> 
>  lib/librte_eal/common/eal_common_proc.c | 204
> +++++++++++++++++++++++++++-----
>  1 file changed, 175 insertions(+), 29 deletions(-)
> 
> diff --git a/lib/librte_eal/common/eal_common_proc.c
> b/lib/librte_eal/common/eal_common_proc.c
> index a6e24e6..7c87971 100644
> --- a/lib/librte_eal/common/eal_common_proc.c
> +++ b/lib/librte_eal/common/eal_common_proc.c
> @@ -13,6 +13,7 @@
>  #include <stdio.h>
>  #include <stdlib.h>
>  #include <string.h>
> +#include <sys/file.h>
>  #include <sys/time.h>
>  #include <sys/types.h>
>  #include <sys/socket.h>
> @@ -32,6 +33,7 @@
>  #include "eal_internal_cfg.h"
> 
>  static int mp_fd = -1;
> +static int lock_fd = -1;
>  static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
>  static char mp_dir_path[PATH_MAX]; /* The directory path for all mp
> sockets */
>  static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
> @@ -104,6 +106,46 @@ find_sync_request(const char *dst, const char
> *act_name)
>  	return r;
>  }
> 
> +static void
> +create_socket_path(const char *name, char *buf, int len)
> +{
> +	const char *prefix = eal_mp_socket_path();
> +	if (strlen(name) > 0)
> +		snprintf(buf, len, "%s_%s", prefix, name);
> +	else
> +		snprintf(buf, len, "%s", prefix);
> +}
> +
> +static void
> +create_lockfile_path(const char *name, char *buf, int len)
> +{
> +	const char *prefix = eal_mp_socket_path();
> +	if (strlen(name) > 1)
> +		snprintf(buf, len, "%slock_%s", prefix, name);
> +	else
> +		snprintf(buf, len, "%slock", prefix);
> +}
> +
> +static const char *
> +get_peer_name(const char *socket_full_path)
> +{
> +	char buf[PATH_MAX] = {0};
> +	int len;
> +
> +	/* primary process has no peer name */
> +	if (strcmp(socket_full_path, eal_mp_socket_path()) == 0)
> +		return NULL;
> +
> +	/* construct dummy socket file name - make it one character long so
> that
> +	 * we hit the code path where underscores are added
> +	 */
> +	create_socket_path("a", buf, sizeof(buf));
> +
> +	/* we want to get everything after /path/.rte_unix_, so discard 'a' */
> +	len = strlen(buf) - 1;
> +	return &socket_full_path[len];
> +}
> +
>  int
>  rte_eal_primary_proc_alive(const char *config_file_path)
>  {
> @@ -332,8 +374,29 @@ mp_handle(void *arg __rte_unused)
>  static int
>  open_socket_fd(void)
>  {
> +	char peer_name[PATH_MAX] = {0};
> +	char lockfile[PATH_MAX] = {0};
>  	struct sockaddr_un un;
> -	const char *prefix = eal_mp_socket_path();
> +
> +	if (rte_eal_process_type() == RTE_PROC_SECONDARY)
> +		snprintf(peer_name, sizeof(peer_name), "%d_%"PRIx64,
> +			 getpid(), rte_rdtsc());
> +
> +	/* try to create lockfile */
> +	create_lockfile_path(peer_name, lockfile, sizeof(lockfile));
> +
> +	lock_fd = open(lockfile, O_CREAT | O_RDWR);
> +	if (lock_fd < 0) {
> +		RTE_LOG(ERR, EAL, "failed to open '%s': %s\n", lockfile,
> +			strerror(errno));
> +		return -1;
> +	}
> +	if (flock(lock_fd, LOCK_EX | LOCK_NB)) {
> +		RTE_LOG(ERR, EAL, "failed to lock '%s': %s\n", lockfile,
> +			strerror(errno));
> +		return -1;
> +	}
> +	/* no need to downgrade to shared lock */
> 
>  	mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
>  	if (mp_fd < 0) {
> @@ -343,13 +406,11 @@ open_socket_fd(void)
> 
>  	memset(&un, 0, sizeof(un));
>  	un.sun_family = AF_UNIX;
> -	if (rte_eal_process_type() == RTE_PROC_PRIMARY)
> -		snprintf(un.sun_path, sizeof(un.sun_path), "%s", prefix);
> -	else {
> -		snprintf(un.sun_path, sizeof(un.sun_path),
> "%s_%d_%"PRIx64,
> -			 prefix, getpid(), rte_rdtsc());
> -	}
> +
> +	create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
> +
>  	unlink(un.sun_path); /* May still exist since last run */
> +
>  	if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
>  		RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
>  			un.sun_path, strerror(errno));
> @@ -361,6 +422,44 @@ open_socket_fd(void)
>  	return mp_fd;
>  }
> 
> +/* find corresponding lock file and try to lock it */
> +static int
> +socket_is_active(const char *peer_name)
> +{
> +	char lockfile[PATH_MAX] = {0};
> +	int fd, ret = -1;
> +
> +	/* construct lockfile filename */
> +	create_lockfile_path(peer_name, lockfile, sizeof(lockfile));
> +
> +	/* try to lock it */
> +	fd = open(lockfile, O_CREAT | O_RDWR);
> +	if (fd < 0) {
> +		RTE_LOG(ERR, EAL, "Cannot open '%s': %s\n", lockfile,
> +			strerror(errno));
> +		return -1;
> +	}
> +	ret = flock(fd, LOCK_EX | LOCK_NB);
> +	if (ret < 0) {
> +		if (errno == EWOULDBLOCK) {
> +			/* file is locked */
> +			ret = 1;
> +		} else {
> +			RTE_LOG(ERR, EAL, "Cannot lock '%s': %s\n", lockfile,
> +				strerror(errno));
> +			ret = -1;
> +		}
> +	} else {
> +		ret = 0;
> +		/* unlink lockfile automatically */
> +		unlink(lockfile);
> +		flock(fd, LOCK_UN);
> +	}
> +	close(fd);
> +
> +	return ret;
> +}
> +
>  static int
>  unlink_sockets(const char *filter)
>  {
> @@ -376,28 +475,33 @@ unlink_sockets(const char *filter)
>  	dir_fd = dirfd(mp_dir);
> 
>  	while ((ent = readdir(mp_dir))) {
> -		if (fnmatch(filter, ent->d_name, 0) == 0)
> +		if (fnmatch(filter, ent->d_name, 0) == 0) {
> +			const char *peer_name;
> +			char path[PATH_MAX];
> +			int ret;
> +
> +			snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
> +				 ent->d_name);
> +			peer_name = get_peer_name(path);
> +
> +			ret = socket_is_active(peer_name);
> +			if (ret < 0) {
> +				RTE_LOG(ERR, EAL, "Error getting socket
> active status\n");
> +				return -1;
> +			} else if (ret == 1) {
> +				RTE_LOG(ERR, EAL, "Socket is active (old
> secondary process still running?)\n");
> +				return -1;
> +			}
> +			RTE_LOG(DEBUG, EAL, "Removing stale socket file
> '%s'\n",
> +					ent->d_name);
>  			unlinkat(dir_fd, ent->d_name, 0);
> +		}
>  	}
> 
>  	closedir(mp_dir);
>  	return 0;
>  }
> 
> -static void
> -unlink_socket_by_path(const char *path)
> -{
> -	char *filename;
> -	char *fullpath = strdup(path);
> -
> -	if (!fullpath)
> -		return;
> -	filename = basename(fullpath);
> -	unlink_sockets(filename);
> -	free(fullpath);
> -	RTE_LOG(INFO, EAL, "Remove socket %s\n", path);
> -}
> -
>  int
>  rte_mp_channel_init(void)
>  {
> @@ -487,10 +591,25 @@ send_msg(const char *dst_path, struct
> rte_mp_msg *msg, int type)
>  		rte_errno = errno;
>  		/* Check if it caused by peer process exits */
>  		if (errno == ECONNREFUSED) {
> -			/* We don't unlink the primary's socket here */
> -			if (rte_eal_process_type() == RTE_PROC_PRIMARY)
> -				unlink_socket_by_path(dst_path);
> -			return 0;
> +			const char *peer_name = get_peer_name(dst_path);
> +			int active, ret = 0;
> +
> +			active = rte_eal_process_type() ==
> RTE_PROC_PRIMARY ?
> +					socket_is_active(peer_name) :
> +					rte_eal_primary_proc_alive(NULL);
> +
> +			if (active > 0) {
> +				RTE_LOG(ERR, EAL, "Couldn't communicate
> with active peer\n");
> +			} else if (active < 0) {
> +				RTE_LOG(ERR, EAL, "Couldn't get peer
> status\n");
> +				ret = -1;
> +			} else if (rte_eal_process_type() ==
> RTE_PROC_PRIMARY) {
> +				/* peer isn't active anymore, so unlink its
> +				 * socket.
> +				 */
> +				unlink(dst_path);
> +			}
> +			return ret;
>  		}
>  		if (errno == ENOBUFS) {
>  			RTE_LOG(ERR, EAL, "Peer cannot receive
> message %s\n",
> @@ -508,7 +627,7 @@ send_msg(const char *dst_path, struct rte_mp_msg
> *msg, int type)
>  static int
>  mp_send(struct rte_mp_msg *msg, const char *peer, int type)
>  {
> -	int ret = 0;
> +	int dir_fd, ret = 0;
>  	DIR *mp_dir;
>  	struct dirent *ent;
> 
> @@ -530,15 +649,28 @@ mp_send(struct rte_mp_msg *msg, const char
> *peer, int type)
>  		rte_errno = errno;
>  		return -1;
>  	}
> +	dir_fd = dirfd(mp_dir);
>  	while ((ent = readdir(mp_dir))) {
>  		char path[PATH_MAX];
> +		const char *peer_name;
> +		int active;
> 
>  		if (fnmatch(mp_filter, ent->d_name, 0) != 0)
>  			continue;
> 
>  		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
>  			 ent->d_name);
> -		if (send_msg(path, msg, type) < 0)
> +		peer_name = get_peer_name(path);
> +
> +		/* only send if we can expect to receive a reply, otherwise
> +		 * remove the socket.
> +		 */
> +		active = socket_is_active(peer_name);
> +		if (active < 0)
> +			ret = -1;
> +		else if (active == 0)
> +			unlinkat(dir_fd, ent->d_name, 0);
> +		else if (active > 0 && send_msg(path, msg, type) < 0)
>  			ret = -1;
>  	}
> 
> @@ -663,7 +795,7 @@ int __rte_experimental
>  rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
>  		const struct timespec *ts)
>  {
> -	int ret = 0;
> +	int dir_fd, ret = 0;
>  	DIR *mp_dir;
>  	struct dirent *ent;
>  	struct timeval now;
> @@ -698,15 +830,29 @@ rte_mp_request(struct rte_mp_msg *req, struct
> rte_mp_reply *reply,
>  		rte_errno = errno;
>  		return -1;
>  	}
> +	dir_fd = dirfd(mp_dir);
> 
>  	while ((ent = readdir(mp_dir))) {
> +		const char *peer_name;
>  		char path[PATH_MAX];
> +		int active;
> 
>  		if (fnmatch(mp_filter, ent->d_name, 0) != 0)
>  			continue;
> 
>  		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
>  			 ent->d_name);
> +		peer_name = get_peer_name(path);
> +
> +		active = socket_is_active(peer_name);
> +
> +		if (active < 0) {
> +			ret = -1;
> +			break;
> +		} else if (active == 0) {
> +			unlinkat(dir_fd, ent->d_name, 0);
> +			continue;
> +		}
> 
>  		if (mp_request_one(path, req, reply, &end))
>  			ret = -1;
> --
> 2.7.4
  
Wiles, Keith Feb. 28, 2018, 4:17 a.m. UTC | #2
> On Feb 27, 2018, at 8:35 AM, Anatoly Burakov <anatoly.burakov@intel.com> wrote:

> 

> Previously, IPC would remove sockets it considers to be "inactive"

> based on whether they have responded. Change this to create lock

> files in addition to socket files, so that we can determine if

> secondary process is active before attempting to communicate with

> it. That way, we can distinguish secondaries that are alive but

> are not responding, from those that have already died.

> 

> Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>

> ---

> 

> Notes:

>    v3: no changes

> 

>    v2: no changes

> 

> lib/librte_eal/common/eal_common_proc.c | 204 +++++++++++++++++++++++++++-----

> 1 file changed, 175 insertions(+), 29 deletions(-)

> 

> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c

> index a6e24e6..7c87971 100644

> --- a/lib/librte_eal/common/eal_common_proc.c

> +++ b/lib/librte_eal/common/eal_common_proc.c

> @@ -13,6 +13,7 @@

> #include <stdio.h>

> #include <stdlib.h>

> #include <string.h>

> +#include <sys/file.h>

> #include <sys/time.h>

> #include <sys/types.h>

> #include <sys/socket.h>

> @@ -32,6 +33,7 @@

> #include "eal_internal_cfg.h"

> 

> static int mp_fd = -1;

> +static int lock_fd = -1;


I did not find where lock_fd is closed in this code, should it be closed at some point?

> static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */

> static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */

> static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;

> @@ -104,6 +106,46 @@ find_sync_request(const char *dst, const char *act_name)

> 	return r;

> }

> 

> +static void

> +create_socket_path(const char *name, char *buf, int len)

> +{

> +	const char *prefix = eal_mp_socket_path();


I thought we needed a space here after the variables, no?

> +	if (strlen(name) > 0)

> +		snprintf(buf, len, "%s_%s", prefix, name);

> +	else

> +		snprintf(buf, len, "%s", prefix);

> +}

> +

> +static void

> +create_lockfile_path(const char *name, char *buf, int len)

> +{

> +	const char *prefix = eal_mp_socket_path();


Same here
> +	if (strlen(name) > 1)

> +		snprintf(buf, len, "%slock_%s", prefix, name);

> +	else

> +		snprintf(buf, len, "%slock", prefix);

> +}

> +

> +static const char *

> +get_peer_name(const char *socket_full_path)

> +{

> +	char buf[PATH_MAX] = {0};

> +	int len;

> +

> +	/* primary process has no peer name */

> +	if (strcmp(socket_full_path, eal_mp_socket_path()) == 0)

> +		return NULL;

> +

> +	/* construct dummy socket file name - make it one character long so that

> +	 * we hit the code path where underscores are added

> +	 */

> +	create_socket_path("a", buf, sizeof(buf));

> +

> +	/* we want to get everything after /path/.rte_unix_, so discard 'a' */

> +	len = strlen(buf) - 1;

> +	return &socket_full_path[len];

> +}

> +

> int

> rte_eal_primary_proc_alive(const char *config_file_path)

> {

> @@ -332,8 +374,29 @@ mp_handle(void *arg __rte_unused)

> static int

> open_socket_fd(void)

> {

> +	char peer_name[PATH_MAX] = {0};

> +	char lockfile[PATH_MAX] = {0};

> 	struct sockaddr_un un;

> -	const char *prefix = eal_mp_socket_path();

> +

> +	if (rte_eal_process_type() == RTE_PROC_SECONDARY)

> +		snprintf(peer_name, sizeof(peer_name), "%d_%"PRIx64,

> +			 getpid(), rte_rdtsc());

> +

> +	/* try to create lockfile */

> +	create_lockfile_path(peer_name, lockfile, sizeof(lockfile));

> +

> +	lock_fd = open(lockfile, O_CREAT | O_RDWR);

> +	if (lock_fd < 0) {

> +		RTE_LOG(ERR, EAL, "failed to open '%s': %s\n", lockfile,

> +			strerror(errno));

> +		return -1;

> +	}

> +	if (flock(lock_fd, LOCK_EX | LOCK_NB)) {

> +		RTE_LOG(ERR, EAL, "failed to lock '%s': %s\n", lockfile,

> +			strerror(errno));


Did not close the lock_fd here.

> +		return -1;

> +	}

> +	/* no need to downgrade to shared lock */

> 

> 	mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);

> 	if (mp_fd < 0) {

> @@ -343,13 +406,11 @@ open_socket_fd(void)

> 

> 	memset(&un, 0, sizeof(un));

> 	un.sun_family = AF_UNIX;

> -	if (rte_eal_process_type() == RTE_PROC_PRIMARY)

> -		snprintf(un.sun_path, sizeof(un.sun_path), "%s", prefix);

> -	else {

> -		snprintf(un.sun_path, sizeof(un.sun_path), "%s_%d_%"PRIx64,

> -			 prefix, getpid(), rte_rdtsc());

> -	}

> +

> +	create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));

> +

> 	unlink(un.sun_path); /* May still exist since last run */

> +

> 	if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {

> 		RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",

> 			un.sun_path, strerror(errno));

> @@ -361,6 +422,44 @@ open_socket_fd(void)

> 	return mp_fd;

> }

> 

> +/* find corresponding lock file and try to lock it */

> +static int

> +socket_is_active(const char *peer_name)

> +{

> +	char lockfile[PATH_MAX] = {0};

> +	int fd, ret = -1;

> +

> +	/* construct lockfile filename */

> +	create_lockfile_path(peer_name, lockfile, sizeof(lockfile));

> +

> +	/* try to lock it */

> +	fd = open(lockfile, O_CREAT | O_RDWR);

> +	if (fd < 0) {

> +		RTE_LOG(ERR, EAL, "Cannot open '%s': %s\n", lockfile,

> +			strerror(errno));

> +		return -1;

> +	}

> +	ret = flock(fd, LOCK_EX | LOCK_NB);

> +	if (ret < 0) {

> +		if (errno == EWOULDBLOCK) {

> +			/* file is locked */

> +			ret = 1;

> +		} else {

> +			RTE_LOG(ERR, EAL, "Cannot lock '%s': %s\n", lockfile,

> +				strerror(errno));

> +			ret = -1;

> +		}

> +	} else {

> +		ret = 0;

> +		/* unlink lockfile automatically */

> +		unlink(lockfile);

> +		flock(fd, LOCK_UN);

> +	}

> +	close(fd);

> +

> +	return ret;

> +}

> +

> static int

> unlink_sockets(const char *filter)

> {

> @@ -376,28 +475,33 @@ unlink_sockets(const char *filter)

> 	dir_fd = dirfd(mp_dir);

> 

> 	while ((ent = readdir(mp_dir))) {

> -		if (fnmatch(filter, ent->d_name, 0) == 0)

> +		if (fnmatch(filter, ent->d_name, 0) == 0) {

> +			const char *peer_name;

> +			char path[PATH_MAX];

> +			int ret;

> +

> +			snprintf(path, sizeof(path), "%s/%s", mp_dir_path,

> +				 ent->d_name);

> +			peer_name = get_peer_name(path);

> +

> +			ret = socket_is_active(peer_name);

> +			if (ret < 0) {

> +				RTE_LOG(ERR, EAL, "Error getting socket active status\n”);


No close for mp_dir?
> +				return -1;

> +			} else if (ret == 1) {

> +				RTE_LOG(ERR, EAL, "Socket is active (old secondary process still running?)\n”);


And here
> +				return -1;

> +			}

> +			RTE_LOG(DEBUG, EAL, "Removing stale socket file '%s'\n",

> +					ent->d_name);

> 			unlinkat(dir_fd, ent->d_name, 0);


Does unlinkat() close dir_fd?

> +		}

> 	}

> 

> 	closedir(mp_dir);

> 	return 0;

> }

> 

> -static void

> -unlink_socket_by_path(const char *path)

> -{

> -	char *filename;

> -	char *fullpath = strdup(path);

> -

> -	if (!fullpath)

> -		return;

> -	filename = basename(fullpath);

> -	unlink_sockets(filename);

> -	free(fullpath);

> -	RTE_LOG(INFO, EAL, "Remove socket %s\n", path);

> -}

> -

> int

> rte_mp_channel_init(void)

> {

> @@ -487,10 +591,25 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)

> 		rte_errno = errno;

> 		/* Check if it caused by peer process exits */

> 		if (errno == ECONNREFUSED) {

> -			/* We don't unlink the primary's socket here */

> -			if (rte_eal_process_type() == RTE_PROC_PRIMARY)

> -				unlink_socket_by_path(dst_path);

> -			return 0;

> +			const char *peer_name = get_peer_name(dst_path);

> +			int active, ret = 0;


Here we have variables in the middle of a block again.

> +

> +			active = rte_eal_process_type() == RTE_PROC_PRIMARY ?

> +					socket_is_active(peer_name) :

> +					rte_eal_primary_proc_alive(NULL);

> +

> +			if (active > 0) {

> +				RTE_LOG(ERR, EAL, "Couldn't communicate with active peer\n");

> +			} else if (active < 0) {

> +				RTE_LOG(ERR, EAL, "Couldn't get peer status\n");

> +				ret = -1;

> +			} else if (rte_eal_process_type() == RTE_PROC_PRIMARY) {

> +				/* peer isn't active anymore, so unlink its

> +				 * socket.

> +				 */

> +				unlink(dst_path);

> +			}

> +			return ret;

> 		}

> 		if (errno == ENOBUFS) {

> 			RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",

> @@ -508,7 +627,7 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)

> static int

> mp_send(struct rte_mp_msg *msg, const char *peer, int type)

> {

> -	int ret = 0;

> +	int dir_fd, ret = 0;

> 	DIR *mp_dir;

> 	struct dirent *ent;

> 

> @@ -530,15 +649,28 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type)

> 		rte_errno = errno;

> 		return -1;

> 	}

> +	dir_fd = dirfd(mp_dir);


when is dir_fd closed?

> 	while ((ent = readdir(mp_dir))) {

> 		char path[PATH_MAX];

> +		const char *peer_name;

> +		int active;

> 

> 		if (fnmatch(mp_filter, ent->d_name, 0) != 0)

> 			continue;

> 

> 		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,

> 			 ent->d_name);

> -		if (send_msg(path, msg, type) < 0)

> +		peer_name = get_peer_name(path);

> +

> +		/* only send if we can expect to receive a reply, otherwise

> +		 * remove the socket.

> +		 */

> +		active = socket_is_active(peer_name);

> +		if (active < 0)

> +			ret = -1;

> +		else if (active == 0)

> +			unlinkat(dir_fd, ent->d_name, 0);

> +		else if (active > 0 && send_msg(path, msg, type) < 0)

> 			ret = -1;

> 	}

> 

> @@ -663,7 +795,7 @@ int __rte_experimental

> rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,

> 		const struct timespec *ts)

> {

> -	int ret = 0;

> +	int dir_fd, ret = 0;

> 	DIR *mp_dir;

> 	struct dirent *ent;

> 	struct timeval now;

> @@ -698,15 +830,29 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,

> 		rte_errno = errno;

> 		return -1;

> 	}

> +	dir_fd = dirfd(mp_dir);


When is dir_fd closed?

> 

> 	while ((ent = readdir(mp_dir))) {

> +		const char *peer_name;

> 		char path[PATH_MAX];

> +		int active;

> 

> 		if (fnmatch(mp_filter, ent->d_name, 0) != 0)

> 			continue;

> 

> 		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,

> 			 ent->d_name);

> +		peer_name = get_peer_name(path);

> +

> +		active = socket_is_active(peer_name);

> +

> +		if (active < 0) {

> +			ret = -1;

> +			break;

> +		} else if (active == 0) {

> +			unlinkat(dir_fd, ent->d_name, 0);

> +			continue;

> +		}

> 

> 		if (mp_request_one(path, req, reply, &end))

> 			ret = -1;

> -- 

> 2.7.4


Regards,
Keith
  
Burakov, Anatoly Feb. 28, 2018, 10:15 a.m. UTC | #3
On 28-Feb-18 1:26 AM, Tan, Jianfeng wrote:
> 
> 
>> -----Original Message-----
>> From: Burakov, Anatoly
>> Sent: Tuesday, February 27, 2018 10:36 PM
>> To: dev@dpdk.org
>> Cc: Tan, Jianfeng
>> Subject: [PATCH v3 3/5] eal: use locks to determine if secondary process is
>> active
>>
>> Previously, IPC would remove sockets it considers to be "inactive"
>> based on whether they have responded.
> 
> To be more precise, it was not depending on if the other side responses or not; it was depending on sendmsg return error, ECONNREFUSED.
> 
>> Change this to create lock
>> files in addition to socket files, so that we can determine if
>> secondary process is active before attempting to communicate with
>> it. That way, we can distinguish secondaries that are alive but
>> are not responding, from those that have already died.
> 
> I think, by the old way, we can also "distinguish secondaries that are alive but are not responding, from those that have already died", can't we?
> 
> Thanks,
> Jianfeng
> 

I rechecked, and you're right. For some reason i thought that nb_sent 
gets incremented even if there was ECONNREFUSED error. It doesn't, so 
the effect is the same. I'll drop this patch so (well, i'll keep the 
naming stuff, as it makes things a bit easier).
  
Burakov, Anatoly Feb. 28, 2018, 10:17 a.m. UTC | #4
On 28-Feb-18 4:17 AM, Wiles, Keith wrote:
> 
> 
>> On Feb 27, 2018, at 8:35 AM, Anatoly Burakov <anatoly.burakov@intel.com> wrote:
>>
>> Previously, IPC would remove sockets it considers to be "inactive"
>> based on whether they have responded. Change this to create lock
>> files in addition to socket files, so that we can determine if
>> secondary process is active before attempting to communicate with
>> it. That way, we can distinguish secondaries that are alive but
>> are not responding, from those that have already died.
>>
>> Signed-off-by: Anatoly Burakov <anatoly.burakov@intel.com>
>> ---
>>
>> Notes:
>>     v3: no changes
>>
>>     v2: no changes
>>
>> lib/librte_eal/common/eal_common_proc.c | 204 +++++++++++++++++++++++++++-----
>> 1 file changed, 175 insertions(+), 29 deletions(-)
>>
>> diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
>> index a6e24e6..7c87971 100644
>> --- a/lib/librte_eal/common/eal_common_proc.c
>> +++ b/lib/librte_eal/common/eal_common_proc.c
>> @@ -13,6 +13,7 @@
>> #include <stdio.h>
>> #include <stdlib.h>
>> #include <string.h>
>> +#include <sys/file.h>
>> #include <sys/time.h>
>> #include <sys/types.h>
>> #include <sys/socket.h>
>> @@ -32,6 +33,7 @@
>> #include "eal_internal_cfg.h"
>>
>> static int mp_fd = -1;
>> +static int lock_fd = -1;
> 
> I did not find where lock_fd is closed in this code, should it be closed at some point?
> 
>> static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
>> static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
>> static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
>> @@ -104,6 +106,46 @@ find_sync_request(const char *dst, const char *act_name)
>> 	return r;
>> }
>>
>> +static void
>> +create_socket_path(const char *name, char *buf, int len)
>> +{
>> +	const char *prefix = eal_mp_socket_path();
> 
> I thought we needed a space here after the variables, no?
> 
>> +	if (strlen(name) > 0)
>> +		snprintf(buf, len, "%s_%s", prefix, name);
>> +	else
>> +		snprintf(buf, len, "%s", prefix);
>> +}
>> +
>> +static void
>> +create_lockfile_path(const char *name, char *buf, int len)
>> +{
>> +	const char *prefix = eal_mp_socket_path();
> 
> Same here
>> +	if (strlen(name) > 1)
>> +		snprintf(buf, len, "%slock_%s", prefix, name);
>> +	else
>> +		snprintf(buf, len, "%slock", prefix);
>> +}
>> +
>> +static const char *
>> +get_peer_name(const char *socket_full_path)
>> +{
>> +	char buf[PATH_MAX] = {0};
>> +	int len;
>> +
>> +	/* primary process has no peer name */
>> +	if (strcmp(socket_full_path, eal_mp_socket_path()) == 0)
>> +		return NULL;
>> +
>> +	/* construct dummy socket file name - make it one character long so that
>> +	 * we hit the code path where underscores are added
>> +	 */
>> +	create_socket_path("a", buf, sizeof(buf));
>> +
>> +	/* we want to get everything after /path/.rte_unix_, so discard 'a' */
>> +	len = strlen(buf) - 1;
>> +	return &socket_full_path[len];
>> +}
>> +
>> int
>> rte_eal_primary_proc_alive(const char *config_file_path)
>> {
>> @@ -332,8 +374,29 @@ mp_handle(void *arg __rte_unused)
>> static int
>> open_socket_fd(void)
>> {
>> +	char peer_name[PATH_MAX] = {0};
>> +	char lockfile[PATH_MAX] = {0};
>> 	struct sockaddr_un un;
>> -	const char *prefix = eal_mp_socket_path();
>> +
>> +	if (rte_eal_process_type() == RTE_PROC_SECONDARY)
>> +		snprintf(peer_name, sizeof(peer_name), "%d_%"PRIx64,
>> +			 getpid(), rte_rdtsc());
>> +
>> +	/* try to create lockfile */
>> +	create_lockfile_path(peer_name, lockfile, sizeof(lockfile));
>> +
>> +	lock_fd = open(lockfile, O_CREAT | O_RDWR);
>> +	if (lock_fd < 0) {
>> +		RTE_LOG(ERR, EAL, "failed to open '%s': %s\n", lockfile,
>> +			strerror(errno));
>> +		return -1;
>> +	}
>> +	if (flock(lock_fd, LOCK_EX | LOCK_NB)) {
>> +		RTE_LOG(ERR, EAL, "failed to lock '%s': %s\n", lockfile,
>> +			strerror(errno));
> 
> Did not close the lock_fd here.
> 
>> +		return -1;
>> +	}
>> +	/* no need to downgrade to shared lock */
>>
>> 	mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
>> 	if (mp_fd < 0) {
>> @@ -343,13 +406,11 @@ open_socket_fd(void)
>>
>> 	memset(&un, 0, sizeof(un));
>> 	un.sun_family = AF_UNIX;
>> -	if (rte_eal_process_type() == RTE_PROC_PRIMARY)
>> -		snprintf(un.sun_path, sizeof(un.sun_path), "%s", prefix);
>> -	else {
>> -		snprintf(un.sun_path, sizeof(un.sun_path), "%s_%d_%"PRIx64,
>> -			 prefix, getpid(), rte_rdtsc());
>> -	}
>> +
>> +	create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
>> +
>> 	unlink(un.sun_path); /* May still exist since last run */
>> +
>> 	if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
>> 		RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
>> 			un.sun_path, strerror(errno));
>> @@ -361,6 +422,44 @@ open_socket_fd(void)
>> 	return mp_fd;
>> }
>>
>> +/* find corresponding lock file and try to lock it */
>> +static int
>> +socket_is_active(const char *peer_name)
>> +{
>> +	char lockfile[PATH_MAX] = {0};
>> +	int fd, ret = -1;
>> +
>> +	/* construct lockfile filename */
>> +	create_lockfile_path(peer_name, lockfile, sizeof(lockfile));
>> +
>> +	/* try to lock it */
>> +	fd = open(lockfile, O_CREAT | O_RDWR);
>> +	if (fd < 0) {
>> +		RTE_LOG(ERR, EAL, "Cannot open '%s': %s\n", lockfile,
>> +			strerror(errno));
>> +		return -1;
>> +	}
>> +	ret = flock(fd, LOCK_EX | LOCK_NB);
>> +	if (ret < 0) {
>> +		if (errno == EWOULDBLOCK) {
>> +			/* file is locked */
>> +			ret = 1;
>> +		} else {
>> +			RTE_LOG(ERR, EAL, "Cannot lock '%s': %s\n", lockfile,
>> +				strerror(errno));
>> +			ret = -1;
>> +		}
>> +	} else {
>> +		ret = 0;
>> +		/* unlink lockfile automatically */
>> +		unlink(lockfile);
>> +		flock(fd, LOCK_UN);
>> +	}
>> +	close(fd);
>> +
>> +	return ret;
>> +}
>> +
>> static int
>> unlink_sockets(const char *filter)
>> {
>> @@ -376,28 +475,33 @@ unlink_sockets(const char *filter)
>> 	dir_fd = dirfd(mp_dir);
>>
>> 	while ((ent = readdir(mp_dir))) {
>> -		if (fnmatch(filter, ent->d_name, 0) == 0)
>> +		if (fnmatch(filter, ent->d_name, 0) == 0) {
>> +			const char *peer_name;
>> +			char path[PATH_MAX];
>> +			int ret;
>> +
>> +			snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
>> +				 ent->d_name);
>> +			peer_name = get_peer_name(path);
>> +
>> +			ret = socket_is_active(peer_name);
>> +			if (ret < 0) {
>> +				RTE_LOG(ERR, EAL, "Error getting socket active status\n”);
> 
> No close for mp_dir?
>> +				return -1;
>> +			} else if (ret == 1) {
>> +				RTE_LOG(ERR, EAL, "Socket is active (old secondary process still running?)\n”);
> 
> And here
>> +				return -1;
>> +			}
>> +			RTE_LOG(DEBUG, EAL, "Removing stale socket file '%s'\n",
>> +					ent->d_name);
>> 			unlinkat(dir_fd, ent->d_name, 0);
> 
> Does unlinkat() close dir_fd?
> 
>> +		}
>> 	}
>>
>> 	closedir(mp_dir);
>> 	return 0;
>> }
>>
>> -static void
>> -unlink_socket_by_path(const char *path)
>> -{
>> -	char *filename;
>> -	char *fullpath = strdup(path);
>> -
>> -	if (!fullpath)
>> -		return;
>> -	filename = basename(fullpath);
>> -	unlink_sockets(filename);
>> -	free(fullpath);
>> -	RTE_LOG(INFO, EAL, "Remove socket %s\n", path);
>> -}
>> -
>> int
>> rte_mp_channel_init(void)
>> {
>> @@ -487,10 +591,25 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
>> 		rte_errno = errno;
>> 		/* Check if it caused by peer process exits */
>> 		if (errno == ECONNREFUSED) {
>> -			/* We don't unlink the primary's socket here */
>> -			if (rte_eal_process_type() == RTE_PROC_PRIMARY)
>> -				unlink_socket_by_path(dst_path);
>> -			return 0;
>> +			const char *peer_name = get_peer_name(dst_path);
>> +			int active, ret = 0;
> 
> Here we have variables in the middle of a block again.
> 
>> +
>> +			active = rte_eal_process_type() == RTE_PROC_PRIMARY ?
>> +					socket_is_active(peer_name) :
>> +					rte_eal_primary_proc_alive(NULL);
>> +
>> +			if (active > 0) {
>> +				RTE_LOG(ERR, EAL, "Couldn't communicate with active peer\n");
>> +			} else if (active < 0) {
>> +				RTE_LOG(ERR, EAL, "Couldn't get peer status\n");
>> +				ret = -1;
>> +			} else if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
>> +				/* peer isn't active anymore, so unlink its
>> +				 * socket.
>> +				 */
>> +				unlink(dst_path);
>> +			}
>> +			return ret;
>> 		}
>> 		if (errno == ENOBUFS) {
>> 			RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
>> @@ -508,7 +627,7 @@ send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
>> static int
>> mp_send(struct rte_mp_msg *msg, const char *peer, int type)
>> {
>> -	int ret = 0;
>> +	int dir_fd, ret = 0;
>> 	DIR *mp_dir;
>> 	struct dirent *ent;
>>
>> @@ -530,15 +649,28 @@ mp_send(struct rte_mp_msg *msg, const char *peer, int type)
>> 		rte_errno = errno;
>> 		return -1;
>> 	}
>> +	dir_fd = dirfd(mp_dir);
> 
> when is dir_fd closed?
> 
>> 	while ((ent = readdir(mp_dir))) {
>> 		char path[PATH_MAX];
>> +		const char *peer_name;
>> +		int active;
>>
>> 		if (fnmatch(mp_filter, ent->d_name, 0) != 0)
>> 			continue;
>>
>> 		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
>> 			 ent->d_name);
>> -		if (send_msg(path, msg, type) < 0)
>> +		peer_name = get_peer_name(path);
>> +
>> +		/* only send if we can expect to receive a reply, otherwise
>> +		 * remove the socket.
>> +		 */
>> +		active = socket_is_active(peer_name);
>> +		if (active < 0)
>> +			ret = -1;
>> +		else if (active == 0)
>> +			unlinkat(dir_fd, ent->d_name, 0);
>> +		else if (active > 0 && send_msg(path, msg, type) < 0)
>> 			ret = -1;
>> 	}
>>
>> @@ -663,7 +795,7 @@ int __rte_experimental
>> rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
>> 		const struct timespec *ts)
>> {
>> -	int ret = 0;
>> +	int dir_fd, ret = 0;
>> 	DIR *mp_dir;
>> 	struct dirent *ent;
>> 	struct timeval now;
>> @@ -698,15 +830,29 @@ rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
>> 		rte_errno = errno;
>> 		return -1;
>> 	}
>> +	dir_fd = dirfd(mp_dir);
> 
> When is dir_fd closed?
> 
>>
>> 	while ((ent = readdir(mp_dir))) {
>> +		const char *peer_name;
>> 		char path[PATH_MAX];
>> +		int active;
>>
>> 		if (fnmatch(mp_filter, ent->d_name, 0) != 0)
>> 			continue;
>>
>> 		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
>> 			 ent->d_name);
>> +		peer_name = get_peer_name(path);
>> +
>> +		active = socket_is_active(peer_name);
>> +
>> +		if (active < 0) {
>> +			ret = -1;
>> +			break;
>> +		} else if (active == 0) {
>> +			unlinkat(dir_fd, ent->d_name, 0);
>> +			continue;
>> +		}
>>
>> 		if (mp_request_one(path, req, reply, &end))
>> 			ret = -1;
>> -- 
>> 2.7.4
> 
> Regards,
> Keith
> 

Thanks for your comments, but as per Jianfeng's comments, most of this 
patch will be dropped.

Some commends though: dir_fd will be closed automatically because we 
call closedir() on the mp_dir, as per closedir manpage [1]. The lock_fd 
wasn't closed because the process was meant to hold onto the lock for 
the lifetime of the process, but since this "feature" will be dropped, 
it doesn't matter.

I'll make sure we close all handles everywhere for v4.
  

Patch

diff --git a/lib/librte_eal/common/eal_common_proc.c b/lib/librte_eal/common/eal_common_proc.c
index a6e24e6..7c87971 100644
--- a/lib/librte_eal/common/eal_common_proc.c
+++ b/lib/librte_eal/common/eal_common_proc.c
@@ -13,6 +13,7 @@ 
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <sys/file.h>
 #include <sys/time.h>
 #include <sys/types.h>
 #include <sys/socket.h>
@@ -32,6 +33,7 @@ 
 #include "eal_internal_cfg.h"
 
 static int mp_fd = -1;
+static int lock_fd = -1;
 static char mp_filter[PATH_MAX];   /* Filter for secondary process sockets */
 static char mp_dir_path[PATH_MAX]; /* The directory path for all mp sockets */
 static pthread_mutex_t mp_mutex_action = PTHREAD_MUTEX_INITIALIZER;
@@ -104,6 +106,46 @@  find_sync_request(const char *dst, const char *act_name)
 	return r;
 }
 
+static void
+create_socket_path(const char *name, char *buf, int len)
+{
+	const char *prefix = eal_mp_socket_path();
+	if (strlen(name) > 0)
+		snprintf(buf, len, "%s_%s", prefix, name);
+	else
+		snprintf(buf, len, "%s", prefix);
+}
+
+static void
+create_lockfile_path(const char *name, char *buf, int len)
+{
+	const char *prefix = eal_mp_socket_path();
+	if (strlen(name) > 1)
+		snprintf(buf, len, "%slock_%s", prefix, name);
+	else
+		snprintf(buf, len, "%slock", prefix);
+}
+
+static const char *
+get_peer_name(const char *socket_full_path)
+{
+	char buf[PATH_MAX] = {0};
+	int len;
+
+	/* primary process has no peer name */
+	if (strcmp(socket_full_path, eal_mp_socket_path()) == 0)
+		return NULL;
+
+	/* construct dummy socket file name - make it one character long so that
+	 * we hit the code path where underscores are added
+	 */
+	create_socket_path("a", buf, sizeof(buf));
+
+	/* we want to get everything after /path/.rte_unix_, so discard 'a' */
+	len = strlen(buf) - 1;
+	return &socket_full_path[len];
+}
+
 int
 rte_eal_primary_proc_alive(const char *config_file_path)
 {
@@ -332,8 +374,29 @@  mp_handle(void *arg __rte_unused)
 static int
 open_socket_fd(void)
 {
+	char peer_name[PATH_MAX] = {0};
+	char lockfile[PATH_MAX] = {0};
 	struct sockaddr_un un;
-	const char *prefix = eal_mp_socket_path();
+
+	if (rte_eal_process_type() == RTE_PROC_SECONDARY)
+		snprintf(peer_name, sizeof(peer_name), "%d_%"PRIx64,
+			 getpid(), rte_rdtsc());
+
+	/* try to create lockfile */
+	create_lockfile_path(peer_name, lockfile, sizeof(lockfile));
+
+	lock_fd = open(lockfile, O_CREAT | O_RDWR);
+	if (lock_fd < 0) {
+		RTE_LOG(ERR, EAL, "failed to open '%s': %s\n", lockfile,
+			strerror(errno));
+		return -1;
+	}
+	if (flock(lock_fd, LOCK_EX | LOCK_NB)) {
+		RTE_LOG(ERR, EAL, "failed to lock '%s': %s\n", lockfile,
+			strerror(errno));
+		return -1;
+	}
+	/* no need to downgrade to shared lock */
 
 	mp_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
 	if (mp_fd < 0) {
@@ -343,13 +406,11 @@  open_socket_fd(void)
 
 	memset(&un, 0, sizeof(un));
 	un.sun_family = AF_UNIX;
-	if (rte_eal_process_type() == RTE_PROC_PRIMARY)
-		snprintf(un.sun_path, sizeof(un.sun_path), "%s", prefix);
-	else {
-		snprintf(un.sun_path, sizeof(un.sun_path), "%s_%d_%"PRIx64,
-			 prefix, getpid(), rte_rdtsc());
-	}
+
+	create_socket_path(peer_name, un.sun_path, sizeof(un.sun_path));
+
 	unlink(un.sun_path); /* May still exist since last run */
+
 	if (bind(mp_fd, (struct sockaddr *)&un, sizeof(un)) < 0) {
 		RTE_LOG(ERR, EAL, "failed to bind %s: %s\n",
 			un.sun_path, strerror(errno));
@@ -361,6 +422,44 @@  open_socket_fd(void)
 	return mp_fd;
 }
 
+/* find corresponding lock file and try to lock it */
+static int
+socket_is_active(const char *peer_name)
+{
+	char lockfile[PATH_MAX] = {0};
+	int fd, ret = -1;
+
+	/* construct lockfile filename */
+	create_lockfile_path(peer_name, lockfile, sizeof(lockfile));
+
+	/* try to lock it */
+	fd = open(lockfile, O_CREAT | O_RDWR);
+	if (fd < 0) {
+		RTE_LOG(ERR, EAL, "Cannot open '%s': %s\n", lockfile,
+			strerror(errno));
+		return -1;
+	}
+	ret = flock(fd, LOCK_EX | LOCK_NB);
+	if (ret < 0) {
+		if (errno == EWOULDBLOCK) {
+			/* file is locked */
+			ret = 1;
+		} else {
+			RTE_LOG(ERR, EAL, "Cannot lock '%s': %s\n", lockfile,
+				strerror(errno));
+			ret = -1;
+		}
+	} else {
+		ret = 0;
+		/* unlink lockfile automatically */
+		unlink(lockfile);
+		flock(fd, LOCK_UN);
+	}
+	close(fd);
+
+	return ret;
+}
+
 static int
 unlink_sockets(const char *filter)
 {
@@ -376,28 +475,33 @@  unlink_sockets(const char *filter)
 	dir_fd = dirfd(mp_dir);
 
 	while ((ent = readdir(mp_dir))) {
-		if (fnmatch(filter, ent->d_name, 0) == 0)
+		if (fnmatch(filter, ent->d_name, 0) == 0) {
+			const char *peer_name;
+			char path[PATH_MAX];
+			int ret;
+
+			snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
+				 ent->d_name);
+			peer_name = get_peer_name(path);
+
+			ret = socket_is_active(peer_name);
+			if (ret < 0) {
+				RTE_LOG(ERR, EAL, "Error getting socket active status\n");
+				return -1;
+			} else if (ret == 1) {
+				RTE_LOG(ERR, EAL, "Socket is active (old secondary process still running?)\n");
+				return -1;
+			}
+			RTE_LOG(DEBUG, EAL, "Removing stale socket file '%s'\n",
+					ent->d_name);
 			unlinkat(dir_fd, ent->d_name, 0);
+		}
 	}
 
 	closedir(mp_dir);
 	return 0;
 }
 
-static void
-unlink_socket_by_path(const char *path)
-{
-	char *filename;
-	char *fullpath = strdup(path);
-
-	if (!fullpath)
-		return;
-	filename = basename(fullpath);
-	unlink_sockets(filename);
-	free(fullpath);
-	RTE_LOG(INFO, EAL, "Remove socket %s\n", path);
-}
-
 int
 rte_mp_channel_init(void)
 {
@@ -487,10 +591,25 @@  send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
 		rte_errno = errno;
 		/* Check if it caused by peer process exits */
 		if (errno == ECONNREFUSED) {
-			/* We don't unlink the primary's socket here */
-			if (rte_eal_process_type() == RTE_PROC_PRIMARY)
-				unlink_socket_by_path(dst_path);
-			return 0;
+			const char *peer_name = get_peer_name(dst_path);
+			int active, ret = 0;
+
+			active = rte_eal_process_type() == RTE_PROC_PRIMARY ?
+					socket_is_active(peer_name) :
+					rte_eal_primary_proc_alive(NULL);
+
+			if (active > 0) {
+				RTE_LOG(ERR, EAL, "Couldn't communicate with active peer\n");
+			} else if (active < 0) {
+				RTE_LOG(ERR, EAL, "Couldn't get peer status\n");
+				ret = -1;
+			} else if (rte_eal_process_type() == RTE_PROC_PRIMARY) {
+				/* peer isn't active anymore, so unlink its
+				 * socket.
+				 */
+				unlink(dst_path);
+			}
+			return ret;
 		}
 		if (errno == ENOBUFS) {
 			RTE_LOG(ERR, EAL, "Peer cannot receive message %s\n",
@@ -508,7 +627,7 @@  send_msg(const char *dst_path, struct rte_mp_msg *msg, int type)
 static int
 mp_send(struct rte_mp_msg *msg, const char *peer, int type)
 {
-	int ret = 0;
+	int dir_fd, ret = 0;
 	DIR *mp_dir;
 	struct dirent *ent;
 
@@ -530,15 +649,28 @@  mp_send(struct rte_mp_msg *msg, const char *peer, int type)
 		rte_errno = errno;
 		return -1;
 	}
+	dir_fd = dirfd(mp_dir);
 	while ((ent = readdir(mp_dir))) {
 		char path[PATH_MAX];
+		const char *peer_name;
+		int active;
 
 		if (fnmatch(mp_filter, ent->d_name, 0) != 0)
 			continue;
 
 		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
 			 ent->d_name);
-		if (send_msg(path, msg, type) < 0)
+		peer_name = get_peer_name(path);
+
+		/* only send if we can expect to receive a reply, otherwise
+		 * remove the socket.
+		 */
+		active = socket_is_active(peer_name);
+		if (active < 0)
+			ret = -1;
+		else if (active == 0)
+			unlinkat(dir_fd, ent->d_name, 0);
+		else if (active > 0 && send_msg(path, msg, type) < 0)
 			ret = -1;
 	}
 
@@ -663,7 +795,7 @@  int __rte_experimental
 rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
 		const struct timespec *ts)
 {
-	int ret = 0;
+	int dir_fd, ret = 0;
 	DIR *mp_dir;
 	struct dirent *ent;
 	struct timeval now;
@@ -698,15 +830,29 @@  rte_mp_request(struct rte_mp_msg *req, struct rte_mp_reply *reply,
 		rte_errno = errno;
 		return -1;
 	}
+	dir_fd = dirfd(mp_dir);
 
 	while ((ent = readdir(mp_dir))) {
+		const char *peer_name;
 		char path[PATH_MAX];
+		int active;
 
 		if (fnmatch(mp_filter, ent->d_name, 0) != 0)
 			continue;
 
 		snprintf(path, sizeof(path), "%s/%s", mp_dir_path,
 			 ent->d_name);
+		peer_name = get_peer_name(path);
+
+		active = socket_is_active(peer_name);
+
+		if (active < 0) {
+			ret = -1;
+			break;
+		} else if (active == 0) {
+			unlinkat(dir_fd, ent->d_name, 0);
+			continue;
+		}
 
 		if (mp_request_one(path, req, reply, &end))
 			ret = -1;