diff cloning_simulation.py @ 0:3a3b0f7cb5c2 draft

planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/DnaCauldron/tree/master commit 3401816c949b538bd9c67e61cbe92badff6a4007-dirty
author tduigou
date Wed, 11 Jun 2025 09:32:59 +0000
parents
children 2655e08cd61a
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/cloning_simulation.py	Wed Jun 11 09:32:59 2025 +0000
@@ -0,0 +1,177 @@
+import argparse
+import os
+import zipfile
+import pandas
+import dnacauldron
+
+
+def cloning_simulation(files_to_assembly, domesticated_list,
+                       csv_file, assembly_type, topology,
+                       file_name_mapping, file_name_mapping_dom,
+                       use_file_names_as_id,
+                       outdir_simulation, output_simulation, enzyme, outdir_gb):
+
+    files_to_assembly = files_to_assembly.split(',')
+
+    repository = dnacauldron.SequenceRepository()
+    repository.import_records(files=files_to_assembly,
+                              use_file_names_as_ids=use_file_names_as_id,
+                              topology=topology)
+    if domesticated_list:
+       domesticated_files = domesticated_list.split(',')
+       repository.import_records(files=domesticated_files,
+                                 use_file_names_as_ids=use_file_names_as_id,
+                                 topology=topology)
+
+    # refine the real record name dict
+    if isinstance(file_name_mapping, str):
+        file_name_mapping = dict(
+            item.split(":") for item in file_name_mapping.split(",")
+        )
+    real_names = {
+        os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "")
+        for k, v in file_name_mapping.items()
+    }
+
+    # refine the real record name dict_dom
+    if file_name_mapping_dom == "":
+        file_name_mapping_dom = {}
+    else:
+        if isinstance(file_name_mapping_dom, str):
+            file_name_mapping_dom = dict(
+                item.split(":") for item in file_name_mapping_dom.split(",")
+            )
+        dom_real_names = {
+            os.path.splitext(os.path.basename(k))[0]: v.replace(".gb", "")
+            for k, v in file_name_mapping_dom.items()
+        }
+        real_names.update(dom_real_names)
+
+    # update the records
+
+    for key, record in list(repository.collections["parts"].items()):
+        current_id = record.id
+        if current_id in real_names:
+            new_id = real_names[current_id]
+            record.id = new_id
+            record.name = new_id
+            record.description = new_id
+            repository.collections["parts"][new_id] = repository.collections["parts"].pop(key)
+    ########################################################
+    # print (f"repo: {vars(repository)}")
+    # any(pandas.read_csv(csv_file, index_col=0, header=None).duplicated())
+    df = pandas.read_csv(csv_file, index_col=0, header=None)
+    if df.duplicated().any():
+        raise ValueError("Duplicate rows found in the data!")
+
+    if assembly_type == "Type2sRestrictionAssembly":
+        assembly_class = dnacauldron.Type2sRestrictionAssembly
+    elif assembly_type == "GibsonAssembly":
+        assembly_class = dnacauldron.GibsonAssembly
+    elif assembly_type == "BASICAssembly":
+        assembly_class = dnacauldron.BASICAssembly
+    elif assembly_type == "BioBrickStandardAssembly":
+        assembly_class = dnacauldron.BioBrickStandardAssembly
+    elif assembly_type == "OligoPairAnnealin":
+        assembly_class = dnacauldron.OligoPairAnnealin
+    elif assembly_type == "LigaseCyclingReactionAssembly":
+        assembly_class = dnacauldron.LigaseCyclingReactionAssembly
+    else:
+        raise ValueError(f"Unsupported assembly type: {assembly_type}")
+
+    new_csvname = "assambly.csv"
+    os.rename(csv_file, new_csvname)
+
+    assembly_plan = dnacauldron.AssemblyPlan.from_spreadsheet(
+        name="auto_from_filename",
+        path=new_csvname,
+        dataframe=None,
+        header=None,
+        assembly_class=assembly_class
+    )
+    if enzyme != 'auto':
+        for assembly in assembly_plan.assemblies:
+            assembly.enzyme = enzyme
+
+    simulation = assembly_plan.simulate(sequence_repository=repository)
+    stats = simulation.compute_stats()
+    print(stats)
+
+    report_writer = dnacauldron.AssemblyReportWriter(
+        include_mix_graphs=True,
+        include_assembly_plots=True,
+        show_overhangs_in_graph=True,
+        annotate_parts_homologies=True,
+        include_pdf_report=True,
+    )
+    simulation.write_report(outdir_simulation, assembly_report_writer=report_writer)
+
+    # Append report files to .dat (ZIP)
+    with zipfile.ZipFile(output_simulation, mode='a', compression=zipfile.ZIP_DEFLATED) as zipf:
+        for root, dirs, files in os.walk(outdir_simulation):
+            for file in files:
+                full_path = os.path.join(root, file)
+                arcname = os.path.relpath(full_path, outdir_simulation)
+                zipf.write(full_path, arcname)
+ #       print("Files in the zip archive:")
+ #       for info in zipf.infolist():
+ #           print(info.filename)
+        for member in zipf.namelist():
+            # Only extract actual files inside 'all_construct_records/' (not subfolders)
+            if member.startswith("assambly_simulation/all_construct_records/") and not member.endswith("/"):
+                # Get the file name only (strip folder path)
+                filename = os.path.basename(member)
+                if not filename:
+                    continue  # skip any edge cases
+
+                # Destination path directly in outdir_dir
+                target_path = os.path.join(outdir_gb, filename)
+
+                # Write the file content
+                with zipf.open(member) as source, open(target_path, "wb") as target:
+                    target.write(source.read())
+
+    return output_simulation, outdir_gb
+
+
+def parse_command_line_args():
+    parser = argparse.ArgumentParser(description="Domestication")
+
+    parser.add_argument("--parts_files", required=True,
+                        help="List of GenBank files (Comma-separated)")
+    parser.add_argument("--domesticated_seq", required=True,
+                        help="output of domestication (ganbank list)")
+    parser.add_argument("--assembly_csv", required=True,
+                        help="csv assembly")
+    parser.add_argument('--assembly_plan_name', type=str,
+                        help='type of assembly')
+    parser.add_argument('--topology', type=str,
+                        help='"circular" or "linear"')
+    parser.add_argument('--file_name_mapping', type=str,
+                        help='Mapping of Galaxy filenames to original filenames')
+    parser.add_argument('--file_name_mapping_dom', type=str,
+                        help='Mapping of Galaxy filenames to original domestication filenames')
+    parser.add_argument("--use_file_names_as_id", type=lambda x: x.lower() == 'true', default=True,
+                        help="Use file names as IDs (True/False)")
+    parser.add_argument("--outdir_simulation", required=True,
+                        help="dir output for cloning simulation results")
+    parser.add_argument("--output_simulation", required=True,
+                        help="zip output for cloning simulation results")
+    parser.add_argument('--enzyme', type=str,
+                        help='enzyme to use')
+    parser.add_argument("--outdir_gb", required=True,
+                        help="dir output constructs gb files")
+ 
+    return parser.parse_args()
+
+
+if __name__ == "__main__":
+    args = parse_command_line_args()
+
+    cloning_simulation(
+        args.parts_files, args.domesticated_seq,
+        args.assembly_csv, args.assembly_plan_name, args.topology,
+        args.file_name_mapping, args.file_name_mapping_dom,
+        args.use_file_names_as_id, args.outdir_simulation,
+        args.output_simulation, args.enzyme, args.outdir_gb
+    )