Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/cwltool/mutation.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/cwltool/mutation.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,84 @@ +from __future__ import absolute_import + +from collections import namedtuple +from typing import Any, Dict + +from typing_extensions import Text # pylint: disable=unused-import +# move to a regular typing import when Python 3.3-3.6 is no longer supported + +from .errors import WorkflowException + + +MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) + +_generation = "http://commonwl.org/cwltool#generation" + +class MutationManager(object): + """Lock manager for checking correctness of in-place update of files. + + Used to validate that in-place file updates happen sequentially, and that a + file which is registered for in-place update cannot be read or updated by + any other steps. + + """ + + def __init__(self): # type: () -> None + """Initialize.""" + self.generations = {} # type: Dict[Text, MutationState] + + def register_reader(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0, [], "")) + obj_generation = obj.get(_generation, 0) + + if obj_generation != current.generation: + raise WorkflowException( + "[job {}] wants to read {} from generation {} but current " + "generation is {}(last updated by {})".format( + stepname, loc, obj_generation, current.generation, current.stepname)) + + current.readers.append(stepname) + self.generations[loc] = current + + def release_reader(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0, [], "")) + obj_generation = obj.get(_generation, 0) + + if obj_generation != current.generation: + raise WorkflowException( + "[job {}] wants to release reader on {} from generation {}" + " but current generation is {} (last updated by {})".format( + stepname, loc, obj_generation, current.generation, + current.stepname)) + + self.generations[loc].readers.remove(stepname) + + def register_mutation(self, stepname, obj): + # type: (Text, Dict[Text, Any]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0, [], "")) + obj_generation = obj.get(_generation, 0) + + if len(current.readers) > 0: + raise WorkflowException( + "[job {}] wants to modify {} but has readers: {}".format( + stepname, loc, current.readers)) + + if obj_generation != current.generation: + raise WorkflowException( + "[job {}] wants to modify {} from generation {} but current " + "generation is {} (last updated by {})".format( + stepname, loc, obj_generation, current.generation, current.stepname)) + + self.generations[loc] = MutationState(current.generation+1, current.readers, stepname) + + def set_generation(self, obj): # type: (Dict[Text, Text]) -> None + loc = obj["location"] + current = self.generations.get(loc, MutationState(0, [], "")) + obj[_generation] = current.generation + + def unset_generation(self, obj): # type: (Dict[Text, Text]) -> None + obj.pop(_generation, None)