comparison dante_gff_to_dna.py @ 0:77d9f2ecb28a draft

Uploaded
author petr-novak
date Wed, 03 Jul 2019 02:45:00 -0400
parents
children d0431a839606
comparison
equal deleted inserted replaced
-1:000000000000 0:77d9f2ecb28a
1 #!/usr/bin/env python3
2
3 import argparse
4 import configuration
5 import time
6 import os
7 from collections import defaultdict
8 from Bio import SeqIO
9 import textwrap
10
11 t_nt_seqs_extraction = time.time()
12
13
14 def str2bool(v):
15 if v.lower() in ('yes', 'true', 't', 'y', '1'):
16 return True
17 elif v.lower() in ('no', 'false', 'f', 'n', '0'):
18 return False
19 else:
20 raise argparse.ArgumentTypeError('Boolean value expected')
21
22
23 def check_file_start(gff_file):
24 count_comment = 0
25 with open(gff_file, "r") as gff_all:
26 line = gff_all.readline()
27 while line.startswith("#"):
28 line = gff_all.readline()
29 count_comment += 1
30 return count_comment, line
31
32
33 def extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL, EXTENDED):
34 ''' Extract nucleotide sequences of protein domains found by DANTE from input DNA seq.
35 Sequences are saved in fasta files separately for each transposon lineage.
36 Sequences extraction is based on position of Best_Hit alignment reported by LASTAL.
37 The positions can be extended (optional) based on what part of database domain was aligned (Best_Hit_DB_Pos attribute).
38 The strand orientation needs to be considered in extending and extracting the sequence itself
39 '''
40 [count_comment, first_line] = check_file_start(DOM_GFF)
41 unique_classes = get_unique_classes(CLASS_TBL)
42 files_dict = defaultdict(str)
43 domains_counts_dict = defaultdict(int)
44 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta'))
45 with open(DOM_GFF, "r") as domains:
46 for comment_idx in range(count_comment):
47 next(domains)
48 seq_id_stored = first_line.split("\t")[0]
49 allSeqs = SeqIO.to_dict(SeqIO.parse(DNA_SEQ, 'fasta'))
50 seq_nt = allSeqs[seq_id_stored]
51 for line in domains:
52 seq_id = line.split("\t")[0]
53 dom_type = line.split("\t")[8].split(";")[0].split("=")[1]
54 elem_type = line.split("\t")[8].split(";")[1].split("=")[1]
55 strand = line.split("\t")[6]
56 align_nt_start = int(line.split("\t")[8].split(";")[3].split(":")[
57 -1].split("-")[0])
58 align_nt_end = int(line.split("\t")[8].split(";")[3].split(":")[
59 -1].split("-")[1].split("[")[0])
60 if seq_id != seq_id_stored:
61 seq_id_stored = seq_id
62 seq_nt = allSeqs[seq_id_stored]
63 if EXTENDED:
64 ## which part of database sequence was aligned
65 db_part = line.split("\t")[8].split(";")[4].split("=")[1]
66 ## datatabse seq length
67 dom_len = int(db_part.split("of")[1])
68 ## start of alignment on database seq
69 db_start = int(db_part.split("of")[0].split(":")[0])
70 ## end of alignment on database seq
71 db_end = int(db_part.split("of")[0].split(":")[1])
72 ## number of nucleotides missing in the beginning
73 dom_nt_prefix = (db_start - 1) * 3
74 ## number of nucleotides missing in the end
75 dom_nt_suffix = (dom_len - db_end) * 3
76 if strand == "+":
77 dom_nt_start = align_nt_start - dom_nt_prefix
78 dom_nt_end = align_nt_end + dom_nt_suffix
79 ## reverse extending for - strand
80 else:
81 dom_nt_start = align_nt_start - dom_nt_suffix
82 dom_nt_end = align_nt_end + dom_nt_prefix
83 ## correction for domain after extending having negative starting positon
84 dom_nt_start = max(1, dom_nt_start)
85 else:
86 dom_nt_start = align_nt_start
87 dom_nt_end = align_nt_end
88 full_dom_nt = seq_nt.seq[dom_nt_start - 1:dom_nt_end]
89 ## for - strand take reverse complement of the extracted sequence
90 if strand == "-":
91 full_dom_nt = full_dom_nt.reverse_complement()
92 full_dom_nt = str(full_dom_nt)
93 ## report when domain classified to the last level and no Ns in extracted seq
94 if elem_type in unique_classes and "N" not in full_dom_nt:
95 # lineages reported in separate fasta files
96 if not elem_type in files_dict:
97 files_dict[elem_type] = os.path.join(
98 OUT_DIR, "{}.fasta".format(elem_type.split("|")[
99 -1].replace("/", "_")))
100 with open(files_dict[elem_type], "a") as out_nt_seq:
101 out_nt_seq.write(">{}:{}-{}|{}[{}]\n{}\n".format(
102 seq_nt.id, dom_nt_start, dom_nt_end, dom_type,
103 elem_type, textwrap.fill(full_dom_nt,
104 configuration.FASTA_LINE)))
105 domains_counts_dict[elem_type] += 1
106 return domains_counts_dict
107
108
109 def get_unique_classes(CLASS_TBL):
110 ''' Get all the lineages of current domains classification table to check if domains are classified to the last level.
111 Only the sequences of unambiguous and completely classified domains will be extracted.
112 '''
113 unique_classes = []
114 with open(CLASS_TBL, "r") as class_tbl:
115 for line in class_tbl:
116 line_class = "|".join(line.rstrip().split("\t")[1:])
117 if line_class not in unique_classes:
118 unique_classes.append(line_class)
119 return unique_classes
120
121
122 def write_domains_stat(domains_counts_dict, OUT_DIR):
123 ''' Report counts of domains for individual lineages
124 '''
125 total = 0
126 with open(
127 os.path.join(OUT_DIR,
128 configuration.EXTRACT_DOM_STAT), "w") as dom_stat:
129 for domain, count in domains_counts_dict.items():
130 dom_stat.write(";{}:{}\n".format(domain, count))
131 total += count
132 dom_stat.write(";TOTAL:{}\n".format(total))
133
134
135 def main(args):
136
137 DNA_SEQ = args.input_dna
138 DOM_GFF = args.domains_gff
139 OUT_DIR = args.out_dir
140 CLASS_TBL = args.classification
141 EXTENDED = args.extended
142
143 if not os.path.exists(OUT_DIR):
144 os.makedirs(OUT_DIR)
145
146 domains_counts_dict = extract_nt_seqs(DNA_SEQ, DOM_GFF, OUT_DIR, CLASS_TBL,
147 EXTENDED)
148 write_domains_stat(domains_counts_dict, OUT_DIR)
149
150 print("ELAPSED_TIME_EXTRACTION = {} s\n".format(time.time() -
151 t_nt_seqs_extraction))
152
153
154 if __name__ == "__main__":
155
156 # Command line arguments
157 parser = argparse.ArgumentParser()
158 parser.add_argument('-i',
159 '--input_dna',
160 type=str,
161 required=True,
162 help='path to input DNA sequence')
163 parser.add_argument('-d',
164 '--domains_gff',
165 type=str,
166 required=True,
167 help='GFF file of protein domains')
168 parser.add_argument('-cs',
169 '--classification',
170 type=str,
171 required=True,
172 help='protein domains classification file')
173 parser.add_argument('-out',
174 '--out_dir',
175 type=str,
176 default=configuration.EXTRACT_OUT_DIR,
177 help='output directory')
178 parser.add_argument(
179 '-ex',
180 '--extended',
181 type=str2bool,
182 default=True,
183 help=
184 'extend the domains edges if not the whole datatabase sequence was aligned')
185 args = parser.parse_args()
186 main(args)