Mercurial > repos > tduigou > parameters_maystro_workflow_0
comparison maystro.py @ 0:9ab8b3673816 draft default tip
planemo upload for repository https://github.com/brsynth/galaxytools/tree/main/tools/parameters_maystro commit db4ac861e1d03fcdfe94321d858839124e493930-dirty
author | tduigou |
---|---|
date | Wed, 23 Jul 2025 09:39:39 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:9ab8b3673816 |
---|---|
1 import argparse | |
2 import tempfile | |
3 import os | |
4 import json | |
5 import shutil | |
6 | |
7 | |
8 def parse_command_line_args(): | |
9 parser = argparse.ArgumentParser(description="Maystro JSON Handler") | |
10 | |
11 parser.add_argument("--distribute_json", required=True, help="true or false") | |
12 parser.add_argument("--json_from_workflow", required=False, nargs='+', help="JSON files from tools", default=[]) | |
13 parser.add_argument("--json_from_user", required=False, help="User-provided JSON") | |
14 parser.add_argument("--json_name_mapping", required=True, help="map the real json name") | |
15 parser.add_argument("--output_workflow", required=True, help="JSON output for next workflow steps") | |
16 parser.add_argument("--output_user", required=True, help="Final JSON output to user") | |
17 | |
18 return parser.parse_args() | |
19 | |
20 | |
21 def parse_file_name_mapping(mapping_str): | |
22 mapping = {} | |
23 if mapping_str: | |
24 for pair in mapping_str.split(','): | |
25 stored, original = pair.strip().split(':', 1) | |
26 # Strip .json from original | |
27 real_name = os.path.splitext(original)[0] | |
28 mapping[os.path.basename(stored)] = real_name | |
29 return mapping | |
30 | |
31 | |
32 def handle_distribute_json_false(args): | |
33 temp_dir = tempfile.mkdtemp(prefix="maystro_merge_") | |
34 print(f"[INFO] Watching temp dir for new JSONs: {temp_dir}") | |
35 | |
36 try: | |
37 # Collect JSONs from json_from_workflow | |
38 initial_jsons = list(filter(os.path.isfile, args.json_from_workflow)) | |
39 print(f"[INFO] Initial JSONs from workflow: {initial_jsons}") | |
40 | |
41 # Parse filename mapping if provided | |
42 filename_mapping = parse_file_name_mapping(getattr(args, 'json_name_mapping', '')) | |
43 | |
44 # Merge all together | |
45 merged = {} | |
46 for file_path in initial_jsons: | |
47 try: | |
48 with open(file_path, 'r') as f: | |
49 data = json.load(f) | |
50 basename = os.path.basename(file_path) | |
51 real_name = filename_mapping.get(basename, basename) # fallback if not in mapping | |
52 merged[real_name] = data | |
53 print(f"[INFO] Added data under key: {real_name}") | |
54 except json.JSONDecodeError as e: | |
55 print(f"[WARN] Skipping invalid JSON file {file_path}: {e}") | |
56 | |
57 with open(args.output_user, "w") as f: | |
58 json.dump(merged, f, indent=2) | |
59 print(f"[INFO] Merged JSON written to: {args.output_user}") | |
60 | |
61 finally: | |
62 print(f"[INFO] Cleaning up: {temp_dir}") | |
63 shutil.rmtree(temp_dir) | |
64 | |
65 | |
66 def merge_json_files(paths): | |
67 merged = {} | |
68 for path in paths: | |
69 try: | |
70 with open(path, "r") as f: | |
71 data = json.load(f) | |
72 merged.update(data) | |
73 except Exception as e: | |
74 print(f"[WARN] Skipping {path}: {e}") | |
75 return merged | |
76 | |
77 | |
78 def handle_distribute_json_true(args): | |
79 if not args.json_from_user: | |
80 raise ValueError("json_from_user is required when distribute_json is true") | |
81 | |
82 with open(args.json_from_user, 'r') as in_f: | |
83 user_data = json.load(in_f) | |
84 | |
85 with open(args.output_workflow, 'w') as out_f: | |
86 json.dump(user_data, out_f, indent=2) | |
87 | |
88 | |
89 def main(): | |
90 args = parse_command_line_args() | |
91 | |
92 if args.distribute_json.lower() == 'false': | |
93 handle_distribute_json_false(args) | |
94 else: | |
95 handle_distribute_json_true(args) | |
96 | |
97 if __name__ == "__main__": | |
98 main() |