Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/boto/services/service.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
| author | shellac |
|---|---|
| date | Sat, 02 May 2020 07:14:21 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:26e78fe6e8c4 |
|---|---|
| 1 # Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ | |
| 2 # | |
| 3 # Permission is hereby granted, free of charge, to any person obtaining a | |
| 4 # copy of this software and associated documentation files (the | |
| 5 # "Software"), to deal in the Software without restriction, including | |
| 6 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
| 7 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
| 8 # persons to whom the Software is furnished to do so, subject to the fol- | |
| 9 # lowing conditions: | |
| 10 # | |
| 11 # The above copyright notice and this permission notice shall be included | |
| 12 # in all copies or substantial portions of the Software. | |
| 13 # | |
| 14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
| 15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
| 16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
| 17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
| 18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| 19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
| 20 # IN THE SOFTWARE. | |
| 21 | |
| 22 import boto | |
| 23 from boto.services.message import ServiceMessage | |
| 24 from boto.services.servicedef import ServiceDef | |
| 25 from boto.pyami.scriptbase import ScriptBase | |
| 26 from boto.utils import get_ts | |
| 27 import time | |
| 28 import os | |
| 29 import mimetypes | |
| 30 | |
| 31 | |
| 32 class Service(ScriptBase): | |
| 33 | |
| 34 # Time required to process a transaction | |
| 35 ProcessingTime = 60 | |
| 36 | |
| 37 def __init__(self, config_file=None, mimetype_files=None): | |
| 38 super(Service, self).__init__(config_file) | |
| 39 self.name = self.__class__.__name__ | |
| 40 self.working_dir = boto.config.get('Pyami', 'working_dir') | |
| 41 self.sd = ServiceDef(config_file) | |
| 42 self.retry_count = self.sd.getint('retry_count', 5) | |
| 43 self.loop_delay = self.sd.getint('loop_delay', 30) | |
| 44 self.processing_time = self.sd.getint('processing_time', 60) | |
| 45 self.input_queue = self.sd.get_obj('input_queue') | |
| 46 self.output_queue = self.sd.get_obj('output_queue') | |
| 47 self.output_domain = self.sd.get_obj('output_domain') | |
| 48 if mimetype_files: | |
| 49 mimetypes.init(mimetype_files) | |
| 50 | |
| 51 def split_key(key): | |
| 52 if key.find(';') < 0: | |
| 53 t = (key, '') | |
| 54 else: | |
| 55 key, type = key.split(';') | |
| 56 label, mtype = type.split('=') | |
| 57 t = (key, mtype) | |
| 58 return t | |
| 59 | |
| 60 def read_message(self): | |
| 61 boto.log.info('read_message') | |
| 62 message = self.input_queue.read(self.processing_time) | |
| 63 if message: | |
| 64 boto.log.info(message.get_body()) | |
| 65 key = 'Service-Read' | |
| 66 message[key] = get_ts() | |
| 67 return message | |
| 68 | |
| 69 # retrieve the source file from S3 | |
| 70 def get_file(self, message): | |
| 71 bucket_name = message['Bucket'] | |
| 72 key_name = message['InputKey'] | |
| 73 file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file')) | |
| 74 boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name)) | |
| 75 bucket = boto.lookup('s3', bucket_name) | |
| 76 key = bucket.new_key(key_name) | |
| 77 key.get_contents_to_filename(os.path.join(self.working_dir, file_name)) | |
| 78 return file_name | |
| 79 | |
| 80 # process source file, return list of output files | |
| 81 def process_file(self, in_file_name, msg): | |
| 82 return [] | |
| 83 | |
| 84 # store result file in S3 | |
| 85 def put_file(self, bucket_name, file_path, key_name=None): | |
| 86 boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name)) | |
| 87 bucket = boto.lookup('s3', bucket_name) | |
| 88 key = bucket.new_key(key_name) | |
| 89 key.set_contents_from_filename(file_path) | |
| 90 return key | |
| 91 | |
| 92 def save_results(self, results, input_message, output_message): | |
| 93 output_keys = [] | |
| 94 for file, type in results: | |
| 95 if 'OutputBucket' in input_message: | |
| 96 output_bucket = input_message['OutputBucket'] | |
| 97 else: | |
| 98 output_bucket = input_message['Bucket'] | |
| 99 key_name = os.path.split(file)[1] | |
| 100 key = self.put_file(output_bucket, file, key_name) | |
| 101 output_keys.append('%s;type=%s' % (key.name, type)) | |
| 102 output_message['OutputKey'] = ','.join(output_keys) | |
| 103 | |
| 104 # write message to each output queue | |
| 105 def write_message(self, message): | |
| 106 message['Service-Write'] = get_ts() | |
| 107 message['Server'] = self.name | |
| 108 if 'HOSTNAME' in os.environ: | |
| 109 message['Host'] = os.environ['HOSTNAME'] | |
| 110 else: | |
| 111 message['Host'] = 'unknown' | |
| 112 message['Instance-ID'] = self.instance_id | |
| 113 if self.output_queue: | |
| 114 boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id) | |
| 115 self.output_queue.write(message) | |
| 116 if self.output_domain: | |
| 117 boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name) | |
| 118 item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']]) | |
| 119 self.output_domain.put_attributes(item_name, message) | |
| 120 | |
| 121 # delete message from input queue | |
| 122 def delete_message(self, message): | |
| 123 boto.log.info('deleting message from %s' % self.input_queue.id) | |
| 124 self.input_queue.delete_message(message) | |
| 125 | |
| 126 # to clean up any files, etc. after each iteration | |
| 127 def cleanup(self): | |
| 128 pass | |
| 129 | |
| 130 def shutdown(self): | |
| 131 on_completion = self.sd.get('on_completion', 'shutdown') | |
| 132 if on_completion == 'shutdown': | |
| 133 if self.instance_id: | |
| 134 time.sleep(60) | |
| 135 c = boto.connect_ec2() | |
| 136 c.terminate_instances([self.instance_id]) | |
| 137 | |
| 138 def main(self, notify=False): | |
| 139 self.notify('Service: %s Starting' % self.name) | |
| 140 empty_reads = 0 | |
| 141 while self.retry_count < 0 or empty_reads < self.retry_count: | |
| 142 try: | |
| 143 input_message = self.read_message() | |
| 144 if input_message: | |
| 145 empty_reads = 0 | |
| 146 output_message = ServiceMessage(None, input_message.get_body()) | |
| 147 input_file = self.get_file(input_message) | |
| 148 results = self.process_file(input_file, output_message) | |
| 149 self.save_results(results, input_message, output_message) | |
| 150 self.write_message(output_message) | |
| 151 self.delete_message(input_message) | |
| 152 self.cleanup() | |
| 153 else: | |
| 154 empty_reads += 1 | |
| 155 time.sleep(self.loop_delay) | |
| 156 except Exception: | |
| 157 boto.log.exception('Service Failed') | |
| 158 empty_reads += 1 | |
| 159 self.notify('Service: %s Shutting Down' % self.name) | |
| 160 self.shutdown() | |
| 161 |
