Mercurial > repos > galaxyp > openms_openswathrtnormalizer
comparison fill_ctd.py @ 9:f25fbb8ff7cf draft
"planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/openms commit 020906fb54bde7fc143c356f41975c378a741315"
author | galaxyp |
---|---|
date | Wed, 09 Sep 2020 20:07:15 +0000 |
parents | |
children | 79dbeb3cbfb4 |
comparison
equal
deleted
inserted
replaced
8:18826fb8e5db | 9:f25fbb8ff7cf |
---|---|
1 import collections | |
2 import json | |
3 import operator | |
4 import os | |
5 import re | |
6 import subprocess | |
7 import sys | |
8 from functools import reduce # forward compatibility for Python 3 | |
9 | |
10 from CTDopts.CTDopts import ( | |
11 _Choices, | |
12 _InFile, | |
13 _Null, | |
14 _NumericRange, | |
15 CTDModel | |
16 ) | |
17 | |
18 | |
19 def getFromDict(dataDict, mapList): | |
20 return reduce(operator.getitem, mapList, dataDict) | |
21 | |
22 | |
23 def setInDict(dataDict, mapList, value): | |
24 getFromDict(dataDict, mapList[:-1])[mapList[-1]] = value | |
25 | |
26 | |
27 def mergeDicts(d, e): | |
28 """ | |
29 insert values from the dict e into dict d | |
30 no values of d are overwritten | |
31 """ | |
32 for k, v in e.items(): | |
33 if (k in d and isinstance(d[k], dict) and isinstance(e[k], collections.abc.Mapping)): | |
34 mergeDicts(d[k], e[k]) | |
35 elif k not in d and not isinstance(e[k], collections.abc.Mapping): | |
36 d[k] = e[k] | |
37 else: | |
38 sys.stderr.write("fill_ctd.py: could not merge key %s for %s in %s" % (k, d, e)) | |
39 sys.exit(1) | |
40 | |
41 | |
42 def _json_object_hook_noenvlookup(d): | |
43 return _json_object_hook(d, envlookup=False) | |
44 | |
45 | |
46 def _json_object_hook(d, envlookup=True): | |
47 """ | |
48 wee helper to transform the json written by galaxy | |
49 while loading | |
50 - True/False (bool objects) -> "true"/"false" (lowercase string) | |
51 - data inputs with multiple and optional true give [None] if no file is given -> [] | |
52 - None -> "" (empty string) | |
53 - replace bash expressions (if envlookup is True): | |
54 - environment variables (need to consist capital letters and _) by their value | |
55 - expressions | |
56 """ | |
57 for k in d.keys(): | |
58 # if type(d[k]) is bool: | |
59 # d[k] = str(d[k]).lower() | |
60 # else | |
61 if type(d[k]) is list and len(d[k]) == 1 and d[k][0] is None: | |
62 d[k] = [] | |
63 elif d[k] is None: | |
64 d[k] = "" | |
65 elif envlookup and type(d[k]) is str and d[k].startswith("$"): | |
66 m = re.fullmatch(r"\$([A-Z_]+)", d[k]) | |
67 if m: | |
68 d[k] = os.environ.get(m.group(1), "") | |
69 continue | |
70 m = re.fullmatch(r"\$(\{[A-Z_]+):-(.*)\}", d[k]) | |
71 if m: | |
72 d[k] = os.environ.get(m.group(1), m.group(2)) | |
73 continue | |
74 | |
75 try: | |
76 p = subprocess.run("echo %s" % d[k], shell=True, check=True, stdout=subprocess.PIPE, encoding="utf8") | |
77 d[k] = p.stdout.strip() | |
78 except subprocess.CalledProcessError: | |
79 sys.stderr.write("fill_ctd error: Could not evaluate %s" % d[k]) | |
80 continue | |
81 return d | |
82 | |
83 | |
84 def qstring2list(qs): | |
85 """ | |
86 transform a space separated string that is quoted by " into a list | |
87 """ | |
88 lst = list() | |
89 qs = qs.split(" ") | |
90 quoted = False | |
91 for p in qs: | |
92 if p == "": | |
93 continue | |
94 if p.startswith('"') and p.endswith('"'): | |
95 lst.append(p[1:-1]) | |
96 elif p.startswith('"'): | |
97 quoted = True | |
98 lst.append(p[1:] + " ") | |
99 elif p.endswith('"'): | |
100 quoted = False | |
101 lst[-1] += p[:-1] | |
102 else: | |
103 if quoted: | |
104 lst[-1] += p + " " | |
105 else: | |
106 lst.append(p) | |
107 return lst | |
108 | |
109 | |
110 def fix_underscores(args): | |
111 if type(args) is dict: | |
112 for k in list(args.keys()): | |
113 v = args[k] | |
114 if type(v) is dict: | |
115 fix_underscores(args[k]) | |
116 if k.startswith("_"): | |
117 args[k[1:]] = v | |
118 del args[k] | |
119 elif type(args) is list: | |
120 for i, v in enumerate(args): | |
121 if type(v) is dict: | |
122 fix_underscores(args[i]) | |
123 | |
124 | |
125 input_ctd = sys.argv[1] | |
126 | |
127 # load user specified parameters from json | |
128 with open(sys.argv[2]) as fh: | |
129 args = json.load(fh, object_hook=_json_object_hook_noenvlookup) | |
130 | |
131 # load hardcoded parameters from json | |
132 with open(sys.argv[3]) as fh: | |
133 hc_args = json.load(fh, object_hook=_json_object_hook) | |
134 | |
135 # insert the hc_args into the args | |
136 mergeDicts(args, hc_args) | |
137 | |
138 if "adv_opts_cond" in args: | |
139 args.update(args["adv_opts_cond"]) | |
140 del args["adv_opts_cond"] | |
141 | |
142 # IDMapper has in and spectra:in params, in is used in out as format_source", | |
143 # which does not work in Galaxy: https://github.com/galaxyproject/galaxy/pull/9493" | |
144 # therefore hardcoded params change the name of spectra:in to spectra:_in | |
145 # which is corrected here again | |
146 # TODO remove once PR is in and adapt profile accordingly | |
147 fix_underscores(args) | |
148 | |
149 model = CTDModel(from_file=input_ctd) | |
150 | |
151 # transform values from json that correspond to | |
152 # - old style booleans (string + restrictions) -> transformed to a str | |
153 # - unrestricted ITEMLIST which are represented as strings | |
154 # ("=quoted and space separated) in Galaxy -> transform to lists | |
155 # - optional data input parameters that have defaults and for which no | |
156 # value is given -> overwritte with the default | |
157 for p in model.get_parameters(): | |
158 | |
159 # check if the parameter is in the arguments from the galaxy tool | |
160 # (from the json file(s)), since advanced parameters are absent | |
161 # if the conditional is set to basic parameters | |
162 try: | |
163 getFromDict(args, p.get_lineage(name_only=True)) | |
164 except KeyError: | |
165 # few tools use dashes in parameters which are automatically replaced | |
166 # by underscores by Galaxy. in these cases the dictionary needs to be | |
167 # updated | |
168 # TODO might be removed later https://github.com/OpenMS/OpenMS/pull/4529 | |
169 try: | |
170 lineage = [_.replace("-", "_") for _ in p.get_lineage(name_only=True)] | |
171 val = getFromDict(args, lineage) | |
172 except KeyError: | |
173 continue | |
174 else: | |
175 setInDict(args, lineage, val) | |
176 | |
177 if p.type is str and type(p.restrictions) is _Choices and set(p.restrictions.choices) == set(["true", "false"]): | |
178 v = getFromDict(args, p.get_lineage(name_only=True)) | |
179 setInDict(args, p.get_lineage(name_only=True), str(v).lower()) | |
180 | |
181 elif p.is_list and (p.restrictions is None or type(p.restrictions) is _NumericRange): | |
182 v = getFromDict(args, p.get_lineage(name_only=True)) | |
183 if type(v) is str: | |
184 setInDict(args, p.get_lineage(name_only=True), qstring2list(v)) | |
185 elif p.type is _InFile and not (p.default is None or type(p.default) is _Null): | |
186 v = getFromDict(args, p.get_lineage(name_only=True)) | |
187 if v in [[], ""]: | |
188 setInDict(args, p.get_lineage(name_only=True), p.default) | |
189 | |
190 model.write_ctd(input_ctd, arg_dict=args) |