Mercurial > repos > tduigou > create_assembly_picklists
comparison CreateAssemblyPicklists_script.py @ 0:0b38f1504205 draft default tip
planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/Plateo commit cc895a281863630e391c310322fbddfd31ac1f8f-dirty
| author | tduigou |
|---|---|
| date | Thu, 16 Oct 2025 14:27:06 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:0b38f1504205 |
|---|---|
| 1 #!/usr/bin/env python | |
| 2 # coding: utf-8 | |
| 3 # Code copied from CUBA backend tools.py and create_assembly_picklists/CreateAssemblyPicklistsView.py | |
| 4 # Code modified for running in a script in Galaxy. | |
| 5 ############################################################################## | |
| 6 ############################################################################## | |
| 7 # App code | |
| 8 ## EGF Galaxy Create assembly picklists -- script | |
| 9 | |
| 10 ############################################################################## | |
| 11 # IMPORTS | |
| 12 import argparse | |
| 13 import os | |
| 14 from io import StringIO, BytesIO | |
| 15 import re | |
| 16 from base64 import b64encode, b64decode | |
| 17 from copy import deepcopy | |
| 18 import sys | |
| 19 | |
| 20 from collections import OrderedDict | |
| 21 from fuzzywuzzy import process | |
| 22 import matplotlib.pyplot as plt | |
| 23 from matplotlib.backends.backend_pdf import PdfPages | |
| 24 import pandas | |
| 25 | |
| 26 from Bio import SeqIO | |
| 27 from Bio.SeqRecord import SeqRecord | |
| 28 from Bio.Seq import Seq | |
| 29 | |
| 30 import bandwagon as bw | |
| 31 import crazydoc | |
| 32 from dnachisel.biotools import sequence_to_biopython_record | |
| 33 import dnacauldron | |
| 34 import flametree | |
| 35 from plateo import AssemblyPlan | |
| 36 from plateo.parsers import plate_from_content_spreadsheet | |
| 37 from plateo.containers import Plate4ti0960 | |
| 38 from plateo.exporters import AssemblyPicklistGenerator, picklist_to_assembly_mix_report | |
| 39 from plateo.exporters import ( | |
| 40 picklist_to_labcyte_echo_picklist_file, | |
| 41 picklist_to_tecan_evo_picklist_file, | |
| 42 plate_to_platemap_spreadsheet, | |
| 43 PlateTextPlotter, | |
| 44 ) | |
| 45 from plateo.tools import human_volume | |
| 46 from snapgene_reader import snapgene_file_to_seqrecord | |
| 47 | |
| 48 | |
| 49 ############################################################################## | |
| 50 # FUNCTIONS | |
| 51 | |
| 52 def fix_and_rename_paths(paths): | |
| 53 fixed_paths = [] | |
| 54 for path in paths: | |
| 55 new_path = path.replace("__sq__", "'") | |
| 56 if new_path != path: | |
| 57 os.rename(path, new_path) | |
| 58 fixed_paths.append(new_path) | |
| 59 return fixed_paths | |
| 60 | |
| 61 | |
| 62 def parse_optional_float(x): | |
| 63 if x == '': | |
| 64 return None | |
| 65 return float(x) | |
| 66 | |
| 67 | |
| 68 def did_you_mean(name, other_names, limit=5, min_score=50): # test | |
| 69 results = process.extract(name, list(other_names), limit=limit) | |
| 70 return [e for (e, score) in results if score >= min_score] | |
| 71 | |
| 72 | |
| 73 def fix_ice_genbank(genbank_txt): | |
| 74 lines = genbank_txt.splitlines() | |
| 75 lines[0] += max(0, 80 - len(lines[0])) * " " | |
| 76 return "\n".join(lines) | |
| 77 | |
| 78 | |
| 79 def write_record(record, target, fmt="genbank"): | |
| 80 """Write a record as genbank, fasta, etc. via Biopython, with fixes""" | |
| 81 record = deepcopy(record) | |
| 82 if fmt == "genbank": | |
| 83 if isinstance(record, (list, tuple)): | |
| 84 for r in record: | |
| 85 r.name = r.name[:20] | |
| 86 else: | |
| 87 record.name = record.name[:20] | |
| 88 if hasattr(target, "open"): | |
| 89 target = target.open("w") | |
| 90 SeqIO.write(record, target, fmt) | |
| 91 | |
| 92 | |
| 93 def autoname_genbank_file(record): | |
| 94 return record.id.replace(".", "_") + ".gb" | |
| 95 | |
| 96 | |
| 97 def string_to_records(string): | |
| 98 """Convert a string of a fasta, genbank... into a simple ATGC string. | |
| 99 | |
| 100 Can also be used to detect a format. | |
| 101 """ | |
| 102 matches = re.match("([ATGC][ATGC]*)", string) | |
| 103 # print("============", len(matches.groups()[0]), len(string)) | |
| 104 # print (matches.groups()[0] == string) | |
| 105 if (matches is not None) and (matches.groups()[0] == string): | |
| 106 return [SeqRecord(Seq(string))], "ATGC" | |
| 107 | |
| 108 for fmt in ("fasta", "genbank"): | |
| 109 if fmt == "genbank": | |
| 110 string = fix_ice_genbank(string) | |
| 111 try: | |
| 112 stringio = StringIO(string) | |
| 113 records = list(SeqIO.parse(stringio, fmt)) | |
| 114 if len(records) > 0: | |
| 115 return (records, fmt) | |
| 116 except: | |
| 117 pass | |
| 118 try: | |
| 119 record = snapgene_file_to_seqrecord(filecontent=StringIO(string)) | |
| 120 return [record] | |
| 121 except: | |
| 122 pass | |
| 123 raise ValueError("Invalid sequence format") | |
| 124 | |
| 125 | |
| 126 def file_to_filelike_object(file_, type="byte"): | |
| 127 content = file_.content.split("base64,")[1] | |
| 128 filelike = BytesIO if (type == "byte") else StringIO | |
| 129 return filelike(b64decode(content)) | |
| 130 | |
| 131 | |
| 132 def spreadsheet_file_to_dataframe(filedict, header="infer"): | |
| 133 filelike = file_to_filelike_object(filedict) | |
| 134 if filedict.name.endswith(".csv"): | |
| 135 return pandas.read_csv(filelike, header=header) | |
| 136 else: | |
| 137 return pandas.read_excel(filelike, header=header) | |
| 138 | |
| 139 | |
| 140 def records_from_zip_file(zip_file, use_file_names_as_ids=False): | |
| 141 zip_name = zip_file.name | |
| 142 zip_file = flametree.file_tree(file_to_filelike_object(zip_file)) | |
| 143 records = [] | |
| 144 for f in zip_file._all_files: | |
| 145 ext = f._extension.lower() | |
| 146 if ext in ["gb", "gbk", "fa", "dna"]: | |
| 147 try: | |
| 148 new_records, fmt = string_to_records(f.read()) | |
| 149 if not isinstance(new_records, list): | |
| 150 new_records = [new_records] | |
| 151 except: | |
| 152 content_stream = BytesIO(f.read("rb")) | |
| 153 try: | |
| 154 record = snapgene_file_to_seqrecord(fileobject=content_stream) | |
| 155 new_records, fmt = [record], "snapgene" | |
| 156 except: | |
| 157 try: | |
| 158 parser = crazydoc.CrazydocParser( | |
| 159 ["highlight_color", "bold", "underline"] | |
| 160 ) | |
| 161 new_records = parser.parse_doc_file(content_stream) | |
| 162 fmt = "doc" | |
| 163 except: | |
| 164 raise ValueError("Format not recognized for file " + f._path) | |
| 165 | |
| 166 single_record = len(new_records) == 1 | |
| 167 for i, record in enumerate(new_records): | |
| 168 name = record.id | |
| 169 if name in [ | |
| 170 None, | |
| 171 "", | |
| 172 "<unknown id>", | |
| 173 ".", | |
| 174 " ", | |
| 175 "<unknown name>", | |
| 176 ]: | |
| 177 number = "" if single_record else ("%04d" % i) | |
| 178 name = f._name_no_extension.replace(" ", "_") + number | |
| 179 record.id = name | |
| 180 record.name = name | |
| 181 record.file_name = f._name_no_extension | |
| 182 record.zip_file_name = zip_name | |
| 183 if use_file_names_as_ids and single_record: | |
| 184 basename = os.path.basename(record.file_name) | |
| 185 basename_no_extension = os.path.splitext(basename)[0] | |
| 186 record.id = basename_no_extension | |
| 187 records += new_records | |
| 188 return records | |
| 189 | |
| 190 | |
| 191 def records_from_data_file(data_file): | |
| 192 content = b64decode(data_file.content.split("base64,")[1]) | |
| 193 try: | |
| 194 records, fmt = string_to_records(content.decode("utf-8")) | |
| 195 except: | |
| 196 try: | |
| 197 record = snapgene_file_to_seqrecord(fileobject=BytesIO(content)) | |
| 198 records, fmt = [record], "snapgene" | |
| 199 except: | |
| 200 try: | |
| 201 parser = crazydoc.CrazydocParser( | |
| 202 ["highlight_color", "bold", "underline"] | |
| 203 ) | |
| 204 records = parser.parse_doc_file(BytesIO(content)) | |
| 205 fmt = "doc" | |
| 206 except: | |
| 207 try: | |
| 208 df = spreadsheet_file_to_dataframe(data_file, header=None) | |
| 209 records = [ | |
| 210 sequence_to_biopython_record(sequence=seq, id=name, name=name) | |
| 211 for name, seq in df.values | |
| 212 ] | |
| 213 fmt = "spreadsheet" | |
| 214 except: | |
| 215 raise ValueError("Format not recognized for file " + data_file.name) | |
| 216 if not isinstance(records, list): | |
| 217 records = [records] | |
| 218 return records, fmt | |
| 219 | |
| 220 | |
| 221 def record_to_formated_string(record, fmt="genbank", remove_descr=False): | |
| 222 if remove_descr: | |
| 223 record = deepcopy(record) | |
| 224 if isinstance(record, (list, tuple)): | |
| 225 for r in record: | |
| 226 r.description = "" | |
| 227 else: | |
| 228 record.description = "" | |
| 229 fileobject = StringIO() | |
| 230 write_record(record, fileobject, fmt) | |
| 231 return fileobject.getvalue().encode("utf-8") | |
| 232 | |
| 233 | |
| 234 def records_from_data_files(data_files, use_file_names_as_ids=False): | |
| 235 records = [] | |
| 236 for file_ in data_files: | |
| 237 circular = ("circular" not in file_) or file_.circular | |
| 238 if file_.name.lower().endswith("zip"): | |
| 239 records += records_from_zip_file( | |
| 240 file_, use_file_names_as_ids=use_file_names_as_ids | |
| 241 ) | |
| 242 continue | |
| 243 recs, fmt = records_from_data_file(file_) | |
| 244 single_record = len(recs) == 1 | |
| 245 for i, record in enumerate(recs): | |
| 246 record.circular = circular | |
| 247 record.linear = not circular | |
| 248 name_no_extension = "".join(file_.name.split(".")[:-1]) | |
| 249 name = name_no_extension + ("" if single_record else ("%04d" % i)) | |
| 250 name = name.replace(" ", "_") | |
| 251 UNKNOWN_IDS = [ | |
| 252 "None", | |
| 253 "", | |
| 254 "<unknown id>", | |
| 255 ".", | |
| 256 "EXPORTED", | |
| 257 "<unknown name>", | |
| 258 "Exported", | |
| 259 ] | |
| 260 # Sorry for this parts, it took a lot of "whatever works". | |
| 261 # keep your part names under 20c and pointless, and everything | |
| 262 # will be good | |
| 263 if str(record.id).strip() in UNKNOWN_IDS: | |
| 264 record.id = name | |
| 265 if str(record.name).strip() in UNKNOWN_IDS: | |
| 266 record.name = name | |
| 267 record.file_name = name_no_extension | |
| 268 if use_file_names_as_ids and single_record: | |
| 269 basename = os.path.basename(record.source_file) | |
| 270 basename_no_extension = os.path.splitext(basename)[0] | |
| 271 record.id = basename_no_extension | |
| 272 records += recs | |
| 273 return records | |
| 274 | |
| 275 | |
| 276 def data_to_html_data(data, datatype, filename=None): | |
| 277 """Data types: zip, genbank, fasta, pdf""" | |
| 278 datatype = { | |
| 279 "zip": "application/zip", | |
| 280 "genbank": "application/genbank", | |
| 281 "fasta": "application/fasta", | |
| 282 "pdf": "application/pdf", | |
| 283 "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| 284 }.get(datatype, datatype) | |
| 285 datatype = "data:%s;" % datatype | |
| 286 data64 = "base64,%s" % b64encode(data).decode("utf-8") | |
| 287 headers = "" | |
| 288 if filename is not None: | |
| 289 headers += "headers=filename%3D" + filename + ";" | |
| 290 return datatype + headers + data64 | |
| 291 | |
| 292 | |
| 293 def zip_data_to_html_data(data): | |
| 294 return data_to_html_data(data, "application/zip") | |
| 295 | |
| 296 | |
| 297 LADDERS = {"100_to_4k": bw.ladders.LADDER_100_to_4k} | |
| 298 | |
| 299 | |
| 300 def matplotlib_figure_to_svg_base64_data(fig, **kwargs): | |
| 301 """Return a string of the form '' where XXX | |
| 302 is the base64-encoded svg version of the figure.""" | |
| 303 output = BytesIO() | |
| 304 fig.savefig(output, format="svg", **kwargs) | |
| 305 svg_txt = output.getvalue().decode("utf-8") | |
| 306 svg_txt = "\n".join(svg_txt.split("\n")[4:]) | |
| 307 svg_txt = "".join(svg_txt.split("\n")) | |
| 308 | |
| 309 content = b64encode(svg_txt.encode("utf-8")) | |
| 310 result = (b"data:image/svg+xml;base64," + content).decode("utf-8") | |
| 311 | |
| 312 return result | |
| 313 | |
| 314 | |
| 315 def matplotlib_figure_to_bitmap_base64_data(fig, fmt="png", **kwargs): | |
| 316 """Return a string of the form '' where XXX | |
| 317 is the base64-encoded svg version of the figure.""" | |
| 318 output = BytesIO() | |
| 319 fig.savefig(output, format=fmt, **kwargs) | |
| 320 bitmap = output.getvalue() | |
| 321 content = b64encode(bitmap) | |
| 322 result = (b"data:image/%s;base64,%s" % (fmt.encode("utf-8"), content)).decode( | |
| 323 "utf-8" | |
| 324 ) | |
| 325 return result | |
| 326 | |
| 327 | |
| 328 def figures_to_pdf_report_data(figures, filename="report.pdf"): | |
| 329 pdf_io = BytesIO() | |
| 330 with PdfPages(pdf_io) as pdf: | |
| 331 for fig in figures: | |
| 332 pdf.savefig(fig, bbox_inches="tight") | |
| 333 return { | |
| 334 "data": ( | |
| 335 "data:application/pdf;base64," | |
| 336 + b64encode(pdf_io.getvalue()).decode("utf-8") | |
| 337 ), | |
| 338 "name": filename, | |
| 339 "mimetype": "application/pdf", | |
| 340 } | |
| 341 | |
| 342 | |
| 343 def csv_to_list(csv_string, sep=","): | |
| 344 return [ | |
| 345 element.strip() | |
| 346 for line in csv_string.split("\n") | |
| 347 for element in line.split(sep) | |
| 348 if len(element.strip()) | |
| 349 ] | |
| 350 | |
| 351 | |
| 352 def set_record_topology(record, topology): | |
| 353 """Set the Biopython record's topology, possibly passing if already set. | |
| 354 | |
| 355 This actually sets the ``record.annotations['topology']``.The ``topology`` | |
| 356 parameter can be "circular", "linear", "default_to_circular" (will default | |
| 357 to circular if ``annotations['topology']`` is not already set) or | |
| 358 "default_to_linear". | |
| 359 """ | |
| 360 valid_topologies = [ | |
| 361 "circular", | |
| 362 "linear", | |
| 363 "default_to_circular", | |
| 364 "default_to_linear", | |
| 365 ] | |
| 366 if topology not in valid_topologies: | |
| 367 raise ValueError( | |
| 368 "topology (%s) should be one of %s." | |
| 369 % (topology, ", ".join(valid_topologies)) | |
| 370 ) | |
| 371 annotations = record.annotations | |
| 372 default_prefix = "default_to_" | |
| 373 if topology.startswith(default_prefix): | |
| 374 if "topology" not in annotations: | |
| 375 annotations["topology"] = topology[len(default_prefix) :] | |
| 376 else: | |
| 377 annotations["topology"] = topology | |
| 378 | |
| 379 | |
| 380 ############################################################################## | |
| 381 def main(): | |
| 382 | |
| 383 parser = argparse.ArgumentParser(description="Generate picklist for DNA assembly.") | |
| 384 parser.add_argument("--parts_files", help="Directory with parts data or file with part sizes") | |
| 385 parser.add_argument("--picklist", type=str, help="Path to the assembly plan CSV or Excel file") | |
| 386 parser.add_argument("--source_plate", help="Source plate file (CSV or Excel)") | |
| 387 parser.add_argument("--backbone_name", required=False, help="Name of the backbone") | |
| 388 parser.add_argument("--result_zip", help="Name of the output zip file") | |
| 389 parser.add_argument("--part_backbone_ratio", type=parse_optional_float, required=False, help="Part to backbone molar ratio") | |
| 390 parser.add_argument("--quantity_unit", choices=["fmol", "nM", "ng"], help="Quantity unit") | |
| 391 parser.add_argument("--part_quantity", type=float, help="Quantity of each part") | |
| 392 parser.add_argument("--buffer_volume", type=float, help="Buffer volume in µL") | |
| 393 parser.add_argument("--total_volume", type=float, help="Total reaction volume in µL") | |
| 394 parser.add_argument("--dispenser", choices=["labcyte_echo", "tecan_evo"], help="Dispenser machine") | |
| 395 | |
| 396 args = parser.parse_args() | |
| 397 | |
| 398 # Parameters: | |
| 399 picklist = args.picklist # assembly plan | |
| 400 # directory or can be a csv/Excel with part sizes | |
| 401 if isinstance(args.parts_files, str): | |
| 402 args.parts_files = args.parts_files.split(",") | |
| 403 parts_dir = fix_and_rename_paths(args.parts_files) | |
| 404 source_plate_path = args.source_plate | |
| 405 backbone_name = args.backbone_name | |
| 406 part_backbone_ratio = args.part_backbone_ratio | |
| 407 result_zip_file = args.result_zip # output file name "picklist.zip" | |
| 408 ############################################################################## | |
| 409 # Defaults: | |
| 410 destination_plate = None | |
| 411 destination_type = "new" # this parameter is not actually used | |
| 412 destination_size = 96 # this parameter is not actually used | |
| 413 fill_by = "column" # this parameter is not actually used | |
| 414 quantity_unit = args.quantity_unit | |
| 415 part_quantity = args.part_quantity # 1.3 | |
| 416 buffer_volume = args.buffer_volume # 0.3 # (µL) | |
| 417 total_volume = args.total_volume # 1 # (µL) | |
| 418 dispenser_machine = args.dispenser | |
| 419 dispenser_min_volume = 0.5 # (nL), this parameter is not actually used | |
| 420 dispenser_max_volume = 5 # (µL), this parameter is not actually used | |
| 421 dispenser_resolution = 2.5 # (nL), this parameter is not actually used | |
| 422 dispenser_dead_volume = 8 # (µL), this parameter is not actually used | |
| 423 use_file_names_as_ids = True | |
| 424 | |
| 425 # CODE | |
| 426 if picklist.endswith(".csv"): | |
| 427 csv = picklist.read().decode() | |
| 428 rows = [line.split(",") for line in csv.split("\n") if len(line)] | |
| 429 else: | |
| 430 dataframe = pandas.read_excel(picklist) | |
| 431 rows = [row for i, row in dataframe.iterrows()] | |
| 432 | |
| 433 assembly_plan = AssemblyPlan( | |
| 434 OrderedDict( | |
| 435 [ | |
| 436 ( | |
| 437 row[0], | |
| 438 [ | |
| 439 str(e).strip() | |
| 440 for e in row[1:] | |
| 441 if str(e).strip() not in ["-", "nan", ""] | |
| 442 ], | |
| 443 ) | |
| 444 for row in rows | |
| 445 if row[0] not in ["nan", "Construct name", "constructs", "construct"] | |
| 446 ] | |
| 447 ) | |
| 448 ) | |
| 449 for assembly, parts in assembly_plan.assemblies.items(): | |
| 450 assembly_plan.assemblies[assembly] = [part.replace(" ", "_") for part in parts] | |
| 451 | |
| 452 # Reading part infos | |
| 453 if not isinstance(parts_dir, list): | |
| 454 if parts_dir.endswith((".csv", ".xls", ".xlsx")): # part sizes specified in table | |
| 455 if parts_dir.endswith(".csv"): | |
| 456 dataframe = pandas.read_csv(parts_dir) | |
| 457 else: | |
| 458 dataframe = pandas.read_excel(parts_dir) | |
| 459 parts_data = {row.part: {"size": row["size"]} for i, row in dataframe.iterrows()} | |
| 460 else: # input records | |
| 461 records = dnacauldron.biotools.load_records_from_files( | |
| 462 files=parts_dir, use_file_names_as_ids=use_file_names_as_ids | |
| 463 ) | |
| 464 parts_data = {rec.id.replace(" ", "_").lower(): {"record": rec} for rec in records} | |
| 465 #parts_data = process_parts_with_mapping(records, args.file_name_mapping) | |
| 466 assembly_plan.parts_data = parts_data | |
| 467 parts_without_data = assembly_plan.parts_without_data() | |
| 468 if len(parts_without_data): | |
| 469 print("success: False") | |
| 470 print("message: Some parts have no provided record or data.") | |
| 471 print("missing_parts: ", parts_without_data) | |
| 472 sys.exit() | |
| 473 # Reading protocol | |
| 474 if quantity_unit == "fmol": | |
| 475 part_mol = part_quantity * 1e-15 | |
| 476 part_g = None | |
| 477 if quantity_unit == "nM": | |
| 478 part_mol = part_quantity * total_volume * 1e-15 | |
| 479 part_g = None | |
| 480 if quantity_unit == "ng": | |
| 481 part_mol = None | |
| 482 part_g = part_quantity * 1e-9 | |
| 483 # Backbone:part molar ratio calculation is not performed in this case. | |
| 484 # This ensures no change regardless of form input: | |
| 485 part_backbone_ratio = 1 | |
| 486 print("Generating picklist") | |
| 487 picklist_generator = AssemblyPicklistGenerator( | |
| 488 part_mol=part_mol, | |
| 489 part_g=part_g, | |
| 490 complement_to=total_volume * 1e-6, # convert uL to L | |
| 491 buffer_volume=buffer_volume * 1e-6, | |
| 492 volume_rounding=2.5e-9, # not using parameter from form | |
| 493 minimal_dispense_volume=5e-9, # Echo machine's minimum dispense - | |
| 494 ) | |
| 495 if backbone_name != '' and backbone_name != 'Non': | |
| 496 backbone_name_list = backbone_name.split(",") | |
| 497 source_plate = plate_from_content_spreadsheet(source_plate_path) | |
| 498 | |
| 499 for well in source_plate.iter_wells(): | |
| 500 if well.is_empty: | |
| 501 continue | |
| 502 quantities = well.content.quantities | |
| 503 part, quantity = list(quantities.items())[0] | |
| 504 quantities.pop(part) | |
| 505 quantities[part.replace(" ", "_")] = quantity | |
| 506 | |
| 507 if backbone_name != '' and backbone_name != 'Non': | |
| 508 if part in backbone_name_list: | |
| 509 # This section multiplies the backbone concentration with the | |
| 510 # part:backbone molar ratio. This tricks the calculator into making | |
| 511 # a picklist with the desired ratio. | |
| 512 # For example, a part:backbone = 2:1 will multiply the | |
| 513 # backbone concentration by 2, therefore half as much of it will be | |
| 514 # added to the well. | |
| 515 quantities[part.replace(" ", "_")] = quantity * part_backbone_ratio | |
| 516 else: | |
| 517 quantities[part.replace(" ", "_")] = quantity | |
| 518 | |
| 519 source_plate.name = "Source" | |
| 520 if destination_plate: | |
| 521 dest_filelike = file_to_filelike_object(destination_plate) | |
| 522 destination_plate = plate_from_content_spreadsheet(destination_plate) | |
| 523 else: | |
| 524 destination_plate = Plate4ti0960("Mixplate") | |
| 525 destination_wells = ( | |
| 526 well for well in destination_plate.iter_wells(direction="column") if well.is_empty | |
| 527 ) | |
| 528 picklist, picklist_data = picklist_generator.make_picklist( | |
| 529 assembly_plan, | |
| 530 source_wells=source_plate.iter_wells(), | |
| 531 destination_wells=destination_wells, | |
| 532 ) | |
| 533 if picklist is None: | |
| 534 print("success: False") | |
| 535 print("message: Some parts in the assembly plan have no corresponding well.") | |
| 536 print("picklist_data: ", picklist_data) | |
| 537 print("missing_parts:", picklist_data.get("missing_parts", None)) | |
| 538 sys.exit() | |
| 539 | |
| 540 future_plates = picklist.simulate(inplace=False) | |
| 541 | |
| 542 | |
| 543 def text(w): | |
| 544 txt = human_volume(w.content.volume) | |
| 545 if "construct" in w.data: | |
| 546 txt = "\n".join([w.data["construct"], txt]) | |
| 547 return txt | |
| 548 | |
| 549 | |
| 550 plotter = PlateTextPlotter(text) | |
| 551 ax, _ = plotter.plot_plate(future_plates[destination_plate], figsize=(20, 8)) | |
| 552 | |
| 553 ziproot = flametree.file_tree(result_zip_file, replace=True) | |
| 554 | |
| 555 # MIXPLATE MAP PLOT | |
| 556 ax.figure.savefig( | |
| 557 ziproot._file("final_mixplate.pdf").open("wb"), | |
| 558 format="pdf", | |
| 559 bbox_inches="tight", | |
| 560 ) | |
| 561 plt.close(ax.figure) | |
| 562 plate_to_platemap_spreadsheet( | |
| 563 future_plates[destination_plate], | |
| 564 lambda w: w.data.get("construct", ""), | |
| 565 filepath=ziproot._file("final_mixplate.xls").open("wb"), | |
| 566 ) | |
| 567 | |
| 568 # ASSEMBLY REPORT | |
| 569 print("Writing report...") | |
| 570 picklist_to_assembly_mix_report( | |
| 571 picklist, | |
| 572 ziproot._file("assembly_mix_picklist_report.pdf").open("wb"), | |
| 573 data=picklist_data, | |
| 574 ) | |
| 575 assembly_plan.write_report(ziproot._file("assembly_plan_summary.pdf").open("wb")) | |
| 576 | |
| 577 # MACHINE PICKLIST | |
| 578 | |
| 579 if dispenser_machine == "labcyte_echo": | |
| 580 picklist_to_labcyte_echo_picklist_file( | |
| 581 picklist, ziproot._file("ECHO_picklist.csv").open("w") | |
| 582 ) | |
| 583 else: | |
| 584 picklist_to_tecan_evo_picklist_file( | |
| 585 picklist, ziproot._file("EVO_picklist.gwl").open("w") | |
| 586 ) | |
| 587 # We'll not write the input source plate. | |
| 588 # raw = file_to_filelike_object(source_plate_path).read() | |
| 589 # f = ziproot.copy(source_plate_path) | |
| 590 # f.write(raw, mode="wb") | |
| 591 ziproot._close() | |
| 592 print("success: True") | |
| 593 | |
| 594 | |
| 595 if __name__ == "__main__": | |
| 596 main() |
