e3.testsuite.multiprocess_scheduler =================================== .. py:module:: e3.testsuite.multiprocess_scheduler Attributes ---------- .. autoapisummary:: e3.testsuite.multiprocess_scheduler.logger e3.testsuite.multiprocess_scheduler.WorkData e3.testsuite.multiprocess_scheduler.SomeWorker e3.testsuite.multiprocess_scheduler.JobFactoryCallback e3.testsuite.multiprocess_scheduler.CollectResultCallback Classes ------- .. autoapisummary:: e3.testsuite.multiprocess_scheduler.Worker e3.testsuite.multiprocess_scheduler.MultiprocessScheduler Functions --------- .. autoapisummary:: e3.testsuite.multiprocess_scheduler.compute_next_dyn_poll Module Contents --------------- .. py:data:: logger .. py:class:: Worker(uid: str, driver: e3.testsuite.driver.TestDriver, callback_name: str, slot: int, env: e3.env.Env) Abstract class to represent units of work for the scheduler. .. py:attribute:: index_generator Generate unique indexes for each worker. .. py:attribute:: uid .. py:attribute:: driver .. py:attribute:: callback_name .. py:attribute:: slot .. py:attribute:: env .. py:attribute:: index .. py:attribute:: process Process that executes this test fragment. .. py:method:: start() -> e3.os.process.Run :abstractmethod: Create and return the subprocess to do the work. All subclasses must override this. .. py:method:: poll(scheduler: MultiprocessScheduler) -> bool Return whether the subprocess is still running. If it is, the caller should invoke `MultiprocessScheduler.collect_result` on it. .. py:data:: WorkData Type that contains all the information needed to do some unit of work. .. py:data:: SomeWorker Worker subclass to start some unit of work. .. py:data:: JobFactoryCallback Callback to create a Worker instance from work data. Arguments are: * UID for this unit of work; * data for the work to do; * slot ID for the new worker. Returned value is the Worker instance. .. py:data:: CollectResultCallback Callback to extract work result from a worker. .. py:class:: MultiprocessScheduler(dag: e3.collection.dag.DAG, job_factory: JobFactoryCallback, collect_result: CollectResultCallback, jobs: int = 0, dyn_poll_interval: bool = True) Bases: :py:obj:`Generic`\ [\ :py:obj:`WorkData`\ , :py:obj:`SomeWorker`\ ] Scheduler to dispatch units of work to subprocesses. .. py:attribute:: parallelism .. py:attribute:: dag .. py:attribute:: workers :type: List[Optional[SomeWorker]] List of active workers. Indexes in this list correspond to slot IDs passed to workers: `self.workers[N].slot == N` for all present wor,kers. When the worker is done, we just replace it with None, and when a slot is None we can create a new worker for it. .. py:attribute:: iterator Iterator to get ready-to-run units of work. .. py:attribute:: job_factory .. py:attribute:: collect_result .. py:attribute:: active_workers :value: 0 Equivalent to the number of non-None slots in ``self.workers``. .. py:attribute:: poll_interval :value: 0.1 Time (in seconds) to wait between each round of worker polling. .. py:attribute:: dyn_poll_interval :value: True .. py:attribute:: no_free_item :value: False True if there is work waiting to be executed, False if all work to be scheduled depends on work that hasn't completed. .. py:attribute:: no_work_left :value: False True if we processed all items from ``self.iterator`` (i.e. we got a ``StopIteration`` exception from it). .. py:property:: has_free_slots :type: bool Return whether there is a free slot to spawn a worker. .. py:method:: spawn_worker(uid: str, data: WorkData, slot: int) -> None Create a worker and assign it to the given slot. .. py:method:: release_worker(slot: int) -> None Release a worker, freeing the corresponding slot. .. py:method:: run() -> None Run the loop to execute all units of work. .. py:method:: poll() -> None .. py:function:: compute_next_dyn_poll(poll_counter: int, poll_interval: float) -> float Adjust the polling interval.