comparison CreateAssemblyPicklists_script.py @ 0:0b38f1504205 draft default tip

planemo upload for repository https://github.com/Edinburgh-Genome-Foundry/Plateo commit cc895a281863630e391c310322fbddfd31ac1f8f-dirty
author tduigou
date Thu, 16 Oct 2025 14:27:06 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:0b38f1504205
1 #!/usr/bin/env python
2 # coding: utf-8
3 # Code copied from CUBA backend tools.py and create_assembly_picklists/CreateAssemblyPicklistsView.py
4 # Code modified for running in a script in Galaxy.
5 ##############################################################################
6 ##############################################################################
7 # App code
8 ## EGF Galaxy Create assembly picklists -- script
9
10 ##############################################################################
11 # IMPORTS
12 import argparse
13 import os
14 from io import StringIO, BytesIO
15 import re
16 from base64 import b64encode, b64decode
17 from copy import deepcopy
18 import sys
19
20 from collections import OrderedDict
21 from fuzzywuzzy import process
22 import matplotlib.pyplot as plt
23 from matplotlib.backends.backend_pdf import PdfPages
24 import pandas
25
26 from Bio import SeqIO
27 from Bio.SeqRecord import SeqRecord
28 from Bio.Seq import Seq
29
30 import bandwagon as bw
31 import crazydoc
32 from dnachisel.biotools import sequence_to_biopython_record
33 import dnacauldron
34 import flametree
35 from plateo import AssemblyPlan
36 from plateo.parsers import plate_from_content_spreadsheet
37 from plateo.containers import Plate4ti0960
38 from plateo.exporters import AssemblyPicklistGenerator, picklist_to_assembly_mix_report
39 from plateo.exporters import (
40 picklist_to_labcyte_echo_picklist_file,
41 picklist_to_tecan_evo_picklist_file,
42 plate_to_platemap_spreadsheet,
43 PlateTextPlotter,
44 )
45 from plateo.tools import human_volume
46 from snapgene_reader import snapgene_file_to_seqrecord
47
48
49 ##############################################################################
50 # FUNCTIONS
51
52 def fix_and_rename_paths(paths):
53 fixed_paths = []
54 for path in paths:
55 new_path = path.replace("__sq__", "'")
56 if new_path != path:
57 os.rename(path, new_path)
58 fixed_paths.append(new_path)
59 return fixed_paths
60
61
62 def parse_optional_float(x):
63 if x == '':
64 return None
65 return float(x)
66
67
68 def did_you_mean(name, other_names, limit=5, min_score=50): # test
69 results = process.extract(name, list(other_names), limit=limit)
70 return [e for (e, score) in results if score >= min_score]
71
72
73 def fix_ice_genbank(genbank_txt):
74 lines = genbank_txt.splitlines()
75 lines[0] += max(0, 80 - len(lines[0])) * " "
76 return "\n".join(lines)
77
78
79 def write_record(record, target, fmt="genbank"):
80 """Write a record as genbank, fasta, etc. via Biopython, with fixes"""
81 record = deepcopy(record)
82 if fmt == "genbank":
83 if isinstance(record, (list, tuple)):
84 for r in record:
85 r.name = r.name[:20]
86 else:
87 record.name = record.name[:20]
88 if hasattr(target, "open"):
89 target = target.open("w")
90 SeqIO.write(record, target, fmt)
91
92
93 def autoname_genbank_file(record):
94 return record.id.replace(".", "_") + ".gb"
95
96
97 def string_to_records(string):
98 """Convert a string of a fasta, genbank... into a simple ATGC string.
99
100 Can also be used to detect a format.
101 """
102 matches = re.match("([ATGC][ATGC]*)", string)
103 # print("============", len(matches.groups()[0]), len(string))
104 # print (matches.groups()[0] == string)
105 if (matches is not None) and (matches.groups()[0] == string):
106 return [SeqRecord(Seq(string))], "ATGC"
107
108 for fmt in ("fasta", "genbank"):
109 if fmt == "genbank":
110 string = fix_ice_genbank(string)
111 try:
112 stringio = StringIO(string)
113 records = list(SeqIO.parse(stringio, fmt))
114 if len(records) > 0:
115 return (records, fmt)
116 except:
117 pass
118 try:
119 record = snapgene_file_to_seqrecord(filecontent=StringIO(string))
120 return [record]
121 except:
122 pass
123 raise ValueError("Invalid sequence format")
124
125
126 def file_to_filelike_object(file_, type="byte"):
127 content = file_.content.split("base64,")[1]
128 filelike = BytesIO if (type == "byte") else StringIO
129 return filelike(b64decode(content))
130
131
132 def spreadsheet_file_to_dataframe(filedict, header="infer"):
133 filelike = file_to_filelike_object(filedict)
134 if filedict.name.endswith(".csv"):
135 return pandas.read_csv(filelike, header=header)
136 else:
137 return pandas.read_excel(filelike, header=header)
138
139
140 def records_from_zip_file(zip_file, use_file_names_as_ids=False):
141 zip_name = zip_file.name
142 zip_file = flametree.file_tree(file_to_filelike_object(zip_file))
143 records = []
144 for f in zip_file._all_files:
145 ext = f._extension.lower()
146 if ext in ["gb", "gbk", "fa", "dna"]:
147 try:
148 new_records, fmt = string_to_records(f.read())
149 if not isinstance(new_records, list):
150 new_records = [new_records]
151 except:
152 content_stream = BytesIO(f.read("rb"))
153 try:
154 record = snapgene_file_to_seqrecord(fileobject=content_stream)
155 new_records, fmt = [record], "snapgene"
156 except:
157 try:
158 parser = crazydoc.CrazydocParser(
159 ["highlight_color", "bold", "underline"]
160 )
161 new_records = parser.parse_doc_file(content_stream)
162 fmt = "doc"
163 except:
164 raise ValueError("Format not recognized for file " + f._path)
165
166 single_record = len(new_records) == 1
167 for i, record in enumerate(new_records):
168 name = record.id
169 if name in [
170 None,
171 "",
172 "<unknown id>",
173 ".",
174 " ",
175 "<unknown name>",
176 ]:
177 number = "" if single_record else ("%04d" % i)
178 name = f._name_no_extension.replace(" ", "_") + number
179 record.id = name
180 record.name = name
181 record.file_name = f._name_no_extension
182 record.zip_file_name = zip_name
183 if use_file_names_as_ids and single_record:
184 basename = os.path.basename(record.file_name)
185 basename_no_extension = os.path.splitext(basename)[0]
186 record.id = basename_no_extension
187 records += new_records
188 return records
189
190
191 def records_from_data_file(data_file):
192 content = b64decode(data_file.content.split("base64,")[1])
193 try:
194 records, fmt = string_to_records(content.decode("utf-8"))
195 except:
196 try:
197 record = snapgene_file_to_seqrecord(fileobject=BytesIO(content))
198 records, fmt = [record], "snapgene"
199 except:
200 try:
201 parser = crazydoc.CrazydocParser(
202 ["highlight_color", "bold", "underline"]
203 )
204 records = parser.parse_doc_file(BytesIO(content))
205 fmt = "doc"
206 except:
207 try:
208 df = spreadsheet_file_to_dataframe(data_file, header=None)
209 records = [
210 sequence_to_biopython_record(sequence=seq, id=name, name=name)
211 for name, seq in df.values
212 ]
213 fmt = "spreadsheet"
214 except:
215 raise ValueError("Format not recognized for file " + data_file.name)
216 if not isinstance(records, list):
217 records = [records]
218 return records, fmt
219
220
221 def record_to_formated_string(record, fmt="genbank", remove_descr=False):
222 if remove_descr:
223 record = deepcopy(record)
224 if isinstance(record, (list, tuple)):
225 for r in record:
226 r.description = ""
227 else:
228 record.description = ""
229 fileobject = StringIO()
230 write_record(record, fileobject, fmt)
231 return fileobject.getvalue().encode("utf-8")
232
233
234 def records_from_data_files(data_files, use_file_names_as_ids=False):
235 records = []
236 for file_ in data_files:
237 circular = ("circular" not in file_) or file_.circular
238 if file_.name.lower().endswith("zip"):
239 records += records_from_zip_file(
240 file_, use_file_names_as_ids=use_file_names_as_ids
241 )
242 continue
243 recs, fmt = records_from_data_file(file_)
244 single_record = len(recs) == 1
245 for i, record in enumerate(recs):
246 record.circular = circular
247 record.linear = not circular
248 name_no_extension = "".join(file_.name.split(".")[:-1])
249 name = name_no_extension + ("" if single_record else ("%04d" % i))
250 name = name.replace(" ", "_")
251 UNKNOWN_IDS = [
252 "None",
253 "",
254 "<unknown id>",
255 ".",
256 "EXPORTED",
257 "<unknown name>",
258 "Exported",
259 ]
260 # Sorry for this parts, it took a lot of "whatever works".
261 # keep your part names under 20c and pointless, and everything
262 # will be good
263 if str(record.id).strip() in UNKNOWN_IDS:
264 record.id = name
265 if str(record.name).strip() in UNKNOWN_IDS:
266 record.name = name
267 record.file_name = name_no_extension
268 if use_file_names_as_ids and single_record:
269 basename = os.path.basename(record.source_file)
270 basename_no_extension = os.path.splitext(basename)[0]
271 record.id = basename_no_extension
272 records += recs
273 return records
274
275
276 def data_to_html_data(data, datatype, filename=None):
277 """Data types: zip, genbank, fasta, pdf"""
278 datatype = {
279 "zip": "application/zip",
280 "genbank": "application/genbank",
281 "fasta": "application/fasta",
282 "pdf": "application/pdf",
283 "xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
284 }.get(datatype, datatype)
285 datatype = "data:%s;" % datatype
286 data64 = "base64,%s" % b64encode(data).decode("utf-8")
287 headers = ""
288 if filename is not None:
289 headers += "headers=filename%3D" + filename + ";"
290 return datatype + headers + data64
291
292
293 def zip_data_to_html_data(data):
294 return data_to_html_data(data, "application/zip")
295
296
297 LADDERS = {"100_to_4k": bw.ladders.LADDER_100_to_4k}
298
299
300 def matplotlib_figure_to_svg_base64_data(fig, **kwargs):
301 """Return a string of the form 'data:image/svg+xml;base64,XXX' where XXX
302 is the base64-encoded svg version of the figure."""
303 output = BytesIO()
304 fig.savefig(output, format="svg", **kwargs)
305 svg_txt = output.getvalue().decode("utf-8")
306 svg_txt = "\n".join(svg_txt.split("\n")[4:])
307 svg_txt = "".join(svg_txt.split("\n"))
308
309 content = b64encode(svg_txt.encode("utf-8"))
310 result = (b"data:image/svg+xml;base64," + content).decode("utf-8")
311
312 return result
313
314
315 def matplotlib_figure_to_bitmap_base64_data(fig, fmt="png", **kwargs):
316 """Return a string of the form 'data:image/png;base64,XXX' where XXX
317 is the base64-encoded svg version of the figure."""
318 output = BytesIO()
319 fig.savefig(output, format=fmt, **kwargs)
320 bitmap = output.getvalue()
321 content = b64encode(bitmap)
322 result = (b"data:image/%s;base64,%s" % (fmt.encode("utf-8"), content)).decode(
323 "utf-8"
324 )
325 return result
326
327
328 def figures_to_pdf_report_data(figures, filename="report.pdf"):
329 pdf_io = BytesIO()
330 with PdfPages(pdf_io) as pdf:
331 for fig in figures:
332 pdf.savefig(fig, bbox_inches="tight")
333 return {
334 "data": (
335 "data:application/pdf;base64,"
336 + b64encode(pdf_io.getvalue()).decode("utf-8")
337 ),
338 "name": filename,
339 "mimetype": "application/pdf",
340 }
341
342
343 def csv_to_list(csv_string, sep=","):
344 return [
345 element.strip()
346 for line in csv_string.split("\n")
347 for element in line.split(sep)
348 if len(element.strip())
349 ]
350
351
352 def set_record_topology(record, topology):
353 """Set the Biopython record's topology, possibly passing if already set.
354
355 This actually sets the ``record.annotations['topology']``.The ``topology``
356 parameter can be "circular", "linear", "default_to_circular" (will default
357 to circular if ``annotations['topology']`` is not already set) or
358 "default_to_linear".
359 """
360 valid_topologies = [
361 "circular",
362 "linear",
363 "default_to_circular",
364 "default_to_linear",
365 ]
366 if topology not in valid_topologies:
367 raise ValueError(
368 "topology (%s) should be one of %s."
369 % (topology, ", ".join(valid_topologies))
370 )
371 annotations = record.annotations
372 default_prefix = "default_to_"
373 if topology.startswith(default_prefix):
374 if "topology" not in annotations:
375 annotations["topology"] = topology[len(default_prefix) :]
376 else:
377 annotations["topology"] = topology
378
379
380 ##############################################################################
381 def main():
382
383 parser = argparse.ArgumentParser(description="Generate picklist for DNA assembly.")
384 parser.add_argument("--parts_files", help="Directory with parts data or file with part sizes")
385 parser.add_argument("--picklist", type=str, help="Path to the assembly plan CSV or Excel file")
386 parser.add_argument("--source_plate", help="Source plate file (CSV or Excel)")
387 parser.add_argument("--backbone_name", required=False, help="Name of the backbone")
388 parser.add_argument("--result_zip", help="Name of the output zip file")
389 parser.add_argument("--part_backbone_ratio", type=parse_optional_float, required=False, help="Part to backbone molar ratio")
390 parser.add_argument("--quantity_unit", choices=["fmol", "nM", "ng"], help="Quantity unit")
391 parser.add_argument("--part_quantity", type=float, help="Quantity of each part")
392 parser.add_argument("--buffer_volume", type=float, help="Buffer volume in µL")
393 parser.add_argument("--total_volume", type=float, help="Total reaction volume in µL")
394 parser.add_argument("--dispenser", choices=["labcyte_echo", "tecan_evo"], help="Dispenser machine")
395
396 args = parser.parse_args()
397
398 # Parameters:
399 picklist = args.picklist # assembly plan
400 # directory or can be a csv/Excel with part sizes
401 if isinstance(args.parts_files, str):
402 args.parts_files = args.parts_files.split(",")
403 parts_dir = fix_and_rename_paths(args.parts_files)
404 source_plate_path = args.source_plate
405 backbone_name = args.backbone_name
406 part_backbone_ratio = args.part_backbone_ratio
407 result_zip_file = args.result_zip # output file name "picklist.zip"
408 ##############################################################################
409 # Defaults:
410 destination_plate = None
411 destination_type = "new" # this parameter is not actually used
412 destination_size = 96 # this parameter is not actually used
413 fill_by = "column" # this parameter is not actually used
414 quantity_unit = args.quantity_unit
415 part_quantity = args.part_quantity # 1.3
416 buffer_volume = args.buffer_volume # 0.3 # (µL)
417 total_volume = args.total_volume # 1 # (µL)
418 dispenser_machine = args.dispenser
419 dispenser_min_volume = 0.5 # (nL), this parameter is not actually used
420 dispenser_max_volume = 5 # (µL), this parameter is not actually used
421 dispenser_resolution = 2.5 # (nL), this parameter is not actually used
422 dispenser_dead_volume = 8 # (µL), this parameter is not actually used
423 use_file_names_as_ids = True
424
425 # CODE
426 if picklist.endswith(".csv"):
427 csv = picklist.read().decode()
428 rows = [line.split(",") for line in csv.split("\n") if len(line)]
429 else:
430 dataframe = pandas.read_excel(picklist)
431 rows = [row for i, row in dataframe.iterrows()]
432
433 assembly_plan = AssemblyPlan(
434 OrderedDict(
435 [
436 (
437 row[0],
438 [
439 str(e).strip()
440 for e in row[1:]
441 if str(e).strip() not in ["-", "nan", ""]
442 ],
443 )
444 for row in rows
445 if row[0] not in ["nan", "Construct name", "constructs", "construct"]
446 ]
447 )
448 )
449 for assembly, parts in assembly_plan.assemblies.items():
450 assembly_plan.assemblies[assembly] = [part.replace(" ", "_") for part in parts]
451
452 # Reading part infos
453 if not isinstance(parts_dir, list):
454 if parts_dir.endswith((".csv", ".xls", ".xlsx")): # part sizes specified in table
455 if parts_dir.endswith(".csv"):
456 dataframe = pandas.read_csv(parts_dir)
457 else:
458 dataframe = pandas.read_excel(parts_dir)
459 parts_data = {row.part: {"size": row["size"]} for i, row in dataframe.iterrows()}
460 else: # input records
461 records = dnacauldron.biotools.load_records_from_files(
462 files=parts_dir, use_file_names_as_ids=use_file_names_as_ids
463 )
464 parts_data = {rec.id.replace(" ", "_").lower(): {"record": rec} for rec in records}
465 #parts_data = process_parts_with_mapping(records, args.file_name_mapping)
466 assembly_plan.parts_data = parts_data
467 parts_without_data = assembly_plan.parts_without_data()
468 if len(parts_without_data):
469 print("success: False")
470 print("message: Some parts have no provided record or data.")
471 print("missing_parts: ", parts_without_data)
472 sys.exit()
473 # Reading protocol
474 if quantity_unit == "fmol":
475 part_mol = part_quantity * 1e-15
476 part_g = None
477 if quantity_unit == "nM":
478 part_mol = part_quantity * total_volume * 1e-15
479 part_g = None
480 if quantity_unit == "ng":
481 part_mol = None
482 part_g = part_quantity * 1e-9
483 # Backbone:part molar ratio calculation is not performed in this case.
484 # This ensures no change regardless of form input:
485 part_backbone_ratio = 1
486 print("Generating picklist")
487 picklist_generator = AssemblyPicklistGenerator(
488 part_mol=part_mol,
489 part_g=part_g,
490 complement_to=total_volume * 1e-6, # convert uL to L
491 buffer_volume=buffer_volume * 1e-6,
492 volume_rounding=2.5e-9, # not using parameter from form
493 minimal_dispense_volume=5e-9, # Echo machine's minimum dispense -
494 )
495 if backbone_name != '' and backbone_name != 'Non':
496 backbone_name_list = backbone_name.split(",")
497 source_plate = plate_from_content_spreadsheet(source_plate_path)
498
499 for well in source_plate.iter_wells():
500 if well.is_empty:
501 continue
502 quantities = well.content.quantities
503 part, quantity = list(quantities.items())[0]
504 quantities.pop(part)
505 quantities[part.replace(" ", "_")] = quantity
506
507 if backbone_name != '' and backbone_name != 'Non':
508 if part in backbone_name_list:
509 # This section multiplies the backbone concentration with the
510 # part:backbone molar ratio. This tricks the calculator into making
511 # a picklist with the desired ratio.
512 # For example, a part:backbone = 2:1 will multiply the
513 # backbone concentration by 2, therefore half as much of it will be
514 # added to the well.
515 quantities[part.replace(" ", "_")] = quantity * part_backbone_ratio
516 else:
517 quantities[part.replace(" ", "_")] = quantity
518
519 source_plate.name = "Source"
520 if destination_plate:
521 dest_filelike = file_to_filelike_object(destination_plate)
522 destination_plate = plate_from_content_spreadsheet(destination_plate)
523 else:
524 destination_plate = Plate4ti0960("Mixplate")
525 destination_wells = (
526 well for well in destination_plate.iter_wells(direction="column") if well.is_empty
527 )
528 picklist, picklist_data = picklist_generator.make_picklist(
529 assembly_plan,
530 source_wells=source_plate.iter_wells(),
531 destination_wells=destination_wells,
532 )
533 if picklist is None:
534 print("success: False")
535 print("message: Some parts in the assembly plan have no corresponding well.")
536 print("picklist_data: ", picklist_data)
537 print("missing_parts:", picklist_data.get("missing_parts", None))
538 sys.exit()
539
540 future_plates = picklist.simulate(inplace=False)
541
542
543 def text(w):
544 txt = human_volume(w.content.volume)
545 if "construct" in w.data:
546 txt = "\n".join([w.data["construct"], txt])
547 return txt
548
549
550 plotter = PlateTextPlotter(text)
551 ax, _ = plotter.plot_plate(future_plates[destination_plate], figsize=(20, 8))
552
553 ziproot = flametree.file_tree(result_zip_file, replace=True)
554
555 # MIXPLATE MAP PLOT
556 ax.figure.savefig(
557 ziproot._file("final_mixplate.pdf").open("wb"),
558 format="pdf",
559 bbox_inches="tight",
560 )
561 plt.close(ax.figure)
562 plate_to_platemap_spreadsheet(
563 future_plates[destination_plate],
564 lambda w: w.data.get("construct", ""),
565 filepath=ziproot._file("final_mixplate.xls").open("wb"),
566 )
567
568 # ASSEMBLY REPORT
569 print("Writing report...")
570 picklist_to_assembly_mix_report(
571 picklist,
572 ziproot._file("assembly_mix_picklist_report.pdf").open("wb"),
573 data=picklist_data,
574 )
575 assembly_plan.write_report(ziproot._file("assembly_plan_summary.pdf").open("wb"))
576
577 # MACHINE PICKLIST
578
579 if dispenser_machine == "labcyte_echo":
580 picklist_to_labcyte_echo_picklist_file(
581 picklist, ziproot._file("ECHO_picklist.csv").open("w")
582 )
583 else:
584 picklist_to_tecan_evo_picklist_file(
585 picklist, ziproot._file("EVO_picklist.gwl").open("w")
586 )
587 # We'll not write the input source plate.
588 # raw = file_to_filelike_object(source_plate_path).read()
589 # f = ziproot.copy(source_plate_path)
590 # f.write(raw, mode="wb")
591 ziproot._close()
592 print("success: True")
593
594
595 if __name__ == "__main__":
596 main()