Mercurial > repos > tduigou > cloning_simulation
comparison cloning_simulation.py @ 0:3a3b0f7cb5c2 draft
planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/DnaCauldron/tree/master commit 3401816c949b538bd9c67e61cbe92badff6a4007-dirty
| author | tduigou |
|---|---|
| date | Wed, 11 Jun 2025 09:32:59 +0000 |
| parents | |
| children | 2655e08cd61a |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:3a3b0f7cb5c2 |
|---|---|
| 1 import argparse | |
| 2 import os | |
| 3 import zipfile | |
| 4 import pandas | |
| 5 import dnacauldron | |
| 6 | |
| 7 | |
| 8 def cloning_simulation(files_to_assembly, domesticated_list, | |
| 9 csv_file, assembly_type, topology, | |
| 10 file_name_mapping, file_name_mapping_dom, | |
| 11 use_file_names_as_id, | |
| 12 outdir_simulation, output_simulation, enzyme, outdir_gb): | |
| 13 | |
| 14 files_to_assembly = files_to_assembly.split(',') | |
| 15 | |
| 16 repository = dnacauldron.SequenceRepository() | |
| 17 repository.import_records(files=files_to_assembly, | |
| 18 use_file_names_as_ids=use_file_names_as_id, | |
| 19 topology=topology) | |
| 20 if domesticated_list: | |
| 21 domesticated_files = domesticated_list.split(',') | |
| 22 repository.import_records(files=domesticated_files, | |
| 23 use_file_names_as_ids=use_file_names_as_id, | |
| 24 topology=topology) | |
| 25 | |
| 26 # refine the real record name dict | |
| 27 if isinstance(file_name_mapping, str): | |
| 28 file_name_mapping = dict( | |
| 29 item.split(":") for item in file_name_mapping.split(",") | |
| 30 ) | |
| 31 real_names = { | |
| 32 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "") | |
| 33 for k, v in file_name_mapping.items() | |
| 34 } | |
| 35 | |
| 36 # refine the real record name dict_dom | |
| 37 if file_name_mapping_dom == "": | |
| 38 file_name_mapping_dom = {} | |
| 39 else: | |
| 40 if isinstance(file_name_mapping_dom, str): | |
| 41 file_name_mapping_dom = dict( | |
| 42 item.split(":") for item in file_name_mapping_dom.split(",") | |
| 43 ) | |
| 44 dom_real_names = { | |
| 45 os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "") | |
| 46 for k, v in file_name_mapping_dom.items() | |
| 47 } | |
| 48 real_names.update(dom_real_names) | |
| 49 | |
| 50 # update the records | |
| 51 | |
| 52 for key, record in list(repository.collections["parts"].items()): | |
| 53 current_id = record.id | |
| 54 if current_id in real_names: | |
| 55 new_id = real_names[current_id] | |
| 56 record.id = new_id | |
| 57 record.name = new_id | |
| 58 record.description = new_id | |
| 59 repository.collections["parts"][new_id] = repository.collections["parts"].pop(key) | |
| 60 ######################################################## | |
| 61 # print (f"repo: {vars(repository)}") | |
| 62 # any(pandas.read_csv(csv_file, index_col=0, header=None).duplicated()) | |
| 63 df = pandas.read_csv(csv_file, index_col=0, header=None) | |
| 64 if df.duplicated().any(): | |
| 65 raise ValueError("Duplicate rows found in the data!") | |
| 66 | |
| 67 if assembly_type == "Type2sRestrictionAssembly": | |
| 68 assembly_class = dnacauldron.Type2sRestrictionAssembly | |
| 69 elif assembly_type == "GibsonAssembly": | |
| 70 assembly_class = dnacauldron.GibsonAssembly | |
| 71 elif assembly_type == "BASICAssembly": | |
| 72 assembly_class = dnacauldron.BASICAssembly | |
| 73 elif assembly_type == "BioBrickStandardAssembly": | |
| 74 assembly_class = dnacauldron.BioBrickStandardAssembly | |
| 75 elif assembly_type == "OligoPairAnnealin": | |
| 76 assembly_class = dnacauldron.OligoPairAnnealin | |
| 77 elif assembly_type == "LigaseCyclingReactionAssembly": | |
| 78 assembly_class = dnacauldron.LigaseCyclingReactionAssembly | |
| 79 else: | |
| 80 raise ValueError(f"Unsupported assembly type: {assembly_type}") | |
| 81 | |
| 82 new_csvname = "assambly.csv" | |
| 83 os.rename(csv_file, new_csvname) | |
| 84 | |
| 85 assembly_plan = dnacauldron.AssemblyPlan.from_spreadsheet( | |
| 86 name="auto_from_filename", | |
| 87 path=new_csvname, | |
| 88 dataframe=None, | |
| 89 header=None, | |
| 90 assembly_class=assembly_class | |
| 91 ) | |
| 92 if enzyme != 'auto': | |
| 93 for assembly in assembly_plan.assemblies: | |
| 94 assembly.enzyme = enzyme | |
| 95 | |
| 96 simulation = assembly_plan.simulate(sequence_repository=repository) | |
| 97 stats = simulation.compute_stats() | |
| 98 print(stats) | |
| 99 | |
| 100 report_writer = dnacauldron.AssemblyReportWriter( | |
| 101 include_mix_graphs=True, | |
| 102 include_assembly_plots=True, | |
| 103 show_overhangs_in_graph=True, | |
| 104 annotate_parts_homologies=True, | |
| 105 include_pdf_report=True, | |
| 106 ) | |
| 107 simulation.write_report(outdir_simulation, assembly_report_writer=report_writer) | |
| 108 | |
| 109 # Append report files to .dat (ZIP) | |
| 110 with zipfile.ZipFile(output_simulation, mode='a', compression=zipfile.ZIP_DEFLATED) as zipf: | |
| 111 for root, dirs, files in os.walk(outdir_simulation): | |
| 112 for file in files: | |
| 113 full_path = os.path.join(root, file) | |
| 114 arcname = os.path.relpath(full_path, outdir_simulation) | |
| 115 zipf.write(full_path, arcname) | |
| 116 # print("Files in the zip archive:") | |
| 117 # for info in zipf.infolist(): | |
| 118 # print(info.filename) | |
| 119 for member in zipf.namelist(): | |
| 120 # Only extract actual files inside 'all_construct_records/' (not subfolders) | |
| 121 if member.startswith("assambly_simulation/all_construct_records/") and not member.endswith("/"): | |
| 122 # Get the file name only (strip folder path) | |
| 123 filename = os.path.basename(member) | |
| 124 if not filename: | |
| 125 continue # skip any edge cases | |
| 126 | |
| 127 # Destination path directly in outdir_dir | |
| 128 target_path = os.path.join(outdir_gb, filename) | |
| 129 | |
| 130 # Write the file content | |
| 131 with zipf.open(member) as source, open(target_path, "wb") as target: | |
| 132 target.write(source.read()) | |
| 133 | |
| 134 return output_simulation, outdir_gb | |
| 135 | |
| 136 | |
| 137 def parse_command_line_args(): | |
| 138 parser = argparse.ArgumentParser(description="Domestication") | |
| 139 | |
| 140 parser.add_argument("--parts_files", required=True, | |
| 141 help="List of GenBank files (Comma-separated)") | |
| 142 parser.add_argument("--domesticated_seq", required=True, | |
| 143 help="output of domestication (ganbank list)") | |
| 144 parser.add_argument("--assembly_csv", required=True, | |
| 145 help="csv assembly") | |
| 146 parser.add_argument('--assembly_plan_name', type=str, | |
| 147 help='type of assembly') | |
| 148 parser.add_argument('--topology', type=str, | |
| 149 help='"circular" or "linear"') | |
| 150 parser.add_argument('--file_name_mapping', type=str, | |
| 151 help='Mapping of Galaxy filenames to original filenames') | |
| 152 parser.add_argument('--file_name_mapping_dom', type=str, | |
| 153 help='Mapping of Galaxy filenames to original domestication filenames') | |
| 154 parser.add_argument("--use_file_names_as_id", type=lambda x: x.lower() == 'true', default=True, | |
| 155 help="Use file names as IDs (True/False)") | |
| 156 parser.add_argument("--outdir_simulation", required=True, | |
| 157 help="dir output for cloning simulation results") | |
| 158 parser.add_argument("--output_simulation", required=True, | |
| 159 help="zip output for cloning simulation results") | |
| 160 parser.add_argument('--enzyme', type=str, | |
| 161 help='enzyme to use') | |
| 162 parser.add_argument("--outdir_gb", required=True, | |
| 163 help="dir output constructs gb files") | |
| 164 | |
| 165 return parser.parse_args() | |
| 166 | |
| 167 | |
| 168 if __name__ == "__main__": | |
| 169 args = parse_command_line_args() | |
| 170 | |
| 171 cloning_simulation( | |
| 172 args.parts_files, args.domesticated_seq, | |
| 173 args.assembly_csv, args.assembly_plan_name, args.topology, | |
| 174 args.file_name_mapping, args.file_name_mapping_dom, | |
| 175 args.use_file_names_as_id, args.outdir_simulation, | |
| 176 args.output_simulation, args.enzyme, args.outdir_gb | |
| 177 ) |
