view env/lib/python3.7/site-packages/cwltool/workflow.py @ 4:79f47841a781 draft

"planemo upload commit 2a0fe2cc28b09e101d37293e53e82f61762262ec"
author shellac
date Thu, 14 May 2020 16:47:39 -0400
parents 26e78fe6e8c4
children
line wrap: on
line source

from __future__ import absolute_import

import copy
import datetime
import functools
import logging
import random
import tempfile
from collections import namedtuple
from typing import (Any, Callable, Dict, Generator, Iterable, List,
                    Mapping, MutableMapping, MutableSequence,
                    Optional, Sequence, Tuple, Union, cast)
from uuid import UUID  # pylint: disable=unused-import

import threading
from ruamel.yaml.comments import CommentedMap
from schema_salad import validate
from schema_salad.sourceline import SourceLine, indent
from six import string_types, iteritems
from six.moves import range
from future.utils import raise_from
from typing_extensions import Text  # pylint: disable=unused-import
# move to a regular typing import when Python 3.3-3.6 is no longer supported

from . import command_line_tool, context, expression, procgenerator
from .command_line_tool import CallbackJob, ExpressionTool
from .job import JobBase
from .builder import content_limit_respected_read
from .checker import can_assign_src_to_sink, static_checker
from .context import LoadingContext  # pylint: disable=unused-import
from .context import RuntimeContext, getdefault
from .errors import WorkflowException
from .load_tool import load_tool
from .loghandler import _logger
from .mutation import MutationManager  # pylint: disable=unused-import
from .pathmapper import adjustDirObjs, get_listing
from .process import Process, get_overrides, shortname, uniquename
from .provenance import ProvenanceProfile
from .software_requirements import (  # pylint: disable=unused-import
    DependenciesConfiguration)
from .stdfsaccess import StdFsAccess
from .utils import DEFAULT_TMP_PREFIX, aslist, json_dumps

WorkflowStateItem = namedtuple('WorkflowStateItem', ['parameter', 'value', 'success'])


def default_make_tool(toolpath_object,      # type: MutableMapping[Text, Any]
                      loadingContext        # type: LoadingContext
                     ):  # type: (...) -> Process
    if not isinstance(toolpath_object, MutableMapping):
        raise WorkflowException(u"Not a dict: '%s'" % toolpath_object)
    if "class" in toolpath_object:
        if toolpath_object["class"] == "CommandLineTool":
            return command_line_tool.CommandLineTool(toolpath_object, loadingContext)
        if toolpath_object["class"] == "ExpressionTool":
            return command_line_tool.ExpressionTool(toolpath_object, loadingContext)
        if toolpath_object["class"] == "Workflow":
            return Workflow(toolpath_object, loadingContext)
        if toolpath_object["class"] == "ProcessGenerator":
            return procgenerator.ProcessGenerator(toolpath_object, loadingContext)

    raise WorkflowException(
        u"Missing or invalid 'class' field in "
        "%s, expecting one of: CommandLineTool, ExpressionTool, Workflow" %
        toolpath_object["id"])


context.default_make_tool = default_make_tool

def findfiles(wo, fn=None):  # type: (Any, Optional[List[MutableMapping[Text, Any]]]) -> List[MutableMapping[Text, Any]]
    if fn is None:
        fn = []
    if isinstance(wo, MutableMapping):
        if wo.get("class") == "File":
            fn.append(wo)
            findfiles(wo.get("secondaryFiles", None), fn)
        else:
            for w in wo.values():
                findfiles(w, fn)
    elif isinstance(wo, MutableSequence):
        for w in wo:
            findfiles(w, fn)
    return fn


def match_types(sinktype,   # type: Union[List[Text], Text]
                src,        # type: WorkflowStateItem
                iid,        # type: Text
                inputobj,   # type: Dict[Text, Any]
                linkMerge,  # type: Text
                valueFrom   # type: Optional[Text]
               ):  # type: (...) -> bool
    if isinstance(sinktype, MutableSequence):
        # Sink is union type
        for st in sinktype:
            if match_types(st, src, iid, inputobj, linkMerge, valueFrom):
                return True
    elif isinstance(src.parameter["type"], MutableSequence):
        # Source is union type
        # Check that at least one source type is compatible with the sink.
        original_types = src.parameter["type"]
        for source_type in original_types:
            src.parameter["type"] = source_type
            match = match_types(
                sinktype, src, iid, inputobj, linkMerge, valueFrom)
            if match:
                src.parameter["type"] = original_types
                return True
        src.parameter["type"] = original_types
        return False
    elif linkMerge:
        if iid not in inputobj:
            inputobj[iid] = []
        if linkMerge == "merge_nested":
            inputobj[iid].append(src.value)
        elif linkMerge == "merge_flattened":
            if isinstance(src.value, MutableSequence):
                inputobj[iid].extend(src.value)
            else:
                inputobj[iid].append(src.value)
        else:
            raise WorkflowException(u"Unrecognized linkMerge enum '%s'" % linkMerge)
        return True
    elif valueFrom is not None \
            or can_assign_src_to_sink(src.parameter["type"], sinktype) \
            or sinktype == "Any":
        # simply assign the value from state to input
        inputobj[iid] = copy.deepcopy(src.value)
        return True
    return False


