Mercurial > repos > artbio > small_rna_maps
comparison small_rna_maps.py.bak @ 2:507383cce5a8 draft
planemo upload for repository https://github.com/ARTbio/tools-artbio/tree/master/tools/small_rna_maps commit edbb53cb13b52bf8e71c562fa8acc2c3be2fb270
author | artbio |
---|---|
date | Mon, 14 Aug 2017 05:52:34 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
1:40972a8dfab9 | 2:507383cce5a8 |
---|---|
1 import argparse | |
2 from collections import defaultdict | |
3 | |
4 import numpy | |
5 | |
6 import pysam | |
7 | |
8 | |
9 def Parser(): | |
10 the_parser = argparse.ArgumentParser() | |
11 the_parser.add_argument('--input', dest='input', required=True, | |
12 nargs='+', help='input BAM files') | |
13 the_parser.add_argument('--sample_name', dest='sample_name', | |
14 required=True, nargs='+', help='sample name') | |
15 the_parser.add_argument('--output', action='store', | |
16 type=str, help='output tabular file') | |
17 the_parser.add_argument('-S', '--sizes', action='store', | |
18 help='use to output read sizes dataframe') | |
19 args = the_parser.parse_args() | |
20 return args | |
21 | |
22 | |
23 class Map: | |
24 | |
25 def __init__(self, bam_file, sample, computeSize=False): | |
26 self.sample_name = sample | |
27 self.bam_object = pysam.AlignmentFile(bam_file, 'rb') | |
28 self.chromosomes = dict(zip(self.bam_object.references, | |
29 self.bam_object.lengths)) | |
30 self.map_dict = self.create_map(self.bam_object) | |
31 self.max = self.compute_max(self.map_dict) | |
32 self.mean = self.compute_mean(self.map_dict) | |
33 self.median = self.compute_median(self.map_dict) | |
34 self.coverage = self.compute_coverage(self.map_dict) | |
35 if computeSize: | |
36 self.size = self.compute_size(self.map_dict) | |
37 | |
38 def create_map(self, bam_object): | |
39 ''' | |
40 Returns a map_dictionary {(chromosome,read_position,polarity): | |
41 [read_length, ...]} | |
42 ''' | |
43 map_dictionary = defaultdict(list) | |
44 # get empty value for start and end of each chromosome | |
45 for chrom in self.chromosomes: | |
46 map_dictionary[(chrom, 1, 'F')] = [] | |
47 map_dictionary[(chrom, self.chromosomes[chrom], 'F')] = [] | |
48 for chrom in self.chromosomes: | |
49 for read in bam_object.fetch(chrom): | |
50 positions = read.positions # a list of covered positions | |
51 for pos in positions: | |
52 if not map_dictionary[(chrom, pos+1, 'F')]: | |
53 map_dictionary[(chrom, pos+1, 'F')] = [] | |
54 if read.is_reverse: | |
55 map_dictionary[(chrom, positions[-1]+1, | |
56 'R')].append(read.query_alignment_length) | |
57 else: | |
58 map_dictionary[(chrom, positions[0]+1, | |
59 'F')].append(read.query_alignment_length) | |
60 return map_dictionary | |
61 | |
62 def compute_max(self, map_dictionary): | |
63 ''' | |
64 takes a map_dictionary as input and returns | |
65 a max_dictionary {(chromosome,read_position,polarity): | |
66 max_of_number_of_read_at_any_position} | |
67 ''' | |
68 merge_keylist = [(i[0], 0) for i in map_dictionary.keys()] | |
69 max_dictionary = dict(merge_keylist) | |
70 for key in map_dictionary: | |
71 if len(map_dictionary[key]) > max_dictionary[key[0]]: | |
72 max_dictionary[key[0]] = len(map_dictionary[key]) | |
73 return max_dictionary | |
74 | |
75 def compute_mean(self, map_dictionary): | |
76 ''' | |
77 takes a map_dictionary as input and returns | |
78 a mean_dictionary {(chromosome,read_position,polarity): | |
79 mean_value_of_reads} | |
80 ''' | |
81 mean_dictionary = dict() | |
82 for key in map_dictionary: | |
83 if len(map_dictionary[key]) == 0: | |
84 mean_dictionary[key] = 0 | |
85 else: | |
86 mean_dictionary[key] = round(numpy.mean(map_dictionary[key]), | |
87 1) | |
88 return mean_dictionary | |
89 | |
90 def compute_median(self, map_dictionary): | |
91 ''' | |
92 takes a map_dictionary as input and returns | |
93 a mean_dictionary {(chromosome,read_position,polarity): | |
94 mean_value_of_reads} | |
95 ''' | |
96 median_dictionary = dict() | |
97 for key in map_dictionary: | |
98 if len(map_dictionary[key]) == 0: | |
99 median_dictionary[key] = 0 | |
100 else: | |
101 median_dictionary[key] = numpy.median(map_dictionary[key]) | |
102 return median_dictionary | |
103 | |
104 def compute_coverage(self, map_dictionary, quality=10): | |
105 ''' | |
106 takes a map_dictionary as input and returns | |
107 a coverage_dictionary {(chromosome,read_position,polarity): | |
108 coverage} | |
109 ''' | |
110 coverage_dictionary = dict() | |
111 for chrom in self.chromosomes: | |
112 coverage_dictionary[(chrom, 1, 'F')] = 0 | |
113 coverage_dictionary[(chrom, self.chromosomes[chrom], 'F')] = 0 | |
114 for key in map_dictionary: | |
115 coverage = self.bam_object.count_coverage( | |
116 reference=key[0], | |
117 start=key[1]-1, | |
118 end=key[1], | |
119 quality_threshold=quality) | |
120 """ Add the 4 coverage values """ | |
121 coverage = [sum(x) for x in zip(*coverage)] | |
122 coverage_dictionary[key] = coverage[0] | |
123 # coverage_dictionary[(key[0], key[1], 'R')] = coverage | |
124 return coverage_dictionary | |
125 | |
126 def compute_size(self, map_dictionary): | |
127 ''' | |
128 Takes a map_dictionary and returns a dictionary of sizes: | |
129 {chrom: {polarity: {size: nbre of reads}}} | |
130 ''' | |
131 size_dictionary = defaultdict(lambda: defaultdict( | |
132 lambda: defaultdict(int))) | |
133 # to track empty chromosomes | |
134 for chrom in self.chromosomes: | |
135 if self.bam_object.count(chrom) == 0: | |
136 size_dictionary[chrom]['F'][10] = 0 | |
137 for key in map_dictionary: | |
138 for size in map_dictionary[key]: | |
139 size_dictionary[key[0]][key[2]][size] += 1 | |
140 return size_dictionary | |
141 | |
142 def write_size_table(self, out): | |
143 ''' | |
144 Dataset, Chromosome, Polarity, Size, Nbr_reads | |
145 out is an *open* file handler | |
146 ''' | |
147 for chrom in sorted(self.size): | |
148 sizes = self.size[chrom]['F'].keys() | |
149 sizes.extend(self.size[chrom]['R'].keys()) | |
150 for polarity in sorted(self.size[chrom]): | |
151 for size in range(min(sizes), max(sizes)+1): | |
152 try: | |
153 line = [self.sample_name, chrom, polarity, size, | |
154 self.size[chrom][polarity][size]] | |
155 except KeyError: | |
156 line = [self.sample_name, chrom, polarity, size, 0] | |
157 line = [str(i) for i in line] | |
158 out.write('\t'.join(line) + '\n') | |
159 | |
160 def write_table(self, out): | |
161 ''' | |
162 Dataset, Chromosome, Chrom_length, Coordinate, Nbr_reads | |
163 Polarity, Max, Mean, Median, Coverage | |
164 out is an *open* file handler | |
165 ''' | |
166 for key in sorted(self.map_dict): | |
167 line = [self.sample_name, key[0], self.chromosomes[key[0]], | |
168 key[1], len(self.map_dict[key]), key[2], self.max[key[0]], | |
169 self.mean[key], self.median[key], self.coverage[key]] | |
170 line = [str(i) for i in line] | |
171 out.write('\t'.join(line) + '\n') | |
172 | |
173 | |
174 def main(inputs, samples, file_out, size_file_out=''): | |
175 F = open(file_out, 'w') | |
176 header = ["Dataset", "Chromosome", "Chrom_length", "Coordinate", | |
177 "Nbr_reads", "Polarity", "Max", "Mean", "Median", "Coverage"] | |
178 F.write('\t'.join(header) + '\n') | |
179 if size_file_out: | |
180 Fs = open(size_file_out, 'w') | |
181 header = ["Dataset", "Chromosome", "Polarity", "Size", "Nbr_reads"] | |
182 Fs.write('\t'.join(header) + '\n') | |
183 for file, sample in zip(inputs, samples): | |
184 mapobj = Map(file, sample, computeSize=True) | |
185 mapobj.write_table(F) | |
186 mapobj.write_size_table(Fs) | |
187 Fs.close() | |
188 else: | |
189 for file, sample in zip(inputs, samples): | |
190 mapobj = Map(file, sample, computeSize=False) | |
191 mapobj.write_table(F) | |
192 F.close() | |
193 | |
194 | |
195 if __name__ == "__main__": | |
196 args = Parser() | |
197 # if identical sample names | |
198 if len(set(args.sample_name)) != len(args.sample_name): | |
199 args.sample_name = [name + '_' + str(i) for | |
200 i, name in enumerate(args.sample_name)] | |
201 main(args.input, args.sample_name, args.output, args.sizes) |