Mercurial > repos > xuebing > sharplabtool
comparison tools/filters/join.py @ 0:9071e359b9a3
Uploaded
author | xuebing |
---|---|
date | Fri, 09 Mar 2012 19:37:19 -0500 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9071e359b9a3 |
---|---|
1 #!/usr/bin/env python | |
2 #Dan Blankenberg | |
3 """ | |
4 Script to Join Two Files on specified columns. | |
5 | |
6 Takes two tab delimited files, two column numbers (base 1) and outputs a new tab delimited file with lines joined by tabs. | |
7 User can also opt to have have non-joining rows of file1 echoed. | |
8 | |
9 """ | |
10 | |
11 import optparse, os, sys, tempfile, struct | |
12 import psyco_full | |
13 | |
14 try: | |
15 simple_json_exception = None | |
16 from galaxy import eggs | |
17 from galaxy.util.bunch import Bunch | |
18 from galaxy.util import stringify_dictionary_keys | |
19 import pkg_resources | |
20 pkg_resources.require("simplejson") | |
21 import simplejson | |
22 except Exception, e: | |
23 simplejson_exception = e | |
24 simplejson = None | |
25 | |
26 | |
27 class OffsetList: | |
28 def __init__( self, filesize = 0, fmt = None ): | |
29 self.file = tempfile.NamedTemporaryFile( 'w+b' ) | |
30 if fmt: | |
31 self.fmt = fmt | |
32 elif filesize and filesize <= sys.maxint * 2: | |
33 self.fmt = 'I' | |
34 else: | |
35 self.fmt = 'Q' | |
36 self.fmt_size = struct.calcsize( self.fmt ) | |
37 @property | |
38 def size( self ): | |
39 self.file.flush() | |
40 return self.file_size / self.fmt_size | |
41 @property | |
42 def file_size( self ): | |
43 self.file.flush() | |
44 return os.stat( self.file.name ).st_size | |
45 def add_offset( self, offset ): | |
46 if not isinstance( offset, list ): | |
47 offset = [offset] | |
48 self.file.seek( self.file_size ) | |
49 for off in offset: | |
50 self.file.write( struct.pack( self.fmt, off ) ) | |
51 def get_offsets( self, start = 0 ): | |
52 self.file.seek( start * self.fmt_size ) | |
53 while True: | |
54 packed = self.file.read( self.fmt_size ) | |
55 if not packed: break | |
56 yield struct.unpack( self.fmt, packed )[0] | |
57 def get_offset_by_index( self, index ): | |
58 self.file.seek( index * self.fmt_size ) | |
59 return struct.unpack( self.fmt, self.file.read( self.fmt_size ) )[0] | |
60 def set_offset_at_index( self, index, offset ): | |
61 if not isinstance( offset, list ): | |
62 offset = [offset] | |
63 if index >= self.size: | |
64 self.add_offset( offset ) | |
65 else: | |
66 temp_file = tempfile.NamedTemporaryFile( 'w+b' ) | |
67 self.file.seek( 0 ) | |
68 temp_file.write( self.file.read( ( index ) * self.fmt_size ) ) | |
69 for off in offset: | |
70 temp_file.write( struct.pack( self.fmt, off ) ) | |
71 temp_file.write( self.file.read() ) | |
72 self.file = temp_file | |
73 | |
74 class SortedOffsets( OffsetList ): | |
75 def __init__( self, indexed_filename, column, split = None ): | |
76 OffsetList.__init__( self, os.stat( indexed_filename ).st_size ) | |
77 self.indexed_filename = indexed_filename | |
78 self.indexed_file = open( indexed_filename, 'rb' ) | |
79 self.column = column | |
80 self.split = split | |
81 self.last_identifier = None | |
82 self.last_identifier_merged = None | |
83 self.last_offset_merged = 0 | |
84 def merge_with_dict( self, new_offset_dict ): | |
85 if not new_offset_dict: return #no items to merge in | |
86 keys = new_offset_dict.keys() | |
87 keys.sort() | |
88 identifier2 = keys.pop( 0 ) | |
89 | |
90 result_offsets = OffsetList( fmt = self.fmt ) | |
91 offsets1 = enumerate( self.get_offsets() ) | |
92 try: | |
93 index1, offset1 = offsets1.next() | |
94 identifier1 = self.get_identifier_by_offset( offset1 ) | |
95 except StopIteration: | |
96 offset1 = None | |
97 identifier1 = None | |
98 index1 = 0 | |
99 | |
100 while True: | |
101 if identifier1 is None and identifier2 is None: | |
102 self.file = result_offsets.file #self is now merged results | |
103 return | |
104 elif identifier1 is None or ( identifier2 and identifier2 < identifier1 ): | |
105 result_offsets.add_offset( new_offset_dict[identifier2] ) | |
106 if keys: | |
107 identifier2 = keys.pop( 0 ) | |
108 else: | |
109 identifier2 = None | |
110 elif identifier2 is None: | |
111 result_offsets.file.seek( result_offsets.file_size ) | |
112 self.file.seek( index1 * self.fmt_size ) | |
113 result_offsets.file.write( self.file.read() ) | |
114 identifier1 = None | |
115 offset1 = None | |
116 else: | |
117 result_offsets.add_offset( offset1 ) | |
118 try: | |
119 index1, offset1 = offsets1.next() | |
120 identifier1 = self.get_identifier_by_offset( offset1 ) | |
121 except StopIteration: | |
122 offset1 = None | |
123 identifier1 = None | |
124 index1 += 1 | |
125 #methods to help link offsets to lines, ids, etc | |
126 def get_identifier_by_line( self, line ): | |
127 if isinstance( line, str ): | |
128 fields = line.rstrip( '\r\n' ).split( self.split ) | |
129 if self.column < len( fields ): | |
130 return fields[self.column] | |
131 return None | |
132 def get_line_by_offset( self, offset ): | |
133 self.indexed_file.seek( offset ) | |
134 return self.indexed_file.readline() | |
135 def get_identifier_by_offset( self, offset ): | |
136 return self.get_identifier_by_line( self.get_line_by_offset( offset ) ) | |
137 | |
138 #indexed set of offsets, index is built on demand | |
139 class OffsetIndex: | |
140 def __init__( self, filename, column, split = None, index_depth = 3 ): | |
141 self.filename = filename | |
142 self.file = open( filename, 'rb' ) | |
143 self.column = column | |
144 self.split = split | |
145 self._offsets = {} | |
146 self._index = None | |
147 self.index_depth = index_depth | |
148 def _build_index( self ): | |
149 self._index = {} | |
150 for start_char, sorted_offsets in self._offsets.items(): | |
151 self._index[start_char]={} | |
152 for i, offset in enumerate( sorted_offsets.get_offsets() ): | |
153 identifier = sorted_offsets.get_identifier_by_offset( offset ) | |
154 if identifier[0:self.index_depth] not in self._index[start_char]: | |
155 self._index[start_char][identifier[0:self.index_depth]] = i | |
156 def get_lines_by_identifier( self, identifier ): | |
157 if not identifier: return | |
158 #if index doesn't exist, build it | |
159 if self._index is None: self._build_index() | |
160 | |
161 #identifier cannot exist | |
162 if identifier[0] not in self._index or identifier[0:self.index_depth] not in self._index[identifier[0]]: | |
163 return | |
164 #identifier might exist, search for it | |
165 offset_index = self._index[identifier[0]][identifier[0:self.index_depth]] | |
166 while True: | |
167 if offset_index >= self._offsets[identifier[0]].size: | |
168 return | |
169 offset = self._offsets[identifier[0]].get_offset_by_index( offset_index ) | |
170 identifier2 = self._offsets[identifier[0]].get_identifier_by_offset( offset ) | |
171 if not identifier2 or identifier2 > identifier: | |
172 return | |
173 if identifier2 == identifier: | |
174 yield self._offsets[identifier[0]].get_line_by_offset( offset ) | |
175 offset_index += 1 | |
176 def get_offsets( self ): | |
177 keys = self._offsets.keys() | |
178 keys.sort() | |
179 for key in keys: | |
180 for offset in self._offsets[key].get_offsets(): | |
181 yield offset | |
182 def get_line_by_offset( self, offset ): | |
183 self.file.seek( offset ) | |
184 return self.file.readline() | |
185 def get_identifiers_offsets( self ): | |
186 keys = self._offsets.keys() | |
187 keys.sort() | |
188 for key in keys: | |
189 for offset in self._offsets[key].get_offsets(): | |
190 yield self._offsets[key].get_identifier_by_offset( offset ), offset | |
191 def get_identifier_by_line( self, line ): | |
192 if isinstance( line, str ): | |
193 fields = line.rstrip( '\r\n' ).split( self.split ) | |
194 if self.column < len( fields ): | |
195 return fields[self.column] | |
196 return None | |
197 def merge_with_dict( self, d ): | |
198 if not d: return #no data to merge | |
199 self._index = None | |
200 keys = d.keys() | |
201 keys.sort() | |
202 identifier = keys.pop( 0 ) | |
203 first_char = identifier[0] | |
204 temp = { identifier: d[identifier] } | |
205 while True: | |
206 if not keys: | |
207 if first_char not in self._offsets: | |
208 self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) | |
209 self._offsets[first_char].merge_with_dict( temp ) | |
210 return | |
211 identifier = keys.pop( 0 ) | |
212 if identifier[0] == first_char: | |
213 temp[identifier] = d[identifier] | |
214 else: | |
215 if first_char not in self._offsets: | |
216 self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) | |
217 self._offsets[first_char].merge_with_dict( temp ) | |
218 temp = { identifier: d[identifier] } | |
219 first_char = identifier[0] | |
220 | |
221 class BufferedIndex: | |
222 def __init__( self, filename, column, split = None, buffer = 1000000, index_depth = 3 ): | |
223 self.index = OffsetIndex( filename, column, split, index_depth ) | |
224 self.buffered_offsets = {} | |
225 f = open( filename, 'rb' ) | |
226 offset = f.tell() | |
227 identified_offset_count = 1 | |
228 while True: | |
229 offset = f.tell() | |
230 line = f.readline() | |
231 if not line: break #EOF | |
232 identifier = self.index.get_identifier_by_line( line ) | |
233 if identifier: | |
234 #flush buffered offsets, if buffer size reached | |
235 if buffer and identified_offset_count % buffer == 0: | |
236 self.index.merge_with_dict( self.buffered_offsets ) | |
237 self.buffered_offsets = {} | |
238 if identifier not in self.buffered_offsets: | |
239 self.buffered_offsets[identifier] = [] | |
240 self.buffered_offsets[identifier].append( offset ) | |
241 identified_offset_count += 1 | |
242 f.close() | |
243 | |
244 def get_lines_by_identifier( self, identifier ): | |
245 for line in self.index.get_lines_by_identifier( identifier ): | |
246 yield line | |
247 if identifier in self.buffered_offsets: | |
248 for offset in self.buffered_offsets[identifier]: | |
249 yield self.index.get_line_by_offset( offset ) | |
250 | |
251 | |
252 def fill_empty_columns( line, split, fill_values ): | |
253 if not fill_values: | |
254 return line | |
255 filled_columns = [] | |
256 for i, field in enumerate( line.split( split ) ): | |
257 if field or i >= len( fill_values ): | |
258 filled_columns.append( field ) | |
259 else: | |
260 filled_columns.append( fill_values[i] ) | |
261 if len( fill_values ) > len( filled_columns ): | |
262 filled_columns.extend( fill_values[ len( filled_columns ) : ] ) | |
263 return split.join( filled_columns ) | |
264 | |
265 | |
266 def join_files( filename1, column1, filename2, column2, out_filename, split = None, buffer = 1000000, keep_unmatched = False, keep_partial = False, index_depth = 3, fill_options = None ): | |
267 #return identifier based upon line | |
268 def get_identifier_by_line( line, column, split = None ): | |
269 if isinstance( line, str ): | |
270 fields = line.rstrip( '\r\n' ).split( split ) | |
271 if column < len( fields ): | |
272 return fields[column] | |
273 return None | |
274 if fill_options is None: | |
275 fill_options = Bunch( fill_unjoined_only = True, file1_columns = None, file2_columns = None ) | |
276 out = open( out_filename, 'w+b' ) | |
277 index = BufferedIndex( filename2, column2, split, buffer, index_depth ) | |
278 for line1 in open( filename1, 'rb' ): | |
279 identifier = get_identifier_by_line( line1, column1, split ) | |
280 if identifier: | |
281 written = False | |
282 for line2 in index.get_lines_by_identifier( identifier ): | |
283 if not fill_options.fill_unjoined_only: | |
284 out.write( "%s%s%s\n" % ( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ), split, fill_empty_columns( line2.rstrip( '\r\n' ), split, fill_options.file2_columns ) ) ) | |
285 else: | |
286 out.write( "%s%s%s\n" % ( line1.rstrip( '\r\n' ), split, line2.rstrip( '\r\n' ) ) ) | |
287 written = True | |
288 if not written and keep_unmatched: | |
289 out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) | |
290 if fill_options: | |
291 if fill_options.file2_columns: | |
292 out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) | |
293 out.write( "\n" ) | |
294 elif keep_partial: | |
295 out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) | |
296 if fill_options: | |
297 if fill_options.file2_columns: | |
298 out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) | |
299 out.write( "\n" ) | |
300 out.close() | |
301 | |
302 def main(): | |
303 parser = optparse.OptionParser() | |
304 parser.add_option( | |
305 '-b','--buffer', | |
306 dest='buffer', | |
307 type='int',default=1000000, | |
308 help='Number of lines to buffer at a time. Default: 1,000,000 lines. A buffer of 0 will attempt to use memory only.' | |
309 ) | |
310 parser.add_option( | |
311 '-d','--index_depth', | |
312 dest='index_depth', | |
313 type='int',default=3, | |
314 help='Depth to use on filebased offset indexing. Default: 3.' | |
315 ) | |
316 parser.add_option( | |
317 '-p','--keep_partial', | |
318 action='store_true', | |
319 dest='keep_partial', | |
320 default=False, | |
321 help='Keep rows in first input which are missing identifiers.') | |
322 parser.add_option( | |
323 '-u','--keep_unmatched', | |
324 action='store_true', | |
325 dest='keep_unmatched', | |
326 default=False, | |
327 help='Keep rows in first input which are not joined with the second input.') | |
328 parser.add_option( | |
329 '-f','--fill_options_file', | |
330 dest='fill_options_file', | |
331 type='str',default=None, | |
332 help='Fill empty columns with a values from a JSONified file.') | |
333 | |
334 | |
335 options, args = parser.parse_args() | |
336 | |
337 fill_options = None | |
338 if options.fill_options_file is not None: | |
339 try: | |
340 if simplejson is None: | |
341 raise simplejson_exception | |
342 fill_options = Bunch( **stringify_dictionary_keys( simplejson.load( open( options.fill_options_file ) ) ) ) #simplejson.load( open( options.fill_options_file ) ) | |
343 except Exception, e: | |
344 print "Warning: Ignoring fill options due to simplejson error (%s)." % e | |
345 if fill_options is None: | |
346 fill_options = Bunch() | |
347 if 'fill_unjoined_only' not in fill_options: | |
348 fill_options.fill_unjoined_only = True | |
349 if 'file1_columns' not in fill_options: | |
350 fill_options.file1_columns = None | |
351 if 'file2_columns' not in fill_options: | |
352 fill_options.file2_columns = None | |
353 | |
354 | |
355 try: | |
356 filename1 = args[0] | |
357 filename2 = args[1] | |
358 column1 = int( args[2] ) - 1 | |
359 column2 = int( args[3] ) - 1 | |
360 out_filename = args[4] | |
361 except: | |
362 print >> sys.stderr, "Error parsing command line." | |
363 sys.exit() | |
364 | |
365 #Character for splitting fields and joining lines | |
366 split = "\t" | |
367 | |
368 return join_files( filename1, column1, filename2, column2, out_filename, split, options.buffer, options.keep_unmatched, options.keep_partial, options.index_depth, fill_options = fill_options ) | |
369 | |
370 if __name__ == "__main__": main() |