def object_from_state(state,                  # type: Dict[Text, Optional[WorkflowStateItem]]
                      parms,                  # type: List[Dict[Text, Any]]
                      frag_only,              # type: bool
                      supportsMultipleInput,  # type: bool
                      sourceField,            # type: Text
                      incomplete=False        # type: bool
                     ):  # type: (...) -> Optional[Dict[Text, Any]]
    inputobj = {}  # type: Dict[Text, Any]
    for inp in parms:
        iid = inp["id"]
        if frag_only:
            iid = shortname(iid)
        if sourceField in inp:
            connections = aslist(inp[sourceField])
            if (len(connections) > 1 and
                    not supportsMultipleInput):
                raise WorkflowException(
                    "Workflow contains multiple inbound links to a single "
                    "parameter but MultipleInputFeatureRequirement is not "
                    "declared.")
            for src in connections:
                a_state = state.get(src, None)
                if a_state is not None and (a_state.success == "success" or incomplete):
                    if not match_types(
                            inp["type"], a_state, iid, inputobj,
                            inp.get("linkMerge", ("merge_nested"
                                                  if len(connections) > 1 else None)),
                            valueFrom=inp.get("valueFrom")):
                        raise WorkflowException(
                            u"Type mismatch between source '%s' (%s) and "
                            "sink '%s' (%s)" % (src,
                                                a_state.parameter["type"], inp["id"],
                                                inp["type"]))
                elif src not in state:
                    raise WorkflowException(
                        u"Connect source '%s' on parameter '%s' does not "
                        "exist" % (src, inp["id"]))
                elif not incomplete:
                    return None

        if inputobj.get(iid) is None and "default" in inp:
            inputobj[iid] = inp["default"]

        if iid not in inputobj and ("valueFrom" in inp or incomplete):
            inputobj[iid] = None

        if iid not in inputobj:
            raise WorkflowException(u"Value for %s not specified" % (inp["id"]))
    return inputobj


class WorkflowJobStep(object):
    def __init__(self, step):
        # type: (WorkflowStep) -> None
        """Initialize this WorkflowJobStep."""
        self.step = step
        self.tool = step.tool
        self.id = step.id
        self.submitted = False
        self.completed = False
        self.iterable = None  # type: Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]
        self.name = uniquename(u"step %s" % shortname(self.id))
        self.prov_obj = step.prov_obj
        self.parent_wf = step.parent_wf

    def job(self,
            joborder,         # type: Mapping[Text, Text]
            output_callback,  # type: functools.partial[None]
            runtimeContext    # type: RuntimeContext
           ):
        # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob], None, None]
        runtimeContext = runtimeContext.copy()
        runtimeContext.part_of = self.name
        runtimeContext.name = shortname(self.id)

        _logger.info(u"[%s] start", self.name)

        for j in self.step.job(joborder, output_callback, runtimeContext):
            yield j

