Mercurial > repos > shellac > guppy_basecaller
diff env/lib/python3.7/site-packages/galaxy/util/watcher.py @ 0:26e78fe6e8c4 draft
"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author | shellac |
---|---|
date | Sat, 02 May 2020 07:14:21 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/env/lib/python3.7/site-packages/galaxy/util/watcher.py Sat May 02 07:14:21 2020 -0400 @@ -0,0 +1,190 @@ +# TODO: this is largely copied from galaxy.tools.toolbox.galaxy and generalized, the tool-oriented watchers in that +# module should probably be updated to use this where possible + +from __future__ import absolute_import + +import logging +import os.path +import time + +from six.moves import filter + +try: + from watchdog.events import FileSystemEventHandler + from watchdog.observers import Observer + from watchdog.observers.polling import PollingObserver + can_watch = True +except ImportError: + Observer = None + FileSystemEventHandler = object + PollingObserver = None + can_watch = False + +from galaxy.util.hash_util import md5_hash_file + +log = logging.getLogger(__name__) + + +def get_observer_class(config_name, config_value, default, monitor_what_str): + """ + """ + config_value = config_value or default + config_value = str(config_value).lower() + if config_value in ("true", "yes", "on", "auto"): + expect_observer = True + observer_class = Observer + elif config_value == "polling": + expect_observer = True + observer_class = PollingObserver + elif config_value in ('false', 'no', 'off'): + expect_observer = False + observer_class = None + else: + message = "Unrecognized value for %s config option: %s" % (config_name, config_value) + raise Exception(message) + + if expect_observer and observer_class is None: + message = "Watchdog library unavailable, cannot monitor %s." % monitor_what_str + if config_value == "auto": + log.info(message) + else: + raise Exception(message) + + return observer_class + + +def get_watcher(config, config_name, default="False", monitor_what_str=None, watcher_class=None, + event_handler_class=None, **kwargs): + config_value = getattr(config, config_name, None) + observer_class = get_observer_class(config_name, config_value, default=default, monitor_what_str=monitor_what_str) + if observer_class is not None: + watcher_class = watcher_class or Watcher + event_handler_class = event_handler_class or EventHandler + return watcher_class(observer_class, event_handler_class, **kwargs) + else: + return NullWatcher() + + +class BaseWatcher(object): + + def __init__(self, observer_class, even_handler_class, **kwargs): + self.observer = None + self.observer_class = observer_class + self.event_handler = even_handler_class(self) + self.monitored_dirs = {} + + def start(self): + if self.observer is None: + self.observer = self.observer_class() + self.observer.start() + self.resume_watching() + + def monitor(self, dir_path, recursive=False): + self.monitored_dirs[dir_path] = recursive + if self.observer is not None: + self.observer.schedule(self.event_handler, dir_path, recursive=recursive) + + def resume_watching(self): + for dir_path, recursive in self.monitored_dirs.items(): + self.monitor(dir_path, recursive) + + def shutdown(self): + if self.observer is not None: + self.observer.stop() + self.observer.join() + self.observer = None + + +class Watcher(BaseWatcher): + + def __init__(self, observer_class, event_handler_class, **kwargs): + super(Watcher, self).__init__(observer_class, event_handler_class, **kwargs) + self.path_hash = {} + self.file_callbacks = {} + self.dir_callbacks = {} + self.ignore_extensions = {} + self.require_extensions = {} + self.event_handler = event_handler_class(self) + + def watch_file(self, file_path, callback=None): + file_path = os.path.abspath(file_path) + dir_path = os.path.dirname(file_path) + if dir_path not in self.monitored_dirs: + if callback is not None: + self.file_callbacks[file_path] = callback + self.monitor(dir_path) + log.debug("Watching for changes to file: %s", file_path) + + def watch_directory(self, dir_path, callback=None, recursive=False, ignore_extensions=None, require_extensions=None): + dir_path = os.path.abspath(dir_path) + if dir_path not in self.monitored_dirs: + if callback is not None: + self.dir_callbacks[dir_path] = callback + if ignore_extensions: + self.ignore_extensions[dir_path] = ignore_extensions + if require_extensions: + self.require_extensions[dir_path] = require_extensions + self.monitor(dir_path, recursive=recursive) + log.debug("Watching for changes in directory%s: %s", ' (recursively)' if recursive else '', dir_path) + + +class EventHandler(FileSystemEventHandler): + + def __init__(self, watcher): + self.watcher = watcher + + def on_any_event(self, event): + self._handle(event) + + def _extension_check(self, key, path): + required_extensions = self.watcher.require_extensions.get(key) + if required_extensions: + return any(filter(path.endswith, required_extensions)) + return not any(filter(path.endswith, self.watcher.ignore_extensions.get(key, []))) + + def _handle(self, event): + # modified events will only have src path, move events will + # have dest_path and src_path but we only care about dest. So + # look at dest if it exists else use src. + path = getattr(event, 'dest_path', None) or event.src_path + path = os.path.abspath(path) + callback = self.watcher.file_callbacks.get(path) + if os.path.basename(path).startswith('.'): + return + if callback: + ext_ok = self._extension_check(path, path) + else: + # reversed sort for getting the most specific dir first + for key in reversed(sorted(self.watcher.dir_callbacks.keys())): + if os.path.commonprefix([path, key]) == key: + callback = self.watcher.dir_callbacks[key] + ext_ok = self._extension_check(key, path) + break + if not callback or not ext_ok: + return + cur_hash = md5_hash_file(path) + if cur_hash: + if self.watcher.path_hash.get(path) == cur_hash: + return + else: + time.sleep(0.5) + if cur_hash != md5_hash_file(path): + # We're still modifying the file, it'll be picked up later + return + self.watcher.path_hash[path] = cur_hash + callback(path=path) + + +class NullWatcher(object): + + def start(self): + pass + + def shutdown(self): + pass + + def watch_file(self, *args, **kwargs): + pass + + def watch_directory(self, *args, **kwargs): + pass