Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/planemo/galaxy/activity.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 """Module provides generic interface to running Galaxy tools and workflows.""" | |
| 2 | |
| 3 import json | |
| 4 import os | |
| 5 import tempfile | |
| 6 import time | |
| 7 | |
| 8 import bioblend | |
| 9 import requests | |
| 10 import yaml | |
| 11 from bioblend.galaxy.client import Client | |
| 12 from bioblend.util import attach_file | |
| 13 from galaxy.tool_util.cwl.util import ( | |
| 14 DirectoryUploadTarget, | |
| 15 FileUploadTarget, | |
| 16 galactic_job_json, | |
| 17 invocation_to_output, | |
| 18 output_properties, | |
| 19 output_to_cwl_json, | |
| 20 path_or_uri_to_uri, | |
| 21 tool_response_to_output, | |
| 22 ) | |
| 23 from galaxy.tool_util.parser import get_tool_source | |
| 24 from galaxy.util import ( | |
| 25 safe_makedirs, | |
| 26 unicodify, | |
| 27 ) | |
| 28 | |
| 29 from planemo.galaxy.api import summarize_history | |
| 30 from planemo.io import wait_on | |
| 31 from planemo.runnable import ( | |
| 32 ErrorRunResponse, | |
| 33 get_outputs, | |
| 34 RunnableType, | |
| 35 SuccessfulRunResponse, | |
| 36 ) | |
| 37 | |
| 38 DEFAULT_HISTORY_NAME = "CWL Target History" | |
| 39 ERR_NO_SUCH_TOOL = ("Failed to find tool with ID [%s] in Galaxy - cannot execute job. " | |
| 40 "You may need to enable verbose logging and determine why the tool did not load. [%s]") | |
| 41 | |
| 42 | |
| 43 def execute(ctx, config, runnable, job_path, **kwds): | |
| 44 """Execute a Galaxy activity.""" | |
| 45 try: | |
| 46 return _execute(ctx, config, runnable, job_path, **kwds) | |
| 47 except Exception as e: | |
| 48 return ErrorRunResponse(unicodify(e)) | |
| 49 | |
| 50 | |
| 51 def _verified_tool_id(runnable, user_gi): | |
| 52 tool_id = _tool_id(runnable.path) | |
| 53 try: | |
| 54 user_gi.tools.show_tool(tool_id) | |
| 55 except Exception as e: | |
| 56 raise Exception(ERR_NO_SUCH_TOOL % (tool_id, e)) | |
| 57 return tool_id | |
| 58 | |
| 59 | |
| 60 def _inputs_representation(runnable): | |
| 61 if runnable.type == RunnableType.cwl_tool: | |
| 62 inputs_representation = "cwl" | |
| 63 else: | |
| 64 inputs_representation = "galaxy" | |
| 65 return inputs_representation | |
| 66 | |
| 67 | |
| 68 def log_contents_str(config): | |
| 69 if hasattr(config, "log_contents"): | |
| 70 return config.log_contents | |
| 71 else: | |
| 72 return "No log for this engine type." | |
| 73 | |
| 74 | |
| 75 def _execute(ctx, config, runnable, job_path, **kwds): | |
| 76 user_gi = config.user_gi | |
| 77 admin_gi = config.gi | |
| 78 | |
| 79 history_id = _history_id(user_gi, **kwds) | |
| 80 | |
| 81 job_dict, _ = stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds) | |
| 82 | |
| 83 if runnable.type in [RunnableType.galaxy_tool, RunnableType.cwl_tool]: | |
| 84 response_class = GalaxyToolRunResponse | |
| 85 tool_id = _verified_tool_id(runnable, user_gi) | |
| 86 inputs_representation = _inputs_representation(runnable) | |
| 87 run_tool_payload = dict( | |
| 88 history_id=history_id, | |
| 89 tool_id=tool_id, | |
| 90 inputs=job_dict, | |
| 91 inputs_representation=inputs_representation, | |
| 92 ) | |
| 93 ctx.vlog("Post to Galaxy tool API with payload [%s]" % run_tool_payload) | |
| 94 tool_run_response = user_gi.tools._post(run_tool_payload) | |
| 95 | |
| 96 job = tool_run_response["jobs"][0] | |
| 97 job_id = job["id"] | |
| 98 try: | |
| 99 final_state = _wait_for_job(user_gi, job_id) | |
| 100 except Exception: | |
| 101 summarize_history(ctx, user_gi, history_id) | |
| 102 raise | |
| 103 if final_state != "ok": | |
| 104 msg = "Failed to run CWL tool job final job state is [%s]." % final_state | |
| 105 summarize_history(ctx, user_gi, history_id) | |
| 106 with open("errored_galaxy.log", "w") as f: | |
| 107 f.write(log_contents_str(config)) | |
| 108 raise Exception(msg) | |
| 109 | |
| 110 ctx.vlog("Final job state was ok, fetching details for job [%s]" % job_id) | |
| 111 job_info = admin_gi.jobs.show_job(job_id) | |
| 112 response_kwds = { | |
| 113 'job_info': job_info, | |
| 114 'api_run_response': tool_run_response, | |
| 115 } | |
| 116 if ctx.verbose: | |
| 117 summarize_history(ctx, user_gi, history_id) | |
| 118 elif runnable.type in [RunnableType.galaxy_workflow, RunnableType.cwl_workflow]: | |
| 119 response_class = GalaxyWorkflowRunResponse | |
| 120 workflow_id = config.workflow_id(runnable.path) | |
| 121 ctx.vlog("Found Galaxy workflow ID [%s] for path [%s]" % (workflow_id, runnable.path)) | |
| 122 # TODO: update bioblend to allow inputs_by. | |
| 123 # invocation = user_gi.worklfows.invoke_workflow( | |
| 124 # workflow_id, | |
| 125 # history_id=history_id, | |
| 126 # inputs=job_dict, | |
| 127 # ) | |
| 128 payload = dict( | |
| 129 workflow_id=workflow_id, | |
| 130 history_id=history_id, | |
| 131 inputs=job_dict, | |
| 132 inputs_by="name", | |
| 133 allow_tool_state_corrections=True, | |
| 134 ) | |
| 135 invocations_url = "%s/%s/invocations" % ( | |
| 136 user_gi._make_url(user_gi.workflows), | |
| 137 workflow_id, | |
| 138 ) | |
| 139 invocation = Client._post(user_gi.workflows, payload, url=invocations_url) | |
| 140 invocation_id = invocation["id"] | |
| 141 ctx.vlog("Waiting for invocation [%s]" % invocation_id) | |
| 142 polling_backoff = kwds.get("polling_backoff", 0) | |
| 143 try: | |
| 144 final_invocation_state = _wait_for_invocation(ctx, user_gi, history_id, workflow_id, invocation_id, polling_backoff) | |
| 145 except Exception: | |
| 146 ctx.vlog("Problem waiting on invocation...") | |
| 147 summarize_history(ctx, user_gi, history_id) | |
| 148 raise | |
| 149 ctx.vlog("Final invocation state is [%s]" % final_invocation_state) | |
| 150 final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff) | |
| 151 if final_state != "ok": | |
| 152 msg = "Failed to run workflow final history state is [%s]." % final_state | |
| 153 summarize_history(ctx, user_gi, history_id) | |
| 154 with open("errored_galaxy.log", "w") as f: | |
| 155 f.write(log_contents_str(config)) | |
| 156 raise Exception(msg) | |
| 157 ctx.vlog("Final history state is 'ok'") | |
| 158 response_kwds = { | |
| 159 'workflow_id': workflow_id, | |
| 160 'invocation_id': invocation_id, | |
| 161 } | |
| 162 else: | |
| 163 raise NotImplementedError() | |
| 164 | |
| 165 run_response = response_class( | |
| 166 ctx=ctx, | |
| 167 runnable=runnable, | |
| 168 user_gi=user_gi, | |
| 169 history_id=history_id, | |
| 170 log=log_contents_str(config), | |
| 171 **response_kwds | |
| 172 ) | |
| 173 output_directory = kwds.get("output_directory", None) | |
| 174 ctx.vlog("collecting outputs from run...") | |
| 175 run_response.collect_outputs(ctx, output_directory) | |
| 176 ctx.vlog("collecting outputs complete") | |
| 177 return run_response | |
| 178 | |
| 179 | |
| 180 def stage_in(ctx, runnable, config, user_gi, history_id, job_path, **kwds): | |
| 181 files_attached = [False] | |
| 182 | |
| 183 def upload_func(upload_target): | |
| 184 | |
| 185 def _attach_file(upload_payload, uri, index=0): | |
| 186 uri = path_or_uri_to_uri(uri) | |
| 187 is_path = uri.startswith("file://") | |
| 188 if not is_path or config.use_path_paste: | |
| 189 upload_payload["inputs"]["files_%d|url_paste" % index] = uri | |
| 190 else: | |
| 191 files_attached[0] = True | |
| 192 path = uri[len("file://"):] | |
| 193 upload_payload["files_%d|file_data" % index] = attach_file(path) | |
| 194 | |
| 195 if isinstance(upload_target, FileUploadTarget): | |
| 196 file_path = upload_target.path | |
| 197 upload_payload = user_gi.tools._upload_payload( | |
| 198 history_id, | |
| 199 file_type=upload_target.properties.get('filetype', None) or "auto", | |
| 200 ) | |
| 201 name = os.path.basename(file_path) | |
| 202 upload_payload["inputs"]["files_0|auto_decompress"] = False | |
| 203 upload_payload["inputs"]["auto_decompress"] = False | |
| 204 _attach_file(upload_payload, file_path) | |
| 205 upload_payload["inputs"]["files_0|NAME"] = name | |
| 206 if upload_target.secondary_files: | |
| 207 _attach_file(upload_payload, upload_target.secondary_files, index=1) | |
| 208 upload_payload["inputs"]["files_1|type"] = "upload_dataset" | |
| 209 upload_payload["inputs"]["files_1|auto_decompress"] = True | |
| 210 upload_payload["inputs"]["file_count"] = "2" | |
| 211 upload_payload["inputs"]["force_composite"] = "True" | |
| 212 | |
| 213 ctx.vlog("upload_payload is %s" % upload_payload) | |
| 214 return user_gi.tools._post(upload_payload, files_attached=files_attached[0]) | |
| 215 elif isinstance(upload_target, DirectoryUploadTarget): | |
| 216 tar_path = upload_target.tar_path | |
| 217 | |
| 218 upload_payload = user_gi.tools._upload_payload( | |
| 219 history_id, | |
| 220 file_type="tar", | |
| 221 ) | |
| 222 upload_payload["inputs"]["files_0|auto_decompress"] = False | |
| 223 _attach_file(upload_payload, tar_path) | |
| 224 tar_upload_response = user_gi.tools._post(upload_payload, files_attached=files_attached[0]) | |
| 225 convert_response = user_gi.tools.run_tool( | |
| 226 tool_id="CONVERTER_tar_to_directory", | |
| 227 tool_inputs={"input1": {"src": "hda", "id": tar_upload_response["outputs"][0]["id"]}}, | |
| 228 history_id=history_id, | |
| 229 ) | |
| 230 assert "outputs" in convert_response, convert_response | |
| 231 return convert_response | |
| 232 else: | |
| 233 content = json.dumps(upload_target.object) | |
| 234 return user_gi.tools.paste_content( | |
| 235 content, | |
| 236 history_id, | |
| 237 file_type="expression.json", | |
| 238 ) | |
| 239 | |
| 240 def create_collection_func(element_identifiers, collection_type): | |
| 241 payload = { | |
| 242 "name": "dataset collection", | |
| 243 "instance_type": "history", | |
| 244 "history_id": history_id, | |
| 245 "element_identifiers": element_identifiers, | |
| 246 "collection_type": collection_type, | |
| 247 "fields": None if collection_type != "record" else "auto", | |
| 248 } | |
| 249 dataset_collections_url = user_gi.url + "/dataset_collections" | |
| 250 dataset_collection = Client._post(user_gi.histories, payload, url=dataset_collections_url) | |
| 251 return dataset_collection | |
| 252 | |
| 253 with open(job_path, "r") as f: | |
| 254 job = yaml.safe_load(f) | |
| 255 | |
| 256 # Figure out what "." should be here instead. | |
| 257 job_dir = os.path.dirname(job_path) | |
| 258 job_dict, datasets = galactic_job_json( | |
| 259 job, | |
| 260 job_dir, | |
| 261 upload_func, | |
| 262 create_collection_func, | |
| 263 tool_or_workflow="tool" if runnable.type in [RunnableType.cwl_tool, RunnableType.galaxy_tool] else "workflow", | |
| 264 ) | |
| 265 | |
| 266 if datasets: | |
| 267 final_state = _wait_for_history(ctx, user_gi, history_id) | |
| 268 | |
| 269 for (dataset, path) in datasets: | |
| 270 dataset_details = user_gi.histories.show_dataset( | |
| 271 history_id, | |
| 272 dataset["id"], | |
| 273 ) | |
| 274 ctx.vlog("Uploaded dataset for path [%s] with metadata [%s]" % (path, dataset_details)) | |
| 275 else: | |
| 276 # Mark uploads as ok because nothing to do. | |
| 277 final_state = "ok" | |
| 278 | |
| 279 ctx.vlog("final state is %s" % final_state) | |
| 280 if final_state != "ok": | |
| 281 msg = "Failed to run job final job state is [%s]." % final_state | |
| 282 summarize_history(ctx, user_gi, history_id) | |
| 283 with open("errored_galaxy.log", "w") as f: | |
| 284 f.write(log_contents_str(config)) | |
| 285 raise Exception(msg) | |
| 286 | |
| 287 return job_dict, datasets | |
| 288 | |
| 289 | |
| 290 class GalaxyBaseRunResponse(SuccessfulRunResponse): | |
| 291 | |
| 292 def __init__( | |
| 293 self, | |
| 294 ctx, | |
| 295 runnable, | |
| 296 user_gi, | |
| 297 history_id, | |
| 298 log, | |
| 299 ): | |
| 300 self._ctx = ctx | |
| 301 self._runnable = runnable | |
| 302 self._user_gi = user_gi | |
| 303 self._history_id = history_id | |
| 304 self._log = log | |
| 305 | |
| 306 self._job_info = None | |
| 307 | |
| 308 self._outputs_dict = None | |
| 309 | |
| 310 def to_galaxy_output(self, output): | |
| 311 """Convert runnable output to a GalaxyOutput object. | |
| 312 | |
| 313 Subclasses for workflow and tool execution override this. | |
| 314 """ | |
| 315 raise NotImplementedError() | |
| 316 | |
| 317 def _get_extra_files(self, dataset_details): | |
| 318 extra_files_url = "%s/%s/contents/%s/extra_files" % ( | |
| 319 self._user_gi._make_url(self._user_gi.histories), self._history_id, dataset_details["id"] | |
| 320 ) | |
| 321 extra_files = Client._get(self._user_gi.jobs, url=extra_files_url) | |
| 322 return extra_files | |
| 323 | |
| 324 def _get_metadata(self, history_content_type, content_id): | |
| 325 if history_content_type == "dataset": | |
| 326 return self._user_gi.histories.show_dataset( | |
| 327 self._history_id, | |
| 328 content_id, | |
| 329 ) | |
| 330 elif history_content_type == "dataset_collection": | |
| 331 return self._user_gi.histories.show_dataset_collection( | |
| 332 self._history_id, | |
| 333 content_id, | |
| 334 ) | |
| 335 else: | |
| 336 raise Exception("Unknown history content type encountered [%s]" % history_content_type) | |
| 337 | |
| 338 def collect_outputs(self, ctx, output_directory): | |
| 339 assert self._outputs_dict is None, "collect_outputs pre-condition violated" | |
| 340 | |
| 341 outputs_dict = {} | |
| 342 if not output_directory: | |
| 343 # TODO: rather than creating a directory just use | |
| 344 # Galaxy paths if they are available in this | |
| 345 # configuration. | |
| 346 output_directory = tempfile.mkdtemp() | |
| 347 | |
| 348 def get_dataset(dataset_details, filename=None): | |
| 349 parent_basename = dataset_details.get("cwl_file_name") | |
| 350 if not parent_basename: | |
| 351 parent_basename = dataset_details.get("name") | |
| 352 file_ext = dataset_details["file_ext"] | |
| 353 if file_ext == "directory": | |
| 354 # TODO: rename output_directory to outputs_directory because we can have output directories | |
| 355 # and this is confusing... | |
| 356 the_output_directory = os.path.join(output_directory, parent_basename) | |
| 357 safe_makedirs(the_output_directory) | |
| 358 destination = self.download_output_to(dataset_details, the_output_directory, filename=filename) | |
| 359 else: | |
| 360 destination = self.download_output_to(dataset_details, output_directory, filename=filename) | |
| 361 if filename is None: | |
| 362 basename = parent_basename | |
| 363 else: | |
| 364 basename = os.path.basename(filename) | |
| 365 | |
| 366 return {"path": destination, "basename": basename} | |
| 367 | |
| 368 ctx.vlog("collecting outputs to directory %s" % output_directory) | |
| 369 for runnable_output in get_outputs(self._runnable): | |
| 370 output_id = runnable_output.get_id() | |
| 371 if not output_id: | |
| 372 ctx.vlog("Workflow output identified without an ID (label), skipping") | |
| 373 continue | |
| 374 output_dict_value = None | |
| 375 if self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]: | |
| 376 galaxy_output = self.to_galaxy_output(runnable_output) | |
| 377 cwl_output = output_to_cwl_json( | |
| 378 galaxy_output, | |
| 379 self._get_metadata, | |
| 380 get_dataset, | |
| 381 self._get_extra_files, | |
| 382 pseduo_location=True, | |
| 383 ) | |
| 384 output_dict_value = cwl_output | |
| 385 else: | |
| 386 # TODO: deprecate this route for finding workflow outputs, | |
| 387 # it is a brittle and bad approach... | |
| 388 output_dataset_id = self.output_dataset_id(runnable_output) | |
| 389 dataset = self._get_metadata("dataset", output_dataset_id) | |
| 390 dataset_dict = get_dataset(dataset) | |
| 391 ctx.vlog("populated destination [%s]" % dataset_dict["path"]) | |
| 392 | |
| 393 if dataset["file_ext"] == "expression.json": | |
| 394 with open(dataset_dict["path"], "r") as f: | |
| 395 output_dict_value = json.load(f) | |
| 396 else: | |
| 397 output_dict_value = output_properties(**dataset_dict) | |
| 398 | |
| 399 outputs_dict[output_id] = output_dict_value | |
| 400 | |
| 401 self._outputs_dict = outputs_dict | |
| 402 ctx.vlog("collected outputs [%s]" % self._outputs_dict) | |
| 403 | |
| 404 @property | |
| 405 def log(self): | |
| 406 return self._log | |
| 407 | |
| 408 @property | |
| 409 def job_info(self): | |
| 410 if self._job_info is not None: | |
| 411 return dict( | |
| 412 stdout=self._job_info["stdout"], | |
| 413 stderr=self._job_info["stderr"], | |
| 414 command_line=self._job_info["command_line"], | |
| 415 ) | |
| 416 return None | |
| 417 | |
| 418 @property | |
| 419 def outputs_dict(self): | |
| 420 return self._outputs_dict | |
| 421 | |
| 422 def download_output_to(self, dataset_details, output_directory, filename=None): | |
| 423 if filename is None: | |
| 424 local_filename = dataset_details.get("cwl_file_name") or dataset_details.get("name") | |
| 425 else: | |
| 426 local_filename = filename | |
| 427 destination = os.path.join(output_directory, local_filename) | |
| 428 self._history_content_download( | |
| 429 self._history_id, | |
| 430 dataset_details["id"], | |
| 431 to_path=destination, | |
| 432 filename=filename, | |
| 433 ) | |
| 434 return destination | |
| 435 | |
| 436 def _history_content_download(self, history_id, dataset_id, to_path, filename=None): | |
| 437 user_gi = self._user_gi | |
| 438 url = user_gi.url + "/histories/%s/contents/%s/display" % (history_id, dataset_id) | |
| 439 | |
| 440 data = {} | |
| 441 if filename: | |
| 442 data["filename"] = filename | |
| 443 | |
| 444 r = requests.get(url, params=data, verify=user_gi.verify, stream=True, timeout=user_gi.timeout) | |
| 445 r.raise_for_status() | |
| 446 | |
| 447 with open(to_path, 'wb') as fp: | |
| 448 for chunk in r.iter_content(chunk_size=bioblend.CHUNK_SIZE): | |
| 449 if chunk: | |
| 450 fp.write(chunk) | |
| 451 | |
| 452 | |
| 453 class GalaxyToolRunResponse(GalaxyBaseRunResponse): | |
| 454 | |
| 455 def __init__( | |
| 456 self, | |
| 457 ctx, | |
| 458 runnable, | |
| 459 user_gi, | |
| 460 history_id, | |
| 461 log, | |
| 462 job_info, | |
| 463 api_run_response, | |
| 464 ): | |
| 465 super(GalaxyToolRunResponse, self).__init__( | |
| 466 ctx=ctx, | |
| 467 runnable=runnable, | |
| 468 user_gi=user_gi, | |
| 469 history_id=history_id, | |
| 470 log=log, | |
| 471 ) | |
| 472 self._job_info = job_info | |
| 473 self.api_run_response = api_run_response | |
| 474 | |
| 475 def is_collection(self, output): | |
| 476 # TODO: Make this more rigorous - search both output and output | |
| 477 # collections - throw an exception if not found in either place instead | |
| 478 # of just assuming all non-datasets are collections. | |
| 479 return self.output_dataset_id(output) is None | |
| 480 | |
| 481 def to_galaxy_output(self, runnable_output): | |
| 482 output_id = runnable_output.get_id() | |
| 483 return tool_response_to_output(self.api_run_response, self._history_id, output_id) | |
| 484 | |
| 485 def output_dataset_id(self, output): | |
| 486 outputs = self.api_run_response["outputs"] | |
| 487 output_id = output.get_id() | |
| 488 output_dataset_id = None | |
| 489 self._ctx.vlog("Looking for id [%s] in outputs [%s]" % (output_id, outputs)) | |
| 490 for output in outputs: | |
| 491 if output["output_name"] == output_id: | |
| 492 output_dataset_id = output["id"] | |
| 493 | |
| 494 return output_dataset_id | |
| 495 | |
| 496 | |
| 497 class GalaxyWorkflowRunResponse(GalaxyBaseRunResponse): | |
| 498 | |
| 499 def __init__( | |
| 500 self, | |
| 501 ctx, | |
| 502 runnable, | |
| 503 user_gi, | |
| 504 history_id, | |
| 505 log, | |
| 506 workflow_id, | |
| 507 invocation_id, | |
| 508 ): | |
| 509 super(GalaxyWorkflowRunResponse, self).__init__( | |
| 510 ctx=ctx, | |
| 511 runnable=runnable, | |
| 512 user_gi=user_gi, | |
| 513 history_id=history_id, | |
| 514 log=log, | |
| 515 ) | |
| 516 self._workflow_id = workflow_id | |
| 517 self._invocation_id = invocation_id | |
| 518 | |
| 519 def to_galaxy_output(self, runnable_output): | |
| 520 output_id = runnable_output.get_id() | |
| 521 self._ctx.vlog("checking for output in invocation [%s]" % self._invocation) | |
| 522 return invocation_to_output(self._invocation, self._history_id, output_id) | |
| 523 | |
| 524 def output_dataset_id(self, output): | |
| 525 invocation = self._invocation | |
| 526 if "outputs" in invocation: | |
| 527 # Use newer workflow outputs API. | |
| 528 | |
| 529 output_name = output.get_id() | |
| 530 if output_name in invocation["outputs"]: | |
| 531 return invocation["outputs"][output.get_id()]["id"] | |
| 532 else: | |
| 533 raise Exception("Failed to find output [%s] in invocation outputs [%s]" % (output_name, invocation["outputs"])) | |
| 534 else: | |
| 535 # Assume the output knows its order_index and such - older line of | |
| 536 # development not worth persuing. | |
| 537 workflow_output = output.workflow_output | |
| 538 order_index = workflow_output.order_index | |
| 539 | |
| 540 invocation_steps = invocation["steps"] | |
| 541 output_steps = [s for s in invocation_steps if s["order_index"] == order_index] | |
| 542 assert len(output_steps) == 1, "More than one step matching outputs, behavior undefined." | |
| 543 output_step = output_steps[0] | |
| 544 job_id = output_step["job_id"] | |
| 545 assert job_id, "Output doesn't define a job_id, behavior undefined." | |
| 546 job_info = self._user_gi.jobs.show_job(job_id, full_details=True) | |
| 547 job_outputs = job_info["outputs"] | |
| 548 output_name = workflow_output.output_name | |
| 549 assert output_name in job_outputs, "No output [%s] found for output job." | |
| 550 job_output = job_outputs[output_name] | |
| 551 assert "id" in job_output, "Job output [%s] does not contain 'id'." % job_output | |
| 552 return job_output["id"] | |
| 553 | |
| 554 @property | |
| 555 def _invocation(self): | |
| 556 invocation = self._user_gi.workflows.show_invocation( | |
| 557 self._workflow_id, | |
| 558 self._invocation_id, | |
| 559 ) | |
| 560 return invocation | |
| 561 | |
| 562 | |
| 563 def _tool_id(tool_path): | |
| 564 tool_source = get_tool_source(tool_path) | |
| 565 return tool_source.parse_id() | |
| 566 | |
| 567 | |
| 568 def _history_id(gi, **kwds): | |
| 569 history_id = kwds.get("history_id", None) | |
| 570 if history_id is None: | |
| 571 history_name = kwds.get("history_name", DEFAULT_HISTORY_NAME) | |
| 572 history_id = gi.histories.create_history(history_name)["id"] | |
| 573 return history_id | |
| 574 | |
| 575 | |
| 576 def _wait_for_invocation(ctx, gi, history_id, workflow_id, invocation_id, polling_backoff=0): | |
| 577 | |
| 578 def state_func(): | |
| 579 if _retry_on_timeouts(ctx, gi, lambda gi: has_jobs_in_states(gi, history_id, ["error", "deleted", "deleted_new"])): | |
| 580 raise Exception("Problem running workflow, one or more jobs failed.") | |
| 581 | |
| 582 return _retry_on_timeouts(ctx, gi, lambda gi: gi.workflows.show_invocation(workflow_id, invocation_id)) | |
| 583 | |
| 584 return _wait_on_state(state_func, polling_backoff) | |
| 585 | |
| 586 | |
| 587 def _retry_on_timeouts(ctx, gi, f): | |
| 588 gi.timeout = 60 | |
| 589 try_count = 5 | |
| 590 try: | |
| 591 for try_num in range(try_count): | |
| 592 start_time = time.time() | |
| 593 try: | |
| 594 return f(gi) | |
| 595 except Exception: | |
| 596 end_time = time.time() | |
| 597 if end_time - start_time > 45 and (try_num + 1) < try_count: | |
| 598 ctx.vlog("Galaxy seems to have timedout, retrying to fetch status.") | |
| 599 continue | |
| 600 else: | |
| 601 raise | |
| 602 finally: | |
| 603 gi.timeout = None | |
| 604 | |
| 605 | |
| 606 def has_jobs_in_states(gi, history_id, states): | |
| 607 params = {"history_id": history_id} | |
| 608 jobs_url = gi._make_url(gi.jobs) | |
| 609 jobs = Client._get(gi.jobs, params=params, url=jobs_url) | |
| 610 | |
| 611 target_jobs = [j for j in jobs if j["state"] in states] | |
| 612 | |
| 613 return len(target_jobs) > 0 | |
| 614 | |
| 615 | |
| 616 def _wait_for_history(ctx, gi, history_id, polling_backoff=0): | |
| 617 | |
| 618 def has_active_jobs(gi): | |
| 619 if has_jobs_in_states(gi, history_id, ["new", "upload", "waiting", "queued", "running"]): | |
| 620 return True | |
| 621 else: | |
| 622 return None | |
| 623 | |
| 624 timeout = 60 * 60 * 24 | |
| 625 wait_on(lambda: _retry_on_timeouts(ctx, gi, has_active_jobs), "active jobs", timeout, polling_backoff) | |
| 626 | |
| 627 def state_func(): | |
| 628 return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) | |
| 629 | |
| 630 return _wait_on_state(state_func, polling_backoff) | |
| 631 | |
| 632 | |
| 633 def _wait_for_job(gi, job_id): | |
| 634 def state_func(): | |
| 635 return gi.jobs.show_job(job_id, full_details=True) | |
| 636 | |
| 637 return _wait_on_state(state_func) | |
| 638 | |
| 639 | |
| 640 def _wait_on_state(state_func, polling_backoff=0): | |
| 641 | |
| 642 def get_state(): | |
| 643 response = state_func() | |
| 644 state = response["state"] | |
| 645 if str(state) not in ["running", "queued", "new", "ready"]: | |
| 646 return state | |
| 647 else: | |
| 648 return None | |
| 649 timeout = 60 * 60 * 24 | |
| 650 final_state = wait_on(get_state, "state", timeout, polling_backoff) | |
| 651 return final_state | |
| 652 | |
| 653 | |
| 654 __all__ = ( | |
| 655 "execute", | |
| 656 ) |