class WorkflowJob(object):
    def __init__(self, workflow, runtimeContext):
        # type: (Workflow, RuntimeContext) -> None
        """Initialize this WorkflowJob."""
        self.workflow = workflow
        self.prov_obj = None  # type: Optional[ProvenanceProfile]
        self.parent_wf = None  # type: Optional[ProvenanceProfile]
        self.tool = workflow.tool
        if runtimeContext.research_obj is not None:
            self.prov_obj = workflow.provenance_object
            self.parent_wf = workflow.parent_wf
        self.steps = [WorkflowJobStep(s) for s in workflow.steps]
        self.state = {}  # type: Dict[Text, Optional[WorkflowStateItem]]
        self.processStatus = u""
        self.did_callback = False
        self.made_progress = None  # type: Optional[bool]

        if runtimeContext.outdir is not None:
            self.outdir = runtimeContext.outdir
        else:
            self.outdir = tempfile.mkdtemp(
                prefix=getdefault(runtimeContext.tmp_outdir_prefix, DEFAULT_TMP_PREFIX))

        self.name = uniquename(u"workflow {}".format(
            getdefault(runtimeContext.name,
                       shortname(self.workflow.tool.get("id", "embedded")))))

        _logger.debug(
            u"[%s] initialized from %s", self.name,
            self.tool.get("id", "workflow embedded in %s" % runtimeContext.part_of))

    def do_output_callback(self, final_output_callback):
        # type: (Callable[[Any, Any], Any]) -> None

        supportsMultipleInput = bool(self.workflow.get_requirement("MultipleInputFeatureRequirement")[0])

        wo = None  # type: Optional[Dict[Text, Text]]
        try:
            wo = object_from_state(
                self.state, self.tool["outputs"], True, supportsMultipleInput,
                "outputSource", incomplete=True)
        except WorkflowException as err:
            _logger.error(
                u"[%s] Cannot collect workflow output: %s", self.name, Text(err))
            self.processStatus = "permanentFail"
        if self.prov_obj and self.parent_wf \
                and self.prov_obj.workflow_run_uri != self.parent_wf.workflow_run_uri:
            process_run_id = None
            self.prov_obj.generate_output_prov(wo or {}, process_run_id, self.name)
            self.prov_obj.document.wasEndedBy(
                self.prov_obj.workflow_run_uri, None, self.prov_obj.engine_uuid,
                datetime.datetime.now())
            prov_ids = self.prov_obj.finalize_prov_profile(self.name)
            # Tell parent to associate our provenance files with our wf run
            self.parent_wf.activity_has_provenance(self.prov_obj.workflow_run_uri, prov_ids)

        _logger.info(u"[%s] completed %s", self.name, self.processStatus)
        if _logger.isEnabledFor(logging.DEBUG):
            _logger.debug(u"[%s] %s", self.name, json_dumps(wo, indent=4))

        self.did_callback = True

        final_output_callback(wo, self.processStatus)

    def receive_output(self, step, outputparms, final_output_callback, jobout, processStatus):
        # type: (WorkflowJobStep, List[Dict[Text,Text]], Callable[[Any, Any], Any], Dict[Text,Text], Text) -> None

        for i in outputparms:
            if "id" in i:
                if i["id"] in jobout:
                    self.state[i["id"]] = WorkflowStateItem(i, jobout[i["id"]], processStatus)
                else:
                    _logger.error(u"[%s] Output is missing expected field %s", step.name, i["id"])
                    processStatus = "permanentFail"
        if _logger.isEnabledFor(logging.DEBUG):
            _logger.debug(u"[%s] produced output %s", step.name,
                          json_dumps(jobout, indent=4))

        if processStatus != "success":
            if self.processStatus != "permanentFail":
                self.processStatus = processStatus

            _logger.warning(u"[%s] completed %s", step.name, processStatus)
        else:
            _logger.info(u"[%s] completed %s", step.name, processStatus)

        step.completed = True
        # Release the iterable related to this step to
        # reclaim memory.
        step.iterable = None
        self.made_progress = True

        completed = sum(1 for s in self.steps if s.completed)
        if completed == len(self.steps):
            self.do_output_callback(final_output_callback)

    def try_make_job(self,
                     step,                   # type: WorkflowJobStep
                     final_output_callback,  # type: Callable[[Any, Any], Any]
                     runtimeContext          # type: RuntimeContext
                    ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]

        inputparms = step.tool["inputs"]
        outputparms = step.tool["outputs"]

        supportsMultipleInput = bool(self.workflow.get_requirement(
            "MultipleInputFeatureRequirement")[0])

        try:
            inputobj = object_from_state(
                self.state, inputparms, False, supportsMultipleInput, "source")
            if inputobj is None:
                _logger.debug(u"[%s] job step %s not ready", self.name, step.id)
                return

            if step.submitted:
                return
            _logger.info(u"[%s] starting %s", self.name, step.name)

            callback = functools.partial(self.receive_output, step, outputparms, final_output_callback)


            valueFrom = {
                i["id"]: i["valueFrom"] for i in step.tool["inputs"]
                if "valueFrom" in i}

            loadContents = set(i["id"] for i in step.tool["inputs"]
                               if i.get("loadContents"))

            if len(valueFrom) > 0 and not bool(self.workflow.get_requirement("StepInputExpressionRequirement")[0]):
                raise WorkflowException(
                    "Workflow step contains valueFrom but StepInputExpressionRequirement not in requirements")

            vfinputs = {shortname(k): v for k, v in iteritems(inputobj)}

            def postScatterEval(io):
                # type: (MutableMapping[Text, Any]) -> Dict[Text, Any]
                shortio = {shortname(k): v for k, v in iteritems(io)}

                fs_access = getdefault(runtimeContext.make_fs_access, StdFsAccess)("")
                for k, v in io.items():
                    if k in loadContents and v.get("contents") is None:
                        with fs_access.open(v["location"], "rb") as f:
                            v["contents"] = content_limit_respected_read(f)

                def valueFromFunc(k, v):  # type: (Any, Any) -> Any
                    if k in valueFrom:
                        adjustDirObjs(v, functools.partial(get_listing,
                            fs_access, recursive=True))
                        return expression.do_eval(
                            valueFrom[k], shortio, self.workflow.requirements,
                            None, None, {}, context=v,
                            debug=runtimeContext.debug,
                            js_console=runtimeContext.js_console,
                            timeout=runtimeContext.eval_timeout)
                    return v

                return {k: valueFromFunc(k, v) for k, v in io.items()}

            if "scatter" in step.tool:
                scatter = aslist(step.tool["scatter"])
                method = step.tool.get("scatterMethod")
                if method is None and len(scatter) != 1:
                    raise WorkflowException("Must specify scatterMethod when scattering over multiple inputs")
                runtimeContext = runtimeContext.copy()
                runtimeContext.postScatterEval = postScatterEval

                emptyscatter = [shortname(s) for s in scatter if len(inputobj[s]) == 0]
                if emptyscatter:
                    _logger.warning(
                        "[job %s] Notice: scattering over empty input in "
                        "'%s'.  All outputs will be empty.", step.name,
                        "', '".join(emptyscatter))

                if method == "dotproduct" or method is None:
                    jobs = dotproduct_scatter(
                        step, inputobj, scatter, callback, runtimeContext)
                elif method == "nested_crossproduct":
                    jobs = nested_crossproduct_scatter(
                        step, inputobj, scatter, callback, runtimeContext)
                elif method == "flat_crossproduct":
                    jobs = flat_crossproduct_scatter(
                        step, inputobj, scatter, callback, runtimeContext)
            else:
                if _logger.isEnabledFor(logging.DEBUG):
                    _logger.debug(u"[job %s] job input %s", step.name,
                                  json_dumps(inputobj, indent=4))

                inputobj = postScatterEval(inputobj)

                if _logger.isEnabledFor(logging.DEBUG):
                    _logger.debug(u"[job %s] evaluated job input to %s",
                                  step.name, json_dumps(inputobj, indent=4))
                jobs = step.job(inputobj, callback, runtimeContext)

            step.submitted = True

            for j in jobs:
                yield j
        except WorkflowException:
            raise
        except Exception:
            _logger.exception("Unhandled exception")
            self.processStatus = "permanentFail"
            step.completed = True


    def run(self,
            runtimeContext,   # type: RuntimeContext
            tmpdir_lock=None  # type: Optional[threading.Lock]
            ):  # type: (...) -> None
        """Log the start of each workflow."""
        _logger.info(u"[%s] start", self.name)

    def job(self,
            joborder,         # type: Mapping[Text, Any]
            output_callback,  # type: Callable[[Any, Any], Any]
            runtimeContext    # type: RuntimeContext
           ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
        self.state = {}
        self.processStatus = "success"

        if _logger.isEnabledFor(logging.DEBUG):
            _logger.debug(u"[%s] %s", self.name, json_dumps(joborder, indent=4))

        runtimeContext = runtimeContext.copy()
        runtimeContext.outdir = None

        for index, inp in enumerate(self.tool["inputs"]):
            with SourceLine(self.tool["inputs"], index, WorkflowException,
                            _logger.isEnabledFor(logging.DEBUG)):
                inp_id = shortname(inp["id"])
                if inp_id in joborder:
                    self.state[inp["id"]] = WorkflowStateItem(
                        inp, joborder[inp_id], "success")
                elif "default" in inp:
                    self.state[inp["id"]] = WorkflowStateItem(
                        inp, inp["default"], "success")
                else:
                    raise WorkflowException(
                        u"Input '%s' not in input object and does not have a "
                        " default value." % (inp["id"]))

        for step in self.steps:
            for out in step.tool["outputs"]:
                self.state[out["id"]] = None

        completed = 0
        while completed < len(self.steps):
            self.made_progress = False

            for step in self.steps:
                if getdefault(runtimeContext.on_error, "stop") == "stop" and self.processStatus != "success":
                    break

                if not step.submitted:
                    try:
                        step.iterable = self.try_make_job(
                            step, output_callback, runtimeContext)
                    except WorkflowException as exc:
                        _logger.error(u"[%s] Cannot make job: %s", step.name, Text(exc))
                        _logger.debug("", exc_info=True)
                        self.processStatus = "permanentFail"

                if step.iterable is not None:
                    try:
                        for newjob in step.iterable:
                            if getdefault(runtimeContext.on_error, "stop") == "stop" \
                                    and self.processStatus != "success":
                                break
                            if newjob is not None:
                                self.made_progress = True
                                yield newjob
                            else:
                                break
                    except WorkflowException as exc:
                        _logger.error(u"[%s] Cannot make job: %s", step.name, Text(exc))
                        _logger.debug("", exc_info=True)
                        self.processStatus = "permanentFail"

            completed = sum(1 for s in self.steps if s.completed)

            if not self.made_progress and completed < len(self.steps):
                if self.processStatus != "success":
                    break
                else:
                    yield None

        if not self.did_callback:
            self.do_output_callback(output_callback)  # could have called earlier on line 336;
            #depends which one comes first. All steps are completed
            #or all outputs have been produced.

class Workflow(Process):
    def __init__(self,
                 toolpath_object,      # type: MutableMapping[Text, Any]
                 loadingContext        # type: LoadingContext
                ):  # type: (...) -> None
        """Initializet this Workflow."""
        super(Workflow, self).__init__(
            toolpath_object, loadingContext)
        self.provenance_object = None  # type: Optional[ProvenanceProfile]
        if loadingContext.research_obj is not None:
            run_uuid = None  # type: Optional[UUID]
            is_master = not loadingContext.prov_obj  # Not yet set
            if is_master:
                run_uuid = loadingContext.research_obj.ro_uuid

            self.provenance_object = ProvenanceProfile(
                loadingContext.research_obj,
                full_name=loadingContext.cwl_full_name,
                host_provenance=loadingContext.host_provenance,
                user_provenance=loadingContext.user_provenance,
                orcid=loadingContext.orcid,
                run_uuid=run_uuid,
                fsaccess=loadingContext.research_obj.fsaccess)  # inherit RO UUID for master wf run
            # TODO: Is Workflow(..) only called when we are the master workflow?
            self.parent_wf = self.provenance_object

        # FIXME: Won't this overwrite prov_obj for nested workflows?
        loadingContext.prov_obj = self.provenance_object
        loadingContext = loadingContext.copy()
        loadingContext.requirements = self.requirements
        loadingContext.hints = self.hints

        self.steps = []  # type: List[WorkflowStep]
        validation_errors = []
        for index, step in enumerate(self.tool.get("steps", [])):
            try:
                self.steps.append(self.make_workflow_step(step, index, loadingContext,
                                                          loadingContext.prov_obj))
            except validate.ValidationException as vexc:
                if _logger.isEnabledFor(logging.DEBUG):
                    _logger.exception("Validation failed at")
                validation_errors.append(vexc)

        if validation_errors:
            raise validate.ValidationException("\n".join(str(v) for v in validation_errors))

        random.shuffle(self.steps)

        # statically validate data links instead of doing it at runtime.
        workflow_inputs = self.tool["inputs"]
        workflow_outputs = self.tool["outputs"]

        step_inputs = []  # type: List[Any]
        step_outputs = []  # type: List[Any]
        param_to_step = {}  # type: Dict[Text, Dict[Text, Any]]
        for step in self.steps:
            step_inputs.extend(step.tool["inputs"])
            step_outputs.extend(step.tool["outputs"])
            for s in step.tool["inputs"]:
                param_to_step[s["id"]] = step.tool

        if getdefault(loadingContext.do_validate, True):
            static_checker(workflow_inputs, workflow_outputs, step_inputs, step_outputs, param_to_step)

    def make_workflow_step(self,
                           toolpath_object,      # type: Dict[Text, Any]
                           pos,                  # type: int
                           loadingContext,       # type: LoadingContext
                           parentworkflowProv=None  # type: Optional[ProvenanceProfile]
                          ): # type: (...) -> WorkflowStep
        return WorkflowStep(toolpath_object, pos, loadingContext, parentworkflowProv)

    def job(self,
            job_order,         # type: Mapping[Text, Any]
            output_callbacks,  # type: Callable[[Any, Any], Any]
            runtimeContext     # type: RuntimeContext
           ):  # type: (...) -> Generator[Union[WorkflowJob, ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
        builder = self._init_job(job_order, runtimeContext)

        if runtimeContext.research_obj is not None:
            if runtimeContext.toplevel:
                # Record primary-job.json
                runtimeContext.research_obj.fsaccess = runtimeContext.make_fs_access('')
                runtimeContext.research_obj.create_job(builder.job, self.job)

        job = WorkflowJob(self, runtimeContext)
        yield job

        runtimeContext = runtimeContext.copy()
        runtimeContext.part_of = u"workflow %s" % job.name
        runtimeContext.toplevel = False

        for wjob in job.job(builder.job, output_callbacks, runtimeContext):
            yield wjob

    def visit(self, op):  # type: (Callable[[MutableMapping[Text, Any]], Any]) -> None
        op(self.tool)
        for step in self.steps:
            step.visit(op)



class WorkflowStep(Process):
    def __init__(self,
                 toolpath_object,      # type: Dict[Text, Any]
                 pos,                  # type: int
                 loadingContext,       # type: LoadingContext
                 parentworkflowProv=None  # type: Optional[ProvenanceProfile]
                ):  # type: (...) -> None
        """Initialize this WorkflowStep."""
        if "id" in toolpath_object:
            self.id = toolpath_object["id"]
        else:
            self.id = "#step" + Text(pos)

        loadingContext = loadingContext.copy()

        loadingContext.requirements = copy.deepcopy(getdefault(loadingContext.requirements, []))
        assert loadingContext.requirements is not None  # nosec
        loadingContext.requirements.extend(toolpath_object.get("requirements", []))
        loadingContext.requirements.extend(get_overrides(getdefault(loadingContext.overrides_list, []),
                                                self.id).get("requirements", []))

        hints = copy.deepcopy(getdefault(loadingContext.hints, []))
        hints.extend(toolpath_object.get("hints", []))
        loadingContext.hints = hints


        try:
            if isinstance(toolpath_object["run"], MutableMapping):
                self.embedded_tool = loadingContext.construct_tool_object(
                    toolpath_object["run"], loadingContext)  # type: Process
            else:
                loadingContext.metadata = {}
                self.embedded_tool = load_tool(
                    toolpath_object["run"], loadingContext)
        except validate.ValidationException as vexc:
            if loadingContext.debug:
                _logger.exception("Validation exception")
            raise_from(WorkflowException(
                u"Tool definition %s failed validation:\n%s" %
                (toolpath_object["run"], indent(str(vexc)))), vexc)

        validation_errors = []
        self.tool = toolpath_object = copy.deepcopy(toolpath_object)
        bound = set()
        for stepfield, toolfield in (("in", "inputs"), ("out", "outputs")):
            toolpath_object[toolfield] = []
            for index, step_entry in enumerate(toolpath_object[stepfield]):
                if isinstance(step_entry, string_types):
                    param = CommentedMap()  # type: CommentedMap
                    inputid = step_entry
                else:
                    param = CommentedMap(iteritems(step_entry))
                    inputid = step_entry["id"]

                shortinputid = shortname(inputid)
                found = False
                for tool_entry in self.embedded_tool.tool[toolfield]:
                    frag = shortname(tool_entry["id"])
                    if frag == shortinputid:
                        #if the case that the step has a default for a parameter,
                        #we do not want the default of the tool to override it
                        step_default = None
                        if "default" in param and "default" in tool_entry:
                            step_default = param["default"]
                        param.update(tool_entry)
                        param["_tool_entry"] = tool_entry
                        if step_default is not None:
                            param["default"] = step_default
                        found = True
                        bound.add(frag)
                        break
                if not found:
                    if stepfield == "in":
                        param["type"] = "Any"
                        param["not_connected"] = True
                    else:
                        if isinstance(step_entry, Mapping):
                            step_entry_name = step_entry['id']
                        else:
                            step_entry_name = step_entry
                        validation_errors.append(
                            SourceLine(self.tool["out"], index).makeError(
                                "Workflow step output '%s' does not correspond to"
                                % shortname(step_entry_name))
                            + "\n" + SourceLine(self.embedded_tool.tool, "outputs").makeError(
                                "  tool output (expected '%s')" % (
                                    "', '".join(
                                        [shortname(tool_entry["id"]) for tool_entry in
                                         self.embedded_tool.tool['outputs']]))))
                param["id"] = inputid
                param.lc.line = toolpath_object[stepfield].lc.data[index][0]
                param.lc.col = toolpath_object[stepfield].lc.data[index][1]
                param.lc.filename = toolpath_object[stepfield].lc.filename
                toolpath_object[toolfield].append(param)

        missing_values = []
        for _, tool_entry in enumerate(self.embedded_tool.tool["inputs"]):
            if shortname(tool_entry["id"]) not in bound:
                if "null" not in tool_entry["type"] and "default" not in tool_entry:
                    missing_values.append(shortname(tool_entry["id"]))

        if missing_values:
            validation_errors.append(SourceLine(self.tool, "in").makeError(
                "Step is missing required parameter%s '%s'" %
                ("s" if len(missing_values) > 1 else "", "', '".join(missing_values))))

        if validation_errors:
            raise validate.ValidationException("\n".join(validation_errors))

        super(WorkflowStep, self).__init__(toolpath_object, loadingContext)

        if self.embedded_tool.tool["class"] == "Workflow":
            (feature, _) = self.get_requirement("SubworkflowFeatureRequirement")
            if not feature:
                raise WorkflowException(
                    "Workflow contains embedded workflow but "
                    "SubworkflowFeatureRequirement not in requirements")

        if "scatter" in self.tool:
            (feature, _) = self.get_requirement("ScatterFeatureRequirement")
            if not feature:
                raise WorkflowException(
                    "Workflow contains scatter but ScatterFeatureRequirement "
                    "not in requirements")

            inputparms = copy.deepcopy(self.tool["inputs"])
            outputparms = copy.deepcopy(self.tool["outputs"])
            scatter = aslist(self.tool["scatter"])

            method = self.tool.get("scatterMethod")
            if method is None and len(scatter) != 1:
                raise validate.ValidationException(
                    "Must specify scatterMethod when scattering over multiple inputs")

            inp_map = {i["id"]: i for i in inputparms}
            for inp in scatter:
                if inp not in inp_map:
                    raise validate.ValidationException(
                        SourceLine(self.tool, "scatter").makeError(
                            "Scatter parameter '%s' does not correspond to "
                            "an input parameter of this step, expecting '%s'"
                            % (shortname(inp), "', '".join(
                                shortname(k) for k in inp_map.keys()))))

                inp_map[inp]["type"] = {"type": "array", "items": inp_map[inp]["type"]}

            if self.tool.get("scatterMethod") == "nested_crossproduct":
                nesting = len(scatter)
            else:
                nesting = 1

            for _ in range(0, nesting):
                for oparam in outputparms:
                    oparam["type"] = {"type": "array", "items": oparam["type"]}
            self.tool["inputs"] = inputparms
            self.tool["outputs"] = outputparms
        self.prov_obj = None  # type: Optional[ProvenanceProfile]
        if loadingContext.research_obj is not None:
            self.prov_obj = parentworkflowProv
            if self.embedded_tool.tool["class"] == "Workflow":
                self.parent_wf = self.embedded_tool.parent_wf
            else:
                self.parent_wf = self.prov_obj

    def receive_output(self, output_callback, jobout, processStatus):
        # type: (Callable[...,Any], Dict[Text, Text], Text) -> None
        output = {}
        for i in self.tool["outputs"]:
            field = shortname(i["id"])
            if field in jobout:
                output[i["id"]] = jobout[field]
            else:
                processStatus = "permanentFail"
        output_callback(output, processStatus)

    def job(self,
            job_order,         # type: Mapping[Text, Text]
            output_callbacks,  # type: Callable[[Any, Any], Any]
            runtimeContext,    # type: RuntimeContext
           ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob], None, None]
        #initialize sub-workflow as a step in the parent profile

        if self.embedded_tool.tool["class"] == "Workflow" \
                and runtimeContext.research_obj and self.prov_obj \
                and self.embedded_tool.provenance_object:
            self.embedded_tool.parent_wf = self.prov_obj
            process_name = self.tool["id"].split("#")[1]
            self.prov_obj.start_process(
                process_name, datetime.datetime.now(),
                self.embedded_tool.provenance_object.workflow_run_uri)

        step_input = {}
        for inp in self.tool["inputs"]:
            field = shortname(inp["id"])
            if not inp.get("not_connected"):
                step_input[field] = job_order[inp["id"]]

        try:
            for tool in self.embedded_tool.job(
                    step_input,
                    functools.partial(self.receive_output, output_callbacks),
                    runtimeContext):
                yield tool
        except WorkflowException:
            _logger.error(u"Exception on step '%s'", runtimeContext.name)
            raise
        except Exception as exc:
            _logger.exception("Unexpected exception")
            raise_from(WorkflowException(Text(exc)), exc)

    def visit(self, op):  # type: (Callable[[MutableMapping[Text, Any]], Any]) -> None
        self.embedded_tool.visit(op)


