comparison env/lib/python3.7/site-packages/cwltool/mutation.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 from __future__ import absolute_import
2
3 from collections import namedtuple
4 from typing import Any, Dict
5
6 from typing_extensions import Text # pylint: disable=unused-import
7 # move to a regular typing import when Python 3.3-3.6 is no longer supported
8
9 from .errors import WorkflowException
10
11
12 MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"])
13
14 _generation = "http://commonwl.org/cwltool#generation"
15
16 class MutationManager(object):
17 """Lock manager for checking correctness of in-place update of files.
18
19 Used to validate that in-place file updates happen sequentially, and that a
20 file which is registered for in-place update cannot be read or updated by
21 any other steps.
22
23 """
24
25 def __init__(self): # type: () -> None
26 """Initialize."""
27 self.generations = {} # type: Dict[Text, MutationState]
28
29 def register_reader(self, stepname, obj):
30 # type: (Text, Dict[Text, Any]) -> None
31 loc = obj["location"]
32 current = self.generations.get(loc, MutationState(0, [], ""))
33 obj_generation = obj.get(_generation, 0)
34
35 if obj_generation != current.generation:
36 raise WorkflowException(
37 "[job {}] wants to read {} from generation {} but current "
38 "generation is {}(last updated by {})".format(
39 stepname, loc, obj_generation, current.generation, current.stepname))
40
41 current.readers.append(stepname)
42 self.generations[loc] = current
43
44 def release_reader(self, stepname, obj):
45 # type: (Text, Dict[Text, Any]) -> None
46 loc = obj["location"]
47 current = self.generations.get(loc, MutationState(0, [], ""))
48 obj_generation = obj.get(_generation, 0)
49
50 if obj_generation != current.generation:
51 raise WorkflowException(
52 "[job {}] wants to release reader on {} from generation {}"
53 " but current generation is {} (last updated by {})".format(
54 stepname, loc, obj_generation, current.generation,
55 current.stepname))
56
57 self.generations[loc].readers.remove(stepname)
58
59 def register_mutation(self, stepname, obj):
60 # type: (Text, Dict[Text, Any]) -> None
61 loc = obj["location"]
62 current = self.generations.get(loc, MutationState(0, [], ""))
63 obj_generation = obj.get(_generation, 0)
64
65 if len(current.readers) > 0:
66 raise WorkflowException(
67 "[job {}] wants to modify {} but has readers: {}".format(
68 stepname, loc, current.readers))
69
70 if obj_generation != current.generation:
71 raise WorkflowException(
72 "[job {}] wants to modify {} from generation {} but current "
73 "generation is {} (last updated by {})".format(
74 stepname, loc, obj_generation, current.generation, current.stepname))
75
76 self.generations[loc] = MutationState(current.generation+1, current.readers, stepname)
77
78 def set_generation(self, obj): # type: (Dict[Text, Text]) -> None
79 loc = obj["location"]
80 current = self.generations.get(loc, MutationState(0, [], ""))
81 obj[_generation] = current.generation
82
83 def unset_generation(self, obj): # type: (Dict[Text, Text]) -> None
84 obj.pop(_generation, None)