Mercurial > repos > petr-novak > dante
comparison dante_gff_to_dna.py @ 0:77d9f2ecb28a draft
Uploaded
author | petr-novak |
---|---|
date | Wed, 03 Jul 2019 02:45:00 -0400 |
parents | |
children | d0431a839606 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:77d9f2ecb28a |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 import argparse | |
4 import configuration | |
5 import time | |
6 import os | |
7 from collections import defaultdict | |
8 from Bio import SeqIO | |
9 import textwrap | |
10 | |
11 t_nt_seqs_extraction = time.time() | |
12 | |
13 | |
14 def str2bool(v): | |
15 if v.lower() in ('yes', 'true', 't', 'y', '1'): | |
16 return True | |
17 elif v.lower() in ('no', 'false', 'f', 'n', '0'): | |
18 return False | |
19 else: | |
20 raise argparse.ArgumentTypeError('Boolean value expected') | |
21 | |
22 | |
23 def check_file_start(gff_file): | |
24 count_comment = 0 | |
25 with open(gff_file, "r") as gff_all: | |
26 line = gff_all.readline() | |
27 while line.startswith("#"): | |
28 line = gff_all.readline() | |
29 count_comment += 1 | |
30 return count_comment, line | |
31 | |
32 | |
33 def extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL, EXTENDED): | |
34 ''' Extract nucleotide sequences of protein domains found by DANTE from input DNA seq. | |
35 Sequences are saved in fasta files separately for each transposon lineage. | |
36 Sequences extraction is based on position of Best_Hit alignment reported by LASTAL. | |
37 The positions can be extended (optional) based on what part of database domain was aligned (Best_Hit_DB_Pos attribute). | |
38 The strand orientation needs to be considered in extending and extracting the sequence itself | |
39 ''' | |
40 [count_comment, first_line] = check_file_start(DOM_GFF) | |
41 unique_classes = get_unique_classes(CLASS_TBL) | |
42 files_dict = defaultdict(str) | |
43 domains_counts_dict = defaultdict(int) | |
44 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta')) | |
45 with open(DOM_GFF, "r") as domains: | |
46 for comment_idx in range(count_comment): | |
47 next(domains) | |
48 seq_id_stored = first_line.split("\t")[0] | |
49 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta')) | |
50 seq_nt = allSeqs[seq_id_stored] | |
51 for line in domains: | |
52 seq_id = line.split("\t")[0] | |
53 dom_type = line.split("\t")[8].split(";")[0].split("=")[1] | |
54 elem_type = line.split("\t")[8].split(";")[1].split("=")[1] | |
55 strand = line.split("\t")[6] | |
56 align_nt_start = int(line.split("\t")[8].split(";")[3].split(":")[ | |
57 -1].split("-")[0]) | |
58 align_nt_end = int(line.split("\t")[8].split(";")[3].split(":")[ | |
59 -1].split("-")[1].split("[")[0]) | |
60 if seq_id != seq_id_stored: | |
61 seq_id_stored = seq_id | |
62 seq_nt = allSeqs[seq_id_stored] | |
63 if EXTENDED: | |
64 ## which part of database sequence was aligned | |
65 db_part = line.split("\t")[8].split(";")[4].split("=")[1] | |
66 ## datatabse seq length | |
67 dom_len = int(db_part.split("of")[1]) | |
68 ## start of alignment on database seq | |
69 db_start = int(db_part.split("of")[0].split(":")[0]) | |
70 ## end of alignment on database seq | |
71 db_end = int(db_part.split("of")[0].split(":")[1]) | |
72 ## number of nucleotides missing in the beginning | |
73 dom_nt_prefix = (db_start - 1) * 3 | |
74 ## number of nucleotides missing in the end | |
75 dom_nt_suffix = (dom_len - db_end) * 3 | |
76 if strand == "+": | |
77 dom_nt_start = align_nt_start - dom_nt_prefix | |
78 dom_nt_end = align_nt_end + dom_nt_suffix | |
79 ## reverse extending for - strand | |
80 else: | |
81 dom_nt_start = align_nt_start - dom_nt_suffix | |
82 dom_nt_end = align_nt_end + dom_nt_prefix | |
83 ## correction for domain after extending having negative starting positon | |
84 dom_nt_start = max(1, dom_nt_start) | |
85 else: | |
86 dom_nt_start = align_nt_start | |
87 dom_nt_end = align_nt_end | |
88 full_dom_nt = seq_nt.seq[dom_nt_start - 1:dom_nt_end] | |
89 ## for - strand take reverse complement of the extracted sequence | |
90 if strand == "-": | |
91 full_dom_nt = full_dom_nt.reverse_complement() | |
92 full_dom_nt = str(full_dom_nt) | |
93 ## report when domain classified to the last level and no Ns in extracted seq | |
94 if elem_type in unique_classes and "N" not in full_dom_nt: | |
95 # lineages reported in separate fasta files | |
96 if not elem_type in files_dict: | |
97 files_dict[elem_type] = os.path.join( | |
98 OUT_DIR, "{}.fasta".format(elem_type.split("|")[ | |
99 -1].replace("/", "_"))) | |
100 with open(files_dict[elem_type], "a") as out_nt_seq: | |
101 out_nt_seq.write(">{}:{}-{}|{}[{}]\n{}\n".format( | |
102 seq_nt.id, dom_nt_start, dom_nt_end, dom_type, | |
103 elem_type, textwrap.fill(full_dom_nt, | |
104 configuration.FASTA_LINE))) | |
105 domains_counts_dict[elem_type] += 1 | |
106 return domains_counts_dict | |
107 | |
108 | |
109 def get_unique_classes(CLASS_TBL): | |
110 ''' Get all the lineages of current domains classification table to check if domains are classified to the last level. | |
111 Only the sequences of unambiguous and completely classified domains will be extracted. | |
112 ''' | |
113 unique_classes = [] | |
114 with open(CLASS_TBL, "r") as class_tbl: | |
115 for line in class_tbl: | |
116 line_class = "|".join(line.rstrip().split("\t")[1:]) | |
117 if line_class not in unique_classes: | |
118 unique_classes.append(line_class) | |
119 return unique_classes | |
120 | |
121 | |
122 def write_domains_stat(domains_counts_dict, OUT_DIR): | |
123 ''' Report counts of domains for individual lineages | |
124 ''' | |
125 total = 0 | |
126 with open( | |
127 os.path.join(OUT_DIR, | |
128 configuration.EXTRACT_DOM_STAT), "w") as dom_stat: | |
129 for domain, count in domains_counts_dict.items(): | |
130 dom_stat.write(";{}:{}\n".format(domain, count)) | |
131 total += count | |
132 dom_stat.write(";TOTAL:{}\n".format(total)) | |
133 | |
134 | |
135 def main(args): | |
136 | |
137 DNA_SEQ = args.input_dna | |
138 DOM_GFF = args.domains_gff | |
139 OUT_DIR = args.out_dir | |
140 CLASS_TBL = args.classification | |
141 EXTENDED = args.extended | |
142 | |
143 if not os.path.exists(OUT_DIR): | |
144 os.makedirs(OUT_DIR) | |
145 | |
146 domains_counts_dict = extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL, | |
147 EXTENDED) | |
148 write_domains_stat(domains_counts_dict, OUT_DIR) | |
149 | |
150 print("ELAPSED_TIME_EXTRACTION = {} s\n".format(time.time() - | |
151 t_nt_seqs_extraction)) | |
152 | |
153 | |
154 if __name__ == "__main__": | |
155 | |
156 # Command line arguments | |
157 parser = argparse.ArgumentParser() | |
158 parser.add_argument('-i', | |
159 '--input_dna', | |
160 type=str, | |
161 required=True, | |
162 help='path to input DNA sequence') | |
163 parser.add_argument('-d', | |
164 '--domains_gff', | |
165 type=str, | |
166 required=True, | |
167 help='GFF file of protein domains') | |
168 parser.add_argument('-cs', | |
169 '--classification', | |
170 type=str, | |
171 required=True, | |
172 help='protein domains classification file') | |
173 parser.add_argument('-out', | |
174 '--out_dir', | |
175 type=str, | |
176 default=configuration.EXTRACT_OUT_DIR, | |
177 help='output directory') | |
178 parser.add_argument( | |
179 '-ex', | |
180 '--extended', | |
181 type=str2bool, | |
182 default=True, | |
183 help= | |
184 'extend the domains edges if not the whole datatabase sequence was aligned') | |
185 args = parser.parse_args() | |
186 main(args) |