class ReceiveScatterOutput(object):
    def __init__(self,
                 output_callback,  # type: Callable[..., Any]
                 dest,             # type: Dict[Text, List[Optional[Text]]]
                 total             # type: int
                ):  # type: (...) -> None
        """Initialize."""
        self.dest = dest
        self.completed = 0
        self.processStatus = u"success"
        self.total = total
        self.output_callback = output_callback
        self.steps = []  # type: List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]

    def receive_scatter_output(self, index, jobout, processStatus):
        # type: (int, Dict[Text, Text], Text) -> None
        for key, val in jobout.items():
            self.dest[key][index] = val

        # Release the iterable related to this step to
        # reclaim memory.
        if self.steps:
            self.steps[index] = None

        if processStatus != "success":
            if self.processStatus != "permanentFail":
                self.processStatus = processStatus

        self.completed += 1

        if self.completed == self.total:
            self.output_callback(self.dest, self.processStatus)

    def setTotal(self, total, steps):  # type: (int, List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]) -> None
        self.total = total
        self.steps = steps
        if self.completed == self.total:
            self.output_callback(self.dest, self.processStatus)


def parallel_steps(steps, rc, runtimeContext):
    # type: (List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]], ReceiveScatterOutput, RuntimeContext) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
    while rc.completed < rc.total:
        made_progress = False
        for index, step in enumerate(steps):
            if getdefault(runtimeContext.on_error, "stop") == "stop" and rc.processStatus != "success":
                break
            if step is None:
                continue
            try:
                for j in step:
                    if getdefault(runtimeContext.on_error, "stop") == "stop" and rc.processStatus != "success":
                        break
                    if j is not None:
                        made_progress = True
                        yield j
                    else:
                        break
                if made_progress:
                    break
            except WorkflowException as exc:
                _logger.error(u"Cannot make scatter job: %s", Text(exc))
                _logger.debug("", exc_info=True)
                rc.receive_scatter_output(index, {}, "permanentFail")
        if not made_progress and rc.completed < rc.total:
            yield None


