Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/boto/services/service.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/lib/python3.7/site-packages/boto/services/service.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,161 +0,0 @@ -# Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/ -# -# Permission is hereby granted, free of charge, to any person obtaining a -# copy of this software and associated documentation files (the -# "Software"), to deal in the Software without restriction, including -# without limitation the rights to use, copy, modify, merge, publish, dis- -# tribute, sublicense, and/or sell copies of the Software, and to permit -# persons to whom the Software is furnished to do so, subject to the fol- -# lowing conditions: -# -# The above copyright notice and this permission notice shall be included -# in all copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- -# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT -# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, -# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS -# IN THE SOFTWARE. - -import boto -from boto.services.message import ServiceMessage -from boto.services.servicedef import ServiceDef -from boto.pyami.scriptbase import ScriptBase -from boto.utils import get_ts -import time -import os -import mimetypes - - -class Service(ScriptBase): - - # Time required to process a transaction - ProcessingTime = 60 - - def __init__(self, config_file=None, mimetype_files=None): - super(Service, self).__init__(config_file) - self.name = self.__class__.__name__ - self.working_dir = boto.config.get('Pyami', 'working_dir') - self.sd = ServiceDef(config_file) - self.retry_count = self.sd.getint('retry_count', 5) - self.loop_delay = self.sd.getint('loop_delay', 30) - self.processing_time = self.sd.getint('processing_time', 60) - self.input_queue = self.sd.get_obj('input_queue') - self.output_queue = self.sd.get_obj('output_queue') - self.output_domain = self.sd.get_obj('output_domain') - if mimetype_files: - mimetypes.init(mimetype_files) - - def split_key(key): - if key.find(';') < 0: - t = (key, '') - else: - key, type = key.split(';') - label, mtype = type.split('=') - t = (key, mtype) - return t - - def read_message(self): - boto.log.info('read_message') - message = self.input_queue.read(self.processing_time) - if message: - boto.log.info(message.get_body()) - key = 'Service-Read' - message[key] = get_ts() - return message - - # retrieve the source file from S3 - def get_file(self, message): - bucket_name = message['Bucket'] - key_name = message['InputKey'] - file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file')) - boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name)) - bucket = boto.lookup('s3', bucket_name) - key = bucket.new_key(key_name) - key.get_contents_to_filename(os.path.join(self.working_dir, file_name)) - return file_name - - # process source file, return list of output files - def process_file(self, in_file_name, msg): - return [] - - # store result file in S3 - def put_file(self, bucket_name, file_path, key_name=None): - boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name)) - bucket = boto.lookup('s3', bucket_name) - key = bucket.new_key(key_name) - key.set_contents_from_filename(file_path) - return key - - def save_results(self, results, input_message, output_message): - output_keys = [] - for file, type in results: - if 'OutputBucket' in input_message: - output_bucket = input_message['OutputBucket'] - else: - output_bucket = input_message['Bucket'] - key_name = os.path.split(file)[1] - key = self.put_file(output_bucket, file, key_name) - output_keys.append('%s;type=%s' % (key.name, type)) - output_message['OutputKey'] = ','.join(output_keys) - - # write message to each output queue - def write_message(self, message): - message['Service-Write'] = get_ts() - message['Server'] = self.name - if 'HOSTNAME' in os.environ: - message['Host'] = os.environ['HOSTNAME'] - else: - message['Host'] = 'unknown' - message['Instance-ID'] = self.instance_id - if self.output_queue: - boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id) - self.output_queue.write(message) - if self.output_domain: - boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name) - item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']]) - self.output_domain.put_attributes(item_name, message) - - # delete message from input queue - def delete_message(self, message): - boto.log.info('deleting message from %s' % self.input_queue.id) - self.input_queue.delete_message(message) - - # to clean up any files, etc. after each iteration - def cleanup(self): - pass - - def shutdown(self): - on_completion = self.sd.get('on_completion', 'shutdown') - if on_completion == 'shutdown': - if self.instance_id: - time.sleep(60) - c = boto.connect_ec2() - c.terminate_instances([self.instance_id]) - - def main(self, notify=False): - self.notify('Service: %s Starting' % self.name) - empty_reads = 0 - while self.retry_count < 0 or empty_reads < self.retry_count: - try: - input_message = self.read_message() - if input_message: - empty_reads = 0 - output_message = ServiceMessage(None, input_message.get_body()) - input_file = self.get_file(input_message) - results = self.process_file(input_file, output_message) - self.save_results(results, input_message, output_message) - self.write_message(output_message) - self.delete_message(input_message) - self.cleanup() - else: - empty_reads += 1 - time.sleep(self.loop_delay) - except Exception: - boto.log.exception('Service Failed') - empty_reads += 1 - self.notify('Service: %s Shutting Down' % self.name) - self.shutdown() -