Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/task_queue.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 # Copyright (C) The Arvados Authors. All rights reserved. | |
2 # | |
3 # SPDX-License-Identifier: Apache-2.0 | |
4 """TaskQueue.""" | |
5 | |
6 import queue | |
7 import threading | |
8 from typing import Callable, Optional | |
9 | |
10 from .loghandler import _logger | |
11 | |
12 | |
13 class TaskQueue: | |
14 """A TaskQueue class. | |
15 | |
16 Uses a first-in, first-out queue of tasks executed on a fixed number of | |
17 threads. | |
18 | |
19 New tasks enter the queue and are started in the order received, | |
20 as worker threads become available. | |
21 | |
22 If thread_count == 0 then tasks will be synchronously executed | |
23 when add() is called (this makes the actual task queue behavior a | |
24 no-op, but may be a useful configuration knob). | |
25 | |
26 The thread_count is also used as the maximum size of the queue. | |
27 | |
28 The threads are created during TaskQueue initialization. Call | |
29 join() when you're done with the TaskQueue and want the threads to | |
30 stop. | |
31 | |
32 | |
33 Attributes | |
34 ---------- | |
35 in_flight | |
36 the number of tasks in the queue | |
37 | |
38 """ | |
39 | |
40 def __init__(self, lock: threading.Lock, thread_count: int): | |
41 """Create a new task queue using the specified lock and number of threads.""" | |
42 self.thread_count = thread_count | |
43 self.task_queue: queue.Queue[Optional[Callable[[], None]]] = queue.Queue( | |
44 maxsize=self.thread_count | |
45 ) | |
46 self.task_queue_threads = [] | |
47 self.lock = lock | |
48 self.in_flight = 0 | |
49 self.error: Optional[BaseException] = None | |
50 | |
51 for _r in range(0, self.thread_count): | |
52 t = threading.Thread(target=self._task_queue_func) | |
53 self.task_queue_threads.append(t) | |
54 t.start() | |
55 | |
56 def _task_queue_func(self) -> None: | |
57 while True: | |
58 task = self.task_queue.get() | |
59 if task is None: | |
60 return | |
61 try: | |
62 task() | |
63 except BaseException as e: | |
64 _logger.exception("Unhandled exception running task") | |
65 self.error = e | |
66 finally: | |
67 with self.lock: | |
68 self.in_flight -= 1 | |
69 | |
70 def add( | |
71 self, | |
72 task: Callable[[], None], | |
73 unlock: Optional[threading.Condition] = None, | |
74 check_done: Optional[threading.Event] = None, | |
75 ) -> None: | |
76 """ | |
77 Add your task to the queue. | |
78 | |
79 The optional unlock will be released prior to attempting to add the | |
80 task to the queue. | |
81 | |
82 If the optional "check_done" threading.Event's flag is set, then we | |
83 will skip adding this task to the queue. | |
84 | |
85 If the TaskQueue was created with thread_count == 0 then your task will | |
86 be synchronously executed. | |
87 | |
88 """ | |
89 if self.thread_count == 0: | |
90 task() | |
91 return | |
92 | |
93 with self.lock: | |
94 self.in_flight += 1 | |
95 | |
96 while True: | |
97 try: | |
98 if unlock is not None: | |
99 unlock.release() | |
100 if check_done is not None and check_done.is_set(): | |
101 with self.lock: | |
102 self.in_flight -= 1 | |
103 return | |
104 self.task_queue.put(task, block=True, timeout=3) | |
105 return | |
106 except queue.Full: | |
107 pass | |
108 finally: | |
109 if unlock is not None: | |
110 unlock.acquire() | |
111 | |
112 def drain(self) -> None: | |
113 """Drain the queue.""" | |
114 try: | |
115 while not self.task_queue.empty(): | |
116 self.task_queue.get(True, 0.1) | |
117 except queue.Empty: | |
118 pass | |
119 | |
120 def join(self) -> None: | |
121 """Wait for all threads to complete.""" | |
122 for _t in self.task_queue_threads: | |
123 self.task_queue.put(None) | |
124 for t in self.task_queue_threads: | |
125 t.join() |