Mercurial > repos > yufei-luo > s_mart
view SMART/DiffExpAnal/fastq_groomer_parallel_unSQL.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
line wrap: on
line source
import sys, os, optparse,shutil, random from commons.core.launcher.Launcher import Launcher from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory from commons.core.utils.FileUtils import FileUtils def _map(iLauncher, cmd, cmdStart, cmdFinish ): lCmds = [] lCmds.extend(cmd) lCmdStart = [] lCmdStart.extend(cmdStart) lCmdFinish = [] lCmdFinish.extend(cmdFinish) return(iLauncher.prepareCommands_withoutIndentation(lCmds, lCmdStart, lCmdFinish)) def splitFastQ(fileName, nbOfSeqPerBatch): nbOfLinesPerFile = nbOfSeqPerBatch * 4 lOutput = [] filePrefix, fileExt = os.path.splitext(os.path.basename(fileName)) resDir = os.path.dirname(fileName) with open(fileName) as inF: fileNb = 1 line = inF.readline() if not line or nbOfLinesPerFile == 0: outFileName = "%s/%s-%s%s" %(resDir, filePrefix, fileNb, fileExt) lOutput.append(outFileName) f = open(outFileName, "wb") shutil.copyfileobj(open(fileName, "rb"), f) f.close() else: while line: outFileName = "%s/%s-%s%s" %(resDir, filePrefix, fileNb, fileExt) lOutput.append(outFileName) with open(outFileName, "w") as outF: lineNb = 1 while lineNb <= nbOfLinesPerFile and line: outF.write(line) line = inF.readline() lineNb += 1 fileNb += 1 return lOutput def joinFastQ(dCutOut2Out): for key in dCutOut2Out.keys(): FileUtils.catFilesFromList(dCutOut2Out[key],key, False) def _createFastqGroomerCode(outGroomerNames, inputFileNames, input_type, output_type, force_quality_encoding, summarize_input): cmd2Launch = [] cmd2Launch.append("log = 0") cmd2Launch.append("from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter") cmd2Launch.append("aggregator = fastqAggregator()") cmd2Launch.append("out = fastqWriter( open( '%s', 'wb' ), format = '%s', force_quality_encoding = '%s')" % (outGroomerNames,output_type,force_quality_encoding)) cmd2Launch.append("read_count = None") if summarize_input: cmd2Launch.append("reader = fastqVerboseErrorReader") else: cmd2Launch.append("reader = fastqReader") cmd2Launch.append("for read_count, fastq_read in enumerate( reader( open( '%s' ), format = '%s', apply_galaxy_conventions = True ) ):" % (inputFileNames, input_type)) if summarize_input: cmd2Launch.append("\taggregator.consume_read( fastq_read )") cmd2Launch.append("\tout.write( fastq_read )") cmd2Launch.append("out.close()") cmd2Launch.append("if read_count is not None:") #cmd2Launch.append("\tprint 'Groomed %s %s reads into %s reads.' % ( read_count + 1, %s, %s )" % ('%i', '%s', '%s', input_type,output_type)) cmd2Launch.append("\tif '%s' != '%s' and 'solexa' in [ '%s', '%s' ]:" % (input_type, output_type, input_type, output_type)) cmd2Launch.append("\t\tprint 'Converted between Solexa and PHRED scores.'") if summarize_input: cmd2Launch.append("\tprint 'Based upon quality and sequence, the input data is valid for: %s' % ( ', '.join( aggregator.get_valid_formats() ) or 'None' )") cmd2Launch.append("\tascii_range = aggregator.get_ascii_range()") cmd2Launch.append("\tdecimal_range = aggregator.get_decimal_range()") cmd2Launch.append("\tprint 'Input ASCII range: %s(%i) - %s(%i)' % ( repr( ascii_range[0] ), ord( ascii_range[0] ), repr( ascii_range[1] ), ord( ascii_range[1] ) )") cmd2Launch.append("\tprint 'Input decimal range: %i - %i' % ( decimal_range[0], decimal_range[1] ) ") cmd2Launch.append("else:") cmd2Launch.append("\tprint 'No valid FASTQ reads were provided.'") cmd2Launch.append("\tlog = 255") return cmd2Launch def stop_err(msg): sys.stderr.write("%s\n" % msg) sys.exit() def main(): input_filename = sys.argv[1] #a txt file input_type = sys.argv[2] output_filename = sys.argv[3] #a txt file output_type = sys.argv[4] force_quality_encoding = sys.argv[5] summarize_input = sys.argv[6] == 'summarize_input' pairedEnd_input = sys.argv[7] if pairedEnd_input == 'None': pairedEnd_input = None else: output_pairedEndFileName = sys.argv[8] if force_quality_encoding == 'None': force_quality_encoding = None #Parse the input txt file and read a list of fastq files file = open(input_filename, "r") lines = file.readlines() inputFileNames = [] outGroomerNames = [] resDirName = os.path.dirname(output_filename) + "/" #Write output txt file and define all output groomer file names outFile = open(output_filename, "w") for line in lines: tab = line.split() inputFileNames.append(tab[1]) outGroomerName = resDirName + tab[0] + '_outGroomer_%s.fastq' % random.randrange(0, 10000) outGroomerNames.append(outGroomerName) outFile.write(tab[0] + '\t' + outGroomerName + '\n') outFile.close() file.close() if pairedEnd_input != None: inPairedFile = open(pairedEnd_input, "r") lines = inPairedFile.readlines() inputPairedEndFileNames = [] outGroomerPairedEndNames = [] outPairedEndFile = open(output_pairedEndFileName, "w") for line in lines: tab = line.split() inputPairedEndFileNames.append(tab[1]) outGroomerPairedEndName = resDirName + tab[0] + '_outGroomer_pairedEnd_%s.fastq' % random.randrange(0, 10000) outGroomerPairedEndNames.append(outGroomerPairedEndName) outPairedEndFile.write(tab[0] + '\t' + outGroomerPairedEndName + '\n') outPairedEndFile.close() inPairedFile.close() acronym = "fastqGroomer" jobdb = TableJobAdaptatorFactory.createJobInstance() iLauncher = Launcher(jobdb, os.getcwd(), "", "", os.getcwd(), os.getcwd(), "jobs", "", acronym, acronym, False, True) lCmdsTuples = [] dCutOut2Out = {} lAllFile2remove = [] # Write output file for i in range(len(outGroomerNames)): lCutInputFile = splitFastQ(inputFileNames[i], 20000) lAllFile2remove.extend(lCutInputFile) lCutOutput = [] for cutInput in lCutInputFile: cutOutput = "%s_out" % cutInput lCutOutput.append(cutOutput) lAllFile2remove.extend(lCutOutput) cmd2Launch = _createFastqGroomerCode(cutOutput, cutInput, input_type, output_type, force_quality_encoding, summarize_input) cmdStart = [] cmdFinish = [] lCmdsTuples.append(_map(iLauncher, cmd2Launch, cmdStart, cmdFinish)) dCutOut2Out[outGroomerNames[i]] = lCutOutput if pairedEnd_input != None: lCutInputFile = splitFastQ(inputPairedEndFileNames[i], 20000) lAllFile2remove.extend(lCutInputFile) lCutOutput = [] for cutInput in lCutInputFile: cutOutput = "%s_out" % cutInput lCutOutput.append(cutOutput) lAllFile2remove.extend(lCutOutput) cmd2Launch = _createFastqGroomerCode(cutOutput, cutInput, input_type, output_type, force_quality_encoding, summarize_input) cmdStart = [] cmdFinish = [] lCmdsTuples.append(_map(iLauncher, cmd2Launch, cmdStart, cmdFinish)) dCutOut2Out[outGroomerPairedEndNames[i]] = lCutOutput iLauncher.runLauncherForMultipleJobs(acronym, lCmdsTuples, False) joinFastQ(dCutOut2Out) FileUtils.removeFilesFromListIfExist(lAllFile2remove) if __name__ == "__main__": main()