Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/task_queue.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 # Copyright (C) The Arvados Authors. All rights reserved. | |
| 2 # | |
| 3 # SPDX-License-Identifier: Apache-2.0 | |
| 4 """TaskQueue.""" | |
| 5 | |
| 6 import queue | |
| 7 import threading | |
| 8 from typing import Callable, Optional | |
| 9 | |
| 10 from .loghandler import _logger | |
| 11 | |
| 12 | |
| 13 class TaskQueue: | |
| 14 """A TaskQueue class. | |
| 15 | |
| 16 Uses a first-in, first-out queue of tasks executed on a fixed number of | |
| 17 threads. | |
| 18 | |
| 19 New tasks enter the queue and are started in the order received, | |
| 20 as worker threads become available. | |
| 21 | |
| 22 If thread_count == 0 then tasks will be synchronously executed | |
| 23 when add() is called (this makes the actual task queue behavior a | |
| 24 no-op, but may be a useful configuration knob). | |
| 25 | |
| 26 The thread_count is also used as the maximum size of the queue. | |
| 27 | |
| 28 The threads are created during TaskQueue initialization. Call | |
| 29 join() when you're done with the TaskQueue and want the threads to | |
| 30 stop. | |
| 31 | |
| 32 | |
| 33 Attributes | |
| 34 ---------- | |
| 35 in_flight | |
| 36 the number of tasks in the queue | |
| 37 | |
| 38 """ | |
| 39 | |
| 40 def __init__(self, lock: threading.Lock, thread_count: int): | |
| 41 """Create a new task queue using the specified lock and number of threads.""" | |
| 42 self.thread_count = thread_count | |
| 43 self.task_queue: queue.Queue[Optional[Callable[[], None]]] = queue.Queue( | |
| 44 maxsize=self.thread_count | |
| 45 ) | |
| 46 self.task_queue_threads = [] | |
| 47 self.lock = lock | |
| 48 self.in_flight = 0 | |
| 49 self.error: Optional[BaseException] = None | |
| 50 | |
| 51 for _r in range(0, self.thread_count): | |
| 52 t = threading.Thread(target=self._task_queue_func) | |
| 53 self.task_queue_threads.append(t) | |
| 54 t.start() | |
| 55 | |
| 56 def _task_queue_func(self) -> None: | |
| 57 while True: | |
| 58 task = self.task_queue.get() | |
| 59 if task is None: | |
| 60 return | |
| 61 try: | |
| 62 task() | |
| 63 except BaseException as e: | |
| 64 _logger.exception("Unhandled exception running task") | |
| 65 self.error = e | |
| 66 finally: | |
| 67 with self.lock: | |
| 68 self.in_flight -= 1 | |
| 69 | |
| 70 def add( | |
| 71 self, | |
| 72 task: Callable[[], None], | |
| 73 unlock: Optional[threading.Condition] = None, | |
| 74 check_done: Optional[threading.Event] = None, | |
| 75 ) -> None: | |
| 76 """ | |
| 77 Add your task to the queue. | |
| 78 | |
| 79 The optional unlock will be released prior to attempting to add the | |
| 80 task to the queue. | |
| 81 | |
| 82 If the optional "check_done" threading.Event's flag is set, then we | |
| 83 will skip adding this task to the queue. | |
| 84 | |
| 85 If the TaskQueue was created with thread_count == 0 then your task will | |
| 86 be synchronously executed. | |
| 87 | |
| 88 """ | |
| 89 if self.thread_count == 0: | |
| 90 task() | |
| 91 return | |
| 92 | |
| 93 with self.lock: | |
| 94 self.in_flight += 1 | |
| 95 | |
| 96 while True: | |
| 97 try: | |
| 98 if unlock is not None: | |
| 99 unlock.release() | |
| 100 if check_done is not None and check_done.is_set(): | |
| 101 with self.lock: | |
| 102 self.in_flight -= 1 | |
| 103 return | |
| 104 self.task_queue.put(task, block=True, timeout=3) | |
| 105 return | |
| 106 except queue.Full: | |
| 107 pass | |
| 108 finally: | |
| 109 if unlock is not None: | |
| 110 unlock.acquire() | |
| 111 | |
| 112 def drain(self) -> None: | |
| 113 """Drain the queue.""" | |
| 114 try: | |
| 115 while not self.task_queue.empty(): | |
| 116 self.task_queue.get(True, 0.1) | |
| 117 except queue.Empty: | |
| 118 pass | |
| 119 | |
| 120 def join(self) -> None: | |
| 121 """Wait for all threads to complete.""" | |
| 122 for _t in self.task_queue_threads: | |
| 123 self.task_queue.put(None) | |
| 124 for t in self.task_queue_threads: | |
| 125 t.join() |
