view env/lib/python3.7/site-packages/boto/manage/task.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
line wrap: on
line source

# Copyright (c) 2006-2009 Mitch Garnaat http://garnaat.org/
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
#

import boto
from boto.sdb.db.property import StringProperty, DateTimeProperty, IntegerProperty
from boto.sdb.db.model import Model
import datetime, subprocess, time
from boto.compat import StringIO

def check_hour(val):
    if val == '*':
        return
    if int(val) < 0 or int(val) > 23:
        raise ValueError

class Task(Model):

    """
    A scheduled, repeating task that can be executed by any participating servers.
    The scheduling is similar to cron jobs.  Each task has an hour attribute.
    The allowable values for hour are [0-23|*].

    To keep the operation reasonably efficient and not cause excessive polling,
    the minimum granularity of a Task is hourly.  Some examples:

         hour='*' - the task would be executed each hour
         hour='3' - the task would be executed at 3AM GMT each day.

    """
    name = StringProperty()
    hour = StringProperty(required=True, validator=check_hour, default='*')
    command = StringProperty(required=True)
    last_executed = DateTimeProperty()
    last_status = IntegerProperty()
    last_output = StringProperty()
    message_id = StringProperty()

    @classmethod
    def start_all(cls, queue_name):
        for task in cls.all():
            task.start(queue_name)

    def __init__(self, id=None, **kw):
        super(Task, self).__init__(id, **kw)
        self.hourly = self.hour == '*'
        self.daily = self.hour != '*'
        self.now = datetime.datetime.utcnow()

    def check(self):
        """
        Determine how long until the next scheduled time for a Task.
        Returns the number of seconds until the next scheduled time or zero
        if the task needs to be run immediately.
        If it's an hourly task and it's never been run, run it now.
        If it's a daily task and it's never been run and the hour is right, run it now.
        """
        boto.log.info('checking Task[%s]-now=%s, last=%s' % (self.name, self.now, self.last_executed))

        if self.hourly and not self.last_executed:
            return 0

        if self.daily and not self.last_executed:
            if int(self.hour) == self.now.hour:
                return 0
            else:
                return max( (int(self.hour)-self.now.hour), (self.now.hour-int(self.hour)) )*60*60

        delta = self.now - self.last_executed
        if self.hourly:
            if delta.seconds >= 60*60:
                return 0
            else:
                return 60*60 - delta.seconds
        else:
            if int(self.hour) == self.now.hour:
                if delta.days >= 1:
                    return 0
                else:
                    return 82800 # 23 hours, just to be safe
            else:
                return max( (int(self.hour)-self.now.hour), (self.now.hour-int(self.hour)) )*60*60

    def _run(self, msg, vtimeout):
        boto.log.info('Task[%s] - running:%s' % (self.name, self.command))
        log_fp = StringIO()
        process = subprocess.Popen(self.command, shell=True, stdin=subprocess.PIPE,
                                   stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        nsecs = 5
        current_timeout = vtimeout
        while process.poll() is None:
            boto.log.info('nsecs=%s, timeout=%s' % (nsecs, current_timeout))
            if nsecs >= current_timeout:
                current_timeout += vtimeout
                boto.log.info('Task[%s] - setting timeout to %d seconds' % (self.name, current_timeout))
                if msg:
                    msg.change_visibility(current_timeout)
            time.sleep(5)
            nsecs += 5
        t = process.communicate()
        log_fp.write(t[0])
        log_fp.write(t[1])
        boto.log.info('Task[%s] - output: %s' % (self.name, log_fp.getvalue()))
        self.last_executed = self.now
        self.last_status = process.returncode
        self.last_output = log_fp.getvalue()[0:1023]

    def run(self, msg, vtimeout=60):
        delay = self.check()
        boto.log.info('Task[%s] - delay=%s seconds' % (self.name, delay))
        if delay == 0:
            self._run(msg, vtimeout)
            queue = msg.queue
            new_msg = queue.new_message(self.id)
            new_msg = queue.write(new_msg)
            self.message_id = new_msg.id
            self.put()
            boto.log.info('Task[%s] - new message id=%s' % (self.name, new_msg.id))
            msg.delete()
            boto.log.info('Task[%s] - deleted message %s' % (self.name, msg.id))
        else:
            boto.log.info('new_vtimeout: %d' % delay)
            msg.change_visibility(delay)

    def start(self, queue_name):
        boto.log.info('Task[%s] - starting with queue: %s' % (self.name, queue_name))
        queue = boto.lookup('sqs', queue_name)
        msg = queue.new_message(self.id)
        msg = queue.write(msg)
        self.message_id = msg.id
        self.put()
        boto.log.info('Task[%s] - start successful' % self.name)

class TaskPoller(object):

    def __init__(self, queue_name):
        self.sqs = boto.connect_sqs()
        self.queue = self.sqs.lookup(queue_name)

    def poll(self, wait=60, vtimeout=60):
        while True:
            m = self.queue.read(vtimeout)
            if m:
                task = Task.get_by_id(m.get_body())
                if task:
                    if not task.message_id or m.id == task.message_id:
                        boto.log.info('Task[%s] - read message %s' % (task.name, m.id))
                        task.run(m, vtimeout)
                    else:
                        boto.log.info('Task[%s] - found extraneous message, ignoring' % task.name)
            else:
                time.sleep(wait)