From patchwork Thu Sep 10 18:58:54 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: "Ma, LihongX" X-Patchwork-Id: 77302 Return-Path: X-Original-To: patchwork@inbox.dpdk.org Delivered-To: patchwork@inbox.dpdk.org Received: from dpdk.org (dpdk.org [92.243.14.124]) by inbox.dpdk.org (Postfix) with ESMTP id E154FA04B5; Fri, 11 Sep 2020 04:32:24 +0200 (CEST) Received: from [92.243.14.124] (localhost [127.0.0.1]) by dpdk.org (Postfix) with ESMTP id CFE8F1C0CA; Fri, 11 Sep 2020 04:32:24 +0200 (CEST) Received: from mga03.intel.com (mga03.intel.com [134.134.136.65]) by dpdk.org (Postfix) with ESMTP id AAA47DE0 for ; Fri, 11 Sep 2020 04:32:22 +0200 (CEST) IronPort-SDR: JSTWF0NoZKgUNn3onsjmE5PlHLSmBWtreFJGWVvJQHd9Rg8cA7PNkHbKSevuGGpQ35NwaOJviU SnyTlQqcrtRQ== X-IronPort-AV: E=McAfee;i="6000,8403,9740"; a="158714509" X-IronPort-AV: E=Sophos;i="5.76,413,1592895600"; d="scan'208";a="158714509" X-Amp-Result: SKIPPED(no attachment in message) X-Amp-File-Uploaded: False Received: from fmsmga004.fm.intel.com ([10.253.24.48]) by orsmga103.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 10 Sep 2020 19:32:22 -0700 IronPort-SDR: ES5u+ZM/tsDC7CN8QZ5UWD6+2UsUG1dEPw4mF8fts43IycnS8sHWdEtz2msFVDdL0QwV19MATO 867G6HRjBD5g== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="5.76,413,1592895600"; d="scan'208";a="329592596" Received: from dpdk-lihong-ub1604.sh.intel.com ([10.67.118.174]) by fmsmga004.fm.intel.com with ESMTP; 10 Sep 2020 19:32:20 -0700 From: LihongX Ma To: dts@dpdk.org Cc: LihongX Ma Date: Fri, 11 Sep 2020 02:58:54 +0800 Message-Id: <1599764335-1164-2-git-send-email-lihongx.ma@intel.com> X-Mailer: git-send-email 2.7.4 In-Reply-To: <1599764335-1164-1-git-send-email-lihongx.ma@intel.com> References: <1599764335-1164-1-git-send-email-lihongx.ma@intel.com> Subject: [dts] [PATCH V1 1/2] framework: use the default python lib threadpool to maintain thread X-BeenThere: dts@dpdk.org X-Mailman-Version: 2.1.15 Precedence: list List-Id: test suite reviews and discussions List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Errors-To: dts-bounces@dpdk.org Sender: "dts" the function in extra_libs/threadpool.py is similar to default python lib, so use the default lib instead of it. Signed-off-by: LihongX Ma --- extra_libs/threadpool.py | 426 ----------------------------------------------- framework/dts.py | 5 - 2 files changed, 431 deletions(-) delete mode 100644 extra_libs/threadpool.py diff --git a/extra_libs/threadpool.py b/extra_libs/threadpool.py deleted file mode 100644 index 3839f26..0000000 --- a/extra_libs/threadpool.py +++ /dev/null @@ -1,426 +0,0 @@ -# -*- coding: UTF-8 -*- -"""Easy to use object-oriented thread pool framework. - -A thread pool is an object that maintains a pool of worker threads to perform -time consuming operations in parallel. It assigns jobs to the threads -by putting them in a work request queue, where they are picked up by the -next available thread. This then performs the requested operation in the -background and puts the results in another queue. - -The thread pool object can then collect the results from all threads from -this queue as soon as they become available or after all threads have -finished their work. It's also possible, to define callbacks to handle -each result as it comes in. - -The basic concept and some code was taken from the book "Python in a Nutshell, -2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section -14.5 "Threaded Program Architecture". I wrapped the main program logic in the -ThreadPool class, added the WorkRequest class and the callback system and -tweaked the code here and there. Kudos also to Florent Aide for the exception -handling mechanism. - -Basic usage:: - - >>> pool = ThreadPool(poolsize) - >>> requests = makeRequests(some_callable, list_of_args, callback) - >>> [pool.putRequest(req) for req in requests] - >>> pool.wait() - -See the end of the module code for a brief, annotated usage example. - -Website : http://chrisarndt.de/projects/threadpool/ - -""" -__docformat__ = "restructuredtext en" - -__all__ = [ - 'makeRequests', - 'NoResultsPending', - 'NoWorkersAvailable', - 'ThreadPool', - 'WorkRequest', - 'WorkerThread' -] - -__author__ = "Christopher Arndt" -__version__ = '1.3.2' -__license__ = "MIT license" - - -# standard library modules -import sys -import threading -import traceback - -try: - import Queue # Python 2 -except ImportError: - import queue as Queue # Python 3 - - -# exceptions -class NoResultsPending(Exception): - """All work requests have been processed.""" - pass - -class NoWorkersAvailable(Exception): - """No worker threads available to process remaining requests.""" - pass - - -# internal module helper functions -def _handle_thread_exception(request, exc_info): - """Default exception handler callback function. - - This just prints the exception info via ``traceback.print_exception``. - - """ - traceback.print_exception(*exc_info) - - -# utility functions -def makeRequests(callable_, args_list, callback=None, - exc_callback=_handle_thread_exception): - """Create several work requests for same callable with different arguments. - - Convenience function for creating several work requests for the same - callable where each invocation of the callable receives different values - for its arguments. - - ``args_list`` contains the parameters for each invocation of callable. - Each item in ``args_list`` should be either a 2-item tuple of the list of - positional arguments and a dictionary of keyword arguments or a single, - non-tuple argument. - - See docstring for ``WorkRequest`` for info on ``callback`` and - ``exc_callback``. - - """ - requests = [] - for item in args_list: - if isinstance(item, tuple): - requests.append( - WorkRequest(callable_, item[0], item[1], callback=callback, - exc_callback=exc_callback) - ) - else: - requests.append( - WorkRequest(callable_, [item], None, callback=callback, - exc_callback=exc_callback) - ) - return requests - - -# classes -class WorkerThread(threading.Thread): - """Background thread connected to the requests/results queues. - - A worker thread sits in the background and picks up work requests from - one queue and puts the results in another until it is dismissed. - - """ - - def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds): - """Set up thread in daemonic mode and start it immediatedly. - - ``requests_queue`` and ``results_queue`` are instances of - ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a - new worker thread. - - """ - threading.Thread.__init__(self, **kwds) - self.setDaemon(1) - self._requests_queue = requests_queue - self._results_queue = results_queue - self._poll_timeout = poll_timeout - self._dismissed = threading.Event() - self.start() - - def run(self): - """Repeatedly process the job queue until told to exit.""" - while True: - if self._dismissed.isSet(): - # we are dismissed, break out of loop - break - # get next work request. If we don't get a new request from the - # queue after self._poll_timout seconds, we jump to the start of - # the while loop again, to give the thread a chance to exit. - try: - request = self._requests_queue.get(True, self._poll_timeout) - except Queue.Empty: - continue - else: - if self._dismissed.isSet(): - # we are dismissed, put back request in queue and exit loop - self._requests_queue.put(request) - break - try: - result = request.callable(*request.args, **request.kwds) - self._results_queue.put((request, result)) - except: - request.exception = True - self._results_queue.put((request, sys.exc_info())) - - def dismiss(self): - """Sets a flag to tell the thread to exit when done with current job. - """ - self._dismissed.set() - - -class WorkRequest: - """A request to execute a callable for putting in the request queue later. - - See the module function ``makeRequests`` for the common case - where you want to build several ``WorkRequest`` objects for the same - callable but with different arguments for each call. - - """ - - def __init__(self, callable_, args=None, kwds=None, requestID=None, - callback=None, exc_callback=_handle_thread_exception): - """Create a work request for a callable and attach callbacks. - - A work request consists of the a callable to be executed by a - worker thread, a list of positional arguments, a dictionary - of keyword arguments. - - A ``callback`` function can be specified, that is called when the - results of the request are picked up from the result queue. It must - accept two anonymous arguments, the ``WorkRequest`` object and the - results of the callable, in that order. If you want to pass additional - information to the callback, just stick it on the request object. - - You can also give custom callback for when an exception occurs with - the ``exc_callback`` keyword parameter. It should also accept two - anonymous arguments, the ``WorkRequest`` and a tuple with the exception - details as returned by ``sys.exc_info()``. The default implementation - of this callback just prints the exception info via - ``traceback.print_exception``. If you want no exception handler - callback, just pass in ``None``. - - ``requestID``, if given, must be hashable since it is used by - ``ThreadPool`` object to store the results of that work request in a - dictionary. It defaults to the return value of ``id(self)``. - - """ - if requestID is None: - self.requestID = id(self) - else: - try: - self.requestID = hash(requestID) - except TypeError: - raise TypeError("requestID must be hashable.") - self.exception = False - self.callback = callback - self.exc_callback = exc_callback - self.callable = callable_ - self.args = args or [] - self.kwds = kwds or {} - - def __str__(self): - return "" % \ - (self.requestID, self.args, self.kwds, self.exception) - -class ThreadPool: - """A thread pool, distributing work requests and collecting results. - - See the module docstring for more information. - - """ - - def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5): - """Set up the thread pool and start num_workers worker threads. - - ``num_workers`` is the number of worker threads to start initially. - - If ``q_size > 0`` the size of the work *request queue* is limited and - the thread pool blocks when the queue is full and it tries to put - more work requests in it (see ``putRequest`` method), unless you also - use a positive ``timeout`` value for ``putRequest``. - - If ``resq_size > 0`` the size of the *results queue* is limited and the - worker threads will block when the queue is full and they try to put - new results in it. - - .. warning: - If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is - the possibilty of a deadlock, when the results queue is not pulled - regularly and too many jobs are put in the work requests queue. - To prevent this, always set ``timeout > 0`` when calling - ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions. - - """ - self._requests_queue = Queue.Queue(q_size) - self._results_queue = Queue.Queue(resq_size) - self.workers = [] - self.dismissedWorkers = [] - self.workRequests = {} - self.createWorkers(num_workers, poll_timeout) - - def createWorkers(self, num_workers, poll_timeout=5): - """Add num_workers worker threads to the pool. - - ``poll_timout`` sets the interval in seconds (int or float) for how - ofte threads should check whether they are dismissed, while waiting for - requests. - - """ - for i in range(num_workers): - self.workers.append(WorkerThread(self._requests_queue, - self._results_queue, poll_timeout=poll_timeout)) - - def dismissWorkers(self, num_workers, do_join=False): - """Tell num_workers worker threads to quit after their current task.""" - dismiss_list = [] - for i in range(min(num_workers, len(self.workers))): - worker = self.workers.pop() - worker.dismiss() - dismiss_list.append(worker) - - if do_join: - for worker in dismiss_list: - worker.join() - else: - self.dismissedWorkers.extend(dismiss_list) - - def joinAllDismissedWorkers(self): - """Perform Thread.join() on all worker threads that have been dismissed. - """ - for worker in self.dismissedWorkers: - worker.join() - self.dismissedWorkers = [] - - def putRequest(self, request, block=True, timeout=None): - """Put work request into work queue and save its id for later.""" - assert isinstance(request, WorkRequest) - # don't reuse old work requests - assert not getattr(request, 'exception', None) - self._requests_queue.put(request, block, timeout) - self.workRequests[request.requestID] = request - - def poll(self, block=False): - """Process any new results in the queue.""" - while True: - # still results pending? - if not self.workRequests: - raise NoResultsPending - # are there still workers to process remaining requests? - elif block and not self.workers: - raise NoWorkersAvailable - try: - # get back next results - request, result = self._results_queue.get(block=block) - # has an exception occured? - if request.exception and request.exc_callback: - request.exc_callback(request, result) - # hand results to callback, if any - if request.callback and not \ - (request.exception and request.exc_callback): - request.callback(request, result) - del self.workRequests[request.requestID] - except Queue.Empty: - break - except Exception as e: - traceback.print_exception(*sys.exc_info()) - # unexpected thing happened, need further dedbugging - import pdb - pdb.set_trace() - - def wait(self): - """Wait for results, blocking until all have arrived.""" - while 1: - try: - self.poll(True) - except NoResultsPending: - break - - -################ -# USAGE EXAMPLE -################ - -if __name__ == '__main__': - import random - import time - - # the work the threads will have to do (rather trivial in our example) - def do_something(data): - time.sleep(random.randint(1,5)) - result = round(random.random() * data, 5) - # just to show off, we throw an exception once in a while - if result > 5: - raise RuntimeError("Something extraordinary happened!") - return result - - # this will be called each time a result is available - def print_result(request, result): - print("**** Result from request #%s: %r" % (request.requestID, result)) - - # this will be called when an exception occurs within a thread - # this example exception handler does little more than the default handler - def handle_exception(request, exc_info): - if not isinstance(exc_info, tuple): - # Something is seriously wrong... - print(request) - print(exc_info) - raise SystemExit - print("**** Exception occured in request #%s: %s" % \ - (request.requestID, exc_info)) - - # assemble the arguments for each job to a list... - data = [random.randint(1,10) for i in range(20)] - # ... and build a WorkRequest object for each item in data - requests = makeRequests(do_something, data, print_result, handle_exception) - # to use the default exception handler, uncomment next line and comment out - # the preceding one. - #requests = makeRequests(do_something, data, print_result) - - # or the other form of args_lists accepted by makeRequests: ((,), {}) - data = [((random.randint(1,10),), {}) for i in range(20)] - requests.extend( - makeRequests(do_something, data, print_result, handle_exception) - #makeRequests(do_something, data, print_result) - # to use the default exception handler, uncomment next line and comment - # out the preceding one. - ) - - # we create a pool of 3 worker threads - print("Creating thread pool with 3 worker threads.") - main = ThreadPool(3) - - # then we put the work requests in the queue... - for req in requests: - main.putRequest(req) - print("Work request #%s added." % req.requestID) - # or shorter: - # [main.putRequest(req) for req in requests] - - # ...and wait for the results to arrive in the result queue - # by using ThreadPool.wait(). This would block until results for - # all work requests have arrived: - # main.wait() - - # instead we can poll for results while doing something else: - i = 0 - while True: - try: - time.sleep(0.5) - main.poll() - print("Main thread working...") - print("(active worker threads: %i)" % (threading.activeCount()-1, )) - if i == 10: - print("**** Adding 3 more worker threads...") - main.createWorkers(3) - if i == 20: - print("**** Dismissing 2 worker threads...") - main.dismissWorkers(2) - i += 1 - except KeyboardInterrupt: - print("**** Interrupted!") - break - except NoResultsPending: - print("**** No pending results.") - break - if main.dismissedWorkers: - print("Joining all dismissed worker threads...") - main.joinAllDismissedWorkers() diff --git a/framework/dts.py b/framework/dts.py index 2d86bc3..1a08d6d 100644 --- a/framework/dts.py +++ b/framework/dts.py @@ -514,11 +514,6 @@ def run_all(config_file, pkgName, git, patch, skip_setup, if not os.path.exists(output_dir): os.mkdir(output_dir) - # add external library - exec_file = os.path.realpath(__file__) - extra_libs_path = exec_file.replace('framework/dts.py', '') + 'extra_libs' - sys.path.insert(1, extra_libs_path) - # add python module search path sys.path.append(suite_dir)