Mercurial > repos > xuebing > sharplabtool
diff tools/next_gen_conversion/fastq_gen_conv.py @ 0:9071e359b9a3
Uploaded
author | xuebing |
---|---|
date | Fri, 09 Mar 2012 19:37:19 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tools/next_gen_conversion/fastq_gen_conv.py Fri Mar 09 19:37:19 2012 -0500 @@ -0,0 +1,172 @@ +""" +Converts any type of FASTQ file to Sanger type and makes small adjustments if necessary. + +usage: %prog [options] + -i, --input=i: Input FASTQ candidate file + -r, --origType=r: Original type + -a, --allOrNot=a: Whether or not to check all blocks + -b, --blocks=b: Number of blocks to check + -o, --output=o: Output file + +usage: %prog input_file oroutput_file +""" + +import math, sys +from galaxy import eggs +import pkg_resources; pkg_resources.require( "bx-python" ) +from bx.cookbook import doc_optparse + +def stop_err( msg ): + sys.stderr.write( "%s\n" % msg ) + sys.exit() + +def all_bases_valid(seq): + """Confirm that the sequence contains only bases""" + valid_bases = ['a', 'A', 'c', 'C', 'g', 'G', 't', 'T', 'N'] + for base in seq: + if base not in valid_bases: + return False + return True + +def __main__(): + #Parse Command Line + options, args = doc_optparse.parse( __doc__ ) + orig_type = options.origType + if orig_type == 'sanger' and options.allOrNot == 'not': + max_blocks = int(options.blocks) + else: + max_blocks = -1 + fin = file(options.input, 'r') + fout = file(options.output, 'w') + range_min = 1000 + range_max = -5 + block_num = 0 + bad_blocks = 0 + base_len = -1 + line_count = 0 + lines = [] + line = fin.readline() + while line: + if line.strip() and max_blocks >= 0 and block_num > 0 and orig_type == 'sanger' and block_num >= max_blocks: + fout.write(line) + if line_count % 4 == 0: + block_num += 1 + line_count += 1 + elif line.strip(): + # the line that starts a block, with a name + if line_count % 4 == 0 and line.startswith('@'): + lines.append(line) + else: + # if we expect a sequence of bases + if line_count % 4 == 1 and all_bases_valid(line.strip()): + lines.append(line) + base_len = len(line.strip()) + # if we expect the second name line + elif line_count % 4 == 2 and line.startswith('+'): + lines.append(line) + # if we expect a sequence of qualities and it's the expected length + elif line_count % 4 == 3: + split_line = line.strip().split() + # decimal qualities + if len(split_line) == base_len: + # convert + phred_list = [] + for ch in split_line: + int_ch = int(ch) + if int_ch < range_min: + range_min = int_ch + if int_ch > range_max: + range_max = int_ch + if int_ch >= 0 and int_ch <= 93: + phred_list.append(chr(int_ch + 33)) + # make sure we haven't lost any quality values + if len(phred_list) == base_len: + # print first three lines + for l in lines: + fout.write(l) + # print converted quality line + fout.write(''.join(phred_list)) + # reset + lines = [] + base_len = -1 + # abort if so + else: + bad_blocks += 1 + lines = [] + base_len = -1 + # ascii qualities + elif len(split_line[0]) == base_len: + qualities = [] + # print converted quality line + if orig_type == 'illumina': + for c in line.strip(): + if ord(c) - 64 < range_min: + range_min = ord(c) - 64 + if ord(c) - 64 > range_max: + range_max = ord(c) - 64 + if ord(c) < 64 or ord(c) > 126: + bad_blocks += 1 + base_len = -1 + lines = [] + break + else: + qualities.append( chr( ord(c) - 31 ) ) + quals = ''.join(qualities) + elif orig_type == 'solexa': + for c in line.strip(): + if ord(c) - 64 < range_min: + range_min = ord(c) - 64 + if ord(c) - 64 > range_max: + range_max = ord(c) - 64 + if ord(c) < 59 or ord(c) > 126: + bad_blocks += 1 + base_len = -1 + lines = [] + break + else: + p = 10.0**( ( ord(c) - 64 ) / -10.0 ) / ( 1 + 10.0**( ( ord(c) - 64 ) / -10.0 ) ) + qualities.append( chr( int( -10.0*math.log10( p ) ) + 33 ) ) + quals = ''.join(qualities) + else: # 'sanger' + for c in line.strip(): + if ord(c) - 33 < range_min: + range_min = ord(c) - 33 + if ord(c) - 33 > range_max: + range_max = ord(c) - 33 + if ord(c) < 33 or ord(c) > 126: + bad_blocks += 1 + base_len = -1 + lines = [] + break + else: + qualities.append(c) + quals = ''.join(qualities) + # make sure we don't have bad qualities + if len(quals) == base_len: + # print first three lines + for l in lines: + fout.write(l) + # print out quality line + fout.write(quals+'\n') + # reset + lines = [] + base_len = -1 + else: + bad_blocks += 1 + base_len = -1 + lines = [] + # mark the successful end of a block + block_num += 1 + line_count += 1 + line = fin.readline() + fout.close() + fin.close() + if range_min != 1000 and range_min != -5: + outmsg = 'The range of quality values found were: %s to %s' % (range_min, range_max) + else: + outmsg = '' + if bad_blocks > 0: + outmsg += '\nThere were %s bad blocks skipped' % (bad_blocks) + sys.stdout.write(outmsg) + +if __name__=="__main__": __main__() \ No newline at end of file