comparison env/lib/python3.7/site-packages/boto/services/service.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 # Copyright (c) 2006,2007 Mitch Garnaat http://garnaat.org/
2 #
3 # Permission is hereby granted, free of charge, to any person obtaining a
4 # copy of this software and associated documentation files (the
5 # "Software"), to deal in the Software without restriction, including
6 # without limitation the rights to use, copy, modify, merge, publish, dis-
7 # tribute, sublicense, and/or sell copies of the Software, and to permit
8 # persons to whom the Software is furnished to do so, subject to the fol-
9 # lowing conditions:
10 #
11 # The above copyright notice and this permission notice shall be included
12 # in all copies or substantial portions of the Software.
13 #
14 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20 # IN THE SOFTWARE.
21
22 import boto
23 from boto.services.message import ServiceMessage
24 from boto.services.servicedef import ServiceDef
25 from boto.pyami.scriptbase import ScriptBase
26 from boto.utils import get_ts
27 import time
28 import os
29 import mimetypes
30
31
32 class Service(ScriptBase):
33
34 # Time required to process a transaction
35 ProcessingTime = 60
36
37 def __init__(self, config_file=None, mimetype_files=None):
38 super(Service, self).__init__(config_file)
39 self.name = self.__class__.__name__
40 self.working_dir = boto.config.get('Pyami', 'working_dir')
41 self.sd = ServiceDef(config_file)
42 self.retry_count = self.sd.getint('retry_count', 5)
43 self.loop_delay = self.sd.getint('loop_delay', 30)
44 self.processing_time = self.sd.getint('processing_time', 60)
45 self.input_queue = self.sd.get_obj('input_queue')
46 self.output_queue = self.sd.get_obj('output_queue')
47 self.output_domain = self.sd.get_obj('output_domain')
48 if mimetype_files:
49 mimetypes.init(mimetype_files)
50
51 def split_key(key):
52 if key.find(';') < 0:
53 t = (key, '')
54 else:
55 key, type = key.split(';')
56 label, mtype = type.split('=')
57 t = (key, mtype)
58 return t
59
60 def read_message(self):
61 boto.log.info('read_message')
62 message = self.input_queue.read(self.processing_time)
63 if message:
64 boto.log.info(message.get_body())
65 key = 'Service-Read'
66 message[key] = get_ts()
67 return message
68
69 # retrieve the source file from S3
70 def get_file(self, message):
71 bucket_name = message['Bucket']
72 key_name = message['InputKey']
73 file_name = os.path.join(self.working_dir, message.get('OriginalFileName', 'in_file'))
74 boto.log.info('get_file: %s/%s to %s' % (bucket_name, key_name, file_name))
75 bucket = boto.lookup('s3', bucket_name)
76 key = bucket.new_key(key_name)
77 key.get_contents_to_filename(os.path.join(self.working_dir, file_name))
78 return file_name
79
80 # process source file, return list of output files
81 def process_file(self, in_file_name, msg):
82 return []
83
84 # store result file in S3
85 def put_file(self, bucket_name, file_path, key_name=None):
86 boto.log.info('putting file %s as %s.%s' % (file_path, bucket_name, key_name))
87 bucket = boto.lookup('s3', bucket_name)
88 key = bucket.new_key(key_name)
89 key.set_contents_from_filename(file_path)
90 return key
91
92 def save_results(self, results, input_message, output_message):
93 output_keys = []
94 for file, type in results:
95 if 'OutputBucket' in input_message:
96 output_bucket = input_message['OutputBucket']
97 else:
98 output_bucket = input_message['Bucket']
99 key_name = os.path.split(file)[1]
100 key = self.put_file(output_bucket, file, key_name)
101 output_keys.append('%s;type=%s' % (key.name, type))
102 output_message['OutputKey'] = ','.join(output_keys)
103
104 # write message to each output queue
105 def write_message(self, message):
106 message['Service-Write'] = get_ts()
107 message['Server'] = self.name
108 if 'HOSTNAME' in os.environ:
109 message['Host'] = os.environ['HOSTNAME']
110 else:
111 message['Host'] = 'unknown'
112 message['Instance-ID'] = self.instance_id
113 if self.output_queue:
114 boto.log.info('Writing message to SQS queue: %s' % self.output_queue.id)
115 self.output_queue.write(message)
116 if self.output_domain:
117 boto.log.info('Writing message to SDB domain: %s' % self.output_domain.name)
118 item_name = '/'.join([message['Service-Write'], message['Bucket'], message['InputKey']])
119 self.output_domain.put_attributes(item_name, message)
120
121 # delete message from input queue
122 def delete_message(self, message):
123 boto.log.info('deleting message from %s' % self.input_queue.id)
124 self.input_queue.delete_message(message)
125
126 # to clean up any files, etc. after each iteration
127 def cleanup(self):
128 pass
129
130 def shutdown(self):
131 on_completion = self.sd.get('on_completion', 'shutdown')
132 if on_completion == 'shutdown':
133 if self.instance_id:
134 time.sleep(60)
135 c = boto.connect_ec2()
136 c.terminate_instances([self.instance_id])
137
138 def main(self, notify=False):
139 self.notify('Service: %s Starting' % self.name)
140 empty_reads = 0
141 while self.retry_count < 0 or empty_reads < self.retry_count:
142 try:
143 input_message = self.read_message()
144 if input_message:
145 empty_reads = 0
146 output_message = ServiceMessage(None, input_message.get_body())
147 input_file = self.get_file(input_message)
148 results = self.process_file(input_file, output_message)
149 self.save_results(results, input_message, output_message)
150 self.write_message(output_message)
151 self.delete_message(input_message)
152 self.cleanup()
153 else:
154 empty_reads += 1
155 time.sleep(self.loop_delay)
156 except Exception:
157 boto.log.exception('Service Failed')
158 empty_reads += 1
159 self.notify('Service: %s Shutting Down' % self.name)
160 self.shutdown()
161