Mercurial > repos > muon-spectroscopy-computational-project > larch_select_paths
comparison larch_select_paths.py @ 0:2e827836f0ad draft
planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_select_paths commit 5be486890442dedfb327289d597e1c8110240735
| author | muon-spectroscopy-computational-project |
|---|---|
| date | Tue, 14 Nov 2023 15:35:52 +0000 |
| parents | |
| children | 7fdca938d90c |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:2e827836f0ad |
|---|---|
| 1 import csv | |
| 2 import json | |
| 3 import os | |
| 4 import re | |
| 5 import sys | |
| 6 from zipfile import ZIP_DEFLATED, ZipFile | |
| 7 | |
| 8 | |
| 9 class GDSWriter: | |
| 10 def __init__(self, default_variables: "dict[str, dict]"): | |
| 11 self.default_properties = { | |
| 12 "s02": {"name": "s02"}, | |
| 13 "e0": {"name": "e0"}, | |
| 14 "deltar": {"name": "alpha*reff"}, | |
| 15 "sigma2": {"name": "sigma2"}, | |
| 16 } | |
| 17 self.rows = [ | |
| 18 f"{'id':>4s}, {'name':>24s}, {'value':>5s}, {'expr':>4s}, " | |
| 19 f"{'vary':>4s}\n" | |
| 20 ] | |
| 21 self.names = set() | |
| 22 | |
| 23 for property in self.default_properties: | |
| 24 name = self.default_properties[property]["name"] | |
| 25 value = default_variables[property]["value"] | |
| 26 vary = default_variables[property]["vary"] | |
| 27 is_common = default_variables[property]["is_common"] | |
| 28 | |
| 29 self.default_properties[property]["value"] = value | |
| 30 self.default_properties[property]["vary"] = vary | |
| 31 self.default_properties[property]["is_common"] = is_common | |
| 32 | |
| 33 if is_common: | |
| 34 self.append_gds(name=name, value=value, vary=vary) | |
| 35 | |
| 36 def append_gds( | |
| 37 self, | |
| 38 name: str, | |
| 39 value: float = 0., | |
| 40 expr: str = None, | |
| 41 vary: bool = True, | |
| 42 label: str = "", | |
| 43 ): | |
| 44 """Append a single GDS variable to the list of rows, later to be | |
| 45 written to file. | |
| 46 | |
| 47 Args: | |
| 48 name (str): Name of the GDS variable. | |
| 49 value (float, optional): Starting value for variable. | |
| 50 Defaults to 0. | |
| 51 expr (str, optional): Expression for setting the variable. | |
| 52 Defaults to None. | |
| 53 vary (bool, optional): Whether the variable is optimised during the | |
| 54 fit. Defaults to True. | |
| 55 label (str, optional): Label to keep variables for different FEFF | |
| 56 directories distinct. Defaults to "". | |
| 57 """ | |
| 58 formatted_name = name if (label is None) else label + name | |
| 59 formatted_name = formatted_name.replace("*reff", "") | |
| 60 if not expr: | |
| 61 expr = " " | |
| 62 | |
| 63 if formatted_name in self.names: | |
| 64 raise ValueError(f"{formatted_name} already used as variable name") | |
| 65 self.names.add(formatted_name) | |
| 66 | |
| 67 self.rows.append( | |
| 68 f"{len(self.rows):4d}, {formatted_name:>24s}, {str(value):>5s}, " | |
| 69 f"{expr:>4s}, {str(vary):>4s}\n" | |
| 70 ) | |
| 71 | |
| 72 def parse_gds( | |
| 73 self, | |
| 74 property_name: str, | |
| 75 variable_name: str = None, | |
| 76 path_variable: dict = None, | |
| 77 directory_label: str = None, | |
| 78 path_label: str = None, | |
| 79 ) -> str: | |
| 80 """Parse and append a row defining a GDS variable for a particular | |
| 81 path. | |
| 82 | |
| 83 Args: | |
| 84 property_name (str): The property to which the variable | |
| 85 corresponds. Should be a key in `self.default_properties`. | |
| 86 variable_name (str, optional): Custom name for this variable. | |
| 87 Defaults to None. | |
| 88 path_variable (dict, optional): Dictionary defining the GDS | |
| 89 settings for this path's variable. Defaults to None. | |
| 90 directory_label (str, optional): Label to indicate paths from a | |
| 91 separate directory. Defaults to None. | |
| 92 path_label (str, optional): Label indicating the atoms involved in | |
| 93 this path. Defaults to None. | |
| 94 | |
| 95 Returns: | |
| 96 str: Either `variable_name`, the name used as a default globally | |
| 97 for this `property_name`, or an automatically generated unique | |
| 98 name. | |
| 99 """ | |
| 100 if variable_name: | |
| 101 self.append_gds( | |
| 102 name=variable_name, | |
| 103 value=path_variable["value"], | |
| 104 expr=path_variable["expr"], | |
| 105 vary=path_variable["vary"], | |
| 106 ) | |
| 107 return variable_name | |
| 108 elif self.default_properties[property_name]["is_common"]: | |
| 109 return self.default_properties[property_name]["name"] | |
| 110 else: | |
| 111 auto_name = self.default_properties[property_name]["name"] | |
| 112 if directory_label: | |
| 113 auto_name += f"_{directory_label}" | |
| 114 if path_label: | |
| 115 auto_name += f"_{path_label.lower().replace('.', '')}" | |
| 116 | |
| 117 self.append_gds( | |
| 118 name=auto_name, | |
| 119 value=self.default_properties[property_name]["value"], | |
| 120 vary=self.default_properties[property_name]["vary"], | |
| 121 ) | |
| 122 return auto_name | |
| 123 | |
| 124 def write(self): | |
| 125 """Write GDS rows to file. | |
| 126 """ | |
| 127 with open("gds.csv", "w") as out: | |
| 128 out.writelines(self.rows) | |
| 129 | |
| 130 | |
| 131 class PathsWriter: | |
| 132 def __init__(self, default_variables: "dict[str, dict]"): | |
| 133 self.rows = [ | |
| 134 f"{'id':>4s}, {'filename':>24s}, {'label':>24s}, {'s02':>3s}, " | |
| 135 f"{'e0':>4s}, {'sigma2':>24s}, {'deltar':>10s}\n" | |
| 136 ] | |
| 137 self.gds_writer = GDSWriter(default_variables=default_variables) | |
| 138 | |
| 139 def parse_feff_output( | |
| 140 self, | |
| 141 paths_file: str, | |
| 142 selection: "dict[str, str|list]", | |
| 143 directory_label: str = "", | |
| 144 ): | |
| 145 """Parse selected paths from CSV summary and define GDS variables. | |
| 146 | |
| 147 Args: | |
| 148 paths_file (str): CSV summary filename. | |
| 149 selection (dict[str, str|list]): Dictionary indicating which paths | |
| 150 to select, and how to define their variables. | |
| 151 directory_label (str, optional): Label to indicate paths from a | |
| 152 separate directory. Defaults to "". | |
| 153 """ | |
| 154 paths = selection["paths"] | |
| 155 path_values_ids = [path_value["id"] for path_value in paths] | |
| 156 | |
| 157 with open(paths_file) as file: | |
| 158 reader = csv.reader(file) | |
| 159 for row in reader: | |
| 160 id_match = re.search(r"\d+", row[0]) | |
| 161 if id_match: | |
| 162 path_id = int(id_match.group()) | |
| 163 filename = row[0].strip() | |
| 164 path_label = row[-2].strip() | |
| 165 variables = {} | |
| 166 | |
| 167 if path_id in path_values_ids: | |
| 168 path_value = paths[path_values_ids.index(path_id)] | |
| 169 for property in self.gds_writer.default_properties: | |
| 170 variables[property] = self.gds_writer.parse_gds( | |
| 171 property_name=property, | |
| 172 variable_name=path_value[property]["name"], | |
| 173 path_variable=path_value[property], | |
| 174 directory_label=directory_label, | |
| 175 path_label=path_label, | |
| 176 ) | |
| 177 self.parse_selected_path( | |
| 178 filename=filename, | |
| 179 path_label=path_label, | |
| 180 directory_label=directory_label, | |
| 181 **variables, | |
| 182 ) | |
| 183 elif selection["selection"] == "all" or int(row[-1]): | |
| 184 path_value = None | |
| 185 for property in self.gds_writer.default_properties: | |
| 186 variables[property] = self.gds_writer.parse_gds( | |
| 187 property_name=property, | |
| 188 directory_label=directory_label, | |
| 189 path_label=path_label, | |
| 190 ) | |
| 191 self.parse_selected_path( | |
| 192 filename=filename, | |
| 193 path_label=path_label, | |
| 194 directory_label=directory_label, | |
| 195 **variables, | |
| 196 ) | |
| 197 | |
| 198 def parse_selected_path( | |
| 199 self, | |
| 200 filename: str, | |
| 201 path_label: str, | |
| 202 directory_label: str = "", | |
| 203 s02: str = "s02", | |
| 204 e0: str = "e0", | |
| 205 sigma2: str = "sigma2", | |
| 206 deltar: str = "alpha*reff", | |
| 207 ): | |
| 208 """Format and append row representing a selected FEFF path. | |
| 209 | |
| 210 Args: | |
| 211 filename (str): Name of the underlying FEFF path file, without | |
| 212 parent directory. | |
| 213 path_label (str): Label indicating the atoms involved in this path. | |
| 214 directory_label (str, optional): Label to indicate paths from a | |
| 215 separate directory. Defaults to "". | |
| 216 s02 (str, optional): Electron screening factor variable name. | |
| 217 Defaults to "s02". | |
| 218 e0 (str, optional): Energy shift variable name. Defaults to "e0". | |
| 219 sigma2 (str, optional): Mean squared displacement variable name. | |
| 220 Defaults to "sigma2". | |
| 221 deltar (str, optional): Change in path length variable. | |
| 222 Defaults to "alpha*reff". | |
| 223 """ | |
| 224 if directory_label: | |
| 225 filename = os.path.join(directory_label, filename) | |
| 226 label = f"{directory_label}.{path_label}" | |
| 227 else: | |
| 228 filename = os.path.join("feff", filename) | |
| 229 label = path_label | |
| 230 | |
| 231 self.rows.append( | |
| 232 f"{len(self.rows):>4d}, {filename:>24s}, {label:>24s}, " | |
| 233 f"{s02:>3s}, {e0:>4s}, {sigma2:>24s}, {deltar:>10s}\n" | |
| 234 ) | |
| 235 | |
| 236 def write(self): | |
| 237 """Write selected path and GDS rows to file. | |
| 238 """ | |
| 239 self.gds_writer.write() | |
| 240 with open("sp.csv", "w") as out: | |
| 241 out.writelines(self.rows) | |
| 242 | |
| 243 | |
| 244 def main(input_values: dict): | |
| 245 """Select paths and define GDS parameters. | |
| 246 | |
| 247 Args: | |
| 248 input_values (dict): All input values from the Galaxy tool UI. | |
| 249 | |
| 250 Raises: | |
| 251 ValueError: If a FEFF label is not unique. | |
| 252 """ | |
| 253 default_variables = input_values["variables"] | |
| 254 | |
| 255 writer = PathsWriter(default_variables=default_variables) | |
| 256 | |
| 257 if len(input_values["feff_outputs"]) == 1: | |
| 258 feff_output = input_values["feff_outputs"][0] | |
| 259 writer.parse_feff_output( | |
| 260 paths_file=feff_output["paths_file"], | |
| 261 selection=feff_output["selection"], | |
| 262 ) | |
| 263 else: | |
| 264 zfill_length = len(str(len(input_values["feff_outputs"]))) | |
| 265 labels = set() | |
| 266 with ZipFile("merged.zip", "x", ZIP_DEFLATED) as zipfile_out: | |
| 267 for i, feff_output in enumerate(input_values["feff_outputs"]): | |
| 268 label = feff_output.pop("label") or str(i + 1).zfill( | |
| 269 zfill_length | |
| 270 ) | |
| 271 if label in labels: | |
| 272 raise ValueError(f"Label '{label}' is not unique") | |
| 273 labels.add(label) | |
| 274 | |
| 275 writer.parse_feff_output( | |
| 276 directory_label=label, | |
| 277 paths_file=feff_output["paths_file"], | |
| 278 selection=feff_output["selection"], | |
| 279 ) | |
| 280 | |
| 281 with ZipFile(feff_output["paths_zip"]) as z: | |
| 282 for zipinfo in z.infolist(): | |
| 283 if zipinfo.filename != "feff/": | |
| 284 zipinfo.filename = zipinfo.filename[5:] | |
| 285 z.extract(member=zipinfo, path=label) | |
| 286 zipfile_out.write( | |
| 287 os.path.join(label, zipinfo.filename) | |
| 288 ) | |
| 289 | |
| 290 writer.write() | |
| 291 | |
| 292 | |
| 293 if __name__ == "__main__": | |
| 294 input_values = json.load(open(sys.argv[1], "r", encoding="utf-8")) | |
| 295 main(input_values) |
