comparison larch_athena.py @ 2:a1e26990131c draft

planemo upload for repository https://github.com/MaterialsGalaxy/larch-tools/tree/main/larch_athena commit 0f66842e802430e887d1c6cb7be1cc5436408fd2
author muon-spectroscopy-computational-project
date Mon, 04 Mar 2024 11:43:19 +0000
parents 2b3115342fef
children 82e9dd980916
comparison
equal deleted inserted replaced
1:2b3115342fef 2:a1e26990131c
61 if is_zipped: 61 if is_zipped:
62 all_groups = list(self.load_zipped_files().values()) 62 all_groups = list(self.load_zipped_files().values())
63 else: 63 else:
64 all_groups = [] 64 all_groups = []
65 for filepath in dat_files.split(","): 65 for filepath in dat_files.split(","):
66 group = self.load_single_file(filepath)["out"] 66 for group in self.load_single_file(filepath).values():
67 all_groups.append(group) 67 all_groups.append(group)
68 68
69 return merge_groups(all_groups, xarray="energy", yarray="mu") 69 merged_group = merge_groups(all_groups, xarray="energy", yarray="mu")
70 pre_edge_with_defaults(merged_group)
71 return merged_group
70 72
71 def load_single_file( 73 def load_single_file(
72 self, 74 self,
73 filepath: str, 75 filepath: str,
74 is_zipped: bool = False, 76 is_zipped: bool = False,
75 ) -> "tuple[dict, bool]": 77 ) -> dict:
76 if is_zipped: 78 if is_zipped:
77 return self.load_zipped_files() 79 return self.load_zipped_files()
78 80
79 print(f"Attempting to read from {filepath}") 81 print(f"Attempting to read from {filepath}")
80 if self.data_format == "athena": 82 if self.data_format == "athena":
83 return {"out": group} 85 return {"out": group}
84 elif self.extract_group["extract_group"] == "multiple": 86 elif self.extract_group["extract_group"] == "multiple":
85 groups = {} 87 groups = {}
86 for repeat in self.extract_group["multiple"]: 88 for repeat in self.extract_group["multiple"]:
87 name = repeat["group_name"] 89 name = repeat["group_name"]
90 print(f"\nExtracting group {name}")
88 groups[name] = read_group(filepath, name) 91 groups[name] = read_group(filepath, name)
89 return groups 92 return groups
90 else: 93 else:
91 return read_all_groups(filepath) 94 return read_all_groups(filepath)
92 95
139 142
140 all_paths = list(os.walk("dat_files")) 143 all_paths = list(os.walk("dat_files"))
141 all_paths.sort(key=lambda x: x[0]) 144 all_paths.sort(key=lambda x: x[0])
142 file_total = sum([len(f) for _, _, f in all_paths]) 145 file_total = sum([len(f) for _, _, f in all_paths])
143 print(f"{file_total} files found") 146 print(f"{file_total} files found")
144 key_length = len(str(file_total))
145 i = 0
146 keyed_data = {} 147 keyed_data = {}
147 for dirpath, _, filenames in all_paths: 148 for dirpath, _, filenames in all_paths:
148 try: 149 try:
149 filenames.sort(key=sorting_key) 150 filenames.sort(key=sorting_key)
150 except IndexError as e: 151 except IndexError as e:
153 f"defaulting to sorting alphabetically:\n{e}" 154 f"defaulting to sorting alphabetically:\n{e}"
154 ) 155 )
155 filenames.sort() 156 filenames.sort()
156 157
157 for filename in filenames: 158 for filename in filenames:
158 key = str(i).zfill(key_length) 159 if len(all_paths) > 1:
160 key = f"{dirpath}_{filename}"
161 else:
162 key = filename
159 filepath = os.path.join(dirpath, filename) 163 filepath = os.path.join(dirpath, filename)
160 xas_data = self.load_single_file(filepath) 164 xas_data = self.load_single_file(filepath)
161 keyed_data[key] = xas_data["out"] 165 keyed_data[key] = xas_data["out"]
162 i += 1
163 166
164 return keyed_data 167 return keyed_data
165 168
166 def rename_cols(self, xafs_group: Group) -> Group: 169 def rename_cols(self, xafs_group: Group) -> Group:
167 labels = [label.lower() for label in xafs_group.array_labels] 170 labels = [label.lower() for label in xafs_group.array_labels]