0
|
1 #!/usr/bin/env python3
|
|
2
|
|
3 import argparse
|
|
4 import time
|
|
5 import os
|
10
|
6 import textwrap
|
0
|
7 from collections import defaultdict
|
|
8 from Bio import SeqIO
|
10
|
9 import configuration
|
17
|
10 from dante_gff_output_filtering import parse_gff_line
|
0
|
11 t_nt_seqs_extraction = time.time()
|
|
12
|
|
13
|
|
14 def str2bool(v):
|
|
15 if v.lower() in ('yes', 'true', 't', 'y', '1'):
|
|
16 return True
|
|
17 elif v.lower() in ('no', 'false', 'f', 'n', '0'):
|
|
18 return False
|
|
19 else:
|
|
20 raise argparse.ArgumentTypeError('Boolean value expected')
|
|
21
|
|
22
|
|
23 def check_file_start(gff_file):
|
|
24 count_comment = 0
|
|
25 with open(gff_file, "r") as gff_all:
|
|
26 line = gff_all.readline()
|
|
27 while line.startswith("#"):
|
|
28 line = gff_all.readline()
|
|
29 count_comment += 1
|
|
30 return count_comment, line
|
|
31
|
|
32
|
|
33 def extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL, EXTENDED):
|
|
34 ''' Extract nucleotide sequences of protein domains found by DANTE from input DNA seq.
|
|
35 Sequences are saved in fasta files separately for each transposon lineage.
|
|
36 Sequences extraction is based on position of Best_Hit alignment reported by LASTAL.
|
17
|
37 The positions can be extended (optional) based on what part of database domain was aligned
|
|
38 (Best_Hit_DB_Pos attribute).
|
0
|
39 The strand orientation needs to be considered in extending and extracting the sequence itself
|
|
40 '''
|
|
41 [count_comment, first_line] = check_file_start(DOM_GFF)
|
|
42 unique_classes = get_unique_classes(CLASS_TBL)
|
|
43 files_dict = defaultdict(str)
|
|
44 domains_counts_dict = defaultdict(int)
|
|
45 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta'))
|
|
46 with open(DOM_GFF, "r") as domains:
|
|
47 for comment_idx in range(count_comment):
|
|
48 next(domains)
|
|
49 seq_id_stored = first_line.split("\t")[0]
|
|
50 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta'))
|
|
51 seq_nt = allSeqs[seq_id_stored]
|
|
52 for line in domains:
|
17
|
53 gff_line = parse_gff_line(line)
|
|
54 elem_type = gff_line['attributes']['Final_Classification']
|
|
55 if elem_type == configuration.AMBIGUOUS_TAG:
|
|
56 continue # skip ambiguous classification
|
|
57 seq_id = gff_line['seqid']
|
|
58 dom_type = gff_line['attributes']['Name']
|
|
59 strand = gff_line['strand']
|
|
60 align_nt_start = int(gff_line['attributes']['Best_Hit'].split(":")[
|
0
|
61 -1].split("-")[0])
|
17
|
62 align_nt_end = int(gff_line['attributes']['Best_Hit'].split(":")[
|
0
|
63 -1].split("-")[1].split("[")[0])
|
|
64 if seq_id != seq_id_stored:
|
|
65 seq_id_stored = seq_id
|
|
66 seq_nt = allSeqs[seq_id_stored]
|
|
67 if EXTENDED:
|
|
68 ## which part of database sequence was aligned
|
17
|
69 db_part = gff_line['attributes']['Best_Hit_DB_Pos']
|
|
70 ## db_part = line.split("\t")[8].split(";")[4].split("=")[1]
|
0
|
71 ## datatabse seq length
|
|
72 dom_len = int(db_part.split("of")[1])
|
|
73 ## start of alignment on database seq
|
|
74 db_start = int(db_part.split("of")[0].split(":")[0])
|
|
75 ## end of alignment on database seq
|
|
76 db_end = int(db_part.split("of")[0].split(":")[1])
|
|
77 ## number of nucleotides missing in the beginning
|
|
78 dom_nt_prefix = (db_start - 1) * 3
|
|
79 ## number of nucleotides missing in the end
|
|
80 dom_nt_suffix = (dom_len - db_end) * 3
|
|
81 if strand == "+":
|
|
82 dom_nt_start = align_nt_start - dom_nt_prefix
|
|
83 dom_nt_end = align_nt_end + dom_nt_suffix
|
|
84 ## reverse extending for - strand
|
|
85 else:
|
|
86 dom_nt_start = align_nt_start - dom_nt_suffix
|
|
87 dom_nt_end = align_nt_end + dom_nt_prefix
|
|
88 ## correction for domain after extending having negative starting positon
|
|
89 dom_nt_start = max(1, dom_nt_start)
|
|
90 else:
|
|
91 dom_nt_start = align_nt_start
|
|
92 dom_nt_end = align_nt_end
|
|
93 full_dom_nt = seq_nt.seq[dom_nt_start - 1:dom_nt_end]
|
|
94 ## for - strand take reverse complement of the extracted sequence
|
|
95 if strand == "-":
|
|
96 full_dom_nt = full_dom_nt.reverse_complement()
|
|
97 full_dom_nt = str(full_dom_nt)
|
|
98 ## report when domain classified to the last level and no Ns in extracted seq
|
|
99 if elem_type in unique_classes and "N" not in full_dom_nt:
|
|
100 # lineages reported in separate fasta files
|
|
101 if not elem_type in files_dict:
|
|
102 files_dict[elem_type] = os.path.join(
|
|
103 OUT_DIR, "{}.fasta".format(elem_type.split("|")[
|
|
104 -1].replace("/", "_")))
|
|
105 with open(files_dict[elem_type], "a") as out_nt_seq:
|
|
106 out_nt_seq.write(">{}:{}-{}|{}[{}]\n{}\n".format(
|
|
107 seq_nt.id, dom_nt_start, dom_nt_end, dom_type,
|
|
108 elem_type, textwrap.fill(full_dom_nt,
|
|
109 configuration.FASTA_LINE)))
|
|
110 domains_counts_dict[elem_type] += 1
|
|
111 return domains_counts_dict
|
|
112
|
|
113
|
|
114 def get_unique_classes(CLASS_TBL):
|
|
115 ''' Get all the lineages of current domains classification table to check if domains are classified to the last level.
|
|
116 Only the sequences of unambiguous and completely classified domains will be extracted.
|
|
117 '''
|
|
118 unique_classes = []
|
|
119 with open(CLASS_TBL, "r") as class_tbl:
|
|
120 for line in class_tbl:
|
|
121 line_class = "|".join(line.rstrip().split("\t")[1:])
|
|
122 if line_class not in unique_classes:
|
|
123 unique_classes.append(line_class)
|
|
124 return unique_classes
|
|
125
|
|
126
|
|
127 def write_domains_stat(domains_counts_dict, OUT_DIR):
|
|
128 ''' Report counts of domains for individual lineages
|
|
129 '''
|
|
130 total = 0
|
|
131 with open(
|
|
132 os.path.join(OUT_DIR,
|
|
133 configuration.EXTRACT_DOM_STAT), "w") as dom_stat:
|
|
134 for domain, count in domains_counts_dict.items():
|
|
135 dom_stat.write(";{}:{}\n".format(domain, count))
|
|
136 total += count
|
|
137 dom_stat.write(";TOTAL:{}\n".format(total))
|
|
138
|
|
139
|
|
140 def main(args):
|
|
141
|
|
142 DNA_SEQ = args.input_dna
|
|
143 DOM_GFF = args.domains_gff
|
|
144 OUT_DIR = args.out_dir
|
|
145 CLASS_TBL = args.classification
|
|
146 EXTENDED = args.extended
|
|
147
|
|
148 if not os.path.exists(OUT_DIR):
|
|
149 os.makedirs(OUT_DIR)
|
|
150
|
|
151 domains_counts_dict = extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL,
|
|
152 EXTENDED)
|
|
153 write_domains_stat(domains_counts_dict, OUT_DIR)
|
|
154
|
|
155 print("ELAPSED_TIME_EXTRACTION = {} s\n".format(time.time() -
|
|
156 t_nt_seqs_extraction))
|
|
157
|
|
158
|
|
159 if __name__ == "__main__":
|
|
160
|
|
161 # Command line arguments
|
|
162 parser = argparse.ArgumentParser()
|
|
163 parser.add_argument('-i',
|
|
164 '--input_dna',
|
|
165 type=str,
|
|
166 required=True,
|
|
167 help='path to input DNA sequence')
|
|
168 parser.add_argument('-d',
|
|
169 '--domains_gff',
|
|
170 type=str,
|
|
171 required=True,
|
|
172 help='GFF file of protein domains')
|
|
173 parser.add_argument('-cs',
|
|
174 '--classification',
|
|
175 type=str,
|
|
176 required=True,
|
|
177 help='protein domains classification file')
|
|
178 parser.add_argument('-out',
|
|
179 '--out_dir',
|
|
180 type=str,
|
|
181 default=configuration.EXTRACT_OUT_DIR,
|
|
182 help='output directory')
|
|
183 parser.add_argument(
|
|
184 '-ex',
|
|
185 '--extended',
|
|
186 type=str2bool,
|
|
187 default=True,
|
|
188 help=
|
|
189 'extend the domains edges if not the whole datatabase sequence was aligned')
|
|
190 args = parser.parse_args()
|
|
191 main(args)
|