Mercurial > repos > devteam > lastz
comparison lastz_wrapper.py @ 0:0801f8207d30 draft
Uploaded tarball
| author | devteam |
|---|---|
| date | Mon, 26 Nov 2012 09:47:51 -0500 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:0801f8207d30 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 | |
| 3 """ | |
| 4 Runs Lastz | |
| 5 Written for Lastz v. 1.01.88. | |
| 6 | |
| 7 usage: lastz_wrapper.py [options] | |
| 8 --ref_name: The reference name to change all output matches to | |
| 9 --ref_source: Whether the reference is cached or from the history | |
| 10 --source_select: Whether to used pre-set or cached reference file | |
| 11 --input1: The name of the reference file if using history or reference base name if using cached | |
| 12 --input2: The reads file to align | |
| 13 --ref_sequences: The number of sequences in the reference file if using one from history | |
| 14 --pre_set_options: Which of the pre set options to use, if using pre-sets | |
| 15 --strand: Which strand of the read to search, if specifying all parameters | |
| 16 --seed: Seeding settings, if specifying all parameters | |
| 17 --gfextend: Whether to perform gap-free extension of seed hits to HSPs (high scoring segment pairs), if specifying all parameters | |
| 18 --chain: Whether to perform chaining of HSPs, if specifying all parameters | |
| 19 --transition: Number of transitions to allow in each seed hit, if specifying all parameters | |
| 20 --O: Gap opening penalty, if specifying all parameters | |
| 21 --E: Gap extension penalty, if specifying all parameters | |
| 22 --X: X-drop threshold, if specifying all parameters | |
| 23 --Y: Y-drop threshold, if specifying all parameters | |
| 24 --K: Threshold for HSPs, if specifying all parameters | |
| 25 --L: Threshold for gapped alignments, if specifying all parameters | |
| 26 --entropy: Whether to involve entropy when filtering HSPs, if specifying all parameters | |
| 27 --identity_min: Minimum identity (don't report matches under this identity) | |
| 28 --identity_max: Maximum identity (don't report matches above this identity) | |
| 29 --coverage: The minimum coverage value (don't report matches covering less than this) | |
| 30 --unmask: Whether to convert lowercase bases to uppercase | |
| 31 --out_format: The format of the output file (sam, diffs, or tabular (general)) | |
| 32 --output: The name of the output file | |
| 33 --lastzSeqsFileDir: Directory of local lastz_seqs.loc file | |
| 34 """ | |
| 35 import optparse, os, subprocess, shutil, sys, tempfile, threading, time | |
| 36 from Queue import Queue | |
| 37 | |
| 38 from galaxy import eggs | |
| 39 import pkg_resources | |
| 40 pkg_resources.require( 'bx-python' ) | |
| 41 from bx.seq.twobit import * | |
| 42 from bx.seq.fasta import FastaReader | |
| 43 from galaxy.util.bunch import Bunch | |
| 44 | |
| 45 STOP_SIGNAL = object() | |
| 46 WORKERS = 4 | |
| 47 SLOTS = 128 | |
| 48 | |
| 49 def stop_err( msg ): | |
| 50 sys.stderr.write( "%s" % msg ) | |
| 51 sys.exit() | |
| 52 | |
| 53 def stop_queues( lastz, combine_data ): | |
| 54 # This method should only be called if an error has been encountered. | |
| 55 # Send STOP_SIGNAL to all worker threads | |
| 56 for t in lastz.threads: | |
| 57 lastz.put( STOP_SIGNAL, True ) | |
| 58 combine_data.put( STOP_SIGNAL, True ) | |
| 59 | |
| 60 class BaseQueue( object ): | |
| 61 def __init__( self, num_threads, slots=-1 ): | |
| 62 # Initialize the queue and worker threads | |
| 63 self.queue = Queue( slots ) | |
| 64 self.threads = [] | |
| 65 for i in range( num_threads ): | |
| 66 worker = threading.Thread( target=self.run_next ) | |
| 67 worker.start() | |
| 68 self.threads.append( worker ) | |
| 69 def run_next( self ): | |
| 70 # Run the next job, waiting until one is available if necessary | |
| 71 while True: | |
| 72 job = self.queue.get() | |
| 73 if job is STOP_SIGNAL: | |
| 74 return self.shutdown() | |
| 75 self.run_job( job ) | |
| 76 time.sleep( 1 ) | |
| 77 def run_job( self, job ): | |
| 78 stop_err( 'Not Implemented' ) | |
| 79 def put( self, job, block=False ): | |
| 80 # Add a job to the queue | |
| 81 self.queue.put( job, block ) | |
| 82 def shutdown( self ): | |
| 83 return | |
| 84 | |
| 85 class LastzJobQueue( BaseQueue ): | |
| 86 """ | |
| 87 A queue that runs commands in parallel. Blocking is done so the queue will | |
| 88 not consume much memory. | |
| 89 """ | |
| 90 def run_job( self, job ): | |
| 91 # Execute the job's command | |
| 92 proc = subprocess.Popen( args=job.command, shell=True, stderr=subprocess.PIPE, ) | |
| 93 proc.wait() | |
| 94 stderr = proc.stderr.read() | |
| 95 proc.wait() | |
| 96 if stderr: | |
| 97 stop_queues( self, job.combine_data_queue ) | |
| 98 stop_err( stderr ) | |
| 99 job.combine_data_queue.put( job ) | |
| 100 | |
| 101 class CombineDataQueue( BaseQueue ): | |
| 102 """ | |
| 103 A queue that concatenates files in serial. Blocking is not done since this | |
| 104 queue is not expected to grow larger than the command queue. | |
| 105 """ | |
| 106 def __init__( self, output_filename, num_threads=1 ): | |
| 107 BaseQueue.__init__( self, num_threads ) | |
| 108 self.CHUNK_SIZE = 2**20 # 1Mb | |
| 109 self.output_file = open( output_filename, 'wb' ) | |
| 110 def run_job( self, job ): | |
| 111 in_file = open( job.output, 'rb' ) | |
| 112 while True: | |
| 113 chunk = in_file.read( self.CHUNK_SIZE ) | |
| 114 if not chunk: | |
| 115 in_file.close() | |
| 116 break | |
| 117 self.output_file.write( chunk ) | |
| 118 for file_name in job.cleanup: | |
| 119 os.remove( file_name ) | |
| 120 def shutdown( self ): | |
| 121 self.output_file.close() | |
| 122 return | |
| 123 | |
| 124 def __main__(): | |
| 125 #Parse Command Line | |
| 126 parser = optparse.OptionParser() | |
| 127 parser.add_option( '', '--ref_name', dest='ref_name', help='The reference name to change all output matches to' ) | |
| 128 parser.add_option( '', '--ref_source', dest='ref_source', help='Whether the reference is cached or from the history' ) | |
| 129 parser.add_option( '', '--ref_sequences', dest='ref_sequences', help='Number of sequences in the reference dataset' ) | |
| 130 parser.add_option( '', '--source_select', dest='source_select', help='Whether to used pre-set or cached reference file' ) | |
| 131 parser.add_option( '', '--input1', dest='input1', help='The name of the reference file if using history or reference base name if using cached' ) | |
| 132 parser.add_option( '', '--input2', dest='input2', help='The reads file to align' ) | |
| 133 parser.add_option( '', '--pre_set_options', dest='pre_set_options', help='Which of the pre set options to use, if using pre-sets' ) | |
| 134 parser.add_option( '', '--strand', dest='strand', help='Which strand of the read to search, if specifying all parameters' ) | |
| 135 parser.add_option( '', '--seed', dest='seed', help='Seeding settings, if specifying all parameters' ) | |
| 136 parser.add_option( '', '--transition', dest='transition', help='Number of transitions to allow in each seed hit, if specifying all parameters' ) | |
| 137 parser.add_option( '', '--gfextend', dest='gfextend', help='Whether to perform gap-free extension of seed hits to HSPs (high scoring segment pairs), if specifying all parameters' ) | |
| 138 parser.add_option( '', '--chain', dest='chain', help='Whether to perform chaining of HSPs, if specifying all parameters' ) | |
| 139 parser.add_option( '', '--O', dest='O', help='Gap opening penalty, if specifying all parameters' ) | |
| 140 parser.add_option( '', '--E', dest='E', help='Gap extension penalty, if specifying all parameters' ) | |
| 141 parser.add_option( '', '--X', dest='X', help='X-drop threshold, if specifying all parameters' ) | |
| 142 parser.add_option( '', '--Y', dest='Y', help='Y-drop threshold, if specifying all parameters' ) | |
| 143 parser.add_option( '', '--K', dest='K', help='Threshold for HSPs, if specifying all parameters' ) | |
| 144 parser.add_option( '', '--L', dest='L', help='Threshold for gapped alignments, if specifying all parameters' ) | |
| 145 parser.add_option( '', '--entropy', dest='entropy', help='Whether to involve entropy when filtering HSPs, if specifying all parameters' ) | |
| 146 parser.add_option( '', '--identity_min', dest='identity_min', help="Minimum identity (don't report matches under this identity)" ) | |
| 147 parser.add_option( '', '--identity_max', dest='identity_max', help="Maximum identity (don't report matches above this identity)" ) | |
| 148 parser.add_option( '', '--coverage', dest='coverage', help="The minimum coverage value (don't report matches covering less than this)" ) | |
| 149 parser.add_option( '', '--unmask', dest='unmask', help='Whether to convert lowercase bases to uppercase' ) | |
| 150 parser.add_option( '', '--out_format', dest='format', help='The format of the output file (sam, diffs, or tabular (general))' ) | |
| 151 parser.add_option( '', '--output', dest='output', help='The output file' ) | |
| 152 parser.add_option( '', '--lastzSeqsFileDir', dest='lastzSeqsFileDir', help='Directory of local lastz_seqs.loc file' ) | |
| 153 ( options, args ) = parser.parse_args() | |
| 154 | |
| 155 # output version # of tool | |
| 156 try: | |
| 157 tmp = tempfile.NamedTemporaryFile().name | |
| 158 tmp_stdout = open( tmp, 'wb' ) | |
| 159 proc = subprocess.Popen( args='lastz -v', shell=True, stdout=tmp_stdout ) | |
| 160 tmp_stdout.close() | |
| 161 returncode = proc.wait() | |
| 162 stdout = None | |
| 163 for line in open( tmp_stdout.name, 'rb' ): | |
| 164 if line.lower().find( 'version' ) >= 0: | |
| 165 stdout = line.strip() | |
| 166 break | |
| 167 if stdout: | |
| 168 sys.stdout.write( '%s\n' % stdout ) | |
| 169 else: | |
| 170 raise Exception | |
| 171 except: | |
| 172 sys.stdout.write( 'Could not determine Lastz version\n' ) | |
| 173 | |
| 174 if options.unmask == 'yes': | |
| 175 unmask = '[unmask]' | |
| 176 else: | |
| 177 unmask = '' | |
| 178 if options.ref_name: | |
| 179 ref_name = '[nickname=%s]' % options.ref_name | |
| 180 else: | |
| 181 ref_name = '' | |
| 182 # Prepare for commonly-used preset options | |
| 183 if options.source_select == 'pre_set': | |
| 184 set_options = '--%s' % options.pre_set_options | |
| 185 # Prepare for user-specified options | |
| 186 else: | |
| 187 set_options = '--%s --%s --gapped --strand=%s --seed=%s --%s O=%s E=%s X=%s Y=%s K=%s L=%s --%s' % \ | |
| 188 ( options.gfextend, options.chain, options.strand, options.seed, options.transition, | |
| 189 options.O, options.E, options.X, options.Y, options.K, options.L, options.entropy ) | |
| 190 # Specify input2 and add [fullnames] modifier if output format is diffs | |
| 191 if options.format == 'diffs': | |
| 192 input2 = '%s[fullnames]' % options.input2 | |
| 193 else: | |
| 194 input2 = options.input2 | |
| 195 if options.format == 'tabular': | |
| 196 # Change output format to general if it's tabular and add field names for tabular output | |
| 197 format = 'general-' | |
| 198 tabular_fields = ':score,name1,strand1,size1,start1,zstart1,end1,length1,text1,name2,strand2,size2,start2,zstart2,end2,start2+,zstart2+,end2+,length2,text2,diff,cigar,identity,coverage,gaprate,diagonal,shingle' | |
| 199 elif options.format == 'sam': | |
| 200 # We currently ALWAYS suppress SAM headers. | |
| 201 format = 'sam-' | |
| 202 tabular_fields = '' | |
| 203 else: | |
| 204 format = options.format | |
| 205 tabular_fields = '' | |
| 206 | |
| 207 # Set up our queues | |
| 208 lastz_job_queue = LastzJobQueue( WORKERS, slots=SLOTS ) | |
| 209 combine_data_queue = CombineDataQueue( options.output ) | |
| 210 | |
| 211 if options.ref_source == 'history': | |
| 212 # Reference is a fasta dataset from the history, so split job across | |
| 213 # the number of sequences in the dataset ( this could be a HUGE number ) | |
| 214 try: | |
| 215 # Ensure there is at least 1 sequence in the dataset ( this may not be necessary ). | |
| 216 error_msg = "The reference dataset is missing metadata, click the pencil icon in the history item and 'auto-detect' the metadata attributes." | |
| 217 ref_sequences = int( options.ref_sequences ) | |
| 218 if ref_sequences < 1: | |
| 219 stop_queues( lastz_job_queue, combine_data_queue ) | |
| 220 stop_err( error_msg ) | |
| 221 except: | |
| 222 stop_queues( lastz_job_queue, combine_data_queue ) | |
| 223 stop_err( error_msg ) | |
| 224 seqs = 0 | |
| 225 fasta_reader = FastaReader( open( options.input1 ) ) | |
| 226 while True: | |
| 227 # Read the next sequence from the reference dataset | |
| 228 seq = fasta_reader.next() | |
| 229 if not seq: | |
| 230 break | |
| 231 seqs += 1 | |
| 232 # Create a temporary file to contain the current sequence as input to lastz | |
| 233 tmp_in_fd, tmp_in_name = tempfile.mkstemp( suffix='.in' ) | |
| 234 tmp_in = os.fdopen( tmp_in_fd, 'wb' ) | |
| 235 # Write the current sequence to the temporary input file | |
| 236 tmp_in.write( '>%s\n%s\n' % ( seq.name, seq.text ) ) | |
| 237 tmp_in.close() | |
| 238 # Create a 2nd temporary file to contain the output from lastz execution on the current sequence | |
| 239 tmp_out_fd, tmp_out_name = tempfile.mkstemp( suffix='.out' ) | |
| 240 os.close( tmp_out_fd ) | |
| 241 # Generate the command line for calling lastz on the current sequence | |
| 242 command = 'lastz %s%s%s %s %s --ambiguousn --nolaj --identity=%s..%s --coverage=%s --format=%s%s > %s' % \ | |
| 243 ( tmp_in_name, unmask, ref_name, input2, set_options, options.identity_min, | |
| 244 options.identity_max, options.coverage, format, tabular_fields, tmp_out_name ) | |
| 245 # Create a job object | |
| 246 job = Bunch() | |
| 247 job.command = command | |
| 248 job.output = tmp_out_name | |
| 249 job.cleanup = [ tmp_in_name, tmp_out_name ] | |
| 250 job.combine_data_queue = combine_data_queue | |
| 251 # Add another job to the lastz_job_queue. Execution | |
| 252 # will wait at this point if the queue is full. | |
| 253 lastz_job_queue.put( job, block=True ) | |
| 254 # Make sure the value of sequences in the metadata is the same as the | |
| 255 # number of sequences read from the dataset ( this may not be necessary ). | |
| 256 if ref_sequences != seqs: | |
| 257 stop_queues( lastz_job_queue, combine_data_queue ) | |
| 258 stop_err( "The value of metadata.sequences (%d) differs from the number of sequences read from the reference (%d)." % ( ref_sequences, seqs ) ) | |
| 259 else: | |
| 260 # Reference is a locally cached 2bit file, split job across number of chroms in 2bit file | |
| 261 tbf = TwoBitFile( open( options.input1, 'r' ) ) | |
| 262 for chrom in tbf.keys(): | |
| 263 # Create a temporary file to contain the output from lastz execution on the current chrom | |
| 264 tmp_out_fd, tmp_out_name = tempfile.mkstemp( suffix='.out' ) | |
| 265 os.close( tmp_out_fd ) | |
| 266 command = 'lastz %s/%s%s%s %s %s --ambiguousn --nolaj --identity=%s..%s --coverage=%s --format=%s%s >> %s' % \ | |
| 267 ( options.input1, chrom, unmask, ref_name, input2, set_options, options.identity_min, | |
| 268 options.identity_max, options.coverage, format, tabular_fields, tmp_out_name ) | |
| 269 # Create a job object | |
| 270 job = Bunch() | |
| 271 job.command = command | |
| 272 job.output = tmp_out_name | |
| 273 job.cleanup = [ tmp_out_name ] | |
| 274 job.combine_data_queue = combine_data_queue | |
| 275 # Add another job to the lastz_job_queue. Execution | |
| 276 # will wait at this point if the queue is full. | |
| 277 lastz_job_queue.put( job, block=True ) | |
| 278 | |
| 279 # Stop the lastz_job_queue | |
| 280 for t in lastz_job_queue.threads: | |
| 281 lastz_job_queue.put( STOP_SIGNAL, True ) | |
| 282 # Although all jobs are submitted to the queue, we can't shut down the combine_data_queue | |
| 283 # until we know that all jobs have been submitted to its queue. We do this by checking | |
| 284 # whether all of the threads in the lastz_job_queue have terminated. | |
| 285 while threading.activeCount() > 2: | |
| 286 time.sleep( 1 ) | |
| 287 # Now it's safe to stop the combine_data_queue | |
| 288 combine_data_queue.put( STOP_SIGNAL ) | |
| 289 | |
| 290 if __name__=="__main__": __main__() |
