Mercurial > repos > xuebing > sharplabtool
diff tools/filters/join.py @ 0:9071e359b9a3
Uploaded
author | xuebing |
---|---|
date | Fri, 09 Mar 2012 19:37:19 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/filters/join.py Fri Mar 09 19:37:19 2012 -0500 @@ -0,0 +1,370 @@ +#!/usr/bin/env python +#Dan Blankenberg +""" +Script to Join Two Files on specified columns. + +Takes two tab delimited files, two column numbers (base 1) and outputs a new tab delimited file with lines joined by tabs. +User can also opt to have have non-joining rows of file1 echoed. + +""" + +import optparse, os, sys, tempfile, struct +import psyco_full + +try: + simple_json_exception = None + from galaxy import eggs + from galaxy.util.bunch import Bunch + from galaxy.util import stringify_dictionary_keys + import pkg_resources + pkg_resources.require("simplejson") + import simplejson +except Exception, e: + simplejson_exception = e + simplejson = None + + +class OffsetList: + def __init__( self, filesize = 0, fmt = None ): + self.file = tempfile.NamedTemporaryFile( 'w+b' ) + if fmt: + self.fmt = fmt + elif filesize and filesize <= sys.maxint * 2: + self.fmt = 'I' + else: + self.fmt = 'Q' + self.fmt_size = struct.calcsize( self.fmt ) + @property + def size( self ): + self.file.flush() + return self.file_size / self.fmt_size + @property + def file_size( self ): + self.file.flush() + return os.stat( self.file.name ).st_size + def add_offset( self, offset ): + if not isinstance( offset, list ): + offset = [offset] + self.file.seek( self.file_size ) + for off in offset: + self.file.write( struct.pack( self.fmt, off ) ) + def get_offsets( self, start = 0 ): + self.file.seek( start * self.fmt_size ) + while True: + packed = self.file.read( self.fmt_size ) + if not packed: break + yield struct.unpack( self.fmt, packed )[0] + def get_offset_by_index( self, index ): + self.file.seek( index * self.fmt_size ) + return struct.unpack( self.fmt, self.file.read( self.fmt_size ) )[0] + def set_offset_at_index( self, index, offset ): + if not isinstance( offset, list ): + offset = [offset] + if index >= self.size: + self.add_offset( offset ) + else: + temp_file = tempfile.NamedTemporaryFile( 'w+b' ) + self.file.seek( 0 ) + temp_file.write( self.file.read( ( index ) * self.fmt_size ) ) + for off in offset: + temp_file.write( struct.pack( self.fmt, off ) ) + temp_file.write( self.file.read() ) + self.file = temp_file + +class SortedOffsets( OffsetList ): + def __init__( self, indexed_filename, column, split = None ): + OffsetList.__init__( self, os.stat( indexed_filename ).st_size ) + self.indexed_filename = indexed_filename + self.indexed_file = open( indexed_filename, 'rb' ) + self.column = column + self.split = split + self.last_identifier = None + self.last_identifier_merged = None + self.last_offset_merged = 0 + def merge_with_dict( self, new_offset_dict ): + if not new_offset_dict: return #no items to merge in + keys = new_offset_dict.keys() + keys.sort() + identifier2 = keys.pop( 0 ) + + result_offsets = OffsetList( fmt = self.fmt ) + offsets1 = enumerate( self.get_offsets() ) + try: + index1, offset1 = offsets1.next() + identifier1 = self.get_identifier_by_offset( offset1 ) + except StopIteration: + offset1 = None + identifier1 = None + index1 = 0 + + while True: + if identifier1 is None and identifier2 is None: + self.file = result_offsets.file #self is now merged results + return + elif identifier1 is None or ( identifier2 and identifier2 < identifier1 ): + result_offsets.add_offset( new_offset_dict[identifier2] ) + if keys: + identifier2 = keys.pop( 0 ) + else: + identifier2 = None + elif identifier2 is None: + result_offsets.file.seek( result_offsets.file_size ) + self.file.seek( index1 * self.fmt_size ) + result_offsets.file.write( self.file.read() ) + identifier1 = None + offset1 = None + else: + result_offsets.add_offset( offset1 ) + try: + index1, offset1 = offsets1.next() + identifier1 = self.get_identifier_by_offset( offset1 ) + except StopIteration: + offset1 = None + identifier1 = None + index1 += 1 +#methods to help link offsets to lines, ids, etc + def get_identifier_by_line( self, line ): + if isinstance( line, str ): + fields = line.rstrip( '\r\n' ).split( self.split ) + if self.column < len( fields ): + return fields[self.column] + return None + def get_line_by_offset( self, offset ): + self.indexed_file.seek( offset ) + return self.indexed_file.readline() + def get_identifier_by_offset( self, offset ): + return self.get_identifier_by_line( self.get_line_by_offset( offset ) ) + +#indexed set of offsets, index is built on demand +class OffsetIndex: + def __init__( self, filename, column, split = None, index_depth = 3 ): + self.filename = filename + self.file = open( filename, 'rb' ) + self.column = column + self.split = split + self._offsets = {} + self._index = None + self.index_depth = index_depth + def _build_index( self ): + self._index = {} + for start_char, sorted_offsets in self._offsets.items(): + self._index[start_char]={} + for i, offset in enumerate( sorted_offsets.get_offsets() ): + identifier = sorted_offsets.get_identifier_by_offset( offset ) + if identifier[0:self.index_depth] not in self._index[start_char]: + self._index[start_char][identifier[0:self.index_depth]] = i + def get_lines_by_identifier( self, identifier ): + if not identifier: return + #if index doesn't exist, build it + if self._index is None: self._build_index() + + #identifier cannot exist + if identifier[0] not in self._index or identifier[0:self.index_depth] not in self._index[identifier[0]]: + return + #identifier might exist, search for it + offset_index = self._index[identifier[0]][identifier[0:self.index_depth]] + while True: + if offset_index >= self._offsets[identifier[0]].size: + return + offset = self._offsets[identifier[0]].get_offset_by_index( offset_index ) + identifier2 = self._offsets[identifier[0]].get_identifier_by_offset( offset ) + if not identifier2 or identifier2 > identifier: + return + if identifier2 == identifier: + yield self._offsets[identifier[0]].get_line_by_offset( offset ) + offset_index += 1 + def get_offsets( self ): + keys = self._offsets.keys() + keys.sort() + for key in keys: + for offset in self._offsets[key].get_offsets(): + yield offset + def get_line_by_offset( self, offset ): + self.file.seek( offset ) + return self.file.readline() + def get_identifiers_offsets( self ): + keys = self._offsets.keys() + keys.sort() + for key in keys: + for offset in self._offsets[key].get_offsets(): + yield self._offsets[key].get_identifier_by_offset( offset ), offset + def get_identifier_by_line( self, line ): + if isinstance( line, str ): + fields = line.rstrip( '\r\n' ).split( self.split ) + if self.column < len( fields ): + return fields[self.column] + return None + def merge_with_dict( self, d ): + if not d: return #no data to merge + self._index = None + keys = d.keys() + keys.sort() + identifier = keys.pop( 0 ) + first_char = identifier[0] + temp = { identifier: d[identifier] } + while True: + if not keys: + if first_char not in self._offsets: + self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) + self._offsets[first_char].merge_with_dict( temp ) + return + identifier = keys.pop( 0 ) + if identifier[0] == first_char: + temp[identifier] = d[identifier] + else: + if first_char not in self._offsets: + self._offsets[first_char] = SortedOffsets( self.filename, self.column, self.split ) + self._offsets[first_char].merge_with_dict( temp ) + temp = { identifier: d[identifier] } + first_char = identifier[0] + +class BufferedIndex: + def __init__( self, filename, column, split = None, buffer = 1000000, index_depth = 3 ): + self.index = OffsetIndex( filename, column, split, index_depth ) + self.buffered_offsets = {} + f = open( filename, 'rb' ) + offset = f.tell() + identified_offset_count = 1 + while True: + offset = f.tell() + line = f.readline() + if not line: break #EOF + identifier = self.index.get_identifier_by_line( line ) + if identifier: + #flush buffered offsets, if buffer size reached + if buffer and identified_offset_count % buffer == 0: + self.index.merge_with_dict( self.buffered_offsets ) + self.buffered_offsets = {} + if identifier not in self.buffered_offsets: + self.buffered_offsets[identifier] = [] + self.buffered_offsets[identifier].append( offset ) + identified_offset_count += 1 + f.close() + + def get_lines_by_identifier( self, identifier ): + for line in self.index.get_lines_by_identifier( identifier ): + yield line + if identifier in self.buffered_offsets: + for offset in self.buffered_offsets[identifier]: + yield self.index.get_line_by_offset( offset ) + + +def fill_empty_columns( line, split, fill_values ): + if not fill_values: + return line + filled_columns = [] + for i, field in enumerate( line.split( split ) ): + if field or i >= len( fill_values ): + filled_columns.append( field ) + else: + filled_columns.append( fill_values[i] ) + if len( fill_values ) > len( filled_columns ): + filled_columns.extend( fill_values[ len( filled_columns ) : ] ) + return split.join( filled_columns ) + + +def join_files( filename1, column1, filename2, column2, out_filename, split = None, buffer = 1000000, keep_unmatched = False, keep_partial = False, index_depth = 3, fill_options = None ): + #return identifier based upon line + def get_identifier_by_line( line, column, split = None ): + if isinstance( line, str ): + fields = line.rstrip( '\r\n' ).split( split ) + if column < len( fields ): + return fields[column] + return None + if fill_options is None: + fill_options = Bunch( fill_unjoined_only = True, file1_columns = None, file2_columns = None ) + out = open( out_filename, 'w+b' ) + index = BufferedIndex( filename2, column2, split, buffer, index_depth ) + for line1 in open( filename1, 'rb' ): + identifier = get_identifier_by_line( line1, column1, split ) + if identifier: + written = False + for line2 in index.get_lines_by_identifier( identifier ): + if not fill_options.fill_unjoined_only: + out.write( "%s%s%s\n" % ( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ), split, fill_empty_columns( line2.rstrip( '\r\n' ), split, fill_options.file2_columns ) ) ) + else: + out.write( "%s%s%s\n" % ( line1.rstrip( '\r\n' ), split, line2.rstrip( '\r\n' ) ) ) + written = True + if not written and keep_unmatched: + out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) + if fill_options: + if fill_options.file2_columns: + out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) + out.write( "\n" ) + elif keep_partial: + out.write( fill_empty_columns( line1.rstrip( '\r\n' ), split, fill_options.file1_columns ) ) + if fill_options: + if fill_options.file2_columns: + out.write( "%s%s" % ( split, fill_empty_columns( "", split, fill_options.file2_columns ) ) ) + out.write( "\n" ) + out.close() + +def main(): + parser = optparse.OptionParser() + parser.add_option( + '-b','--buffer', + dest='buffer', + type='int',default=1000000, + help='Number of lines to buffer at a time. Default: 1,000,000 lines. A buffer of 0 will attempt to use memory only.' + ) + parser.add_option( + '-d','--index_depth', + dest='index_depth', + type='int',default=3, + help='Depth to use on filebased offset indexing. Default: 3.' + ) + parser.add_option( + '-p','--keep_partial', + action='store_true', + dest='keep_partial', + default=False, + help='Keep rows in first input which are missing identifiers.') + parser.add_option( + '-u','--keep_unmatched', + action='store_true', + dest='keep_unmatched', + default=False, + help='Keep rows in first input which are not joined with the second input.') + parser.add_option( + '-f','--fill_options_file', + dest='fill_options_file', + type='str',default=None, + help='Fill empty columns with a values from a JSONified file.') + + + options, args = parser.parse_args() + + fill_options = None + if options.fill_options_file is not None: + try: + if simplejson is None: + raise simplejson_exception + fill_options = Bunch( **stringify_dictionary_keys( simplejson.load( open( options.fill_options_file ) ) ) ) #simplejson.load( open( options.fill_options_file ) ) + except Exception, e: + print "Warning: Ignoring fill options due to simplejson error (%s)." % e + if fill_options is None: + fill_options = Bunch() + if 'fill_unjoined_only' not in fill_options: + fill_options.fill_unjoined_only = True + if 'file1_columns' not in fill_options: + fill_options.file1_columns = None + if 'file2_columns' not in fill_options: + fill_options.file2_columns = None + + + try: + filename1 = args[0] + filename2 = args[1] + column1 = int( args[2] ) - 1 + column2 = int( args[3] ) - 1 + out_filename = args[4] + except: + print >> sys.stderr, "Error parsing command line." + sys.exit() + + #Character for splitting fields and joining lines + split = "\t" + + return join_files( filename1, column1, filename2, column2, out_filename, split, options.buffer, options.keep_unmatched, options.keep_partial, options.index_depth, fill_options = fill_options ) + +if __name__ == "__main__": main()