def dotproduct_scatter(process,           # type: WorkflowJobStep
                       joborder,          # type: MutableMapping[Text, Any]
                       scatter_keys,      # type: MutableSequence[Text]
                       output_callback,   # type: Callable[..., Any]
                       runtimeContext     # type: RuntimeContext
                      ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
    jobl = None  # type: Optional[int]
    for key in scatter_keys:
        if jobl is None:
            jobl = len(joborder[key])
        elif jobl != len(joborder[key]):
            raise WorkflowException(
                "Length of input arrays must be equal when performing "
                "dotproduct scatter.")
    if jobl is None:
        raise Exception("Impossible codepath")

    output = {}  # type: Dict[Text,List[Optional[Text]]]
    for i in process.tool["outputs"]:
        output[i["id"]] = [None] * jobl

    rc = ReceiveScatterOutput(output_callback, output, jobl)

    steps = []  # type: List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]
    for index in range(0, jobl):
        sjobo = copy.copy(joborder)
        for key in scatter_keys:
            sjobo[key] = joborder[key][index]

        if runtimeContext.postScatterEval is not None:
            sjobo = runtimeContext.postScatterEval(sjobo)

        steps.append(process.job(
            sjobo, functools.partial(rc.receive_scatter_output, index),
            runtimeContext))

    rc.setTotal(jobl, steps)
    return parallel_steps(steps, rc, runtimeContext)


def nested_crossproduct_scatter(process,          # type: WorkflowJobStep
                                joborder,         # type: MutableMapping[Text, Any]
                                scatter_keys,     # type: MutableSequence[Text]
                                output_callback,  # type: Callable[..., Any]
                                runtimeContext    # type: RuntimeContext
                               ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
    scatter_key = scatter_keys[0]
    jobl = len(joborder[scatter_key])
    output = {}  # type: Dict[Text, List[Optional[Text]]]
    for i in process.tool["outputs"]:
        output[i["id"]] = [None] * jobl

    rc = ReceiveScatterOutput(output_callback, output, jobl)

    steps = []  # type: List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]
    for index in range(0, jobl):
        sjob = copy.copy(joborder)
        sjob[scatter_key] = joborder[scatter_key][index]

        if len(scatter_keys) == 1:
            if runtimeContext.postScatterEval is not None:
                sjob = runtimeContext.postScatterEval(sjob)
            steps.append(process.job(
                sjob, functools.partial(rc.receive_scatter_output, index),
                runtimeContext))
        else:
            steps.append(nested_crossproduct_scatter(
                process, sjob, scatter_keys[1:],
                functools.partial(rc.receive_scatter_output, index),
                runtimeContext))

    rc.setTotal(jobl, steps)
    return parallel_steps(steps, rc, runtimeContext)


def crossproduct_size(joborder, scatter_keys):
    # type: (MutableMapping[Text, Any], MutableSequence[Text]) -> int
    scatter_key = scatter_keys[0]
    if len(scatter_keys) == 1:
        ssum = len(joborder[scatter_key])
    else:
        ssum = 0
        for _ in range(0, len(joborder[scatter_key])):
            ssum += crossproduct_size(joborder, scatter_keys[1:])
    return ssum

def flat_crossproduct_scatter(process,          # type: WorkflowJobStep
                              joborder,         # type: MutableMapping[Text, Any]
                              scatter_keys,     # type: MutableSequence[Text]
                              output_callback,  # type: Callable[..., Any]
                              runtimeContext    # type: RuntimeContext
                             ):  # type: (...) -> Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]
    output = {}  # type: Dict[Text, List[Optional[Text]]]
    for i in process.tool["outputs"]:
        output[i["id"]] = [None] * crossproduct_size(joborder, scatter_keys)
    callback = ReceiveScatterOutput(output_callback, output, 0)
    (steps, total) = _flat_crossproduct_scatter(
        process, joborder, scatter_keys, callback, 0, runtimeContext)
    callback.setTotal(total, steps)
    return parallel_steps(steps, callback, runtimeContext)

def _flat_crossproduct_scatter(process,        # type: WorkflowJobStep
                               joborder,       # type: MutableMapping[Text, Any]
                               scatter_keys,   # type: MutableSequence[Text]
                               callback,       # type: ReceiveScatterOutput
                               startindex,     # type: int
                               runtimeContext  # type: RuntimeContext
                              ):  # type: (...) -> Tuple[List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]], int]
    """Inner loop."""
    scatter_key = scatter_keys[0]
    jobl = len(joborder[scatter_key])
    steps = []  # type: List[Optional[Generator[Union[ExpressionTool.ExpressionJob, JobBase, CallbackJob, None], None, None]]]
    put = startindex
    for index in range(0, jobl):
        sjob = copy.copy(joborder)
        sjob[scatter_key] = joborder[scatter_key][index]

        if len(scatter_keys) == 1:
            if runtimeContext.postScatterEval is not None:
                sjob = runtimeContext.postScatterEval(sjob)
            steps.append(process.job(
                sjob, functools.partial(callback.receive_scatter_output, put),
                runtimeContext))
            put += 1
        else:
            (add, _) = _flat_crossproduct_scatter(
                process, sjob, scatter_keys[1:], callback, put, runtimeContext)
            put += len(add)
            steps.extend(add)

    return (steps, put)