Mercurial > repos > shellac > guppy_basecaller
view env/lib/python3.7/site-packages/cwltool/mutation.py @ 4:79f47841a781 draft
"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author | shellac |
---|---|
date | Thu, 14 May 2020 16:47:39 -0400 |
parents | 26e78fe6e8c4 |
children |
line wrap: on
line source
from __future__ import absolute_import from collections import namedtuple from typing import Any, Dict from typing_extensions import Text # pylint: disable=unused-import # move to a regular typing import when Python 3.3-3.6 is no longer supported from .errors import WorkflowException MutationState = namedtuple("MutationTracker", ["generation", "readers", "stepname"]) _generation = "http://commonwl.org/cwltool#generation" class MutationManager(object): """Lock manager for checking correctness of in-place update of files. Used to validate that in-place file updates happen sequentially, and that a file which is registered for in-place update cannot be read or updated by any other steps. """ def __init__(self): # type: () -> None """Initialize.""" self.generations = {} # type: Dict[Text, MutationState] def register_reader(self, stepname, obj): # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] current = self.generations.get(loc, MutationState(0, [], "")) obj_generation = obj.get(_generation, 0) if obj_generation != current.generation: raise WorkflowException( "[job {}] wants to read {} from generation {} but current " "generation is {}(last updated by {})".format( stepname, loc, obj_generation, current.generation, current.stepname)) current.readers.append(stepname) self.generations[loc] = current def release_reader(self, stepname, obj): # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] current = self.generations.get(loc, MutationState(0, [], "")) obj_generation = obj.get(_generation, 0) if obj_generation != current.generation: raise WorkflowException( "[job {}] wants to release reader on {} from generation {}" " but current generation is {} (last updated by {})".format( stepname, loc, obj_generation, current.generation, current.stepname)) self.generations[loc].readers.remove(stepname) def register_mutation(self, stepname, obj): # type: (Text, Dict[Text, Any]) -> None loc = obj["location"] current = self.generations.get(loc, MutationState(0, [], "")) obj_generation = obj.get(_generation, 0) if len(current.readers) > 0: raise WorkflowException( "[job {}] wants to modify {} but has readers: {}".format( stepname, loc, current.readers)) if obj_generation != current.generation: raise WorkflowException( "[job {}] wants to modify {} from generation {} but current " "generation is {} (last updated by {})".format( stepname, loc, obj_generation, current.generation, current.stepname)) self.generations[loc] = MutationState(current.generation+1, current.readers, stepname) def set_generation(self, obj): # type: (Dict[Text, Text]) -> None loc = obj["location"] current = self.generations.get(loc, MutationState(0, [], "")) obj[_generation] = current.generation def unset_generation(self, obj): # type: (Dict[Text, Text]) -> None obj.pop(_generation, None)