diff env/lib/python3.7/site-packages/cwltool/workflow.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/env/lib/python3.7/site-packages/cwltool/workflow.py	Sat May 02 07:14:21 2020 -0400
@@ -0,0 +1,1020 @@
+from __future__ import absolute_import
+
+import copy
+import datetime
+import functools
+import logging
+import random
+import tempfile
+from collections import namedtuple
+from typing import (Any, Callable, Dict, Generator, Iterable, List,
+                    Mapping, MutableMapping, MutableSequence,
+                    Optional, Sequence, Tuple, Union, cast)
+from uuid import UUID  # pylint: disable=unused-import
+
+import threading
+from ruamel.yaml.comments import CommentedMap
+from schema_salad import validate
+from schema_salad.sourceline import SourceLine, indent
+from six import string_types, iteritems
+from six.moves import range
+from future.utils import raise_from
+from typing_extensions import Text  # pylint: disable=unused-import
+# move to a regular typing import when Python 3.3-3.6 is no longer supported
+
+from . import command_line_tool, context, expression, procgenerator
+from .command_line_tool import CallbackJob, ExpressionTool
+from .job import JobBase
+from .builder import content_limit_respected_read
+from .checker import can_assign_src_to_sink, static_checker
+from .context import LoadingContext  # pylint: disable=unused-import
+from .context import RuntimeContext, getdefault
+from .errors import WorkflowException
+from .load_tool import load_tool
+from .loghandler import _logger
+from .mutation import MutationManager  # pylint: disable=unused-import
+from .pathmapper import adjustDirObjs, get_listing
+from .process import Process, get_overrides, shortname, uniquename
+from .provenance import ProvenanceProfile
+from .software_requirements import (  # pylint: disable=unused-import
+    DependenciesConfiguration)
+from .stdfsaccess import StdFsAccess
+from .utils import DEFAULT_TMP_PREFIX, aslist, json_dumps
+
+WorkflowStateItem = namedtuple('WorkflowStateItem', ['parameter', 'value', 'success'])
+
+
+def default_make_tool(toolpath_object,      # type: MutableMapping[Text, Any]
+                      loadingContext        # type: LoadingContext
+                     ):  # type: (...) -> Process
+    if not isinstance(toolpath_object, MutableMapping):
+        raise WorkflowException(u"Not a dict: '%s'" % toolpath_object)
+    if "class" in toolpath_object:
+        if toolpath_object["class"] == "CommandLineTool":
+            return command_line_tool.CommandLineTool(toolpath_object, loadingContext)
+        if toolpath_object["class"] == "ExpressionTool":
+            return command_line_tool.ExpressionTool(toolpath_object, loadingContext)
+        if toolpath_object["class"] == "Workflow":
+            return Workflow(toolpath_object, loadingContext)
+        if toolpath_object["class"] == "ProcessGenerator":
+            return procgenerator.ProcessGenerator(toolpath_object, loadingContext)
+
+    raise WorkflowException(
+        u"Missing or invalid 'class' field in "
+        "%s, expecting one of: CommandLineTool, ExpressionTool, Workflow" %
+        toolpath_object["id"])
+
+
+context.default_make_tool = default_make_tool
+
+def findfiles(wo, fn=None):  # type: (Any, Optional[List[MutableMapping[Text, Any]]]) -> List[MutableMapping[Text, Any]]
+    if fn is None:
+        fn = []
+    if isinstance(wo, MutableMapping):
+        if wo.get("class") == "File":
+            fn.append(wo)
+            findfiles(wo.get("secondaryFiles", None), fn)
+        else:
+            for w in wo.values():
+                findfiles(w, fn)
+    elif isinstance(wo, MutableSequence):
+        for w in wo:
+            findfiles(w, fn)
+    return fn
+
+
+def match_types(sinktype,   # type: Union[List[Text], Text]
+                src,        # type: WorkflowStateItem
+                iid,        # type: Text
+                inputobj,   # type: Dict[Text, Any]
+                linkMerge,  # type: Text
+                valueFrom   # type: Optional[Text]
+               ):  # type: (...) -> bool
+    if isinstance(sinktype, MutableSequence):
+        # Sink is union type
+        for st in sinktype:
+            if match_types(st, src, iid, inputobj, linkMerge, valueFrom):
+                return True
+    elif isinstance(src.parameter["type"], MutableSequence):
+        # Source is union type
+        # Check that at least one source type is compatible with the sink.
+        original_types = src.parameter["type"]
+        for source_type in original_types:
+            src.parameter["type"] = source_type
+            match = match_types(
+                sinktype, src, iid, inputobj, linkMerge, valueFrom)
+            if match:
+                src.parameter["type"] = original_types
+                return True
+        src.parameter["type"] = original_types
+        return False
+    elif linkMerge:
+        if iid not in inputobj:
+            inputobj[iid] = []
+        if linkMerge == "merge_nested":
+            inputobj[iid].append(src.value)
+        elif linkMerge == "merge_flattened":
+            if isinstance(src.value, MutableSequence):
+                inputobj[iid].extend(src.value)
+            else:
+                inputobj[iid].append(src.value)
+        else:
+            raise WorkflowException(u"Unrecognized linkMerge enum '%s'" % linkMerge)
+        return True
+    elif valueFrom is not None \
+            or can_assign_src_to_sink(src.parameter["type"], sinktype) \
+            or sinktype == "Any":
+        # simply assign the value from state to input
+        inputobj[iid] = copy.deepcopy(src.value)
+        return True
+    return False
+
+
+def object_from_state(state,                  # type: Dict[Text, Optional[WorkflowStateItem]]
+                      parms,                  # type: List[Dict[Text, Any]]
+                      frag_only,              # type: bool
+                      supportsMultipleInput,  # type: bool
+                      sourceField,            # type: Text
+                      incomplete=False        # type: bool
+                     ):  # type: (...) -> Optional[Dict[Text, Any]]
+    inputobj = {}  # type: Dict[Text, Any]
+    for inp in parms:
+        iid = inp["id"]
+        if frag_only:
+            iid = shortname(iid)
+        if sourceField in inp:
+            connections = aslist(inp[sourceField])
+            if (len(connections) > 1 and
+                    not supportsMultipleInput):
+                raise WorkflowException(
+                    "Workflow contains multiple inbound links to a single "
+                    "parameter but MultipleInputFeatureRequirement is not "
+                    "declared.")
+            for src in connections:
+                a_state = state.get(src, None)
+                if a_state is not None and (a_state.success == "success" or incomplete):
+                    if not match_types(
+                            inp["type"], a_state, iid, inputobj,
+                            inp.get("linkMerge", ("merge_nested"
+                                                  if len(connections) > 1 else None)),
+                            valueFrom=inp.get("valueFrom")):
+                        raise WorkflowException(
+                            u"Type mismatch between source '%s' (%s) and "
+                            "sink '%s' (%s)" % (src,
+                                                a_state.parameter["type"], inp["id"],
+                                                inp["type"]))
+                elif src not in state:
+                    raise WorkflowException(
+                        u"Connect source '%s' on parameter '%s' does not "
+                        "exist" % (src, inp["id"]))
+                elif not incomplete:
+                    return None
+
+        if inputobj.get(iid) is None and "default" in inp:
+            inputobj[iid] = inp["default"]
+
+        if iid not in inputobj and ("valueFrom" in inp or incomplete):
+            inputobj[iid] = None
+
+        if iid not in inputobj:
+            raise WorkflowException(u"Value for %s not specified" % (inp["id"]))
+    return inputobj
+
+
+class WorkflowJobStep(object):
+    def __init__(self, step):
+        # type: (WorkflowStep) -> None
+        """Initialize this WorkflowJobStep."""
+        self.step = step
+        self.tool = step.tool
+        self.id = step.id
+        self.submitted = False
+        self.completed = False
+        self.iterable = None  # type: Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]
+        self.name = uniquename(u"step %s" % shortname(self.id))
+        self.prov_obj = step.prov_obj
+        self.parent_wf = step.parent_wf
+
+    def job(self,
+            joborder,         # type: Mapping[Text, Text]
+            output_callback,  # type: functools.partial[None]
+            runtimeContext    # type: RuntimeContext
+           ):
+        # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob], None, None]
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.part_of = self.name
+        runtimeContext.name = shortname(self.id)
+
+        _logger.info(u"[%s] start", self.name)
+
+        for j in self.step.job(joborder, output_callback, runtimeContext):
+            yield j
+
+class WorkflowJob(object):
+    def __init__(self, workflow, runtimeContext):
+        # type: (Workflow, RuntimeContext) -> None
+        """Initialize this WorkflowJob."""
+        self.workflow = workflow
+        self.prov_obj = None  # type: Optional[ProvenanceProfile]
+        self.parent_wf = None  # type: Optional[ProvenanceProfile]
+        self.tool = workflow.tool
+        if runtimeContext.research_obj is not None:
+            self.prov_obj = workflow.provenance_object
+            self.parent_wf = workflow.parent_wf
+        self.steps = [WorkflowJobStep(s) for s in workflow.steps]
+        self.state = {}  # type: Dict[Text, Optional[WorkflowStateItem]]
+        self.processStatus = u""
+        self.did_callback = False
+        self.made_progress = None  # type: Optional[bool]
+
+        if runtimeContext.outdir is not None:
+            self.outdir = runtimeContext.outdir
+        else:
+            self.outdir = tempfile.mkdtemp(
+                prefix=getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))
+
+        self.name = uniquename(u"workflow {}".format(
+            getdefault(runtimeContext.name,
+                       shortname(self.workflow.tool.get("id", "embedded")))))
+
+        _logger.debug(
+            u"[%s] initialized from %s", self.name,
+            self.tool.get("id", "workflow embedded in %s" % runtimeContext.part_of))
+
+    def do_output_callback(self, final_output_callback):
+        # type: (Callable[[Any, Any], Any]) -> None
+
+        supportsMultipleInput = bool(self.workflow.get_requirement("MultipleInputFeatureRequirement")[0])
+
+        wo = None  # type: Optional[Dict[Text, Text]]
+        try:
+            wo = object_from_state(
+                self.state, self.tool["outputs"], True, supportsMultipleInput,
+                "outputSource", incomplete=True)
+        except WorkflowException as err:
+            _logger.error(
+                u"[%s] Cannot collect workflow output: %s", self.name, Text(err))
+            self.processStatus = "permanentFail"
+        if self.prov_obj and self.parent_wf \
+                and self.prov_obj.workflow_run_uri != self.parent_wf.workflow_run_uri:
+            process_run_id = None
+            self.prov_obj.generate_output_prov(wo or {}, process_run_id, self.name)
+            self.prov_obj.document.wasEndedBy(
+                self.prov_obj.workflow_run_uri, None, self.prov_obj.engine_uuid,
+                datetime.datetime.now())
+            prov_ids = self.prov_obj.finalize_prov_profile(self.name)
+            # Tell parent to associate our provenance files with our wf run
+            self.parent_wf.activity_has_provenance(self.prov_obj.workflow_run_uri, prov_ids)
+
+        _logger.info(u"[%s] completed %s", self.name, self.processStatus)
+        if _logger.isEnabledFor(logging.DEBUG):
+            _logger.debug(u"[%s] %s", self.name, json_dumps(wo, indent=4))
+
+        self.did_callback = True
+
+        final_output_callback(wo, self.processStatus)
+
+    def receive_output(self, step, outputparms, final_output_callback, jobout, processStatus):
+        # type: (WorkflowJobStep, List[Dict[Text,Text]], Callable[[Any, Any], Any], Dict[Text,Text], Text) -> None
+
+        for i in outputparms:
+            if "id" in i:
+                if i["id"] in jobout:
+                    self.state[i["id"]] = WorkflowStateItem(i, jobout[i["id"]], processStatus)
+                else:
+                    _logger.error(u"[%s] Output is missing expected field %s", step.name, i["id"])
+                    processStatus = "permanentFail"
+        if _logger.isEnabledFor(logging.DEBUG):
+            _logger.debug(u"[%s] produced output %s", step.name,
+                          json_dumps(jobout, indent=4))
+
+        if processStatus != "success":
+            if self.processStatus != "permanentFail":
+                self.processStatus = processStatus
+
+            _logger.warning(u"[%s] completed %s", step.name, processStatus)
+        else:
+            _logger.info(u"[%s] completed %s", step.name, processStatus)
+
+        step.completed = True
+        # Release the iterable related to this step to
+        # reclaim memory.
+        step.iterable = None
+        self.made_progress = True
+
+        completed = sum(1 for s in self.steps if s.completed)
+        if completed == len(self.steps):
+            self.do_output_callback(final_output_callback)
+
+    def try_make_job(self,
+                     step,                   # type: WorkflowJobStep
+                     final_output_callback,  # type: Callable[[Any, Any], Any]
+                     runtimeContext          # type: RuntimeContext
+                    ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
+
+        inputparms = step.tool["inputs"]
+        outputparms = step.tool["outputs"]
+
+        supportsMultipleInput = bool(self.workflow.get_requirement(
+            "MultipleInputFeatureRequirement")[0])
+
+        try:
+            inputobj = object_from_state(
+                self.state, inputparms, False, supportsMultipleInput, "source")
+            if inputobj is None:
+                _logger.debug(u"[%s] job step %s not ready", self.name, step.id)
+                return
+
+            if step.submitted:
+                return
+            _logger.info(u"[%s] starting %s", self.name, step.name)
+
+            callback = functools.partial(self.receive_output, step, outputparms, final_output_callback)
+
+
+            valueFrom = {
+                i["id"]: i["valueFrom"] for i in step.tool["inputs"]
+                if "valueFrom" in i}
+
+            loadContents = set(i["id"] for i in step.tool["inputs"]
+                               if i.get("loadContents"))
+
+            if len(valueFrom) > 0 and not bool(self.workflow.get_requirement("StepInputExpressionRequirement")[0]):
+                raise WorkflowException(
+                    "Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements")
+
+            vfinputs = {shortname(k): v for k, v in iteritems(inputobj)}
+
+            def postScatterEval(io):
+                # type: (MutableMapping[Text, Any]) -> Dict[Text, Any]
+                shortio = {shortname(k): v for k, v in iteritems(io)}
+
+                fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess)("")
+                for k, v in io.items():
+                    if k in loadContents and v.get("contents") is None:
+                        with fs_access.open(v["location"], "rb") as f:
+                            v["contents"] = content_limit_respected_read(f)
+
+                def valueFromFunc(k, v):  # type: (Any, Any) -> Any
+                    if k in valueFrom:
+                        adjustDirObjs(v, functools.partial(get_listing,
+                            fs_access, recursive=True))
+                        return expression.do_eval(
+                            valueFrom[k], shortio, self.workflow.requirements,
+                            None, None, {}, context=v,
+                            debug=runtimeContext.debug,
+                            js_console=runtimeContext.js_console,
+                            timeout=runtimeContext.eval_timeout)
+                    return v
+
+                return {k: valueFromFunc(k, v) for k, v in io.items()}
+
+            if "scatter" in step.tool:
+                scatter = aslist(step.tool["scatter"])
+                method = step.tool.get("scatterMethod")
+                if method is None and len(scatter) != 1:
+                    raise WorkflowException("Must specify scatterMethod when scattering over multiple inputs")
+                runtimeContext = runtimeContext.copy()
+                runtimeContext.postScatterEval = postScatterEval
+
+                emptyscatter = [shortname(s) for s in scatter if len(inputobj[s]) == 0]
+                if emptyscatter:
+                    _logger.warning(
+                        "[job %s] Notice: scattering over empty input in "
+                        "'%s'.  All outputs will be empty.", step.name,
+                        "', '".join(emptyscatter))
+
+                if method == "dotproduct" or method is None:
+                    jobs = dotproduct_scatter(
+                        step, inputobj, scatter, callback, runtimeContext)
+                elif method == "nested_crossproduct":
+                    jobs = nested_crossproduct_scatter(
+                        step, inputobj, scatter, callback, runtimeContext)
+                elif method == "flat_crossproduct":
+                    jobs = flat_crossproduct_scatter(
+                        step, inputobj, scatter, callback, runtimeContext)
+            else:
+                if _logger.isEnabledFor(logging.DEBUG):
+                    _logger.debug(u"[job %s] job input %s", step.name,
+                                  json_dumps(inputobj, indent=4))
+
+                inputobj = postScatterEval(inputobj)
+
+                if _logger.isEnabledFor(logging.DEBUG):
+                    _logger.debug(u"[job %s] evaluated job input to %s",
+                                  step.name, json_dumps(inputobj, indent=4))
+                jobs = step.job(inputobj, callback, runtimeContext)
+
+            step.submitted = True
+
+            for j in jobs:
+                yield j
+        except WorkflowException:
+            raise
+        except Exception:
+            _logger.exception("Unhandled exception")
+            self.processStatus = "permanentFail"
+            step.completed = True
+
+
+    def run(self,
+            runtimeContext,   # type: RuntimeContext
+            tmpdir_lock=None  # type: Optional[threading.Lock]
+            ):  # type: (...) -> None
+        """Log the start of each workflow."""
+        _logger.info(u"[%s] start", self.name)
+
+    def job(self,
+            joborder,         # type: Mapping[Text, Any]
+            output_callback,  # type: Callable[[Any, Any], Any]
+            runtimeContext    # type: RuntimeContext
+           ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
+        self.state = {}
+        self.processStatus = "success"
+
+        if _logger.isEnabledFor(logging.DEBUG):
+            _logger.debug(u"[%s] %s", self.name, json_dumps(joborder, indent=4))
+
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.outdir = None
+
+        for index, inp in enumerate(self.tool["inputs"]):
+            with SourceLine(self.tool["inputs"], index, WorkflowException,
+                            _logger.isEnabledFor(logging.DEBUG)):
+                inp_id = shortname(inp["id"])
+                if inp_id in joborder:
+                    self.state[inp["id"]] = WorkflowStateItem(
+                        inp, joborder[inp_id], "success")
+                elif "default" in inp:
+                    self.state[inp["id"]] = WorkflowStateItem(
+                        inp, inp["default"], "success")
+                else:
+                    raise WorkflowException(
+                        u"Input '%s' not in input object and does not have a "
+                        " default value." % (inp["id"]))
+
+        for step in self.steps:
+            for out in step.tool["outputs"]:
+                self.state[out["id"]] = None
+
+        completed = 0
+        while completed < len(self.steps):
+            self.made_progress = False
+
+            for step in self.steps:
+                if getdefault(runtimeContext.on_error, "stop") == "stop" and self.processStatus != "success":
+                    break
+
+                if not step.submitted:
+                    try:
+                        step.iterable = self.try_make_job(
+                            step, output_callback, runtimeContext)
+                    except WorkflowException as exc:
+                        _logger.error(u"[%s] Cannot make job: %s", step.name, Text(exc))
+                        _logger.debug("", exc_info=True)
+                        self.processStatus = "permanentFail"
+
+                if step.iterable is not None:
+                    try:
+                        for newjob in step.iterable:
+                            if getdefault(runtimeContext.on_error, "stop") == "stop" \
+                                    and self.processStatus != "success":
+                                break
+                            if newjob is not None:
+                                self.made_progress = True
+                                yield newjob
+                            else:
+                                break
+                    except WorkflowException as exc:
+                        _logger.error(u"[%s] Cannot make job: %s", step.name, Text(exc))
+                        _logger.debug("", exc_info=True)
+                        self.processStatus = "permanentFail"
+
+            completed = sum(1 for s in self.steps if s.completed)
+
+            if not self.made_progress and completed < len(self.steps):
+                if self.processStatus != "success":
+                    break
+                else:
+                    yield None
+
+        if not self.did_callback:
+            self.do_output_callback(output_callback)  # could have called earlier on line 336;
+            #depends which one comes first. All steps are completed
+            #or all outputs have been produced.
+
+class Workflow(Process):
+    def __init__(self,
+                 toolpath_object,      # type: MutableMapping[Text, Any]
+                 loadingContext        # type: LoadingContext
+                ):  # type: (...) -> None
+        """Initializet this Workflow."""
+        super(Workflow, self).__init__(
+            toolpath_object, loadingContext)
+        self.provenance_object = None  # type: Optional[ProvenanceProfile]
+        if loadingContext.research_obj is not None:
+            run_uuid = None  # type: Optional[UUID]
+            is_master = not loadingContext.prov_obj  # Not yet set
+            if is_master:
+                run_uuid = loadingContext.research_obj.ro_uuid
+
+            self.provenance_object = ProvenanceProfile(
+                loadingContext.research_obj,
+                full_name=loadingContext.cwl_full_name,
+                host_provenance=loadingContext.host_provenance,
+                user_provenance=loadingContext.user_provenance,
+                orcid=loadingContext.orcid,
+                run_uuid=run_uuid,
+                fsaccess=loadingContext.research_obj.fsaccess)  # inherit RO UUID for master wf run
+            # TODO: Is Workflow(..) only called when we are the master workflow?
+            self.parent_wf = self.provenance_object
+
+        # FIXME: Won't this overwrite prov_obj for nested workflows?
+        loadingContext.prov_obj = self.provenance_object
+        loadingContext = loadingContext.copy()
+        loadingContext.requirements = self.requirements
+        loadingContext.hints = self.hints
+
+        self.steps = []  # type: List[WorkflowStep]
+        validation_errors = []
+        for index, step in enumerate(self.tool.get("steps", [])):
+            try:
+                self.steps.append(self.make_workflow_step(step, index, loadingContext,
+                                                          loadingContext.prov_obj))
+            except validate.ValidationException as vexc:
+                if _logger.isEnabledFor(logging.DEBUG):
+                    _logger.exception("Validation failed at")
+                validation_errors.append(vexc)
+
+        if validation_errors:
+            raise validate.ValidationException("\n".join(str(v) for v in validation_errors))
+
+        random.shuffle(self.steps)
+
+        # statically validate data links instead of doing it at runtime.
+        workflow_inputs = self.tool["inputs"]
+        workflow_outputs = self.tool["outputs"]
+
+        step_inputs = []  # type: List[Any]
+        step_outputs = []  # type: List[Any]
+        param_to_step = {}  # type: Dict[Text, Dict[Text, Any]]
+        for step in self.steps:
+            step_inputs.extend(step.tool["inputs"])
+            step_outputs.extend(step.tool["outputs"])
+            for s in step.tool["inputs"]:
+                param_to_step[s["id"]] = step.tool
+
+        if getdefault(loadingContext.do_validate, True):
+            static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs, param_to_step)
+
+    def make_workflow_step(self,
+                           toolpath_object,      # type: Dict[Text, Any]
+                           pos,                  # type: int
+                           loadingContext,       # type: LoadingContext
+                           parentworkflowProv=None  # type: Optional[ProvenanceProfile]
+                          ): # type: (...) -> WorkflowStep
+        return WorkflowStep(toolpath_object, pos, loadingContext, parentworkflowProv)
+
+    def job(self,
+            job_order,         # type: Mapping[Text, Any]
+            output_callbacks,  # type: Callable[[Any, Any], Any]
+            runtimeContext     # type: RuntimeContext
+           ):  # type: (...) -> Generator[Union[WorkflowJob, ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
+        builder = self._init_job(job_order, runtimeContext)
+
+        if runtimeContext.research_obj is not None:
+            if runtimeContext.toplevel:
+                # Record primary-job.json
+                runtimeContext.research_obj.fsaccess = runtimeContext.make_fs_access('')
+                runtimeContext.research_obj.create_job(builder.job, self.job)
+
+        job = WorkflowJob(self, runtimeContext)
+        yield job
+
+        runtimeContext = runtimeContext.copy()
+        runtimeContext.part_of = u"workflow %s" % job.name
+        runtimeContext.toplevel = False
+
+        for wjob in job.job(builder.job, output_callbacks, runtimeContext):
+            yield wjob
+
+    def visit(self, op):  # type: (Callable[[MutableMapping[Text, Any]], Any]) -> None
+        op(self.tool)
+        for step in self.steps:
+            step.visit(op)
+
+
+
+class WorkflowStep(Process):
+    def __init__(self,
+                 toolpath_object,      # type: Dict[Text, Any]
+                 pos,                  # type: int
+                 loadingContext,       # type: LoadingContext
+                 parentworkflowProv=None  # type: Optional[ProvenanceProfile]
+                ):  # type: (...) -> None
+        """Initialize this WorkflowStep."""
+        if "id" in toolpath_object:
+            self.id = toolpath_object["id"]
+        else:
+            self.id = "#step" + Text(pos)
+
+        loadingContext = loadingContext.copy()
+
+        loadingContext.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
+        assert loadingContext.requirements is not None  # nosec
+        loadingContext.requirements.extend(toolpath_object.get("requirements", []))
+        loadingContext.requirements.extend(get_overrides(getdefault(loadingContext.overrides_list, []),
+                                                self.id).get("requirements", []))
+
+        hints = copy.deepcopy(getdefault(loadingContext.hints, []))
+        hints.extend(toolpath_object.get("hints", []))
+        loadingContext.hints = hints
+
+
+        try:
+            if isinstance(toolpath_object["run"], MutableMapping):
+                self.embedded_tool = loadingContext.construct_tool_object(
+                    toolpath_object["run"], loadingContext)  # type: Process
+            else:
+                loadingContext.metadata = {}
+                self.embedded_tool = load_tool(
+                    toolpath_object["run"], loadingContext)
+        except validate.ValidationException as vexc:
+            if loadingContext.debug:
+                _logger.exception("Validation exception")
+            raise_from(WorkflowException(
+                u"Tool definition %s failed validation:\n%s" %
+                (toolpath_object["run"], indent(str(vexc)))), vexc)
+
+        validation_errors = []
+        self.tool = toolpath_object = copy.deepcopy(toolpath_object)
+        bound = set()
+        for stepfield, toolfield in (("in", "inputs"), ("out", "outputs")):
+            toolpath_object[toolfield] = []
+            for index, step_entry in enumerate(toolpath_object[stepfield]):
+                if isinstance(step_entry, string_types):
+                    param = CommentedMap()  # type: CommentedMap
+                    inputid = step_entry
+                else:
+                    param = CommentedMap(iteritems(step_entry))
+                    inputid = step_entry["id"]
+
+                shortinputid = shortname(inputid)
+                found = False
+                for tool_entry in self.embedded_tool.tool[toolfield]:
+                    frag = shortname(tool_entry["id"])
+                    if frag == shortinputid:
+                        #if the case that the step has a default for a parameter,
+                        #we do not want the default of the tool to override it
+                        step_default = None
+                        if "default" in param and "default" in tool_entry:
+                            step_default = param["default"]
+                        param.update(tool_entry)
+                        param["_tool_entry"] = tool_entry
+                        if step_default is not None:
+                            param["default"] = step_default
+                        found = True
+                        bound.add(frag)
+                        break
+                if not found:
+                    if stepfield == "in":
+                        param["type"] = "Any"
+                        param["not_connected"] = True
+                    else:
+                        if isinstance(step_entry, Mapping):
+                            step_entry_name = step_entry['id']
+                        else:
+                            step_entry_name = step_entry
+                        validation_errors.append(
+                            SourceLine(self.tool["out"], index).makeError(
+                                "Workflow step output '%s' does not correspond to"
+                                % shortname(step_entry_name))
+                            + "\n" + SourceLine(self.embedded_tool.tool, "outputs").makeError(
+                                "  tool output (expected '%s')" % (
+                                    "', '".join(
+                                        [shortname(tool_entry["id"]) for tool_entry in
+                                         self.embedded_tool.tool['outputs']]))))
+                param["id"] = inputid
+                param.lc.line = toolpath_object[stepfield].lc.data[index][0]
+                param.lc.col = toolpath_object[stepfield].lc.data[index][1]
+                param.lc.filename = toolpath_object[stepfield].lc.filename
+                toolpath_object[toolfield].append(param)
+
+        missing_values = []
+        for _, tool_entry in enumerate(self.embedded_tool.tool["inputs"]):
+            if shortname(tool_entry["id"]) not in bound:
+                if "null" not in tool_entry["type"] and "default" not in tool_entry:
+                    missing_values.append(shortname(tool_entry["id"]))
+
+        if missing_values:
+            validation_errors.append(SourceLine(self.tool, "in").makeError(
+                "Step is missing required parameter%s '%s'" %
+                ("s" if len(missing_values) > 1 else "", "', '".join(missing_values))))
+
+        if validation_errors:
+            raise validate.ValidationException("\n".join(validation_errors))
+
+        super(WorkflowStep, self).__init__(toolpath_object, loadingContext)
+
+        if self.embedded_tool.tool["class"] == "Workflow":
+            (feature, _) = self.get_requirement("SubworkflowFeatureRequirement")
+            if not feature:
+                raise WorkflowException(
+                    "Workflow contains embedded workflow but "
+                    "SubworkflowFeatureRequirement not in requirements")
+
+        if "scatter" in self.tool:
+            (feature, _) = self.get_requirement("ScatterFeatureRequirement")
+            if not feature:
+                raise WorkflowException(
+                    "Workflow contains scatter but ScatterFeatureRequirement "
+                    "not in requirements")
+
+            inputparms = copy.deepcopy(self.tool["inputs"])
+            outputparms = copy.deepcopy(self.tool["outputs"])
+            scatter = aslist(self.tool["scatter"])
+
+            method = self.tool.get("scatterMethod")
+            if method is None and len(scatter) != 1:
+                raise validate.ValidationException(
+                    "Must specify scatterMethod when scattering over multiple inputs")
+
+            inp_map = {i["id"]: i for i in inputparms}
+            for inp in scatter:
+                if inp not in inp_map:
+                    raise validate.ValidationException(
+                        SourceLine(self.tool, "scatter").makeError(
+                            "Scatter parameter '%s' does not correspond to "
+                            "an input parameter of this step, expecting '%s'"
+                            % (shortname(inp), "', '".join(
+                                shortname(k) for k in inp_map.keys()))))
+
+                inp_map[inp]["type"] = {"type": "array", "items": inp_map[inp]["type"]}
+
+            if self.tool.get("scatterMethod") == "nested_crossproduct":
+                nesting = len(scatter)
+            else:
+                nesting = 1
+
+            for _ in range(0, nesting):
+                for oparam in outputparms:
+                    oparam["type"] = {"type": "array", "items": oparam["type"]}
+            self.tool["inputs"] = inputparms
+            self.tool["outputs"] = outputparms
+        self.prov_obj = None  # type: Optional[ProvenanceProfile]
+        if loadingContext.research_obj is not None:
+            self.prov_obj = parentworkflowProv
+            if self.embedded_tool.tool["class"] == "Workflow":
+                self.parent_wf = self.embedded_tool.parent_wf
+            else:
+                self.parent_wf = self.prov_obj
+
+    def receive_output(self, output_callback, jobout, processStatus):
+        # type: (Callable[...,Any], Dict[Text, Text], Text) -> None
+        output = {}
+        for i in self.tool["outputs"]:
+            field = shortname(i["id"])
+            if field in jobout:
+                output[i["id"]] = jobout[field]
+            else:
+                processStatus = "permanentFail"
+        output_callback(output, processStatus)
+
+    def job(self,
+            job_order,         # type: Mapping[Text, Text]
+            output_callbacks,  # type: Callable[[Any, Any], Any]
+            runtimeContext,    # type: RuntimeContext
+           ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob], None, None]
+        #initialize sub-workflow as a step in the parent profile
+
+        if self.embedded_tool.tool["class"] == "Workflow" \
+                and runtimeContext.research_obj and self.prov_obj \
+                and self.embedded_tool.provenance_object:
+            self.embedded_tool.parent_wf = self.prov_obj
+            process_name = self.tool["id"].split("#")[1]
+            self.prov_obj.start_process(
+                process_name, datetime.datetime.now(),
+                self.embedded_tool.provenance_object.workflow_run_uri)
+
+        step_input = {}
+        for inp in self.tool["inputs"]:
+            field = shortname(inp["id"])
+            if not inp.get("not_connected"):
+                step_input[field] = job_order[inp["id"]]
+
+        try:
+            for tool in self.embedded_tool.job(
+                    step_input,
+                    functools.partial(self.receive_output, output_callbacks),
+                    runtimeContext):
+                yield tool
+        except WorkflowException:
+            _logger.error(u"Exception on step '%s'", runtimeContext.name)
+            raise
+        except Exception as exc:
+            _logger.exception("Unexpected exception")
+            raise_from(WorkflowException(Text(exc)), exc)
+
+    def visit(self, op):  # type: (Callable[[MutableMapping[Text, Any]], Any]) -> None
+        self.embedded_tool.visit(op)
+
+
+class ReceiveScatterOutput(object):
+    def __init__(self,
+                 output_callback,  # type: Callable[..., Any]
+                 dest,             # type: Dict[Text, List[Optional[Text]]]
+                 total             # type: int
+                ):  # type: (...) -> None
+        """Initialize."""
+        self.dest = dest
+        self.completed = 0
+        self.processStatus = u"success"
+        self.total = total
+        self.output_callback = output_callback
+        self.steps = []  # type: List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]
+
+    def receive_scatter_output(self, index, jobout, processStatus):
+        # type: (int, Dict[Text, Text], Text) -> None
+        for key, val in jobout.items():
+            self.dest[key][index] = val
+
+        # Release the iterable related to this step to
+        # reclaim memory.
+        if self.steps:
+            self.steps[index] = None
+
+        if processStatus != "success":
+            if self.processStatus != "permanentFail":
+                self.processStatus = processStatus
+
+        self.completed += 1
+
+        if self.completed == self.total:
+            self.output_callback(self.dest, self.processStatus)
+
+    def setTotal(self, total, steps):  # type: (int, List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]) -> None
+        self.total = total
+        self.steps = steps
+        if self.completed == self.total:
+            self.output_callback(self.dest, self.processStatus)
+
+
+def parallel_steps(steps, rc, runtimeContext):
+    # type: (List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]], ReceiveScatterOutput, RuntimeContext) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
+    while rc.completed < rc.total:
+        made_progress = False
+        for index, step in enumerate(steps):
+            if getdefault(runtimeContext.on_error, "stop") == "stop" and rc.processStatus != "success":
+                break
+            if step is None:
+                continue
+            try:
+                for j in step:
+                    if getdefault(runtimeContext.on_error, "stop") == "stop" and rc.processStatus != "success":
+                        break
+                    if j is not None:
+                        made_progress = True
+                        yield j
+                    else:
+                        break
+                if made_progress:
+                    break
+            except WorkflowException as exc:
+                _logger.error(u"Cannot make scatter job: %s", Text(exc))
+                _logger.debug("", exc_info=True)
+                rc.receive_scatter_output(index, {}, "permanentFail")
+        if not made_progress and rc.completed < rc.total:
+            yield None
+
+
+def dotproduct_scatter(process,           # type: WorkflowJobStep
+                       joborder,          # type: MutableMapping[Text, Any]
+                       scatter_keys,      # type: MutableSequence[Text]
+                       output_callback,   # type: Callable[..., Any]
+                       runtimeContext     # type: RuntimeContext
+                      ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
+    jobl = None  # type: Optional[int]
+    for key in scatter_keys:
+        if jobl is None:
+            jobl = len(joborder[key])
+        elif jobl != len(joborder[key]):
+            raise WorkflowException(
+                "Length of input arrays must be equal when performing "
+                "dotproduct scatter.")
+    if jobl is None:
+        raise Exception("Impossible codepath")
+
+    output = {}  # type: Dict[Text,List[Optional[Text]]]
+    for i in process.tool["outputs"]:
+        output[i["id"]] = [None] * jobl
+
+    rc = ReceiveScatterOutput(output_callback, output, jobl)
+
+    steps = []  # type: List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]
+    for index in range(0, jobl):
+        sjobo = copy.copy(joborder)
+        for key in scatter_keys:
+            sjobo[key] = joborder[key][index]
+
+        if runtimeContext.postScatterEval is not None:
+            sjobo = runtimeContext.postScatterEval(sjobo)
+
+        steps.append(process.job(
+            sjobo, functools.partial(rc.receive_scatter_output, index),
+            runtimeContext))
+
+    rc.setTotal(jobl, steps)
+    return parallel_steps(steps, rc, runtimeContext)
+
+
+def nested_crossproduct_scatter(process,          # type: WorkflowJobStep
+                                joborder,         # type: MutableMapping[Text, Any]
+                                scatter_keys,     # type: MutableSequence[Text]
+                                output_callback,  # type: Callable[..., Any]
+                                runtimeContext    # type: RuntimeContext
+                               ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
+    scatter_key = scatter_keys[0]
+    jobl = len(joborder[scatter_key])
+    output = {}  # type: Dict[Text, List[Optional[Text]]]
+    for i in process.tool["outputs"]:
+        output[i["id"]] = [None] * jobl
+
+    rc = ReceiveScatterOutput(output_callback, output, jobl)
+
+    steps = []  # type: List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]
+    for index in range(0, jobl):
+        sjob = copy.copy(joborder)
+        sjob[scatter_key] = joborder[scatter_key][index]
+
+        if len(scatter_keys) == 1:
+            if runtimeContext.postScatterEval is not None:
+                sjob = runtimeContext.postScatterEval(sjob)
+            steps.append(process.job(
+                sjob, functools.partial(rc.receive_scatter_output, index),
+                runtimeContext))
+        else:
+            steps.append(nested_crossproduct_scatter(
+                process, sjob, scatter_keys[1:],
+                functools.partial(rc.receive_scatter_output, index),
+                runtimeContext))
+
+    rc.setTotal(jobl, steps)
+    return parallel_steps(steps, rc, runtimeContext)
+
+
+def crossproduct_size(joborder, scatter_keys):
+    # type: (MutableMapping[Text, Any], MutableSequence[Text]) -> int
+    scatter_key = scatter_keys[0]
+    if len(scatter_keys) == 1:
+        ssum = len(joborder[scatter_key])
+    else:
+        ssum = 0
+        for _ in range(0, len(joborder[scatter_key])):
+            ssum += crossproduct_size(joborder, scatter_keys[1:])
+    return ssum
+
+def flat_crossproduct_scatter(process,          # type: WorkflowJobStep
+                              joborder,         # type: MutableMapping[Text, Any]
+                              scatter_keys,     # type: MutableSequence[Text]
+                              output_callback,  # type: Callable[..., Any]
+                              runtimeContext    # type: RuntimeContext
+                             ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
+    output = {}  # type: Dict[Text, List[Optional[Text]]]
+    for i in process.tool["outputs"]:
+        output[i["id"]] = [None] * crossproduct_size(joborder, scatter_keys)
+    callback = ReceiveScatterOutput(output_callback, output, 0)
+    (steps, total) = _flat_crossproduct_scatter(
+        process, joborder, scatter_keys, callback, 0, runtimeContext)
+    callback.setTotal(total, steps)
+    return parallel_steps(steps, callback, runtimeContext)
+
+def _flat_crossproduct_scatter(process,        # type: WorkflowJobStep
+                               joborder,       # type: MutableMapping[Text, Any]
+                               scatter_keys,   # type: MutableSequence[Text]
+                               callback,       # type: ReceiveScatterOutput
+                               startindex,     # type: int
+                               runtimeContext  # type: RuntimeContext
+                              ):  # type: (...) -> Tuple[List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]], int]
+    """Inner loop."""
+    scatter_key = scatter_keys[0]
+    jobl = len(joborder[scatter_key])
+    steps = []  # type: List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]
+    put = startindex
+    for index in range(0, jobl):
+        sjob = copy.copy(joborder)
+        sjob[scatter_key] = joborder[scatter_key][index]
+
+        if len(scatter_keys) == 1:
+            if runtimeContext.postScatterEval is not None:
+                sjob = runtimeContext.postScatterEval(sjob)
+            steps.append(process.job(
+                sjob, functools.partial(callback.receive_scatter_output, put),
+                runtimeContext))
+            put += 1
+        else:
+            (add, _) = _flat_crossproduct_scatter(
+                process, sjob, scatter_keys[1:], callback, put, runtimeContext)
+            put += len(add)
+            steps.extend(add)
+
+    return (steps, put)