Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/boto/swf/layer2.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/boto/swf/layer2.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,347 @@ +"""Object-oriented interface to SWF wrapping boto.swf.layer1.Layer1""" + +import time +from functools import wraps +from boto.swf.layer1 import Layer1 +from boto.swf.layer1_decisions import Layer1Decisions + +DEFAULT_CREDENTIALS = { + 'aws_access_key_id': None, + 'aws_secret_access_key': None +} + +def set_default_credentials(aws_access_key_id, aws_secret_access_key): + """Set default credentials.""" + DEFAULT_CREDENTIALS.update({ + 'aws_access_key_id': aws_access_key_id, + 'aws_secret_access_key': aws_secret_access_key, + }) + +class SWFBase(object): + + name = None + domain = None + aws_access_key_id = None + aws_secret_access_key = None + region = None + + def __init__(self, **kwargs): + # Set default credentials. + for credkey in ('aws_access_key_id', 'aws_secret_access_key'): + if DEFAULT_CREDENTIALS.get(credkey): + setattr(self, credkey, DEFAULT_CREDENTIALS[credkey]) + # Override attributes with keyword args. + for kwarg in kwargs: + setattr(self, kwarg, kwargs[kwarg]) + + self._swf = Layer1(self.aws_access_key_id, + self.aws_secret_access_key, + region=self.region) + + def __repr__(self): + rep_str = str(self.name) + if hasattr(self, 'version'): + rep_str += '-' + str(getattr(self, 'version')) + return '<%s %r at 0x%x>' % (self.__class__.__name__, rep_str, id(self)) + +class Domain(SWFBase): + + """Simple Workflow Domain.""" + + description = None + retention = 30 + @wraps(Layer1.describe_domain) + def describe(self): + """DescribeDomain.""" + return self._swf.describe_domain(self.name) + + @wraps(Layer1.deprecate_domain) + def deprecate(self): + """DeprecateDomain""" + self._swf.deprecate_domain(self.name) + + @wraps(Layer1.register_domain) + def register(self): + """RegisterDomain.""" + self._swf.register_domain(self.name, str(self.retention), + self.description) + + @wraps(Layer1.list_activity_types) + def activities(self, status='REGISTERED', **kwargs): + """ListActivityTypes.""" + act_types = self._swf.list_activity_types(self.name, status, **kwargs) + act_objects = [] + for act_args in act_types['typeInfos']: + act_ident = act_args['activityType'] + del act_args['activityType'] + act_args.update(act_ident) + act_args.update({ + 'aws_access_key_id': self.aws_access_key_id, + 'aws_secret_access_key': self.aws_secret_access_key, + 'domain': self.name, + 'region': self.region, + }) + act_objects.append(ActivityType(**act_args)) + return act_objects + + @wraps(Layer1.list_workflow_types) + def workflows(self, status='REGISTERED', **kwargs): + """ListWorkflowTypes.""" + wf_types = self._swf.list_workflow_types(self.name, status, **kwargs) + wf_objects = [] + for wf_args in wf_types['typeInfos']: + wf_ident = wf_args['workflowType'] + del wf_args['workflowType'] + wf_args.update(wf_ident) + wf_args.update({ + 'aws_access_key_id': self.aws_access_key_id, + 'aws_secret_access_key': self.aws_secret_access_key, + 'domain': self.name, + 'region': self.region, + }) + + wf_objects.append(WorkflowType(**wf_args)) + return wf_objects + + def executions(self, closed=False, **kwargs): + """List list open/closed executions. + + For a full list of available parameters refer to + :py:func:`boto.swf.layer1.Layer1.list_closed_workflow_executions` and + :py:func:`boto.swf.layer1.Layer1.list_open_workflow_executions` + """ + if closed: + executions = self._swf.list_closed_workflow_executions(self.name, + **kwargs) + else: + if 'oldest_date' not in kwargs: + # Last 24 hours. + kwargs['oldest_date'] = time.time() - (3600 * 24) + executions = self._swf.list_open_workflow_executions(self.name, + **kwargs) + exe_objects = [] + for exe_args in executions['executionInfos']: + for nested_key in ('execution', 'workflowType'): + nested_dict = exe_args[nested_key] + del exe_args[nested_key] + exe_args.update(nested_dict) + + exe_args.update({ + 'aws_access_key_id': self.aws_access_key_id, + 'aws_secret_access_key': self.aws_secret_access_key, + 'domain': self.name, + 'region': self.region, + }) + + exe_objects.append(WorkflowExecution(**exe_args)) + return exe_objects + + @wraps(Layer1.count_pending_activity_tasks) + def count_pending_activity_tasks(self, task_list): + """CountPendingActivityTasks.""" + return self._swf.count_pending_activity_tasks(self.name, task_list) + + @wraps(Layer1.count_pending_decision_tasks) + def count_pending_decision_tasks(self, task_list): + """CountPendingDecisionTasks.""" + return self._swf.count_pending_decision_tasks(self.name, task_list) + + +class Actor(SWFBase): + + task_list = None + last_tasktoken = None + domain = None + + def run(self): + """To be overloaded by subclasses.""" + raise NotImplementedError() + +class ActivityWorker(Actor): + + """Base class for SimpleWorkflow activity workers.""" + + @wraps(Layer1.respond_activity_task_canceled) + def cancel(self, task_token=None, details=None): + """RespondActivityTaskCanceled.""" + if task_token is None: + task_token = self.last_tasktoken + return self._swf.respond_activity_task_canceled(task_token, details) + + @wraps(Layer1.respond_activity_task_completed) + def complete(self, task_token=None, result=None): + """RespondActivityTaskCompleted.""" + if task_token is None: + task_token = self.last_tasktoken + return self._swf.respond_activity_task_completed(task_token, result) + + @wraps(Layer1.respond_activity_task_failed) + def fail(self, task_token=None, details=None, reason=None): + """RespondActivityTaskFailed.""" + if task_token is None: + task_token = self.last_tasktoken + return self._swf.respond_activity_task_failed(task_token, details, + reason) + + @wraps(Layer1.record_activity_task_heartbeat) + def heartbeat(self, task_token=None, details=None): + """RecordActivityTaskHeartbeat.""" + if task_token is None: + task_token = self.last_tasktoken + return self._swf.record_activity_task_heartbeat(task_token, details) + + @wraps(Layer1.poll_for_activity_task) + def poll(self, **kwargs): + """PollForActivityTask.""" + task_list = self.task_list + if 'task_list' in kwargs: + task_list = kwargs.get('task_list') + del kwargs['task_list'] + task = self._swf.poll_for_activity_task(self.domain, task_list, + **kwargs) + self.last_tasktoken = task.get('taskToken') + return task + +class Decider(Actor): + + """Base class for SimpleWorkflow deciders.""" + + @wraps(Layer1.respond_decision_task_completed) + def complete(self, task_token=None, decisions=None, **kwargs): + """RespondDecisionTaskCompleted.""" + if isinstance(decisions, Layer1Decisions): + # Extract decision list from a Layer1Decisions instance. + decisions = decisions._data + if task_token is None: + task_token = self.last_tasktoken + return self._swf.respond_decision_task_completed(task_token, decisions, + **kwargs) + + @wraps(Layer1.poll_for_decision_task) + def poll(self, **kwargs): + """PollForDecisionTask.""" + task_list = self.task_list + if 'task_list' in kwargs: + task_list = kwargs.get('task_list') + del kwargs['task_list'] + decision_task = self._swf.poll_for_decision_task(self.domain, task_list, + **kwargs) + self.last_tasktoken = decision_task.get('taskToken') + return decision_task + +class WorkflowType(SWFBase): + + """A versioned workflow type.""" + + version = None + task_list = None + child_policy = 'TERMINATE' + + @wraps(Layer1.describe_workflow_type) + def describe(self): + """DescribeWorkflowType.""" + return self._swf.describe_workflow_type(self.domain, self.name, + self.version) + @wraps(Layer1.register_workflow_type) + def register(self, **kwargs): + """RegisterWorkflowType.""" + args = { + 'default_execution_start_to_close_timeout': '3600', + 'default_task_start_to_close_timeout': '300', + 'default_child_policy': 'TERMINATE', + } + args.update(kwargs) + self._swf.register_workflow_type(self.domain, self.name, self.version, + **args) + + @wraps(Layer1.deprecate_workflow_type) + def deprecate(self): + """DeprecateWorkflowType.""" + self._swf.deprecate_workflow_type(self.domain, self.name, self.version) + + @wraps(Layer1.start_workflow_execution) + def start(self, **kwargs): + """StartWorkflowExecution.""" + if 'workflow_id' in kwargs: + workflow_id = kwargs['workflow_id'] + del kwargs['workflow_id'] + else: + workflow_id = '%s-%s-%i' % (self.name, self.version, time.time()) + + for def_attr in ('task_list', 'child_policy'): + kwargs[def_attr] = kwargs.get(def_attr, getattr(self, def_attr)) + run_id = self._swf.start_workflow_execution(self.domain, workflow_id, + self.name, self.version, **kwargs)['runId'] + return WorkflowExecution(name=self.name, version=self.version, + runId=run_id, domain=self.domain, workflowId=workflow_id, + aws_access_key_id=self.aws_access_key_id, + aws_secret_access_key=self.aws_secret_access_key) + +class WorkflowExecution(SWFBase): + + """An instance of a workflow.""" + + workflowId = None + runId = None + + @wraps(Layer1.signal_workflow_execution) + def signal(self, signame, **kwargs): + """SignalWorkflowExecution.""" + self._swf.signal_workflow_execution(self.domain, signame, + self.workflowId, **kwargs) + + @wraps(Layer1.terminate_workflow_execution) + def terminate(self, **kwargs): + """TerminateWorkflowExecution (p. 103).""" + return self._swf.terminate_workflow_execution(self.domain, + self.workflowId, **kwargs) + + @wraps(Layer1.get_workflow_execution_history) + def history(self, **kwargs): + """GetWorkflowExecutionHistory.""" + return self._swf.get_workflow_execution_history(self.domain, self.runId, + self.workflowId, **kwargs)['events'] + + @wraps(Layer1.describe_workflow_execution) + def describe(self): + """DescribeWorkflowExecution.""" + return self._swf.describe_workflow_execution(self.domain, self.runId, + self.workflowId) + + @wraps(Layer1.request_cancel_workflow_execution) + def request_cancel(self): + """RequestCancelWorkflowExecution.""" + return self._swf.request_cancel_workflow_execution(self.domain, + self.workflowId, self.runId) + + +class ActivityType(SWFBase): + + """A versioned activity type.""" + + version = None + + @wraps(Layer1.deprecate_activity_type) + def deprecate(self): + """DeprecateActivityType.""" + return self._swf.deprecate_activity_type(self.domain, self.name, + self.version) + + @wraps(Layer1.describe_activity_type) + def describe(self): + """DescribeActivityType.""" + return self._swf.describe_activity_type(self.domain, self.name, + self.version) + + @wraps(Layer1.register_activity_type) + def register(self, **kwargs): + """RegisterActivityType.""" + args = { + 'default_task_heartbeat_timeout': '600', + 'default_task_schedule_to_close_timeout': '3900', + 'default_task_schedule_to_start_timeout': '300', + 'default_task_start_to_close_timeout': '3600', + } + args.update(kwargs) + self._swf.register_activity_type(self.domain, self.name, self.version, + **args)