# HG changeset patch # User greg # Date 1626381592 0 # Node ID 0882b7bb3dfcdae80755fcbe84efcdecc1df2c35 # Parent 560dcf3f9f3d7f9a24c942249a4cadd5d6ea525a Uploaded diff -r 560dcf3f9f3d -r 0882b7bb3dfc queue_genotype_workflow.py --- a/queue_genotype_workflow.py Wed Nov 11 18:21:36 2020 +0000 +++ b/queue_genotype_workflow.py Thu Jul 15 20:39:52 2021 +0000 @@ -283,10 +283,12 @@ ags_storage_dir = get_value_from_config(config_defaults, 'ALL_GENOTYPED_SAMPLES_STORAGE_DIR') coralsnp_workflow_name = get_value_from_config(config_defaults, 'CORALSNP_WORKFLOW_NAME') es_workflow_name = get_value_from_config(config_defaults, 'ENSURE_SYNCED_WORKFLOW_NAME') +fags_workflow_name = get_value_from_config(config_defaults, 'FILTER_ALL_GENOTYPED_SAMPLES_WORKFLOW_NAME') vam_workflow_name = get_value_from_config(config_defaults, 'VALIDATE_AFFY_METADATA_WORKFLOW_NAME') affy_metadata_is_valid = False datasets_have_queued = False +filtered = False stag_database_updated = False synced = False lock = threading.Lock() @@ -393,6 +395,7 @@ while True: history_status_dict = get_history_status(gi, args.history_id) sd_dict = history_status_dict['state_details'] + outputfh.write("\ndatasets_have_queued: %s\n" % str(datasets_have_queued)) outputfh.write("\nsd_dict: %s\n" % str(sd_dict)) # The queue_genotype_workflow tool will continue to be in a # "running" state while inside this for loop, so we know that @@ -415,30 +418,69 @@ break outputfh.write("\nSleeping for 5 seconds...\n") time.sleep(5) + outputfh.write("\nstag_database_updated: %s\n" % str(stag_database_updated)) if stag_database_updated: # Get the id of the "bcftools merge" dataset in the current history. - bcftools_merge_dataset_id = get_history_dataset_id_by_name(gi, args.history_id, "bcftools merge", outputfh) + bcftools_merge = get_history_dataset_id_by_name(gi, args.history_id, "bcftools merge", outputfh) + # Get the FilterAllGenotypedSamples workflow + fags_workflow_id, fags_workflow_dict = get_workflow(gi, fags_workflow_name, outputfh) + outputfh.write("\nFilterAllGenotypedSamples workflow id: %s\n" % str(fags_workflow_id)) + # Map the history datasets to the input datasets for + # the FilterAllGenotypedSamples workflow. + fags_workflow_input_datasets = get_workflow_input_datasets(gi, history_datasets, fags_workflow_name, fags_workflow_dict, outputfh) + # Start the FilterAllGenotypedSamples workflow. + start_workflow(gi, fags_workflow_id, fags_workflow_name, fags_workflow_input_datasets, None, args.history_id, outputfh) + outputfh.write("\nSleeping for 15 seconds...\n") + time.sleep(15) + # Poll the history datasets, checking the statuses, and wait until + # the workflow is finished. The workflow itself simply schedules + # all of the jobs, so it cannot be checked for a state. + while True: + history_status_dict = get_history_status(gi, args.history_id) + sd_dict = history_status_dict['state_details'] + outputfh.write("\nsd_dict: %s\n" % str(sd_dict)) + # The queue_genotype_workflow tool will continue to be in a + # "running" state while inside this for loop, so we know that + # the workflow has completed if only 1 dataset has this state. + if sd_dict['running'] <= 1: + if sd_dict['error'] == 0: + # The all_genotyped_samples.vcf file is filtered. + filtered = True + break + outputfh.write("\nSleeping for 5 seconds...\n") + time.sleep(5) + outputfh.write("\nfiltered: %s\n" % str(filtered)) + if filtered: + # Get the id of the "bcftools view" dataset in the current history. + bcftools_view = get_history_dataset_id_by_name(gi, args.history_id, "bcftools view", outputfh) # Create a new dataset in the All Genotyped Samples data library by - # importing the "bcftools merge" dataset from the current history. + # importing the "bcftools view" dataset from the current history. # We'll do this as the coraldmin user. admin_gi = galaxy.GalaxyInstance(url=galaxy_base_url, key=admin_api_key) - new_ags_dataset_dict = copy_history_dataset_to_library(admin_gi, ags_library_id, bcftools_merge_dataset_id, outputfh) + new_ags_dataset_dict = copy_history_dataset_to_library(admin_gi, ags_library_id, bcftools_view, outputfh) + outputfh.write("\nnew_ags_dataset_dict: %s\n" % str(new_ags_dataset_dict)) # Rename the ldda to be all_genotyped_samples.vcf. new_ags_ldda_id = new_ags_dataset_dict['id'] + outputfh.write("\nnew_ags_ldda_id: %s\n" % str(new_ags_ldda_id)) renamed_ags_dataset_dict = rename_library_dataset(admin_gi, new_ags_ldda_id, ags_dataset_name, outputfh) + outputfh.write("\nrenamed_ags_dataset_dict: %s\n" % str(renamed_ags_dataset_dict)) # Get the full path of the all_genotyped_samples.vcf library dataset. ags_ldda_file_path = get_library_dataset_file_path(gi, ags_library_id, ags_ldda_id, outputfh) + outputfh.write("\nags_ldda_file_path: %s\n" % str(ags_ldda_file_path)) # Copy the all_genotyped_samples.vcf dataset to storage. We # will only keep a single copy of this file since this tool # will end in an error before the CoralSNP workflow is started # if the all_genotyped_samples.vcf file is not sync'd with the # stag database. copy_dataset_to_storage(ags_ldda_file_path, ags_storage_dir, ags_dataset_name, outputfh) + outputfh.write("\naCopied gs_ldda_file_path: %s to ags_storage_dir %s\n" % (str(ags_ldda_file_path), str(ags_storage_dir))) # Delete the original all_genotyped_samples library dataset. deleted_dataset_dict = delete_library_dataset(admin_gi, ags_library_id, ags_ldda_id, outputfh) + outputfh.write("\ndeleted_dataset_dict: %s\n" % str(deleted_dataset_dict)) # To save disk space, delete the all_genotyped_samples hda # in the current history to enable later purging by an admin. ags_hda_id = get_history_dataset_id_by_name(gi, args.history_id, "all_genotyped_samples", outputfh) + outputfh.write("\nags_hda_id: %s\n" % str(ags_hda_id)) delete_history_dataset(gi, args.history_id, ags_hda_id, outputfh) else: outputfh.write("\nProcessing ended in error...\n")