| # Copyright 2014 The Chromium Authors. All rights reserved. |
| # Use of this source code is governed by a BSD-style license that can be |
| # found in the LICENSE file. |
| """ Wrapper that allows method execution in parallel. |
| |
| This class wraps a list of objects of the same type, emulates their |
| interface, and executes any functions called on the objects in parallel |
| in ReraiserThreads. |
| |
| This means that, given a list of objects: |
| |
| class Foo: |
| def __init__(self): |
| self.baz = Baz() |
| |
| def bar(self, my_param): |
| // do something |
| |
| list_of_foos = [Foo(1), Foo(2), Foo(3)] |
| |
| we can take a sequential operation on that list of objects: |
| |
| for f in list_of_foos: |
| f.bar('Hello') |
| |
| and run it in parallel across all of the objects: |
| |
| Parallelizer(list_of_foos).bar('Hello') |
| |
| It can also handle (non-method) attributes of objects, so that this: |
| |
| for f in list_of_foos: |
| f.baz.myBazMethod() |
| |
| can be run in parallel with: |
| |
| Parallelizer(list_of_foos).baz.myBazMethod() |
| |
| Because it emulates the interface of the wrapped objects, a Parallelizer |
| can be passed to a method or function that takes objects of that type: |
| |
| def DoesSomethingWithFoo(the_foo): |
| the_foo.bar('Hello') |
| the_foo.bar('world') |
| the_foo.baz.myBazMethod |
| |
| DoesSomethingWithFoo(Parallelizer(list_of_foos)) |
| |
| Note that this class spins up a thread for each object. Using this class |
| to parallelize operations that are already fast will incur a net performance |
| penalty. |
| |
| """ |
| # pylint: disable=protected-access |
| |
| from devil.utils import reraiser_thread |
| from devil.utils import watchdog_timer |
| |
| _DEFAULT_TIMEOUT = 30 |
| _DEFAULT_RETRIES = 3 |
| |
| |
| class Parallelizer(object): |
| """Allows parallel execution of method calls across a group of objects.""" |
| |
| def __init__(self, objs): |
| self._orig_objs = objs |
| self._objs = objs |
| |
| def __getattr__(self, name): |
| """Emulate getting the |name| attribute of |self|. |
| |
| Args: |
| name: The name of the attribute to retrieve. |
| Returns: |
| A Parallelizer emulating the |name| attribute of |self|. |
| """ |
| self.pGet(None) |
| |
| r = type(self)(self._orig_objs) |
| r._objs = [getattr(o, name) for o in self._objs] |
| return r |
| |
| def __getitem__(self, index): |
| """Emulate getting the value of |self| at |index|. |
| |
| Returns: |
| A Parallelizer emulating the value of |self| at |index|. |
| """ |
| self.pGet(None) |
| |
| r = type(self)(self._orig_objs) |
| r._objs = [o[index] for o in self._objs] |
| return r |
| |
| def __call__(self, *args, **kwargs): |
| """Emulate calling |self| with |args| and |kwargs|. |
| |
| Note that this call is asynchronous. Call pFinish on the return value to |
| block until the call finishes. |
| |
| Returns: |
| A Parallelizer wrapping the ReraiserThreadGroup running the call in |
| parallel. |
| Raises: |
| AttributeError if the wrapped objects aren't callable. |
| """ |
| self.pGet(None) |
| |
| for o in self._objs: |
| if not callable(o): |
| raise AttributeError("'%s' is not callable" % o.__name__) |
| |
| r = type(self)(self._orig_objs) |
| r._objs = reraiser_thread.ReraiserThreadGroup([ |
| reraiser_thread.ReraiserThread( |
| o, args=args, kwargs=kwargs, name='%s.%s' % (str(d), o.__name__)) |
| for d, o in zip(self._orig_objs, self._objs) |
| ]) |
| r._objs.StartAll() |
| return r |
| |
| def pFinish(self, timeout): |
| """Finish any outstanding asynchronous operations. |
| |
| Args: |
| timeout: The maximum number of seconds to wait for an individual |
| result to return, or None to wait forever. |
| Returns: |
| self, now emulating the return values. |
| """ |
| self._assertNoShadow('pFinish') |
| if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): |
| self._objs.JoinAll() |
| self._objs = self._objs.GetAllReturnValues( |
| watchdog_timer.WatchdogTimer(timeout)) |
| return self |
| |
| def pGet(self, timeout): |
| """Get the current wrapped objects. |
| |
| Args: |
| timeout: Same as |pFinish|. |
| Returns: |
| A list of the results, in order of the provided devices. |
| Raises: |
| Any exception raised by any of the called functions. |
| """ |
| self._assertNoShadow('pGet') |
| self.pFinish(timeout) |
| return self._objs |
| |
| def pMap(self, f, *args, **kwargs): |
| """Map a function across the current wrapped objects in parallel. |
| |
| This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. |
| |
| Note that this call is asynchronous. Call pFinish on the return value to |
| block until the call finishes. |
| |
| Args: |
| f: The function to call. |
| args: The positional args to pass to f. |
| kwargs: The keyword args to pass to f. |
| Returns: |
| A Parallelizer wrapping the ReraiserThreadGroup running the map in |
| parallel. |
| """ |
| self._assertNoShadow('pMap') |
| r = type(self)(self._orig_objs) |
| r._objs = reraiser_thread.ReraiserThreadGroup([ |
| reraiser_thread.ReraiserThread( |
| f, |
| args=tuple([o] + list(args)), |
| kwargs=kwargs, |
| name='%s(%s)' % (f.__name__, d)) |
| for d, o in zip(self._orig_objs, self._objs) |
| ]) |
| r._objs.StartAll() |
| return r |
| |
| def _assertNoShadow(self, attr_name): |
| """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts. |
| |
| If the wrapped objects _do_ have an |attr_name| attribute, it will be |
| inaccessible to clients. |
| |
| Args: |
| attr_name: The attribute to check. |
| Raises: |
| AssertionError if the wrapped objects have an attribute named 'attr_name' |
| or '_assertNoShadow'. |
| """ |
| if isinstance(self._objs, reraiser_thread.ReraiserThreadGroup): |
| assert not hasattr(self._objs, '_assertNoShadow') |
| assert not hasattr(self._objs, attr_name) |
| else: |
| assert not any(hasattr(o, '_assertNoShadow') for o in self._objs) |
| assert not any(hasattr(o, attr_name) for o in self._objs) |
| |
| |
| class SyncParallelizer(Parallelizer): |
| """A Parallelizer that blocks on function calls.""" |
| |
| def __enter__(self): |
| """Emulate entering the context of |self|. |
| |
| Note that this call is synchronous. |
| |
| Returns: |
| A Parallelizer emulating the value returned from entering into the |
| context of |self|. |
| """ |
| r = type(self)(self._orig_objs) |
| r._objs = [o.__enter__ for o in r._objs] |
| return r.__call__() |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| """Emulate exiting the context of |self|. |
| |
| Note that this call is synchronous. |
| |
| Args: |
| exc_type: the exception type. |
| exc_val: the exception value. |
| exc_tb: the exception traceback. |
| """ |
| r = type(self)(self._orig_objs) |
| r._objs = [o.__exit__ for o in r._objs] |
| r.__call__(exc_type, exc_val, exc_tb) |
| |
| # override |
| def __call__(self, *args, **kwargs): |
| """Emulate calling |self| with |args| and |kwargs|. |
| |
| Note that this call is synchronous. |
| |
| Returns: |
| A Parallelizer emulating the value returned from calling |self| with |
| |args| and |kwargs|. |
| Raises: |
| AttributeError if the wrapped objects aren't callable. |
| """ |
| r = super(SyncParallelizer, self).__call__(*args, **kwargs) |
| r.pFinish(None) |
| return r |
| |
| # override |
| def pMap(self, f, *args, **kwargs): |
| """Map a function across the current wrapped objects in parallel. |
| |
| This calls f(o, *args, **kwargs) for each o in the set of wrapped objects. |
| |
| Note that this call is synchronous. |
| |
| Args: |
| f: The function to call. |
| args: The positional args to pass to f. |
| kwargs: The keyword args to pass to f. |
| Returns: |
| A Parallelizer wrapping the ReraiserThreadGroup running the map in |
| parallel. |
| """ |
| r = super(SyncParallelizer, self).pMap(f, *args, **kwargs) |
| r.pFinish(None) |
| return r |