comparison planemo/lib/python3.7/site-packages/galaxy/util/properties.py @ 1:56ad4e20f292 draft

"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
author guerler
date Fri, 31 Jul 2020 00:32:28 -0400
parents
children
comparison
equal deleted inserted replaced
0:d30785e31577 1:56ad4e20f292
1 """ Module used to blend ini, environment, and explicit dictionary properties
2 to determine application configuration. Some hard coded defaults for Galaxy but
3 this should be reusable by tool shed and pulsar as well.
4 """
5 import os
6 import os.path
7 import sys
8 from functools import partial
9 from itertools import product, starmap
10
11 import yaml
12 from six import iteritems, string_types
13 from six.moves.configparser import ConfigParser
14
15 from galaxy.exceptions import InvalidFileFormatError
16 from galaxy.util.path import extensions, has_ext, joinext
17
18
19 def find_config_file(names, exts=None, dirs=None, include_samples=False):
20 """Locate a config file in multiple directories, with multiple extensions.
21
22 >>> from shutil import rmtree
23 >>> from tempfile import mkdtemp
24 >>> def touch(d, f):
25 ... open(os.path.join(d, f), 'w').close()
26 >>> def _find_config_file(*args, **kwargs):
27 ... return find_config_file(*args, **kwargs).replace(d, '')
28 >>> d = mkdtemp()
29 >>> d1 = os.path.join(d, 'd1')
30 >>> d2 = os.path.join(d, 'd2')
31 >>> os.makedirs(d1)
32 >>> os.makedirs(d2)
33 >>> touch(d1, 'foo.ini')
34 >>> touch(d1, 'foo.bar')
35 >>> touch(d1, 'baz.ini.sample')
36 >>> touch(d2, 'foo.yaml')
37 >>> touch(d2, 'baz.yml')
38 >>> _find_config_file('foo', dirs=(d1, d2))
39 '/d1/foo.ini'
40 >>> _find_config_file('baz', dirs=(d1, d2))
41 '/d2/baz.yml'
42 >>> _find_config_file('baz', dirs=(d1, d2), include_samples=True)
43 '/d2/baz.yml'
44 >>> _find_config_file('baz', dirs=(d1,), include_samples=True)
45 '/d1/baz.ini.sample'
46 >>> _find_config_file('foo', dirs=(d2, d1))
47 '/d2/foo.yaml'
48 >>> find_config_file('quux', dirs=(d,))
49 >>> _find_config_file('foo', exts=('bar', 'ini'), dirs=(d1,))
50 '/d1/foo.bar'
51 >>> rmtree(d)
52 """
53 found = __find_config_files(
54 names,
55 exts=exts or extensions['yaml'] + extensions['ini'],
56 dirs=dirs or [os.getcwd(), os.path.join(os.getcwd(), 'config')],
57 include_samples=include_samples,
58 )
59 if not found:
60 return None
61 # doesn't really make sense to log here but we should probably generate a warning of some kind if more than one
62 # config is found.
63 return found[0]
64
65
66 def load_app_properties(
67 kwds=None,
68 ini_file=None,
69 ini_section=None,
70 config_file=None,
71 config_section=None,
72 config_prefix="GALAXY_CONFIG_"
73 ):
74 if config_file is None:
75 config_file = ini_file
76 config_section = ini_section
77
78 # read from file or init w/no file
79 if config_file:
80 properties = read_properties_from_file(config_file, config_section)
81 else:
82 properties = {'__file__': None}
83
84 # update from kwds
85 if kwds:
86 properties.update(kwds)
87
88 # update from env
89 override_prefix = "%sOVERRIDE_" % config_prefix
90 for key in os.environ:
91 if key.startswith(override_prefix):
92 config_key = key[len(override_prefix):].lower()
93 properties[config_key] = os.environ[key]
94 elif key.startswith(config_prefix):
95 config_key = key[len(config_prefix):].lower()
96 if config_key not in properties:
97 properties[config_key] = os.environ[key]
98
99 return properties
100
101
102 def read_properties_from_file(config_file, config_section=None):
103 properties = {}
104 if has_ext(config_file, 'yaml', aliases=True, ignore='sample'):
105 if config_section is None:
106 config_section = "galaxy"
107 properties.update(__default_properties(config_file))
108 raw_properties = _read_from_yaml_file(config_file)
109 if raw_properties:
110 properties.update(raw_properties.get(config_section) or {})
111 elif has_ext(config_file, 'ini', aliases=True, ignore='sample'):
112 if config_section is None:
113 config_section = "app:main"
114 parser = nice_config_parser(config_file) # default properties loaded w/parser
115 if parser.has_section(config_section):
116 properties.update(dict(parser.items(config_section)))
117 else:
118 properties.update(parser.defaults())
119 else:
120 raise InvalidFileFormatError("File '%s' doesn't have a supported extension" % config_file)
121 return properties
122
123
124 def _read_from_yaml_file(path):
125 with open(path, "r") as f:
126 return yaml.safe_load(f)
127
128
129 def nice_config_parser(path):
130 parser = NicerConfigParser(path, defaults=__default_properties(path))
131 parser.optionxform = str # Don't lower-case keys
132 with open(path) as f:
133 parser.read_file(f)
134 return parser
135
136
137 class NicerConfigParser(ConfigParser):
138
139 def __init__(self, filename, *args, **kw):
140 ConfigParser.__init__(self, *args, **kw)
141 self.filename = filename
142 if hasattr(self, '_interpolation'):
143 self._interpolation = self.InterpolateWrapper(self._interpolation)
144
145 read_file = getattr(ConfigParser, 'read_file', ConfigParser.readfp)
146
147 def defaults(self):
148 """Return the defaults, with their values interpolated (with the
149 defaults dict itself)
150
151 Mainly to support defaults using values such as %(here)s
152 """
153 defaults = ConfigParser.defaults(self).copy()
154 for key, val in iteritems(defaults):
155 defaults[key] = self.get('DEFAULT', key) or val
156 return defaults
157
158 def _interpolate(self, section, option, rawval, vars):
159 # Python < 3.2
160 try:
161 return ConfigParser._interpolate(
162 self, section, option, rawval, vars)
163 except Exception:
164 e = sys.exc_info()[1]
165 args = list(e.args)
166 args[0] = 'Error in file %s: %s' % (self.filename, e)
167 e.args = tuple(args)
168 e.message = args[0]
169 raise
170
171 class InterpolateWrapper(object):
172 # Python >= 3.2
173 def __init__(self, original):
174 self._original = original
175
176 def __getattr__(self, name):
177 return getattr(self._original, name)
178
179 def before_get(self, parser, section, option, value, defaults):
180 try:
181 return self._original.before_get(parser, section, option,
182 value, defaults)
183 except Exception:
184 e = sys.exc_info()[1]
185 args = list(e.args)
186 args[0] = 'Error in file %s: %s' % (parser.filename, e)
187 e.args = tuple(args)
188 e.message = args[0]
189 raise
190
191
192 def _running_from_source():
193 paths = ['run.sh', 'lib/galaxy/__init__.py', 'scripts/common_startup.sh']
194 return all(map(os.path.exists, paths))
195
196
197 running_from_source = _running_from_source()
198
199
200 def get_data_dir(properties):
201 data_dir = properties.get('data_dir', None)
202 if data_dir is None:
203 if running_from_source:
204 data_dir = './database'
205 else:
206 config_dir = properties.get('config_dir', os.path.dirname(properties['__file__']))
207 data_dir = os.path.join(config_dir, 'data')
208 return data_dir
209
210
211 def __get_all_configs(dirs, names):
212 return list(filter(os.path.exists, starmap(os.path.join, product(dirs, names))))
213
214
215 def __find_config_files(names, exts=None, dirs=None, include_samples=False):
216 sample_names = []
217 if isinstance(names, string_types):
218 names = [names]
219 if not dirs:
220 dirs = [os.getcwd()]
221 if exts:
222 # add exts to names, converts back into a list because it's going to be small and we might consume names twice
223 names = list(starmap(joinext, product(names, exts)))
224 if include_samples:
225 sample_names = map(partial(joinext, ext='sample'), names)
226 # check for all names in each dir before moving to the next dir. could do it the other way around but that makes
227 # less sense to me.
228 return __get_all_configs(dirs, names) or __get_all_configs(dirs, sample_names)
229
230
231 def __default_properties(path):
232 return {
233 'here': os.path.dirname(os.path.abspath(path)),
234 '__file__': os.path.abspath(path)
235 }
236
237
238 __all__ = ('find_config_file', 'get_data_dir', 'load_app_properties', 'NicerConfigParser', 'running_from_source')