Mercurial > repos > xuebing > sharplabtool
view tools/next_gen_conversion/fastq_gen_conv.py @ 0:9071e359b9a3
Uploaded
author | xuebing |
---|---|
date | Fri, 09 Mar 2012 19:37:19 -0500 |
parents | |
children |
line wrap: on
line source
""" Converts any type of FASTQ file to Sanger type and makes small adjustments if necessary. usage: %prog [options] -i, --input=i: Input FASTQ candidate file -r, --origType=r: Original type -a, --allOrNot=a: Whether or not to check all blocks -b, --blocks=b: Number of blocks to check -o, --output=o: Output file usage: %prog input_file oroutput_file """ import math, sys from galaxy import eggs import pkg_resources; pkg_resources.require( "bx-python" ) from bx.cookbook import doc_optparse def stop_err( msg ): sys.stderr.write( "%s\n" % msg ) sys.exit() def all_bases_valid(seq): """Confirm that the sequence contains only bases""" valid_bases = ['a', 'A', 'c', 'C', 'g', 'G', 't', 'T', 'N'] for base in seq: if base not in valid_bases: return False return True def __main__(): #Parse Command Line options, args = doc_optparse.parse( __doc__ ) orig_type = options.origType if orig_type == 'sanger' and options.allOrNot == 'not': max_blocks = int(options.blocks) else: max_blocks = -1 fin = file(options.input, 'r') fout = file(options.output, 'w') range_min = 1000 range_max = -5 block_num = 0 bad_blocks = 0 base_len = -1 line_count = 0 lines = [] line = fin.readline() while line: if line.strip() and max_blocks >= 0 and block_num > 0 and orig_type == 'sanger' and block_num >= max_blocks: fout.write(line) if line_count % 4 == 0: block_num += 1 line_count += 1 elif line.strip(): # the line that starts a block, with a name if line_count % 4 == 0 and line.startswith('@'): lines.append(line) else: # if we expect a sequence of bases if line_count % 4 == 1 and all_bases_valid(line.strip()): lines.append(line) base_len = len(line.strip()) # if we expect the second name line elif line_count % 4 == 2 and line.startswith('+'): lines.append(line) # if we expect a sequence of qualities and it's the expected length elif line_count % 4 == 3: split_line = line.strip().split() # decimal qualities if len(split_line) == base_len: # convert phred_list = [] for ch in split_line: int_ch = int(ch) if int_ch < range_min: range_min = int_ch if int_ch > range_max: range_max = int_ch if int_ch >= 0 and int_ch <= 93: phred_list.append(chr(int_ch + 33)) # make sure we haven't lost any quality values if len(phred_list) == base_len: # print first three lines for l in lines: fout.write(l) # print converted quality line fout.write(''.join(phred_list)) # reset lines = [] base_len = -1 # abort if so else: bad_blocks += 1 lines = [] base_len = -1 # ascii qualities elif len(split_line[0]) == base_len: qualities = [] # print converted quality line if orig_type == 'illumina': for c in line.strip(): if ord(c) - 64 < range_min: range_min = ord(c) - 64 if ord(c) - 64 > range_max: range_max = ord(c) - 64 if ord(c) < 64 or ord(c) > 126: bad_blocks += 1 base_len = -1 lines = [] break else: qualities.append( chr( ord(c) - 31 ) ) quals = ''.join(qualities) elif orig_type == 'solexa': for c in line.strip(): if ord(c) - 64 < range_min: range_min = ord(c) - 64 if ord(c) - 64 > range_max: range_max = ord(c) - 64 if ord(c) < 59 or ord(c) > 126: bad_blocks += 1 base_len = -1 lines = [] break else: p = 10.0**( ( ord(c) - 64 ) / -10.0 ) / ( 1 + 10.0**( ( ord(c) - 64 ) / -10.0 ) ) qualities.append( chr( int( -10.0*math.log10( p ) ) + 33 ) ) quals = ''.join(qualities) else: # 'sanger' for c in line.strip(): if ord(c) - 33 < range_min: range_min = ord(c) - 33 if ord(c) - 33 > range_max: range_max = ord(c) - 33 if ord(c) < 33 or ord(c) > 126: bad_blocks += 1 base_len = -1 lines = [] break else: qualities.append(c) quals = ''.join(qualities) # make sure we don't have bad qualities if len(quals) == base_len: # print first three lines for l in lines: fout.write(l) # print out quality line fout.write(quals+'\n') # reset lines = [] base_len = -1 else: bad_blocks += 1 base_len = -1 lines = [] # mark the successful end of a block block_num += 1 line_count += 1 line = fin.readline() fout.close() fin.close() if range_min != 1000 and range_min != -5: outmsg = 'The range of quality values found were: %s to %s' % (range_min, range_max) else: outmsg = '' if bad_blocks > 0: outmsg += '\nThere were %s bad blocks skipped' % (bad_blocks) sys.stdout.write(outmsg) if __name__=="__main__": __main__()