comparison GCMS/library_lookup.py @ 6:4393f982d18f

reorganized sources
author pieter.lukasse@wur.nl
date Thu, 19 Mar 2015 12:22:23 +0100
parents
children f70b2c169e3a
comparison
equal deleted inserted replaced
5:41f122255d14 6:4393f982d18f
1 '''
2 Logic for searching a Retention Index database file given output from NIST
3 '''
4 import match_library
5 import re
6 import sys
7 import csv
8
9 __author__ = "Marcel Kempenaar"
10 __contact__ = "brs@nbic.nl"
11 __copyright__ = "Copyright, 2012, Netherlands Bioinformatics Centre"
12 __license__ = "MIT"
13
14 def create_lookup_table(library_file, column_type_name, statphase):
15 '''
16 Creates a dictionary holding the contents of the library to be searched
17 @param library_file: library to read
18 @param column_type_name: the columns type name
19 @param statphase: the columns stationary phase
20 '''
21 (data, header) = match_library.read_library(library_file)
22 # Test for presence of required columns
23 if ('columntype' not in header or
24 'columnphasetype' not in header or
25 'cas' not in header):
26 raise IOError('Missing columns in ', library_file)
27
28 column_type_column = header.index("columntype")
29 statphase_column = header.index("columnphasetype")
30 cas_column = header.index("cas")
31
32 filtered_library = [line for line in data if line[column_type_column] == column_type_name
33 and line[statphase_column] == statphase]
34 lookup_dict = {}
35 for element in filtered_library:
36 # Here the cas_number is set to the numeric part of the cas_column value, so if the
37 # cas_column value is 'C1433' then cas_number will be '1433'
38 cas_number = str(re.findall(r'\d+', (element[cas_column]).strip())[0])
39 try:
40 lookup_dict[cas_number].append(element)
41 except KeyError:
42 lookup_dict[cas_number] = [element]
43 return lookup_dict
44
45
46 def _preferred(hits, pref, ctype, polar, model, method):
47 '''
48 Returns all entries in the lookup_dict that have the same column name, type and polarity
49 as given by the user, uses regression if selected given the model and method to use. The
50 regression is applied on the column with the best R-squared value in the model
51 @param hits: all entries in the lookup_dict for the given CAS number
52 @param pref: preferred GC-column, can be one or more names
53 @param ctype: column type (capillary etc.)
54 @param polar: polarity (polar / non-polar etc.)
55 @param model: data loaded from file containing regression models
56 @param method: supported regression method (i.e. poly(nomial) or linear)
57 '''
58 match = []
59 for column in pref:
60 for hit in hits:
61 if hit[4] == ctype and hit[5] == polar and hit[6] == column:
62 # Create copy of found hit since it will be altered downstream
63 match.extend(hit)
64 return match, False
65
66 # No hit found for current CAS number, return if not performing regression
67 if not model:
68 return False, False
69
70 # Perform regression
71 for column in pref:
72 if column not in model:
73 break
74 # Order regression candidates by R-squared value (last element)
75 order = sorted(model[column].items(), key=lambda col: col[1][-1])
76 # Create list of regression candidate column names
77 regress_columns = list(reversed([column for (column, _) in order]))
78 # Names of available columns
79 available = [hit[6] for hit in hits]
80
81 # TODO: combine Rsquared and number of datapoints to get the best regression match
82 '''
83 # Iterate regression columns (in order) and retrieve their models
84 models = {}
85 for col in regress_columns:
86 if col in available:
87 hit = list(hits[available.index(col)])
88 if hit[4] == ctype:
89 # models contains all model data including residuals [-2] and rsquared [-1]
90 models[pref[0]] = model[pref[0]][hit[6]]
91 # Get the combined maximum for residuals and rsquared
92 best_match = models[]
93 # Apply regression
94 if method == 'poly':
95 regressed = _apply_poly_regression(best_match, hit[6], float(hit[3]), model)
96 if regressed:
97 hit[3] = regressed
98 else:
99 return False, False
100 else:
101 hit[3] = _apply_linear_regression(best_match, hit[6], float(hit[3]), model)
102 match.extend(hit)
103 return match, hit[6]
104 '''
105
106 for col in regress_columns:
107 if col in available:
108 hit = list(hits[available.index(col)])
109 if hit[4] == ctype:
110 # Perform regression using a column for which regression is possible
111 if method == 'poly':
112 # Polynomial is only possible within a set border, if the RI falls outside
113 # of this border, skip this lookup
114 regressed = _apply_poly_regression(pref[0], hit[6], float(hit[3]), model)
115 if regressed:
116 hit[3] = regressed
117 else:
118 return False, False
119 else:
120 hit[3] = _apply_linear_regression(pref[0], hit[6], float(hit[3]), model)
121 match.extend(hit)
122 return match, hit[6]
123
124 return False, False
125
126
127
128 def default_hit(row, cas_nr, compound_id):
129 '''
130 This method will return a "default"/empty hit for cases where the
131 method _preferred() returns False (i.e. a RI could not be found
132 for the given cas nr, also not via regression.
133 '''
134 return [
135 #'CAS',
136 'C' + cas_nr,
137 #'NAME',
138 '',
139 #'FORMULA',
140 '',
141 #'RI',
142 '0.0',
143 #'Column.type',
144 '',
145 #'Column.phase.type',
146 '',
147 #'Column.name',
148 '',
149 #'phase.coding',
150 ' ',
151 #'CAS_column.Name',
152 '',
153 #'Centrotype', -> NOTE THAT compound_id is not ALWAYS centrotype...depends on MsClust algorithm used...for now only one MsClust algorithm is used so it is not an issue, but this should be updated/corrected once that changes
154 compound_id,
155 #'Regression.Column.Name',
156 '',
157 #'min',
158 '',
159 #'max',
160 '',
161 #'nr.duplicates',
162 '']
163
164
165 def format_result(lookup_dict, nist_tabular_filename, pref, ctype, polar, model, method):
166 '''
167 Looks up the compounds in the library lookup table and formats the results
168 @param lookup_dict: dictionary containing the library to be searched
169 @param nist_tabular_filename: NIST output file to be matched
170 @param pref: (list of) column-name(s) to look for
171 @param ctype: column type of interest
172 @param polar: polarity of the used column
173 @param model: data loaded from file containing regression models
174 @param method: supported regression method (i.e. poly(nomial) or linear)
175 '''
176 (nist_tabular_list, header_clean) = match_library.read_library(nist_tabular_filename)
177 # Retrieve indices of the CAS and compound_id columns (exit if not present)
178 try:
179 casi = header_clean.index("cas")
180 idi = header_clean.index("id")
181 except:
182 raise IOError("'CAS' or 'compound_id' not found in header of library file")
183
184 data = []
185 for row in nist_tabular_list:
186 casf = str(row[casi].replace('-', '').strip())
187 compound_id = str(row[idi].split('-')[0])
188 if casf in lookup_dict:
189 found_hit, regress = _preferred(lookup_dict[casf], pref, ctype, polar, model, method)
190 if found_hit:
191 # Keep cas nr as 'C'+ numeric part:
192 found_hit[0] = 'C' + casf
193 # Add compound id
194 found_hit.insert(9, compound_id)
195 # Add information on regression process
196 found_hit.insert(10, regress if regress else 'None')
197 # Replace column index references with actual number of duplicates
198 dups = len(found_hit[-1].split(','))
199 if dups > 1:
200 found_hit[-1] = str(dups + 1)
201 else:
202 found_hit[-1] = '0'
203 data.append(found_hit)
204 found_hit = ''
205 else:
206 data.append(default_hit(row, casf, compound_id))
207 else:
208 data.append(default_hit(row, casf, compound_id))
209
210 casf = ''
211 compound_id = ''
212 found_hit = []
213 dups = []
214 return data
215
216
217 def _save_data(content, outfile):
218 '''
219 Write to output file
220 @param content: content to write
221 @param outfile: file to write to
222 '''
223 # header
224 header = ['CAS',
225 'NAME',
226 'FORMULA',
227 'RI',
228 'Column.type',
229 'Column.phase.type',
230 'Column.name',
231 'phase.coding',
232 'CAS_column.Name',
233 'Centrotype',
234 'Regression.Column.Name',
235 'min',
236 'max',
237 'nr.duplicates']
238 output_handle = csv.writer(open(outfile, 'wb'), delimiter="\t")
239 output_handle.writerow(header)
240 for entry in content:
241 output_handle.writerow(entry)
242
243
244 def _read_model(model_file):
245 '''
246 Creates an easy to search dictionary for getting the regression parameters
247 for each valid combination of GC-columns
248 @param model_file: filename containing the regression models
249 '''
250 regress = list(csv.reader(open(model_file, 'rU'), delimiter='\t'))
251 if len(regress.pop(0)) > 9:
252 method = 'poly'
253 else:
254 method = 'linear'
255
256 model = {}
257 # Create new dictionary for each GC-column
258 for line in regress:
259 model[line[0]] = {}
260
261 # Add data
262 for line in regress:
263 if method == 'poly':
264 model[line[0]][line[1]] = [float(col) for col in line[2:11]]
265 else: # linear
266 model[line[0]][line[1]] = [float(col) for col in line[2:9]]
267
268 return model, method
269
270
271 def _apply_poly_regression(column1, column2, retention_index, model):
272 '''
273 Calculates a new retention index (RI) value using a given 3rd-degree polynomial
274 model based on data from GC columns 1 and 2
275 @param column1: name of the selected GC-column
276 @param column2: name of the GC-column to use for regression
277 @param retention_index: RI to convert
278 @param model: dictionary containing model information for all GC-columns
279 '''
280 coeff = model[column1][column2]
281 # If the retention index to convert is within range of the data the model is based on, perform regression
282 if coeff[4] < retention_index < coeff[5]:
283 return (coeff[3] * (retention_index ** 3) + coeff[2] * (retention_index ** 2) +
284 (retention_index * coeff[1]) + coeff[0])
285 else:
286 return False
287
288
289 def _apply_linear_regression(column1, column2, retention_index, model):
290 '''
291 Calculates a new retention index (RI) value using a given linear model based on data
292 from GC columns 1 and 2
293 @param column1: name of the selected GC-column
294 @param column2: name of the GC-column to use for regression
295 @param retention_index: RI to convert
296 @param model: dictionary containing model information for all GC-columns
297 '''
298 # TODO: No use of limits
299 coeff = model[column1][column2]
300 return coeff[1] * retention_index + coeff[0]
301
302
303 def main():
304 '''
305 Library Lookup main function
306 '''
307 library_file = sys.argv[1]
308 nist_tabular_filename = sys.argv[2]
309 ctype = sys.argv[3]
310 polar = sys.argv[4]
311 outfile = sys.argv[5]
312 pref = sys.argv[6:-1]
313 regress = sys.argv[-1]
314
315 if regress != 'False':
316 model, method = _read_model(regress)
317 else:
318 model, method = False, None
319
320 lookup_dict = create_lookup_table(library_file, ctype, polar)
321 data = format_result(lookup_dict, nist_tabular_filename, pref, ctype, polar, model, method)
322
323 _save_data(data, outfile)
324
325
326 if __name__ == "__main__":
327 main()