diff fasta-stats.py @ 4:0dbb995c7d35 draft default tip

"planemo upload for repository https://github.com/galaxyproject/tools-iuc/tree/master/tools/fasta_stats/ commit 50f5cce5a8c11001e2c59600a2b99a4243b6d06f"
author iuc
date Thu, 18 Nov 2021 20:56:57 +0000
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/fasta-stats.py	Thu Nov 18 20:56:57 2021 +0000
@@ -0,0 +1,201 @@
+#!/usr/bin/env python
+
+
+# python version of fasta-stats with some extra features
+# written by anmol.kiran@gmail.com
+# git: @codemeleon
+# date: 10/11/2021
+
+import argparse
+import re
+from os import path
+
+import numpy as np
+from Bio import SeqIO
+
+
+def calculate_NG50(estimated_genome, total_length, sequence_lengths):
+    temp = 0
+    teoretical_NG50 = estimated_genome / 2.0
+    NG50 = 0
+    for seq in sequence_lengths:
+        temp += seq
+        if teoretical_NG50 < temp:
+            NG50 = seq
+            break
+    return NG50
+
+
+def run(fasta, stats_output, gaps_output, genome_size):
+    """Generates scaffold statistics."""
+    if not fasta:
+        exit("Input file not given.")
+    if not path.isfile(fasta):
+        exit(f"{fasta} path does not exist.")
+
+    seq_len = {}
+    bases_global = {"A": 0, "N": 0, "T": 0, "C": 0, "G": 0}
+    bases_seq = {}
+    seq_id_Ngaprange = {}
+    nstart = 0
+    contigs_len = []
+    gap_count = 0
+    for seq_record in SeqIO.parse(fasta, "fasta"):
+        seq = str(seq_record.seq).upper()
+        # print(len(seq))
+        seq_len[seq_record.id] = len(seq)
+
+        # NOTE: Nucleotide count
+        bases_seq[seq_record.id] = {
+            "A": seq.count("A"),
+            "N": seq.count("N"),
+            "T": seq.count("T"),
+            "C": seq.count("C"),
+            "G": seq.count("G"),
+        }
+        bases_global["A"] += bases_seq[seq_record.id]["A"]
+        bases_global["N"] += bases_seq[seq_record.id]["N"]
+        bases_global["T"] += bases_seq[seq_record.id]["T"]
+        bases_global["C"] += bases_seq[seq_record.id]["C"]
+        bases_global["G"] += bases_seq[seq_record.id]["G"]
+
+        # NOTE: Gap count and their range
+        range_gen = re.finditer("N+", seq)
+        n_range = [match.span() for match in range_gen]
+        for n_rng in n_range:
+            if n_rng[0] == 0 or n_rng[1] == seq_len[seq_record.id]:
+                continue
+            else:
+                gap_count += 1
+
+        # NOTE: Contigs, their lenths from scaffold and their N gap range
+        seq_id_Ngaprange[seq_record.id] = n_range
+        n_range_len = len(n_range)
+        if n_range_len > 0:
+            n_range = (
+                [(0, 0)] + n_range + [(seq_len[seq_record.id], seq_len[seq_record.id])]
+            )
+            for idx in range(n_range_len + 1):
+                nstart = n_range[idx][1]
+                nend = n_range[idx + 1][0]
+                con_len = nend - nstart
+                if con_len:
+                    contigs_len.append(con_len)
+        else:
+            contigs_len.append(len(seq))
+
+    # NOTE: Scaffold statistics
+    SEQ_LEN_LIST = sorted(seq_len.values(), reverse=True)
+    scaffold_lens = np.array(SEQ_LEN_LIST)
+    scaffold_lens_sum = np.cumsum(scaffold_lens)
+    N50_len = scaffold_lens_sum[-1] * 0.5
+    N50_idx = np.where(scaffold_lens_sum > N50_len)[0][0]
+    N90_len = scaffold_lens_sum[-1] * 0.9
+    N90_idx = np.where(scaffold_lens_sum > N90_len)[0][0]
+    NG50 = calculate_NG50(genome_size, scaffold_lens_sum[-1], scaffold_lens)
+
+    # NOTE: Contig statistics
+    seq_len_list = sorted(contigs_len, reverse=True)
+    contigs_len = np.array(seq_len_list)
+    contigs_len_sum = np.cumsum(contigs_len)
+    n50_len = contigs_len_sum[-1] * 0.5
+    n50_idx = np.where(contigs_len_sum > n50_len)[0][0]
+    n90_len = contigs_len_sum[-1] * 0.9
+    n90_idx = np.where(contigs_len_sum > n90_len)[0][0]
+    ng50 = calculate_NG50(genome_size, contigs_len_sum[-1], contigs_len)
+
+    with open(stats_output, "w") as soutput:
+        soutput.write("{}\t{}\n".format("Scaffold L50", N50_idx + 1))
+        soutput.write("{}\t{}\n".format("Scaffold N50", SEQ_LEN_LIST[N50_idx]))
+        soutput.write("{}\t{}\n".format("Scaffold L90", N90_idx + 1))
+        soutput.write("{}\t{}\n".format("Scaffold N90", SEQ_LEN_LIST[N90_idx]))
+        if genome_size != 0:
+            soutput.write("{}\t{}\n".format("Scaffold NG50", NG50))
+        soutput.write("{}\t{}\n".format("Scaffold len_max", SEQ_LEN_LIST[0]))
+        soutput.write("{}\t{}\n".format("Scaffold len_min", SEQ_LEN_LIST[-1]))
+        soutput.write(
+            "{}\t{}\n".format("Scaffold len_mean", int(np.mean(SEQ_LEN_LIST)))
+        )
+        soutput.write(
+            "{}\t{}\n".format("Scaffold len_median", int(np.median(SEQ_LEN_LIST)))
+        )
+        soutput.write("{}\t{}\n".format("Scaffold len_std", int(np.std(SEQ_LEN_LIST))))
+        soutput.write("{}\t{}\n".format("Scaffold num_A", bases_global["A"]))
+        soutput.write("{}\t{}\n".format("Scaffold num_T", bases_global["T"]))
+        soutput.write("{}\t{}\n".format("Scaffold num_C", bases_global["C"]))
+        soutput.write("{}\t{}\n".format("Scaffold num_G", bases_global["G"]))
+        soutput.write("{}\t{}\n".format("Scaffold num_N", bases_global["N"]))
+        soutput.write("{}\t{}\n".format("Scaffold num_bp", scaffold_lens_sum[-1]))
+        soutput.write(
+            "{}\t{}\n".format(
+                "Scaffold num_bp_not_N", scaffold_lens_sum[-1] - bases_global["N"]
+            )
+        )
+        soutput.write("{}\t{}\n".format("Scaffold num_seq", len(SEQ_LEN_LIST)))
+        soutput.write(
+            "{}\t{:.2f}\n".format(
+                "Scaffold GC content overall",
+                (
+                    (bases_global["G"] + bases_global["C"])
+                    * 100.0
+                    / scaffold_lens_sum[-1]
+                ),
+            )
+        )
+
+        soutput.write("{}\t{}\n".format("Contig L50", n50_idx + 1))
+        soutput.write("{}\t{}\n".format("Contig N50", seq_len_list[n50_idx]))
+        soutput.write("{}\t{}\n".format("Contig L90", n90_idx + 1))
+        soutput.write("{}\t{}\n".format("Contig N90", seq_len_list[n90_idx]))
+        if genome_size != 0:
+            soutput.write("{}\t{}\n".format("Contig NG50", ng50))
+        soutput.write("{}\t{}\n".format("Contig len_max", seq_len_list[0]))
+        soutput.write("{}\t{}\n".format("Contig len_min", seq_len_list[-1]))
+        soutput.write("{}\t{}\n".format("Contig len_mean", int(np.mean(seq_len_list))))
+        soutput.write(
+            "{}\t{}\n".format("Contig len_median", int(np.median(seq_len_list)))
+        )
+        soutput.write("{}\t{}\n".format("Contig len_std", int(np.std(seq_len_list))))
+        soutput.write("{}\t{}\n".format("Contig num_bp", contigs_len_sum[-1]))
+        soutput.write("{}\t{}\n".format("Contig num_seq", len(contigs_len_sum)))
+        soutput.write("{}\t{}\n".format("Number of gaps", gap_count))
+        if gaps_output is not None:
+            # NOTE: generate gaps statistics file
+            with open(gaps_output, "w") as goutput:
+                for key in seq_id_Ngaprange:
+                    for rng in seq_id_Ngaprange[key]:
+                        goutput.write("{}\t{}\t{}\n".format(key, rng[0], rng[1]))
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser()
+    parser.add_argument("-f", "--fasta", required=True, help="FASTA file")
+    parser.add_argument(
+        "-z",
+        "--genome_size",
+        required=False,
+        type=int,
+        help="If provided, the NG50 statistic will be computed",
+        default=0,
+    )
+    parser.add_argument(
+        "-s",
+        "--stats_output",
+        required=True,
+        help="File to store the general statistics",
+    )
+    parser.add_argument(
+        "-r",
+        "--gaps_output",
+        required=False,
+        help="File to store the gaps statistics",
+        default=None,
+    )
+    args = parser.parse_args()
+
+    run(
+        args.fasta,
+        args.stats_output,
+        args.gaps_output,
+        args.genome_size,
+    )