diff nf/subworkflows/ncbi/gnomon/chainer_wnode/main.nf @ 0:d9c5c5b87fec draft

planemo upload for repository https://github.com/ncbi/egapx commit 8173d01b08d9a91c9ec5f6cb50af346edc8020c4
author fubar
date Sat, 03 Aug 2024 11:16:53 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/nf/subworkflows/ncbi/gnomon/chainer_wnode/main.nf	Sat Aug 03 11:16:53 2024 +0000
@@ -0,0 +1,167 @@
+#!/usr/bin/env nextflow
+nextflow.enable.dsl=2
+
+
+include { merge_params } from '../../utilities'
+include { run_align_sort }  from '../../default/align_sort_sa/main.nf'
+
+split_count=16
+
+
+workflow chainer_wnode {
+    take:
+        alignments
+        hmm_params
+        evidence_denylist
+        gap_fill_allowlist
+        scaffolds
+        trusted_genes    
+        genome
+        proteins
+        parameters  // Map : extra parameter and parameter update
+    main:
+        String input_sorting = parameters.get('input_aligns_sort', '')
+        def sort_aligns = alignments
+        if (!input_sorting.contains("presorted")) {
+            String align_sort_params = ""
+            if (input_sorting.contains("merge_only")) {
+                align_sort_params = "-merge"
+            }
+            align_sort_params += " -ifmt seq-align -compression none -k subject,subject_start,-subject_end "
+            // print(align_sort_params)
+            sort_aligns = run_align_sort([], [], alignments, align_sort_params).collect()
+            //sort_aligns = align_sort(alignments, align_sort_params)
+        }
+        String submit_chainer_params =  merge_params("-minimum-abut-margin 20 -separate-within-introns", parameters, 'submit_chainer')
+        String chainer_wnode_params = merge_params("", parameters, 'chainer_wnode')
+        String gpx_make_outputs_params =  merge_params("-default-output-name chains -slices-for affinity -sort-by affinity", parameters, 'gpx_make_outputs')
+        
+        def (jobs, lines_per_file) = generate_jobs(sort_aligns, submit_chainer_params)
+        def collected = run_chainer(jobs.flatten(), sort_aligns, hmm_params, evidence_denylist, gap_fill_allowlist, scaffolds, trusted_genes, genome, proteins, lines_per_file, chainer_wnode_params) | collect
+
+        run_gpx_make_outputs(collected, gpx_make_outputs_params)
+    emit:
+        chains = run_gpx_make_outputs.out.chains
+        chains_slices = run_gpx_make_outputs.out.chains_slices
+        evidence = run_gpx_make_outputs.out.evidence
+        evidence_slices = run_gpx_make_outputs.out.evidence_slices
+}
+
+
+process generate_jobs {
+    input:
+        path sort_aligns
+        val params
+    output:
+        path "job.*"
+        env lines_per_file
+    script:
+        njobs=split_count
+    """
+    #!/usr/bin/env bash
+    # generate_jobs $sort_aligns $params -output chains -output-slices chains_slices -output-evidence evidence -output-evidence-slices evidence_slices
+    submit_chainer $params -asn $sort_aligns -o jobs
+    total_lines=\$(wc -l <jobs)
+    (( lines_per_file = (total_lines + ${njobs} - 1) / ${njobs} ))
+    echo total_lines=\$total_lines, lines_per_file=\$lines_per_file
+    ####split -l\$lines_per_file jobs job. -da 3
+    # Use round robin to distribute jobs across nodes more evenly
+    if [ \$total_lines -lt $njobs ]; then
+        effective_njobs=\$total_lines
+    else
+        effective_njobs=$njobs
+    fi
+    split -nr/\$effective_njobs jobs job. -da 3
+    """
+    stub:
+    """
+    for i in {1..$split_count}; do
+        echo "<job query =\\\"lcl|${sort_aligns}:\${i}-\${i}\\\"></job>" >> jobs
+    done
+    split -nr/$split_count jobs job. -da 3
+    lines_per_file=10
+    """
+}
+
+
+process run_chainer {
+    input:
+        path job
+        path alignments
+        path hmm_params
+        path evidence_denylist
+        path gap_fill_allowlist
+        path scaffolds
+        path trusted_genes
+        path genome, stageAs: 'indexed/*'
+        path proteins_asn, stageAs: 'indexed/*'
+        val lines_per_file
+        val params
+    output:
+        path "output/*"
+    script:
+        job_num = job.toString().tokenize('.').last().toInteger()
+    """
+    echo "${evidence_denylist.join('\n')}" > evidence_denylist.mft
+    echo "${gap_fill_allowlist.join('\n')}" > gap_fill_allowlist.mft
+    echo "${scaffolds.join('\n')}" > scaffolds.mft
+    echo "${trusted_genes.join('\n')}" > trusted_genes.mft
+    # HACK: derive start_job_id from job file extension
+    filename=\$(basename -- "$job")
+    extension="\${filename##*.}"
+    (( start_job_id = ((10#\$extension) * $lines_per_file) + 1 ))
+    
+    # make the local LDS of the genomic and protein (if present) sequences
+    lds2_indexer -source indexed -db LDS2
+
+    # When running multiple jobs on the cluster there is a chance that
+    # several jobs will run on the same node and thus generate files
+    # with the same filename. We need to avoid that to be able to stage
+    # the output files for gpx_make_outputs. We add the job file numeric
+    # extension as a prefix to the filename.
+    mkdir interim
+    chainer_wnode $params -start-job-id \$start_job_id  -workers 32 -input-jobs ${job} -O interim -nogenbank -lds2 LDS2 -evidence-denylist-manifest evidence_denylist.mft -gap-fill-allowlist-manifest gap_fill_allowlist.mft -param ${hmm_params} -scaffolds-manifest scaffolds.mft -trusted-genes-manifest trusted_genes.mft
+    mkdir output
+    for f in interim/*; do
+        if [ -f \$f ]; then
+            mv \$f output/\${extension}_\$(basename \$f)
+        fi
+    done
+    """
+
+    stub:
+        job_num = job.toString().tokenize('.').last().toInteger()
+    """
+    mkdir -p output
+    touch output/sample_chainer_wnode.${job_num}.out
+    """
+}
+
+
+process run_gpx_make_outputs {
+    input:
+        path files, stageAs: "gpx_inputs/*"
+        val params
+    output:
+        path "output/chains.*.out.gz", emit: 'chains'
+        path "output/chains.*.out.gz.slices", emit: 'chains_slices'
+        path "output/evidence.*.out.gz", emit: 'evidence', optional: true
+        path "output/evidence.*.out.gz.slices", emit: 'evidence_slices', optional: true
+    script:
+    """
+    ls -1 gpx_inputs/* > gpx_inputs.mft
+    mkdir -p output
+    gpx_make_outputs $params -input-manifest gpx_inputs.mft -output output/@.#.out.gz -output-manifest output/@.mft -slices-manifest output/@_slices.mft -num-partitions $split_count
+    """
+    stub:
+    """
+    mkdir -p output
+    echo ${files}
+    for i in {1..$split_count}; do
+        touch output/chains.\$i.out.gz
+        touch output/chains.\$i.out.gz.slices
+        touch output/evidence.\$i.out.gz
+        touch output/evidence.\$i.out.gz.slices
+    done
+    """
+}