From patchwork Fri Nov 21 17:42:03 2014 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "De Lara Guarch, Pablo" X-Patchwork-Id: 1446 Return-Path: X-Original-To: patchwork@dpdk.org Delivered-To: patchwork@dpdk.org Received: from [92.243.14.124] (localhost [IPv6:::1]) by dpdk.org (Postfix) with ESMTP id 6BEC07FC1; Fri, 21 Nov 2014 18:31:58 +0100 (CET) Received: from mga11.intel.com (mga11.intel.com [192.55.52.93]) by dpdk.org (Postfix) with ESMTP id 3BD457F74 for ; Fri, 21 Nov 2014 18:31:46 +0100 (CET) Received: from fmsmga001.fm.intel.com ([10.253.24.23]) by fmsmga102.fm.intel.com with ESMTP; 21 Nov 2014 09:42:19 -0800 X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.07,432,1413270000"; d="scan'208";a="626304564" Received: from irvmail001.ir.intel.com ([163.33.26.43]) by fmsmga001.fm.intel.com with ESMTP; 21 Nov 2014 09:42:16 -0800 Received: from sivswdev02.ir.intel.com (sivswdev02.ir.intel.com [10.237.217.46]) by irvmail001.ir.intel.com (8.14.3/8.13.6/MailSET/Hub) with ESMTP id sALHgF5r001841; Fri, 21 Nov 2014 17:42:15 GMT Received: from sivswdev02.ir.intel.com (localhost [127.0.0.1]) by sivswdev02.ir.intel.com with ESMTP id sALHgFoH012196; Fri, 21 Nov 2014 17:42:15 GMT Received: (from pdelarax@localhost) by sivswdev02.ir.intel.com with id sALHgF3T012192; Fri, 21 Nov 2014 17:42:15 GMT From: Pablo de Lara To: dev@dpdk.org Date: Fri, 21 Nov 2014 17:42:03 +0000 Message-Id: <1416591732-3735-2-git-send-email-pablo.de.lara.guarch@intel.com> X-Mailer: git-send-email 1.7.4.1 In-Reply-To: <1416591732-3735-1-git-send-email-pablo.de.lara.guarch@intel.com> References: <1413142571-23069-1-git-send-email-alan.carew@intel.com> <1416591732-3735-1-git-send-email-pablo.de.lara.guarch@intel.com> Subject: [dpdk-dev] [PATCH v5 01/10] Channel Manager and Monitor for VM Power Management(Host). X-BeenThere: dev@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: patches and discussions about DPDK List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dev-bounces@dpdk.org Sender: "dev" From: Alan Carew The manager is responsible for adding communications channels to the Monitor thread, tracking and reporting VM state and employs the libvirt API for synchronization with the KVM Hypervisor. The manager interacts with the Hypervisor to discover the mapping of virtual CPUS(vCPUs) to the host physical CPUS(pCPUs) and to inspect the VM running state. The manager provides the following functionality to the CLI: 1) Connect to a libvirtd instance, default: qemu:///system 2) Add a VM to an internal list, each VM is identified by a "name" which must correspond a valid libvirt Domain Name. 3) Add communication channels associated with a VM to the epoll based Monitor thread. The channels must exist and be in the form of: /tmp/powermonitor/.. Each channel is a Virtio-Serial endpoint configured as an AF_UNIX file socket and opened in non-blocking mode. Each VM can have a maximum of 64 channels associated with it. 4) Disable or re-enable VM communication channels, channels once added to the Monitor thread remain in that threads control, however acting on channel requests can be disabled and renabled via CLI. The monitor is an epoll based infinite loop running in a separate thread that waits on channel events from VMs and calls the corresponding functions. Channel definitions from the manager are registered via the epoll event opaque pointer when calling epoll_ctl(EPOLL_CTL_ADD), this allows for obtaining the channels file descriptor for reading EPOLLIN events and mapping the vCPU to pCPU(s) associated with a request from a particular VM. Signed-off-by: Alan Carew Signed-off-by: Pablo de Lara --- examples/vm_power_manager/channel_manager.c | 804 +++++++++++++++++++++++++++ examples/vm_power_manager/channel_manager.h | 314 +++++++++++ examples/vm_power_manager/channel_monitor.c | 231 ++++++++ examples/vm_power_manager/channel_monitor.h | 102 ++++ 4 files changed, 1451 insertions(+), 0 deletions(-) create mode 100644 examples/vm_power_manager/channel_manager.c create mode 100644 examples/vm_power_manager/channel_manager.h create mode 100644 examples/vm_power_manager/channel_monitor.c create mode 100644 examples/vm_power_manager/channel_monitor.h diff --git a/examples/vm_power_manager/channel_manager.c b/examples/vm_power_manager/channel_manager.c new file mode 100644 index 0000000..a14f191 --- /dev/null +++ b/examples/vm_power_manager/channel_manager.c @@ -0,0 +1,804 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "channel_manager.h" +#include "channel_commands.h" +#include "channel_monitor.h" + + +#define RTE_LOGTYPE_CHANNEL_MANAGER RTE_LOGTYPE_USER1 + +#define ITERATIVE_BITMASK_CHECK_64(mask_u64b, i) \ + for (i = 0; mask_u64b; mask_u64b &= ~(1ULL << i++)) \ + if ((mask_u64b >> i) & 1) \ + +/* Global pointer to libvirt connection */ +static virConnectPtr global_vir_conn_ptr; + +static unsigned char *global_cpumaps; +static virVcpuInfo *global_vircpuinfo; +static size_t global_maplen; + +static unsigned global_n_host_cpus; + +/* + * Represents a single Virtual Machine + */ +struct virtual_machine_info { + char name[CHANNEL_MGR_MAX_NAME_LEN]; + rte_atomic64_t pcpu_mask[CHANNEL_CMDS_MAX_CPUS]; + struct channel_info *channels[CHANNEL_CMDS_MAX_VM_CHANNELS]; + uint64_t channel_mask; + uint8_t num_channels; + enum vm_status status; + virDomainPtr domainPtr; + virDomainInfo info; + rte_spinlock_t config_spinlock; + LIST_ENTRY(virtual_machine_info) vms_info; +}; + +LIST_HEAD(, virtual_machine_info) vm_list_head; + +static struct virtual_machine_info * +find_domain_by_name(const char *name) +{ + struct virtual_machine_info *info; + LIST_FOREACH(info, &vm_list_head, vms_info) { + if (!strncmp(info->name, name, CHANNEL_MGR_MAX_NAME_LEN-1)) + return info; + } + return NULL; +} + +static int +update_pcpus_mask(struct virtual_machine_info *vm_info) +{ + virVcpuInfoPtr cpuinfo; + unsigned i, j; + int n_vcpus; + uint64_t mask; + + memset(global_cpumaps, 0, CHANNEL_CMDS_MAX_CPUS*global_maplen); + + if (!virDomainIsActive(vm_info->domainPtr)) { + n_vcpus = virDomainGetVcpuPinInfo(vm_info->domainPtr, + vm_info->info.nrVirtCpu, global_cpumaps, global_maplen, + VIR_DOMAIN_AFFECT_CONFIG); + if (n_vcpus < 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error getting vCPU info for " + "in-active VM '%s'\n", vm_info->name); + return -1; + } + goto update_pcpus; + } + + memset(global_vircpuinfo, 0, sizeof(*global_vircpuinfo)* + CHANNEL_CMDS_MAX_CPUS); + + cpuinfo = global_vircpuinfo; + + n_vcpus = virDomainGetVcpus(vm_info->domainPtr, cpuinfo, + CHANNEL_CMDS_MAX_CPUS, global_cpumaps, global_maplen); + if (n_vcpus < 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error getting vCPU info for " + "active VM '%s'\n", vm_info->name); + return -1; + } +update_pcpus: + if (n_vcpus >= CHANNEL_CMDS_MAX_CPUS) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Number of vCPUS(%u) is out of range " + "0...%d\n", n_vcpus, CHANNEL_CMDS_MAX_CPUS-1); + return -1; + } + if (n_vcpus != vm_info->info.nrVirtCpu) { + RTE_LOG(INFO, CHANNEL_MANAGER, "Updating the number of vCPUs for VM '%s" + " from %d -> %d\n", vm_info->name, vm_info->info.nrVirtCpu, + n_vcpus); + vm_info->info.nrVirtCpu = n_vcpus; + } + for (i = 0; i < vm_info->info.nrVirtCpu; i++) { + mask = 0; + for (j = 0; j < global_n_host_cpus; j++) { + if (VIR_CPU_USABLE(global_cpumaps, global_maplen, i, j) > 0) { + mask |= 1ULL << j; + } + } + rte_atomic64_set(&vm_info->pcpu_mask[i], mask); + } + return 0; +} + +int +set_pcpus_mask(char *vm_name, unsigned vcpu, uint64_t core_mask) +{ + unsigned i = 0; + int flags = VIR_DOMAIN_AFFECT_LIVE|VIR_DOMAIN_AFFECT_CONFIG; + struct virtual_machine_info *vm_info; + uint64_t mask = core_mask; + + if (vcpu >= CHANNEL_CMDS_MAX_CPUS) { + RTE_LOG(ERR, CHANNEL_MANAGER, "vCPU(%u) exceeds max allowable(%d)\n", + vcpu, CHANNEL_CMDS_MAX_CPUS-1); + return -1; + } + + vm_info = find_domain_by_name(vm_name); + if (vm_info == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "VM '%s' not found\n", vm_name); + return -1; + } + + if (!virDomainIsActive(vm_info->domainPtr)) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to set vCPU(%u) to pCPU " + "mask(0x%"PRIx64") for VM '%s', VM is not active\n", + vcpu, core_mask, vm_info->name); + return -1; + } + + if (vcpu >= vm_info->info.nrVirtCpu) { + RTE_LOG(ERR, CHANNEL_MANAGER, "vCPU(%u) exceeds the assigned number of " + "vCPUs(%u)\n", vcpu, vm_info->info.nrVirtCpu); + return -1; + } + memset(global_cpumaps, 0 , CHANNEL_CMDS_MAX_CPUS * global_maplen); + ITERATIVE_BITMASK_CHECK_64(mask, i) { + VIR_USE_CPU(global_cpumaps, i); + if (i >= global_n_host_cpus) { + RTE_LOG(ERR, CHANNEL_MANAGER, "CPU(%u) exceeds the available " + "number of CPUs(%u)\n", i, global_n_host_cpus); + return -1; + } + } + if (virDomainPinVcpuFlags(vm_info->domainPtr, vcpu, global_cpumaps, + global_maplen, flags) < 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to set vCPU(%u) to pCPU " + "mask(0x%"PRIx64") for VM '%s'\n", vcpu, core_mask, + vm_info->name); + return -1; + } + rte_atomic64_set(&vm_info->pcpu_mask[vcpu], core_mask); + return 0; + +} + +int +set_pcpu(char *vm_name, unsigned vcpu, unsigned core_num) +{ + uint64_t mask = 1ULL << core_num; + return set_pcpus_mask(vm_name, vcpu, mask); +} + +uint64_t +get_pcpus_mask(struct channel_info *chan_info, unsigned vcpu) +{ + struct virtual_machine_info *vm_info = + (struct virtual_machine_info *)chan_info->priv_info; + return rte_atomic64_read(&vm_info->pcpu_mask[vcpu]); +} + +static inline int +channel_exists(struct virtual_machine_info *vm_info, unsigned channel_num) +{ + rte_spinlock_lock(&(vm_info->config_spinlock)); + if (vm_info->channel_mask & (1ULL << channel_num)) { + rte_spinlock_unlock(&(vm_info->config_spinlock)); + return 1; + } + rte_spinlock_unlock(&(vm_info->config_spinlock)); + return 0; +} + + + +static int +open_non_blocking_channel(struct channel_info *info) +{ + int ret, flags; + struct sockaddr_un sock_addr; + fd_set soc_fd_set; + struct timeval tv; + + info->fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (info->fd == -1) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error(%s) creating socket for '%s'\n", + strerror(errno), + info->channel_path); + return -1; + } + sock_addr.sun_family = AF_UNIX; + memcpy(&sock_addr.sun_path, info->channel_path, + strlen(info->channel_path)+1); + + /* Get current flags */ + flags = fcntl(info->fd, F_GETFL, 0); + if (flags < 0) { + RTE_LOG(WARNING, CHANNEL_MANAGER, "Error(%s) fcntl get flags socket for" + "'%s'\n", strerror(errno), info->channel_path); + return 1; + } + /* Set to Non Blocking */ + flags |= O_NONBLOCK; + if (fcntl(info->fd, F_SETFL, flags) < 0) { + RTE_LOG(WARNING, CHANNEL_MANAGER, "Error(%s) setting non-blocking " + "socket for '%s'\n", strerror(errno), info->channel_path); + return -1; + } + ret = connect(info->fd, (struct sockaddr *)&sock_addr, + sizeof(sock_addr)); + if (ret < 0) { + /* ECONNREFUSED error is given when VM is not active */ + if (errno == ECONNREFUSED) { + RTE_LOG(WARNING, CHANNEL_MANAGER, "VM is not active or has not " + "activated its endpoint to channel %s\n", + info->channel_path); + return -1; + } + /* Wait for tv_sec if in progress */ + else if (errno == EINPROGRESS) { + tv.tv_sec = 2; + tv.tv_usec = 0; + FD_ZERO(&soc_fd_set); + FD_SET(info->fd, &soc_fd_set); + if (select(info->fd+1, NULL, &soc_fd_set, NULL, &tv) > 0) { + RTE_LOG(WARNING, CHANNEL_MANAGER, "Timeout or error on channel " + "'%s'\n", info->channel_path); + return -1; + } + } else { + /* Any other error */ + RTE_LOG(WARNING, CHANNEL_MANAGER, "Error(%s) connecting socket" + " for '%s'\n", strerror(errno), info->channel_path); + return -1; + } + } + return 0; +} + +static int +setup_channel_info(struct virtual_machine_info **vm_info_dptr, + struct channel_info **chan_info_dptr, unsigned channel_num) +{ + struct channel_info *chan_info = *chan_info_dptr; + struct virtual_machine_info *vm_info = *vm_info_dptr; + + chan_info->channel_num = channel_num; + chan_info->priv_info = (void *)vm_info; + chan_info->status = CHANNEL_MGR_CHANNEL_DISCONNECTED; + if (open_non_blocking_channel(chan_info) < 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Could not open channel: " + "'%s' for VM '%s'\n", + chan_info->channel_path, vm_info->name); + return -1; + } + if (add_channel_to_monitor(&chan_info) < 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Could add channel: " + "'%s' to epoll ctl for VM '%s'\n", + chan_info->channel_path, vm_info->name); + return -1; + + } + rte_spinlock_lock(&(vm_info->config_spinlock)); + vm_info->num_channels++; + vm_info->channel_mask |= 1ULL << channel_num; + vm_info->channels[channel_num] = chan_info; + chan_info->status = CHANNEL_MGR_CHANNEL_CONNECTED; + rte_spinlock_unlock(&(vm_info->config_spinlock)); + return 0; +} + +int +add_all_channels(const char *vm_name) +{ + DIR *d; + struct dirent *dir; + struct virtual_machine_info *vm_info; + struct channel_info *chan_info; + char *token, *remaining, *tail_ptr; + char socket_name[PATH_MAX]; + unsigned channel_num; + int num_channels_enabled = 0; + + /* verify VM exists */ + vm_info = find_domain_by_name(vm_name); + if (vm_info == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "VM: '%s' not found" + " during channel discovery\n", vm_name); + return 0; + } + if (!virDomainIsActive(vm_info->domainPtr)) { + RTE_LOG(ERR, CHANNEL_MANAGER, "VM: '%s' is not active\n", vm_name); + vm_info->status = CHANNEL_MGR_VM_INACTIVE; + return 0; + } + d = opendir(CHANNEL_MGR_SOCKET_PATH); + if (d == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error opening directory '%s': %s\n", + CHANNEL_MGR_SOCKET_PATH, strerror(errno)); + return -1; + } + while ((dir = readdir(d)) != NULL) { + if (!strncmp(dir->d_name, ".", 1) || + !strncmp(dir->d_name, "..", 2)) + continue; + + snprintf(socket_name, sizeof(socket_name), "%s", dir->d_name); + remaining = socket_name; + /* Extract vm_name from "." */ + token = strsep(&remaining, "."); + if (remaining == NULL) + continue; + if (strncmp(vm_name, token, CHANNEL_MGR_MAX_NAME_LEN)) + continue; + + /* remaining should contain only */ + errno = 0; + channel_num = (unsigned)strtol(remaining, &tail_ptr, 0); + if ((errno != 0) || (remaining[0] == '\0') || + (*tail_ptr != '\0') || tail_ptr == NULL) { + RTE_LOG(WARNING, CHANNEL_MANAGER, "Malformed channel name" + "'%s' found it should be in the form of " + "'.(decimal)'\n", + dir->d_name); + continue; + } + if (channel_num >= CHANNEL_CMDS_MAX_VM_CHANNELS) { + RTE_LOG(WARNING, CHANNEL_MANAGER, "Channel number(%u) is " + "greater than max allowable: %d, skipping '%s%s'\n", + channel_num, CHANNEL_CMDS_MAX_VM_CHANNELS-1, + CHANNEL_MGR_SOCKET_PATH, dir->d_name); + continue; + } + /* if channel has not been added previously */ + if (channel_exists(vm_info, channel_num)) + continue; + + chan_info = rte_malloc(NULL, sizeof(*chan_info), + CACHE_LINE_SIZE); + if (chan_info == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error allocating memory for " + "channel '%s%s'\n", CHANNEL_MGR_SOCKET_PATH, dir->d_name); + continue; + } + + snprintf(chan_info->channel_path, + sizeof(chan_info->channel_path), "%s%s", + CHANNEL_MGR_SOCKET_PATH, dir->d_name); + + if (setup_channel_info(&vm_info, &chan_info, channel_num) < 0) { + rte_free(chan_info); + continue; + } + + num_channels_enabled++; + } + closedir(d); + return num_channels_enabled; +} + +int +add_channels(const char *vm_name, unsigned *channel_list, + unsigned len_channel_list) +{ + struct virtual_machine_info *vm_info; + struct channel_info *chan_info; + char socket_path[PATH_MAX]; + unsigned i; + int num_channels_enabled = 0; + + vm_info = find_domain_by_name(vm_name); + if (vm_info == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to add channels: VM '%s' " + "not found\n", vm_name); + return 0; + } + + if (!virDomainIsActive(vm_info->domainPtr)) { + RTE_LOG(ERR, CHANNEL_MANAGER, "VM: '%s' is not active\n", vm_name); + vm_info->status = CHANNEL_MGR_VM_INACTIVE; + return 0; + } + + for (i = 0; i < len_channel_list; i++) { + + if (channel_list[i] >= CHANNEL_CMDS_MAX_VM_CHANNELS) { + RTE_LOG(INFO, CHANNEL_MANAGER, "Channel(%u) is out of range " + "0...%d\n", channel_list[i], + CHANNEL_CMDS_MAX_VM_CHANNELS-1); + continue; + } + if (channel_exists(vm_info, channel_list[i])) { + RTE_LOG(INFO, CHANNEL_MANAGER, "Channel already exists, skipping " + "'%s.%u'\n", vm_name, i); + continue; + } + + snprintf(socket_path, sizeof(socket_path), "%s%s.%u", + CHANNEL_MGR_SOCKET_PATH, vm_name, channel_list[i]); + errno = 0; + if (access(socket_path, F_OK) < 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Channel path '%s' error: " + "%s\n", socket_path, strerror(errno)); + continue; + } + chan_info = rte_malloc(NULL, sizeof(*chan_info), + CACHE_LINE_SIZE); + if (chan_info == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error allocating memory for " + "channel '%s'\n", socket_path); + continue; + } + snprintf(chan_info->channel_path, + sizeof(chan_info->channel_path), "%s%s.%u", + CHANNEL_MGR_SOCKET_PATH, vm_name, channel_list[i]); + if (setup_channel_info(&vm_info, &chan_info, channel_list[i]) < 0) { + rte_free(chan_info); + continue; + } + num_channels_enabled++; + + } + return num_channels_enabled; +} + +int +remove_channel(struct channel_info **chan_info_dptr) +{ + struct virtual_machine_info *vm_info; + struct channel_info *chan_info = *chan_info_dptr; + + close(chan_info->fd); + + vm_info = (struct virtual_machine_info *)chan_info->priv_info; + + rte_spinlock_lock(&(vm_info->config_spinlock)); + vm_info->channel_mask &= ~(1ULL << chan_info->channel_num); + vm_info->num_channels--; + rte_spinlock_unlock(&(vm_info->config_spinlock)); + + rte_free(chan_info); + return 0; +} + +int +set_channel_status_all(const char *vm_name, enum channel_status status) +{ + struct virtual_machine_info *vm_info; + unsigned i; + uint64_t mask; + int num_channels_changed = 0; + + if (!(status == CHANNEL_MGR_CHANNEL_CONNECTED || + status == CHANNEL_MGR_CHANNEL_DISABLED)) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Channels can only be enabled or " + "disabled: Unable to change status for VM '%s'\n", vm_name); + } + vm_info = find_domain_by_name(vm_name); + if (vm_info == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to disable channels: VM '%s' " + "not found\n", vm_name); + return 0; + } + + rte_spinlock_lock(&(vm_info->config_spinlock)); + mask = vm_info->channel_mask; + ITERATIVE_BITMASK_CHECK_64(mask, i) { + vm_info->channels[i]->status = status; + num_channels_changed++; + } + rte_spinlock_unlock(&(vm_info->config_spinlock)); + return num_channels_changed; + +} + +int +set_channel_status(const char *vm_name, unsigned *channel_list, + unsigned len_channel_list, enum channel_status status) +{ + struct virtual_machine_info *vm_info; + unsigned i; + int num_channels_changed = 0; + + if (!(status == CHANNEL_MGR_CHANNEL_CONNECTED || + status == CHANNEL_MGR_CHANNEL_DISABLED)) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Channels can only be enabled or " + "disabled: Unable to change status for VM '%s'\n", vm_name); + } + vm_info = find_domain_by_name(vm_name); + if (vm_info == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to add channels: VM '%s' " + "not found\n", vm_name); + return 0; + } + for (i = 0; i < len_channel_list; i++) { + if (channel_exists(vm_info, channel_list[i])) { + rte_spinlock_lock(&(vm_info->config_spinlock)); + vm_info->channels[channel_list[i]]->status = status; + rte_spinlock_unlock(&(vm_info->config_spinlock)); + num_channels_changed++; + } + } + return num_channels_changed; +} + +int +get_info_vm(const char *vm_name, struct vm_info *info) +{ + struct virtual_machine_info *vm_info; + unsigned i, channel_num = 0; + uint64_t mask; + vm_info = find_domain_by_name(vm_name); + if (vm_info == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "VM '%s' not found\n", vm_name); + return -1; + } + info->status = CHANNEL_MGR_VM_ACTIVE; + if (!virDomainIsActive(vm_info->domainPtr)) + info->status = CHANNEL_MGR_VM_INACTIVE; + + rte_spinlock_lock(&(vm_info->config_spinlock)); + + mask = vm_info->channel_mask; + ITERATIVE_BITMASK_CHECK_64(mask, i) { + info->channels[channel_num].channel_num = i; + memcpy(info->channels[channel_num].channel_path, + vm_info->channels[i]->channel_path, PATH_MAX); + info->channels[channel_num].status = vm_info->channels[i]->status; + info->channels[channel_num].fd = vm_info->channels[i]->fd; + channel_num++; + } + + info->num_channels = channel_num; + info->num_vcpus = vm_info->info.nrVirtCpu; + rte_spinlock_unlock(&(vm_info->config_spinlock)); + + memcpy(info->name, vm_info->name, sizeof(vm_info->name)); + for (i = 0; i < info->num_vcpus; i++) { + info->pcpu_mask[i] = rte_atomic64_read(&vm_info->pcpu_mask[i]); + } + return 0; +} + +int +add_vm(const char *vm_name) +{ + struct virtual_machine_info *new_domain; + virDomainPtr dom_ptr; + int i; + + if (find_domain_by_name(vm_name) != NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to add VM: VM '%s' " + "already exists\n", vm_name); + return -1; + } + + if (global_vir_conn_ptr == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "No connection to hypervisor exists\n"); + return -1; + } + dom_ptr = virDomainLookupByName(global_vir_conn_ptr, vm_name); + if (dom_ptr == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error on VM lookup with libvirt: " + "VM '%s' not found\n", vm_name); + return -1; + } + + new_domain = rte_malloc("virtual_machine_info", sizeof(*new_domain), + CACHE_LINE_SIZE); + if (new_domain == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to allocate memory for VM " + "info\n"); + return -1; + } + new_domain->domainPtr = dom_ptr; + if (virDomainGetInfo(new_domain->domainPtr, &new_domain->info) != 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to get libvirt VM info\n"); + rte_free(new_domain); + return -1; + } + if (new_domain->info.nrVirtCpu > CHANNEL_CMDS_MAX_CPUS) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error the number of virtual CPUs(%u) is " + "greater than allowable(%d)\n", new_domain->info.nrVirtCpu, + CHANNEL_CMDS_MAX_CPUS); + rte_free(new_domain); + return -1; + } + + for (i = 0; i < CHANNEL_CMDS_MAX_CPUS; i++) { + rte_atomic64_init(&new_domain->pcpu_mask[i]); + } + if (update_pcpus_mask(new_domain) < 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error getting physical CPU pinning\n"); + rte_free(new_domain); + return -1; + } + strncpy(new_domain->name, vm_name, sizeof(new_domain->name)); + new_domain->channel_mask = 0; + new_domain->num_channels = 0; + + if (!virDomainIsActive(dom_ptr)) + new_domain->status = CHANNEL_MGR_VM_INACTIVE; + else + new_domain->status = CHANNEL_MGR_VM_ACTIVE; + + rte_spinlock_init(&(new_domain->config_spinlock)); + LIST_INSERT_HEAD(&vm_list_head, new_domain, vms_info); + return 0; +} + +int +remove_vm(const char *vm_name) +{ + struct virtual_machine_info *vm_info = find_domain_by_name(vm_name); + if (vm_info == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to remove VM: VM '%s' " + "not found\n", vm_name); + return -1; + } + rte_spinlock_lock(&vm_info->config_spinlock); + if (vm_info->num_channels != 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to remove VM '%s', there are " + "%"PRId8" channels still active\n", + vm_name, vm_info->num_channels); + rte_spinlock_unlock(&vm_info->config_spinlock); + return -1; + } + LIST_REMOVE(vm_info, vms_info); + rte_spinlock_unlock(&vm_info->config_spinlock); + rte_free(vm_info); + return 0; +} + +static void +disconnect_hypervisor(void) +{ + if (global_vir_conn_ptr != NULL) { + virConnectClose(global_vir_conn_ptr); + global_vir_conn_ptr = NULL; + } +} + +static int +connect_hypervisor(const char *path) +{ + if (global_vir_conn_ptr != NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error connecting to %s, connection" + "already established\n", path); + return -1; + } + global_vir_conn_ptr = virConnectOpen(path); + if (global_vir_conn_ptr == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error failed to open connection to " + "Hypervisor '%s'\n", path); + return -1; + } + return 0; +} + +int +channel_manager_init(const char *path) +{ + int n_cpus; + LIST_INIT(&vm_list_head); + if (connect_hypervisor(path) < 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to initialize channel manager\n"); + return -1; + } + + global_maplen = VIR_CPU_MAPLEN(CHANNEL_CMDS_MAX_CPUS); + + global_vircpuinfo = rte_zmalloc(NULL, sizeof(*global_vircpuinfo) * + CHANNEL_CMDS_MAX_CPUS, CACHE_LINE_SIZE); + if (global_vircpuinfo == NULL) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Error allocating memory for CPU Info\n"); + goto error; + } + global_cpumaps = rte_zmalloc(NULL, CHANNEL_CMDS_MAX_CPUS * global_maplen, + CACHE_LINE_SIZE); + if (global_cpumaps == NULL) { + goto error; + } + + n_cpus = virNodeGetCPUMap(global_vir_conn_ptr, NULL, NULL, 0); + if (n_cpus <= 0) { + RTE_LOG(ERR, CHANNEL_MANAGER, "Unable to get the number of Host " + "CPUs\n"); + goto error; + } + global_n_host_cpus = (unsigned)n_cpus; + + if (global_n_host_cpus > CHANNEL_CMDS_MAX_CPUS) { + RTE_LOG(ERR, CHANNEL_MANAGER, "The number of host CPUs(%u) exceeds the " + "maximum of %u\n", global_n_host_cpus, CHANNEL_CMDS_MAX_CPUS); + goto error; + + } + + return 0; +error: + disconnect_hypervisor(); + return -1; +} + +void +channel_manager_exit(void) +{ + unsigned i; + uint64_t mask; + struct virtual_machine_info *vm_info; + + LIST_FOREACH(vm_info, &vm_list_head, vms_info) { + + rte_spinlock_lock(&(vm_info->config_spinlock)); + + mask = vm_info->channel_mask; + ITERATIVE_BITMASK_CHECK_64(mask, i) { + remove_channel_from_monitor(vm_info->channels[i]); + close(vm_info->channels[i]->fd); + rte_free(vm_info->channels[i]); + } + rte_spinlock_unlock(&(vm_info->config_spinlock)); + + LIST_REMOVE(vm_info, vms_info); + rte_free(vm_info); + } + + if (global_cpumaps != NULL) + rte_free(global_cpumaps); + if (global_vircpuinfo != NULL) + rte_free(global_vircpuinfo); + disconnect_hypervisor(); +} diff --git a/examples/vm_power_manager/channel_manager.h b/examples/vm_power_manager/channel_manager.h new file mode 100644 index 0000000..12c29c3 --- /dev/null +++ b/examples/vm_power_manager/channel_manager.h @@ -0,0 +1,314 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef CHANNEL_MANAGER_H_ +#define CHANNEL_MANAGER_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include "channel_commands.h" + +/* Maximum name length including '\0' terminator */ +#define CHANNEL_MGR_MAX_NAME_LEN 64 + +/* Maximum number of channels to each Virtual Machine */ +#define CHANNEL_MGR_MAX_CHANNELS 64 + +/* Hypervisor Path for libvirt(qemu/KVM) */ +#define CHANNEL_MGR_DEFAULT_HV_PATH "qemu:///system" + +/* File socket directory */ +#define CHANNEL_MGR_SOCKET_PATH "/tmp/powermonitor/" + +/* Communication Channel Status */ +enum channel_status { CHANNEL_MGR_CHANNEL_DISCONNECTED = 0, + CHANNEL_MGR_CHANNEL_CONNECTED, + CHANNEL_MGR_CHANNEL_DISABLED, + CHANNEL_MGR_CHANNEL_PROCESSING}; + +/* VM libvirt(qemu/KVM) connection status */ +enum vm_status { CHANNEL_MGR_VM_INACTIVE = 0, CHANNEL_MGR_VM_ACTIVE}; + +/* + * Represents a single and exclusive VM channel that exists between a guest and + * the host. + */ +struct channel_info { + char channel_path[PATH_MAX]; /**< Path to host socket */ + volatile uint32_t status; /**< Connection status(enum channel_status) */ + int fd; /**< AF_UNIX socket fd */ + unsigned channel_num; /**< CHANNEL_MGR_SOCKET_PATH/.channel_num */ + void *priv_info; /**< Pointer to private info, do not modify */ +}; + +/* Represents a single VM instance used to return internal information about + * a VM */ +struct vm_info { + char name[CHANNEL_MGR_MAX_NAME_LEN]; /**< VM name */ + enum vm_status status; /**< libvirt status */ + uint64_t pcpu_mask[CHANNEL_CMDS_MAX_CPUS]; /**< pCPU mask for each vCPU */ + unsigned num_vcpus; /**< number of vCPUS */ + struct channel_info channels[CHANNEL_MGR_MAX_CHANNELS]; /**< Array of channel_info */ + unsigned num_channels; /**< Number of channels */ +}; + +/** + * Initialize the Channel Manager resources and connect to the Hypervisor + * specified in path. + * This must be successfully called first before calling any other functions. + * It must only be call once; + * + * @param path + * Must be a local path, e.g. qemu:///system. + * + * @return + * - 0 on success. + * - Negative on error. + */ +int channel_manager_init(const char *path); + +/** + * Free resources associated with the Channel Manager. + * + * @param path + * Must be a local path, e.g. qemu:///system. + * + * @return + * None + */ +void channel_manager_exit(void); + +/** + * Get the Physical CPU mask for VM lcore channel(vcpu), result is assigned to + * core_mask. + * It is not thread-safe. + * + * @param chan_info + * Pointer to struct channel_info + * + * @param vcpu + * The virtual CPU to query. + * + * + * @return + * - 0 on error. + * - >0 on success. + */ +uint64_t get_pcpus_mask(struct channel_info *chan_info, unsigned vcpu); + +/** + * Set the Physical CPU mask for the specified vCPU. + * It is not thread-safe. + * + * @param name + * Virtual Machine name to lookup + * + * @param vcpu + * The virtual CPU to set. + * + * @param core_mask + * The core mask of the physical CPU(s) to bind the vCPU + * + * @return + * - 0 on success. + * - Negative on error. + */ +int set_pcpus_mask(char *vm_name, unsigned vcpu, uint64_t core_mask); + +/** + * Set the Physical CPU for the specified vCPU. + * It is not thread-safe. + * + * @param name + * Virtual Machine name to lookup + * + * @param vcpu + * The virtual CPU to set. + * + * @param core_num + * The core number of the physical CPU(s) to bind the vCPU + * + * @return + * - 0 on success. + * - Negative on error. + */ +int set_pcpu(char *vm_name, unsigned vcpu, unsigned core_num); +/** + * Add a VM as specified by name to the Channel Manager. The name must + * correspond to a valid libvirt domain name. + * This is required prior to adding channels. + * It is not thread-safe. + * + * @param name + * Virtual Machine name to lookup. + * + * @return + * - 0 on success. + * - Negative on error. + */ +int add_vm(const char *name); + +/** + * Remove a previously added Virtual Machine from the Channel Manager + * It is not thread-safe. + * + * @param name + * Virtual Machine name to lookup. + * + * @return + * - 0 on success. + * - Negative on error. + */ +int remove_vm(const char *name); + +/** + * Add all available channels to the VM as specified by name. + * Channels in the form of paths + * (CHANNEL_MGR_SOCKET_PATH/.) will only be parsed. + * It is not thread-safe. + * + * @param name + * Virtual Machine name to lookup. + * + * @return + * - N the number of channels added for the VM + */ +int add_all_channels(const char *vm_name); + +/** + * Add the channel numbers in channel_list to the domain specified by name. + * Channels in the form of paths + * (CHANNEL_MGR_SOCKET_PATH/.) will only be parsed. + * It is not thread-safe. + * + * @param name + * Virtual Machine name to add channels. + * + * @param channel_list + * Pointer to list of unsigned integers, representing the channel number to add + * It must be allocated outside of this function. + * + * @param num_channels + * The amount of channel numbers in channel_list + * + * @return + * - N the number of channels added for the VM + * - 0 for error + */ +int add_channels(const char *vm_name, unsigned *channel_list, + unsigned num_channels); + +/** + * Remove a channel definition from the channel manager. This must only be + * called from the channel monitor thread. + * + * @param chan_info + * Pointer to a valid struct channel_info. + * + * @return + * - 0 on success. + * - Negative on error. + */ +int remove_channel(struct channel_info **chan_info_dptr); + +/** + * For all channels associated with a Virtual Machine name, update the + * connection status. Valid states are CHANNEL_MGR_CHANNEL_CONNECTED or + * CHANNEL_MGR_CHANNEL_DISABLED only. + * + * + * @param name + * Virtual Machine name to modify all channels. + * + * @param status + * The status to set each channel + * + * @param num_channels + * The amount of channel numbers in channel_list + * + * @return + * - N the number of channels added for the VM + * - 0 for error + */ +int set_channel_status_all(const char *name, enum channel_status status); + +/** + * For all channels in channel_list associated with a Virtual Machine name + * update the connection status of each. + * Valid states are CHANNEL_MGR_CHANNEL_CONNECTED or + * CHANNEL_MGR_CHANNEL_DISABLED only. + * It is not thread-safe. + * + * @param name + * Virtual Machine name to add channels. + * + * @param channel_list + * Pointer to list of unsigned integers, representing the channel numbers to + * modify. + * It must be allocated outside of this function. + * + * @param num_channels + * The amount of channel numbers in channel_list + * + * @return + * - N the number of channels modified for the VM + * - 0 for error + */ +int set_channel_status(const char *vm_name, unsigned *channel_list, + unsigned len_channel_list, enum channel_status status); + +/** + * Populates a pointer to struct vm_info associated with vm_name. + * + * @param vm_name + * The name of the virtual machine to lookup. + * + * @param vm_info + * Pointer to a struct vm_info, this must be allocated prior to calling this + * function. + * + * @return + * - 0 on success. + * - Negative on error. + */ +int get_info_vm(const char *vm_name, struct vm_info *info); + +#ifdef __cplusplus +} +#endif + +#endif /* CHANNEL_MANAGER_H_ */ diff --git a/examples/vm_power_manager/channel_monitor.c b/examples/vm_power_manager/channel_monitor.c new file mode 100644 index 0000000..3674c7c --- /dev/null +++ b/examples/vm_power_manager/channel_monitor.c @@ -0,0 +1,231 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + + +#include "channel_monitor.h" +#include "channel_commands.h" +#include "channel_manager.h" +#include "power_manager.h" + +#define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1 + +#define MAX_EVENTS 256 + + +static volatile unsigned run_loop = 1; +static int global_event_fd; +static struct epoll_event *global_events_list; + +void channel_monitor_exit(void) +{ + run_loop = 0; + rte_free(global_events_list); +} + +static int +process_request(struct channel_packet *pkt, struct channel_info *chan_info) +{ + uint64_t core_mask; + + if (chan_info == NULL) + return -1; + + if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED, + CHANNEL_MGR_CHANNEL_PROCESSING) == 0) + return -1; + + if (pkt->command == CPU_POWER) { + core_mask = get_pcpus_mask(chan_info, pkt->resource_id); + if (core_mask == 0) { + RTE_LOG(ERR, CHANNEL_MONITOR, "Error get physical CPU mask for " + "channel '%s' using vCPU(%u)\n", chan_info->channel_path, + (unsigned)pkt->unit); + return -1; + } + if (__builtin_popcountll(core_mask) == 1) { + + unsigned core_num = __builtin_ffsll(core_mask) - 1; + + switch (pkt->unit) { + case(CPU_POWER_SCALE_MIN): + power_manager_scale_core_min(core_num); + break; + case(CPU_POWER_SCALE_MAX): + power_manager_scale_core_max(core_num); + break; + case(CPU_POWER_SCALE_DOWN): + power_manager_scale_core_down(core_num); + break; + case(CPU_POWER_SCALE_UP): + power_manager_scale_core_up(core_num); + break; + default: + break; + } + } else { + switch (pkt->unit) { + case(CPU_POWER_SCALE_MIN): + power_manager_scale_mask_min(core_mask); + break; + case(CPU_POWER_SCALE_MAX): + power_manager_scale_mask_max(core_mask); + break; + case(CPU_POWER_SCALE_DOWN): + power_manager_scale_mask_down(core_mask); + break; + case(CPU_POWER_SCALE_UP): + power_manager_scale_mask_up(core_mask); + break; + default: + break; + } + + } + } + /* Return is not checked as channel status may have been set to DISABLED + * from management thread + */ + rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING, + CHANNEL_MGR_CHANNEL_CONNECTED); + return 0; + +} + +int +add_channel_to_monitor(struct channel_info **chan_info) +{ + struct channel_info *info = *chan_info; + struct epoll_event event; + event.events = EPOLLIN; + event.data.ptr = info; + if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) { + RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' " + "to epoll\n", info->channel_path); + return -1; + } + return 0; +} + +int +remove_channel_from_monitor(struct channel_info *chan_info) +{ + if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL, chan_info->fd, NULL) < 0) { + RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' " + "from epoll\n", chan_info->channel_path); + return -1; + } + return 0; +} + +int +channel_monitor_init(void) +{ + global_event_fd = epoll_create1(0); + if (global_event_fd == 0) { + RTE_LOG(ERR, CHANNEL_MONITOR, "Error creating epoll context with " + "error %s\n", strerror(errno)); + return -1; + } + global_events_list = rte_malloc("epoll_events", sizeof(*global_events_list) + * MAX_EVENTS, CACHE_LINE_SIZE); + if (global_events_list == NULL) { + RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for" + "epoll events\n"); + return -1; + } + return 0; +} + +void +run_channel_monitor(void) +{ + while (run_loop) { + int n_events, i; + n_events = epoll_wait(global_event_fd, global_events_list, + MAX_EVENTS, 1); + if (!run_loop) + break; + for (i = 0; i < n_events; i++) { + struct channel_info *chan_info = (struct channel_info *) + global_events_list[i].data.ptr; + if ((global_events_list[i].events & EPOLLERR) || + (global_events_list[i].events & EPOLLHUP)) { + RTE_LOG(DEBUG, CHANNEL_MONITOR, "Remote closed connection for " + "channel '%s'\n", chan_info->channel_path); + remove_channel(&chan_info); + continue; + } + if (global_events_list[i].events & EPOLLIN) { + + int n_bytes, err = 0; + struct channel_packet pkt; + void *buffer = &pkt; + int buffer_len = sizeof(pkt); + while (buffer_len > 0) { + n_bytes = read(chan_info->fd, buffer, buffer_len); + if (n_bytes == buffer_len) + break; + if (n_bytes == -1) { + err = errno; + RTE_LOG(DEBUG, CHANNEL_MONITOR, "Received error on " + "channel '%s' read: %s\n", + chan_info->channel_path, strerror(err)); + remove_channel(&chan_info); + break; + } + buffer = (char *)buffer + n_bytes; + buffer_len -= n_bytes; + } + if (!err) + process_request(&pkt, chan_info); + } + } + } +} diff --git a/examples/vm_power_manager/channel_monitor.h b/examples/vm_power_manager/channel_monitor.h new file mode 100644 index 0000000..c138607 --- /dev/null +++ b/examples/vm_power_manager/channel_monitor.h @@ -0,0 +1,102 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2014 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef CHANNEL_MONITOR_H_ +#define CHANNEL_MONITOR_H_ + +#include "channel_manager.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * Setup the Channel Monitor resources required to initialize epoll. + * Must be called first before calling other functions. + * + * @return + * - 0 on success. + * - Negative on error. + */ +int channel_monitor_init(void); + +/** + * Run the channel monitor, loops forever on on epoll_wait. + * + * + * @return + * None + */ +void run_channel_monitor(void); + +/** + * Exit the Channel Monitor, exiting the epoll_wait loop and events processing. + * + * @return + * - 0 on success. + * - Negative on error. + */ +void channel_monitor_exit(void); + +/** + * Add an open channel to monitor via epoll. A pointer to struct channel_info + * will be registered with epoll for event processing. + * It is thread-safe. + * + * @param chan_info + * Pointer to struct channel_info pointer. + * + * @return + * - 0 on success. + * - Negative on error. + */ +int add_channel_to_monitor(struct channel_info **chan_info); + +/** + * Remove a previously added channel from epoll control. + * + * @param chan_info + * Pointer to struct channel_info. + * + * @return + * - 0 on success. + * - Negative on error. + */ +int remove_channel_from_monitor(struct channel_info *chan_info); + +#ifdef __cplusplus +} +#endif + + +#endif /* CHANNEL_MONITOR_H_ */