comparison fill_ctd.py @ 0:f483ffdc7014 draft default tip

planemo upload for repository https://github.com/galaxyproteomics/tools-galaxyp/tree/master/tools/openms commit 5c080b1e2b99f1c88f4557e9fec8c45c9d23b906
author galaxyp
date Fri, 14 Jun 2024 21:32:42 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:f483ffdc7014
1 import collections
2 import json
3 import operator
4 import os
5 import re
6 import subprocess
7 import sys
8 from functools import reduce # forward compatibility for Python 3
9
10 from CTDopts.CTDopts import (
11 _Choices,
12 _InFile,
13 _Null,
14 _NumericRange,
15 CTDModel
16 )
17
18
19 def getFromDict(dataDict, mapList):
20 return reduce(operator.getitem, mapList, dataDict)
21
22
23 def setInDict(dataDict, mapList, value):
24 getFromDict(dataDict, mapList[:-1])[mapList[-1]] = value
25
26
27 def mergeDicts(d, e):
28 """
29 insert values from the dict e into dict d
30 no values of d are overwritten
31 """
32 for k, v in e.items():
33 if (k in d and isinstance(d[k], dict) and isinstance(e[k], collections.abc.Mapping)):
34 mergeDicts(d[k], e[k])
35 elif k not in d:
36 d[k] = e[k]
37 else:
38 sys.stderr.write("fill_ctd.py: could not merge key %s for %s in %s" % (k, d, e))
39 sys.exit(1)
40
41
42 def _json_object_hook_noenvlookup(d):
43 return _json_object_hook(d, envlookup=False)
44
45
46 def _json_object_hook(d, envlookup=True):
47 """
48 wee helper to transform the json written by galaxy
49 while loading
50 - True/False (bool objects) -> "true"/"false" (lowercase string)
51 - data inputs with multiple and optional true give [None] if no file is given -> []
52 - None -> "" (empty string)
53 - replace bash expressions (if envlookup is True):
54 - environment variables (need to consist capital letters and _) by their value
55 - expressions
56 """
57 for k in d.keys():
58 # if type(d[k]) is bool:
59 # d[k] = str(d[k]).lower()
60 # else
61 if type(d[k]) is list and len(d[k]) == 1 and d[k][0] is None:
62 d[k] = []
63 elif d[k] is None:
64 d[k] = ""
65 elif envlookup and type(d[k]) is str and d[k].startswith("$"):
66 m = re.fullmatch(r"\$([A-Z_]+)", d[k])
67 if m:
68 d[k] = os.environ.get(m.group(1), "")
69 continue
70 m = re.fullmatch(r"\$(\{[A-Z_]+):-(.*)\}", d[k])
71 if m:
72 d[k] = os.environ.get(m.group(1), m.group(2))
73 continue
74
75 try:
76 p = subprocess.run("echo %s" % d[k], shell=True, check=True, stdout=subprocess.PIPE, encoding="utf8")
77 d[k] = p.stdout.strip()
78 except subprocess.CalledProcessError:
79 sys.stderr.write("fill_ctd error: Could not evaluate %s" % d[k])
80 continue
81 return d
82
83
84 def qstring2list(qs):
85 """
86 transform a space separated string that is quoted by " into a list
87 """
88 lst = list()
89 qs = qs.split(" ")
90 quoted = False
91 for p in qs:
92 if p == "":
93 continue
94 if p.startswith('"') and p.endswith('"'):
95 lst.append(p[1:-1])
96 elif p.startswith('"'):
97 quoted = True
98 lst.append(p[1:] + " ")
99 elif p.endswith('"'):
100 quoted = False
101 lst[-1] += p[:-1]
102 else:
103 if quoted:
104 lst[-1] += p + " "
105 else:
106 lst.append(p)
107 return lst
108
109
110 def fix_underscores(args):
111 if type(args) is dict:
112 for k in list(args.keys()):
113 v = args[k]
114 if type(v) is dict:
115 fix_underscores(args[k])
116 if k.startswith("_"):
117 args[k[1:]] = v
118 del args[k]
119 elif type(args) is list:
120 for i, v in enumerate(args):
121 if type(v) is dict:
122 fix_underscores(args[i])
123
124
125 input_ctd = sys.argv[1]
126
127 # load user specified parameters from json
128 with open(sys.argv[2]) as fh:
129 args = json.load(fh, object_hook=_json_object_hook_noenvlookup)
130
131 # load hardcoded parameters from json
132 with open(sys.argv[3]) as fh:
133 hc_args = json.load(fh, object_hook=_json_object_hook)
134
135 # insert the hc_args into the args
136 mergeDicts(args, hc_args)
137
138 # put the contents of the advanced options section into the main dict
139 if "adv_opts" in args:
140 args.update(args["adv_opts"])
141 del args["adv_opts"]
142
143 # IDMapper has in and spectra:in params, in is used in out as format_source",
144 # which does not work in Galaxy: https://github.com/galaxyproject/galaxy/pull/9493"
145 # therefore hardcoded params change the name of spectra:in to spectra:_in
146 # which is corrected here again
147 # TODO remove once PR is in and adapt profile accordingly
148 fix_underscores(args)
149
150 model = CTDModel(from_file=input_ctd)
151
152 # transform values from json that correspond to
153 # - old style booleans (string + restrictions) -> transformed to a str
154 # - new style booleans that get a string (happens for hidden parameters [-test])
155 # are transformed to a bool
156 # - unrestricted ITEMLIST which are represented as strings
157 # ("=quoted and space separated) in Galaxy -> transform to lists
158 # - optional data input parameters that have defaults and for which no
159 # value is given -> overwritte with the default
160 for p in model.get_parameters():
161
162 # check if the parameter is in the arguments from the galaxy tool
163 # (from the json file(s)), since advanced parameters are absent
164 # if the conditional is set to basic parameters
165 try:
166 getFromDict(args, p.get_lineage(name_only=True))
167 except KeyError:
168 # few tools use dashes in parameters which are automatically replaced
169 # by underscores by Galaxy. in these cases the dictionary needs to be
170 # updated (better: then dash and the underscore variant are in the dict)
171 # TODO might be removed later https://github.com/OpenMS/OpenMS/pull/4529
172 try:
173 lineage = [_.replace("-", "_") for _ in p.get_lineage(name_only=True)]
174 val = getFromDict(args, lineage)
175 except KeyError:
176 continue
177 else:
178 setInDict(args, p.get_lineage(name_only=True), val)
179
180 if p.type is str and type(p.restrictions) is _Choices and set(p.restrictions.choices) == set(["true", "false"]):
181 v = getFromDict(args, p.get_lineage(name_only=True))
182 setInDict(args, p.get_lineage(name_only=True), str(v).lower())
183 elif p.type is bool:
184 v = getFromDict(args, p.get_lineage(name_only=True))
185 if isinstance(v, str):
186 v = (v.lower() == "true")
187 setInDict(args, p.get_lineage(name_only=True), v)
188 elif p.is_list and (p.restrictions is None or type(p.restrictions) is _NumericRange):
189 v = getFromDict(args, p.get_lineage(name_only=True))
190 if type(v) is str:
191 setInDict(args, p.get_lineage(name_only=True), qstring2list(v))
192 elif p.type is _InFile and not (p.default is None or type(p.default) is _Null):
193 v = getFromDict(args, p.get_lineage(name_only=True))
194 if v in [[], ""]:
195 setInDict(args, p.get_lineage(name_only=True), p.default)
196
197 model.write_ctd(input_ctd, arg_dict=args)