Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/mutation.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 from __future__ import absolute_import | |
2 | |
3 from collections import namedtuple | |
4 from typing import Any, Dict | |
5 | |
6 from typing_extensions import Text # pylint: disable=unused-import | |
7 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
8 | |
9 from .errors import WorkflowException | |
10 | |
11 | |
12 MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) | |
13 | |
14 _generation = "http://commonwl.org/cwltool#generation" | |
15 | |
16 class MutationManager(object): | |
17 """Lock manager for checking correctness of in-place update of files. | |
18 | |
19 Used to validate that in-place file updates happen sequentially, and that a | |
20 file which is registered for in-place update cannot be read or updated by | |
21 any other steps. | |
22 | |
23 """ | |
24 | |
25 def __init__(self): # type: () -> None | |
26 """Initialize.""" | |
27 self.generations = {} # type: Dict[Text, MutationState] | |
28 | |
29 def register_reader(self, stepname, obj): | |
30 # type: (Text, Dict[Text, Any]) -> None | |
31 loc = obj["location"] | |
32 current = self.generations.get(loc, MutationState(0, [], "")) | |
33 obj_generation = obj.get(_generation, 0) | |
34 | |
35 if obj_generation != current.generation: | |
36 raise WorkflowException( | |
37 "[job {}] wants to read {} from generation {} but current " | |
38 "generation is {}(last updated by {})".format( | |
39 stepname, loc, obj_generation, current.generation, current.stepname)) | |
40 | |
41 current.readers.append(stepname) | |
42 self.generations[loc] = current | |
43 | |
44 def release_reader(self, stepname, obj): | |
45 # type: (Text, Dict[Text, Any]) -> None | |
46 loc = obj["location"] | |
47 current = self.generations.get(loc, MutationState(0, [], "")) | |
48 obj_generation = obj.get(_generation, 0) | |
49 | |
50 if obj_generation != current.generation: | |
51 raise WorkflowException( | |
52 "[job {}] wants to release reader on {} from generation {}" | |
53 " but current generation is {} (last updated by {})".format( | |
54 stepname, loc, obj_generation, current.generation, | |
55 current.stepname)) | |
56 | |
57 self.generations[loc].readers.remove(stepname) | |
58 | |
59 def register_mutation(self, stepname, obj): | |
60 # type: (Text, Dict[Text, Any]) -> None | |
61 loc = obj["location"] | |
62 current = self.generations.get(loc, MutationState(0, [], "")) | |
63 obj_generation = obj.get(_generation, 0) | |
64 | |
65 if len(current.readers) > 0: | |
66 raise WorkflowException( | |
67 "[job {}] wants to modify {} but has readers: {}".format( | |
68 stepname, loc, current.readers)) | |
69 | |
70 if obj_generation != current.generation: | |
71 raise WorkflowException( | |
72 "[job {}] wants to modify {} from generation {} but current " | |
73 "generation is {} (last updated by {})".format( | |
74 stepname, loc, obj_generation, current.generation, current.stepname)) | |
75 | |
76 self.generations[loc] = MutationState(current.generation+1, current.readers, stepname) | |
77 | |
78 def set_generation(self, obj): # type: (Dict[Text, Text]) -> None | |
79 loc = obj["location"] | |
80 current = self.generations.get(loc, MutationState(0, [], "")) | |
81 obj[_generation] = current.generation | |
82 | |
83 def unset_generation(self, obj): # type: (Dict[Text, Text]) -> None | |
84 obj.pop(_generation, None) |