comparison pyramid_upgrade.py @ 3:ef68bc2a4dbc draft default tip

planemo upload for repository https://github.com/ohsu-comp-bio/ashlar commit 69f200fcfa0b1d17de50466c51d8c5468fdeb54c
author goeckslab
date Fri, 09 Feb 2024 22:48:46 +0000
parents 33ab2058c6d9
children
comparison
equal deleted inserted replaced
2:33ab2058c6d9 3:ef68bc2a4dbc
1 import argparse
2 import dataclasses
3 import fractions
4 import io
5 import os
6 import re
7 import reprlib
8 import struct
9 import sys
10 import xml.etree.ElementTree
11 from typing import Any, List
12
13
14 datatype_formats = {
15 1: "B", # BYTE
16 2: "s", # ASCII
17 3: "H", # SHORT
18 4: "I", # LONG
19 5: "I", # RATIONAL (pairs)
20 6: "b", # SBYTE
21 7: "B", # UNDEFINED
22 8: "h", # SSHORT
23 9: "i", # SLONG
24 10: "i", # SRATIONAL (pairs)
25 11: "f", # FLOAT
26 12: "d", # DOUBLE
27 13: "I", # IFD
28 16: "Q", # LONG8
29 17: "q", # SLONG8
30 18: "Q", # IFD8
31 }
32 rational_datatypes = {5, 10}
33
34
35 class TiffSurgeon:
36 """Read, manipulate and write IFDs in BigTIFF files."""
37
38 def __init__(self, path, *, writeable=False, encoding=None):
39 self.path = path
40 self.writeable = writeable
41 self.encoding = encoding
42 self.endian = ""
43 self.ifds = None
44 self.file = open(self.path, "r+b" if self.writeable else "rb")
45 self._validate()
46
47 def _validate(self):
48 signature = self.read("2s")
49 signature = signature.decode("ascii", errors="ignore")
50 if signature == "II":
51 self.endian = "<"
52 elif signature == "MM":
53 self.endian = ">"
54 else:
55 raise FormatError(f"Not a TIFF file (signature is '{signature}').")
56 version = self.read("H")
57 if version == 42:
58 raise FormatError("Cannot process classic TIFF, only BigTIFF.")
59 offset_size, reserved, first_ifd_offset = self.read("H H Q")
60 if version != 43 or offset_size != 8 or reserved != 0:
61 raise FormatError("Malformed TIFF, giving up!")
62 self.first_ifd_offset = first_ifd_offset
63
64 def read(self, fmt, *, file=None):
65 if file is None:
66 file = self.file
67 endian = self.endian or "="
68 size = struct.calcsize(endian + fmt)
69 raw = file.read(size)
70 value = self.unpack(fmt, raw)
71 return value
72
73 def write(self, fmt, *values):
74 if not self.writeable:
75 raise ValueError("File is opened as read-only.")
76 raw = self.pack(fmt, *values)
77 self.file.write(raw)
78
79 def unpack(self, fmt, raw):
80 assert self.endian or re.match(r"\d+s", fmt), \
81 "can't unpack non-string before endianness is detected"
82 fmt = self.endian + fmt
83 size = struct.calcsize(fmt)
84 values = struct.unpack(fmt, raw[:size])
85 if len(values) == 1:
86 return values[0]
87 else:
88 return values
89
90 def pack(self, fmt, *values):
91 assert self.endian, "can't pack without endian set"
92 fmt = self.endian + fmt
93 raw = struct.pack(fmt, *values)
94 return raw
95
96 def read_ifds(self):
97 ifds = [self.read_ifd(self.first_ifd_offset)]
98 while ifds[-1].offset_next:
99 ifds.append(self.read_ifd(ifds[-1].offset_next))
100 self.ifds = ifds
101
102 def read_ifd(self, offset):
103 self.file.seek(offset)
104 num_tags = self.read("Q")
105 buf = io.BytesIO(self.file.read(num_tags * 20))
106 offset_next = self.read("Q")
107 try:
108 tags = TagSet([self.read_tag(buf) for i in range(num_tags)])
109 except FormatError as e:
110 raise FormatError(f"IFD at offset {offset}, {e}") from None
111 ifd = Ifd(tags, offset, offset_next)
112 return ifd
113
114 def read_tag(self, buf):
115 tag = Tag(*self.read("H H Q 8s", file=buf))
116 value, offset_range = self.tag_value(tag)
117 tag = dataclasses.replace(tag, value=value, offset_range=offset_range)
118 return tag
119
120 def append_ifd_sequence(self, ifds):
121 """Write list of IFDs as a chained sequence at the end of the file.
122
123 Returns a list of new Ifd objects with updated offsets.
124
125 """
126 self.file.seek(0, os.SEEK_END)
127 new_ifds = []
128 for ifd in ifds:
129 offset = self.file.tell()
130 self.write("Q", len(ifd.tags))
131 for tag in ifd.tags:
132 self.write_tag(tag)
133 offset_next = self.file.tell() + 8 if ifd is not ifds[-1] else 0
134 self.write("Q", offset_next)
135 new_ifd = dataclasses.replace(
136 ifd, offset=offset, offset_next=offset_next
137 )
138 new_ifds.append(new_ifd)
139 return new_ifds
140
141 def append_tag_data(self, code, datatype, value):
142 """Build new tag and write data to the end of the file if necessary.
143
144 Returns a Tag object corresponding to the passed parameters. This
145 function only writes any "overflow" data and not the IFD entry itself,
146 so the returned Tag must still be written to an IFD.
147
148 If the value is small enough to fit in the data field within an IFD, no
149 data will actually be written to the file and the returned Tag object
150 will have the value encoded in its data attribute. Otherwise the data
151 will be appended to the file and the returned Tag's data attribute will
152 encode the corresponding offset.
153
154 """
155 fmt = datatype_formats[datatype]
156 # FIXME Should we perform our own check that values match datatype?
157 # struct.pack will do it but the exception won't be as understandable.
158 original_value = value
159 if isinstance(value, str):
160 if not self.encoding:
161 raise ValueError(
162 "ASCII tag values must be bytes if encoding is not set"
163 )
164 value = [value.encode(self.encoding) + b"\x00"]
165 count = len(value[0])
166 elif isinstance(value, bytes):
167 value = [value + b"\x00"]
168 count = len(value[0])
169 else:
170 try:
171 len(value)
172 except TypeError:
173 value = [value]
174 count = len(value)
175 struct_count = count
176 if datatype in rational_datatypes:
177 value = [i for v in value for i in v.as_integer_ratio()]
178 count //= 2
179 byte_count = struct_count * struct.calcsize(fmt)
180 if byte_count <= 8:
181 data = self.pack(str(struct_count) + fmt, *value)
182 data += bytes(8 - byte_count)
183 else:
184 self.file.seek(0, os.SEEK_END)
185 data = self.pack("Q", self.file.tell())
186 self.write(str(count) + fmt, *value)
187 # TODO Compute and set offset_range.
188 tag = Tag(code, datatype, count, data, original_value)
189 return tag
190
191 def write_first_ifd_offset(self, offset):
192 self.file.seek(8)
193 self.write("Q", offset)
194
195 def write_tag(self, tag):
196 self.write("H H Q 8s", tag.code, tag.datatype, tag.count, tag.data)
197
198 def tag_value(self, tag):
199 """Return decoded tag data and the file offset range."""
200 fmt = datatype_formats[tag.datatype]
201 count = tag.count
202 if tag.datatype in rational_datatypes:
203 count *= 2
204 byte_count = count * struct.calcsize(fmt)
205 if byte_count <= 8:
206 value = self.unpack(str(count) + fmt, tag.data)
207 offset_range = range(0, 0)
208 else:
209 offset = self.unpack("Q", tag.data)
210 self.file.seek(offset)
211 value = self.read(str(count) + fmt)
212 offset_range = range(offset, offset + byte_count)
213 if tag.datatype == 2:
214 value = value.rstrip(b"\x00")
215 if self.encoding:
216 try:
217 value = value.decode(self.encoding)
218 except UnicodeDecodeError as e:
219 raise FormatError(f"tag {tag.code}: {e}") from None
220 elif tag.datatype in rational_datatypes:
221 value = [
222 fractions.Fraction(*v) for v in zip(value[::2], value[1::2])
223 ]
224 if len(value) == 1:
225 value = value[0]
226 return value, offset_range
227
228 def close(self):
229 self.file.close()
230
231
232 @dataclasses.dataclass(frozen=True)
233 class Tag:
234 code: int
235 datatype: int
236 count: int
237 data: bytes
238 value: Any = None
239 offset_range: range = None
240
241 _vrepr = reprlib.Repr()
242 _vrepr.maxstring = 60
243 _vrepr.maxother = 60
244 vrepr = _vrepr.repr
245
246 def __repr__(self):
247 return (
248 self.__class__.__qualname__ + "("
249 + f"code={self.code!r}, datatype={self.datatype!r}, "
250 + f"count={self.count!r}, data={self.data!r}, "
251 + f"value={self.vrepr(self.value)}"
252 + ")"
253 )
254
255
256 @dataclasses.dataclass(frozen=True)
257 class TagSet:
258 """Container for Tag objects as stored in a TIFF IFD.
259
260 Tag objects are maintained in a list that's always sorted in ascending order
261 by the tag code. Only one tag for a given code may be present, which is where
262 the "set" name comes from.
263
264 """
265
266 tags: List[Tag] = dataclasses.field(default_factory=list)
267
268 def __post_init__(self):
269 if len(self.codes) != len(set(self.codes)):
270 raise ValueError("Duplicate tag codes are not allowed.")
271
272 def __repr__(self):
273 ret = type(self).__name__ + "(["
274 if self.tags:
275 ret += "\n"
276 ret += "".join([f" {t},\n" for t in self.tags])
277 ret += "])"
278 return ret
279
280 @property
281 def codes(self):
282 return [t.code for t in self.tags]
283
284 def __getitem__(self, code):
285 for t in self.tags:
286 if code == t.code:
287 return t
288 else:
289 raise KeyError(code)
290
291 def __delitem__(self, code):
292 try:
293 i = self.codes.index(code)
294 except ValueError:
295 raise KeyError(code) from None
296 self.tags[:] = self.tags[:i] + self.tags[i + 1:]
297
298 def __contains__(self, code):
299 return code in self.codes
300
301 def __len__(self):
302 return len(self.tags)
303
304 def __iter__(self):
305 return iter(self.tags)
306
307 def get(self, code, default=None):
308 try:
309 return self[code]
310 except KeyError:
311 return default
312
313 def get_value(self, code, default=None):
314 tag = self.get(code)
315 if tag:
316 return tag.value
317 else:
318 return default
319
320 def insert(self, tag):
321 """Add a new tag or replace an existing one."""
322 for i, t in enumerate(self.tags):
323 if tag.code == t.code:
324 self.tags[i] = tag
325 return
326 elif tag.code < t.code:
327 break
328 else:
329 i = len(self.tags)
330 n = len(self.tags)
331 self.tags[i:n + 1] = [tag] + self.tags[i:n]
332
333
334 @dataclasses.dataclass(frozen=True)
335 class Ifd:
336 tags: TagSet
337 offset: int
338 offset_next: int
339
340 @property
341 def nbytes(self):
342 return len(self.tags) * 20 + 16
343
344 @property
345 def offset_range(self):
346 return range(self.offset, self.offset + self.nbytes)
347
348
349 class FormatError(Exception):
350 pass
351
352
353 def fix_attrib_namespace(elt):
354 """Prefix un-namespaced XML attributes with the tag's namespace."""
355 # This fixes ElementTree's inability to round-trip XML with a default
356 # namespace ("cannot use non-qualified names with default_namespace option"
357 # error). 7-year-old BPO issue here: https://bugs.python.org/issue17088
358 # Code inspired by https://gist.github.com/provegard/1381912 .
359 if elt.tag[0] == "{":
360 uri, _ = elt.tag[1:].rsplit("}", 1)
361 new_attrib = {}
362 for name, value in elt.attrib.items():
363 if name[0] != "{":
364 # For un-namespaced attributes, copy namespace from element.
365 name = f"{{{uri}}}{name}"
366 new_attrib[name] = value
367 elt.attrib = new_attrib
368 for child in elt:
369 fix_attrib_namespace(child)
370
371
372 def parse_args():
373 parser = argparse.ArgumentParser(
374 description="Convert an OME-TIFF legacy pyramid to the BioFormats 6"
375 " OME-TIFF pyramid format in-place.",
376 )
377 parser.add_argument("image", help="OME-TIFF file to convert")
378 parser.add_argument(
379 "-n",
380 dest="channel_names",
381 nargs="+",
382 default=[],
383 metavar="NAME",
384 help="Channel names to be inserted into OME metadata. Number of names"
385 " must match number of channels in image. Be sure to put quotes"
386 " around names containing spaces or other special shell characters."
387 )
388 args = parser.parse_args()
389 return args
390
391
392 def main():
393
394 args = parse_args()
395
396 image_path = sys.argv[1]
397 try:
398 tiff = TiffSurgeon(image_path, encoding="utf-8", writeable=True)
399 except FormatError as e:
400 print(f"TIFF format error: {e}")
401 sys.exit(1)
402
403 tiff.read_ifds()
404
405 # ElementTree doesn't parse xml declarations so we'll just run some sanity
406 # checks that we do have UTF-8 and give it a decoded string instead of raw
407 # bytes. We need to both ensure that the raw tag bytes decode properly and
408 # that the declaration encoding is UTF-8 if present.
409 try:
410 omexml = tiff.ifds[0].tags.get_value(270, "")
411 except FormatError:
412 print("ImageDescription tag is not a valid UTF-8 string (not an OME-TIFF?)")
413 sys.exit(1)
414 if re.match(r'<\?xml [^>]*encoding="(?!UTF-8)[^"]*"', omexml):
415 print("OME-XML is encoded with something other than UTF-8.")
416 sys.exit(1)
417
418 xml_ns = {"ome": "http://www.openmicroscopy.org/Schemas/OME/2016-06"}
419
420 if xml_ns["ome"] not in omexml:
421 print("Not an OME-TIFF.")
422 sys.exit(1)
423 if (
424 "Faas" not in tiff.ifds[0].tags.get_value(305, "")
425 or 330 in tiff.ifds[0].tags
426 ):
427 print("Not a legacy OME-TIFF pyramid.")
428 sys.exit(1)
429
430 # All XML manipulation assumes the document is valid OME-XML!
431 root = xml.etree.ElementTree.fromstring(omexml)
432 image = root.find("ome:Image", xml_ns)
433 pixels = image.find("ome:Pixels", xml_ns)
434 size_x = int(pixels.get("SizeX"))
435 size_y = int(pixels.get("SizeY"))
436 size_c = int(pixels.get("SizeC"))
437 size_z = int(pixels.get("SizeZ"))
438 size_t = int(pixels.get("SizeT"))
439 num_levels = len(root.findall("ome:Image", xml_ns))
440 page_dims = [(ifd.tags[256].value, ifd.tags[257].value) for ifd in tiff.ifds]
441
442 if len(root) != num_levels:
443 print("Top-level OME-XML elements other than Image are not supported.")
444 if size_z != 1 or size_t != 1:
445 print("Z-stacks and multiple timepoints are not supported.")
446 sys.exit(1)
447 if size_c * num_levels != len(tiff.ifds):
448 print("TIFF page count does not match OME-XML Image elements.")
449 sys.exit(1)
450 if any(dims != (size_x, size_y) for dims in page_dims[:size_c]):
451 print(f"TIFF does not begin with SizeC={size_c} full-size pages.")
452 sys.exit(1)
453 for level in range(1, num_levels):
454 level_dims = page_dims[level * size_c: (level + 1) * size_c]
455 if len(set(level_dims)) != 1:
456 print(
457 f"Pyramid level {level + 1} out of {num_levels} has inconsistent"
458 f" sizes:\n{level_dims}"
459 )
460 sys.exit(1)
461 if args.channel_names and len(args.channel_names) != size_c:
462 print(
463 f"Wrong number of channel names -- image has {size_c} channels but"
464 f" {len(args.channel_names)} names were specified:"
465 )
466 for i, n in enumerate(args.channel_names, 1):
467 print(f"{i:4}: {n}")
468 sys.exit(1)
469
470 print("Input image summary")
471 print("===================")
472 print(f"Dimensions: {size_x} x {size_y}")
473 print(f"Number of channels: {size_c}")
474 print(f"Pyramid sub-resolutions ({num_levels - 1} total):")
475 for dim_x, dim_y in page_dims[size_c::size_c]:
476 print(f" {dim_x} x {dim_y}")
477 software = tiff.ifds[0].tags.get_value(305, "<not set>")
478 print(f"Software: {software}")
479 print()
480
481 print("Updating OME-XML metadata...")
482 # We already verified there is nothing but Image elements under the root.
483 for other_image in root[1:]:
484 root.remove(other_image)
485 for tiffdata in pixels.findall("ome:TiffData", xml_ns):
486 pixels.remove(tiffdata)
487 new_tiffdata = xml.etree.ElementTree.Element(
488 f"{{{xml_ns['ome']}}}TiffData",
489 attrib={"IFD": "0", "PlaneCount": str(size_c)},
490 )
491 # A valid OME-XML Pixels begins with size_c Channels; then comes TiffData.
492 pixels.insert(size_c, new_tiffdata)
493
494 if args.channel_names:
495 print("Renaming channels...")
496 channels = pixels.findall("ome:Channel", xml_ns)
497 for channel, name in zip(channels, args.channel_names):
498 channel.attrib["Name"] = name
499
500 fix_attrib_namespace(root)
501 # ElementTree.tostring would have been simpler but it only supports
502 # xml_declaration and default_namespace starting with Python 3.8.
503 xml_file = io.BytesIO()
504 tree = xml.etree.ElementTree.ElementTree(root)
505 tree.write(
506 xml_file,
507 encoding="utf-8",
508 xml_declaration=True,
509 default_namespace=xml_ns["ome"],
510 )
511 new_omexml = xml_file.getvalue()
512
513 print("Writing new TIFF headers...")
514 stale_ranges = [ifd.offset_range for ifd in tiff.ifds]
515 main_ifds = tiff.ifds[:size_c]
516 channel_sub_ifds = [tiff.ifds[c + size_c::size_c] for c in range(size_c)]
517 for i, (main_ifd, sub_ifds) in enumerate(zip(main_ifds, channel_sub_ifds)):
518 for ifd in sub_ifds:
519 if 305 in ifd.tags:
520 stale_ranges.append(ifd.tags[305].offset_range)
521 del ifd.tags[305]
522 ifd.tags.insert(tiff.append_tag_data(254, 3, 1))
523 if i == 0:
524 stale_ranges.append(main_ifd.tags[305].offset_range)
525 stale_ranges.append(main_ifd.tags[270].offset_range)
526 old_software = main_ifd.tags[305].value.replace("Faas", "F*a*a*s")
527 new_software = f"pyramid_upgrade.py (was {old_software})"
528 main_ifd.tags.insert(tiff.append_tag_data(305, 2, new_software))
529 main_ifd.tags.insert(tiff.append_tag_data(270, 2, new_omexml))
530 else:
531 if 305 in main_ifd.tags:
532 stale_ranges.append(main_ifd.tags[305].offset_range)
533 del main_ifd.tags[305]
534 sub_ifds[:] = tiff.append_ifd_sequence(sub_ifds)
535 offsets = [ifd.offset for ifd in sub_ifds]
536 main_ifd.tags.insert(tiff.append_tag_data(330, 16, offsets))
537 main_ifds = tiff.append_ifd_sequence(main_ifds)
538 tiff.write_first_ifd_offset(main_ifds[0].offset)
539
540 print("Clearing old headers and tag values...")
541 # We overwrite all the old IFDs and referenced data values with obvious
542 # "filler" as a courtesy to anyone who might need to poke around in the TIFF
543 # structure down the road. A real TIFF parser wouldn't see the stale data,
544 # but a human might just scan for the first thing that looks like a run of
545 # OME-XML and not realize it's been replaced with something else. The filler
546 # content is the repeated string "unused " with square brackets at the
547 # beginning and end of each filled IFD or data value.
548 filler = b"unused "
549 f_len = len(filler)
550 for r in stale_ranges:
551 tiff.file.seek(r.start)
552 tiff.file.write(b"[")
553 f_total = len(r) - 2
554 for i in range(f_total // f_len):
555 tiff.file.write(filler)
556 tiff.file.write(b" " * (f_total % f_len))
557 tiff.file.write(b"]")
558
559 tiff.close()
560
561 print()
562 print("Success!")
563
564
565 if __name__ == "__main__":
566 main()