comparison env/lib/python3.9/site-packages/cwltool/provenance.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Stores Research Object including provenance."""
2
3 import copy
4 import datetime
5 import hashlib
6 import os
7 import re
8 import shutil
9 import tempfile
10 import uuid
11 from array import array
12 from collections import OrderedDict
13 from getpass import getuser
14 from io import FileIO, TextIOWrapper
15 from mmap import mmap
16 from pathlib import Path, PurePosixPath
17 from typing import (
18 IO,
19 Any,
20 BinaryIO,
21 Callable,
22 Dict,
23 List,
24 MutableMapping,
25 MutableSequence,
26 Optional,
27 Set,
28 Tuple,
29 Union,
30 cast,
31 )
32
33 import prov.model as provM
34 from prov.model import PROV, ProvDocument
35 from schema_salad.utils import json_dumps
36 from typing_extensions import TYPE_CHECKING, TypedDict
37
38 from .loghandler import _logger
39 from .provenance_constants import (
40 ACCOUNT_UUID,
41 CWLPROV,
42 CWLPROV_VERSION,
43 DATA,
44 ENCODING,
45 FOAF,
46 LOGS,
47 METADATA,
48 ORCID,
49 PROVENANCE,
50 SHA1,
51 SHA256,
52 SHA512,
53 SNAPSHOT,
54 TEXT_PLAIN,
55 USER_UUID,
56 UUID,
57 WORKFLOW,
58 Hasher,
59 )
60 from .stdfsaccess import StdFsAccess
61 from .utils import (
62 CWLObjectType,
63 CWLOutputType,
64 create_tmp_dir,
65 local_path,
66 onWindows,
67 posix_path,
68 versionstring,
69 )
70
71 # imports needed for retrieving user data
72 if onWindows():
73 import ctypes # pylint: disable=unused-import
74 else:
75 try:
76 import pwd # pylint: disable=unused-import
77 except ImportError:
78 pass
79
80 if TYPE_CHECKING:
81 from .command_line_tool import ( # pylint: disable=unused-import
82 CommandLineTool,
83 ExpressionTool,
84 )
85 from .workflow import Workflow # pylint: disable=unused-import
86
87
88 def _whoami() -> Tuple[str, str]:
89 """Return the current operating system account as (username, fullname)."""
90 username = getuser()
91 try:
92 if onWindows():
93 get_user_name = ctypes.windll.secur32.GetUserNameExW # type: ignore
94 size = ctypes.pointer(ctypes.c_ulong(0))
95 get_user_name(3, None, size)
96
97 name_buffer = ctypes.create_unicode_buffer(size.contents.value)
98 get_user_name(3, name_buffer, size)
99 fullname = str(name_buffer.value)
100 else:
101 fullname = pwd.getpwuid(os.getuid())[4].split(",")[0]
102 except (KeyError, IndexError):
103 fullname = username
104
105 return (username, fullname)
106
107
108 class WritableBagFile(FileIO):
109 """Writes files in research object."""
110
111 def __init__(self, research_object: "ResearchObject", rel_path: str) -> None:
112 """Initialize an ROBagIt."""
113 self.research_object = research_object
114 if Path(rel_path).is_absolute():
115 raise ValueError("rel_path must be relative: %s" % rel_path)
116 self.rel_path = rel_path
117 self.hashes = {
118 SHA1: hashlib.sha1(), # nosec
119 SHA256: hashlib.sha256(),
120 SHA512: hashlib.sha512(),
121 }
122 # Open file in Research Object folder
123 path = os.path.abspath(
124 os.path.join(research_object.folder, local_path(rel_path))
125 )
126 if not path.startswith(os.path.abspath(research_object.folder)):
127 raise ValueError("Path is outside Research Object: %s" % path)
128 _logger.debug("[provenance] Creating WritableBagFile at %s.", path)
129 super().__init__(path, mode="w")
130
131 def write(self, b: Any) -> int:
132 """Write some content to the Bag."""
133 real_b = b if isinstance(b, (bytes, mmap, array)) else b.encode("utf-8")
134 total = 0
135 length = len(real_b)
136 while total < length:
137 ret = super().write(real_b)
138 if ret:
139 total += ret
140 for val in self.hashes.values():
141 # print("Updating hasher %s ", val)
142 val.update(real_b)
143 return total
144
145 def close(self) -> None:
146 # FIXME: Convert below block to a ResearchObject method?
147 if self.rel_path.startswith("data/"):
148 self.research_object.bagged_size[self.rel_path] = self.tell()
149 else:
150 self.research_object.tagfiles.add(self.rel_path)
151
152 super().close()
153 # { "sha1": "f572d396fae9206628714fb2ce00f72e94f2258f" }
154 checksums = {}
155 for name in self.hashes:
156 checksums[name] = self.hashes[name].hexdigest().lower()
157 self.research_object.add_to_manifest(self.rel_path, checksums)
158
159 # To simplify our hash calculation we won't support
160 # seeking, reading or truncating, as we can't do
161 # similar seeks in the current hash.
162 # TODO: Support these? At the expense of invalidating
163 # the current hash, then having to recalculate at close()
164 def seekable(self) -> bool:
165 return False
166
167 def readable(self) -> bool:
168 return False
169
170 def truncate(self, size: Optional[int] = None) -> int:
171 # FIXME: This breaks contract IOBase,
172 # as it means we would have to recalculate the hash
173 if size is not None:
174 raise OSError("WritableBagFile can't truncate")
175 return self.tell()
176
177
178 def _check_mod_11_2(numeric_string: str) -> bool:
179 """
180 Validate numeric_string for its MOD-11-2 checksum.
181
182 Any "-" in the numeric_string are ignored.
183
184 The last digit of numeric_string is assumed to be the checksum, 0-9 or X.
185
186 See ISO/IEC 7064:2003 and
187 https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier
188 """
189 # Strip -
190 nums = numeric_string.replace("-", "")
191 total = 0
192 # skip last (check)digit
193 for num in nums[:-1]:
194 digit = int(num)
195 total = (total + digit) * 2
196 remainder = total % 11
197 result = (12 - remainder) % 11
198 if result == 10:
199 checkdigit = "X"
200 else:
201 checkdigit = str(result)
202 # Compare against last digit or X
203 return nums[-1].upper() == checkdigit
204
205
206 def _valid_orcid(orcid: Optional[str]) -> str:
207 """
208 Ensure orcid is a valid ORCID identifier.
209
210 The string must be equivalent to one of these forms:
211
212 0000-0002-1825-0097
213 orcid.org/0000-0002-1825-0097
214 http://orcid.org/0000-0002-1825-0097
215 https://orcid.org/0000-0002-1825-0097
216
217 If the ORCID number or prefix is invalid, a ValueError is raised.
218
219 The returned ORCID string is always in the form of:
220 https://orcid.org/0000-0002-1825-0097
221 """
222 if orcid is None or not orcid:
223 raise ValueError("ORCID cannot be unspecified")
224 # Liberal in what we consume, e.g. ORCID.org/0000-0002-1825-009x
225 orcid = orcid.lower()
226 match = re.match(
227 # Note: concatinated r"" r"" below so we can add comments to pattern
228 # Optional hostname, with or without protocol
229 r"(http://orcid\.org/|https://orcid\.org/|orcid\.org/)?"
230 # alternative pattern, but probably messier
231 # r"^((https?://)?orcid.org/)?"
232 # ORCID number is always 4x4 numerical digits,
233 # but last digit (modulus 11 checksum)
234 # can also be X (but we made it lowercase above).
235 # e.g. 0000-0002-1825-0097
236 # or 0000-0002-1694-233x
237 r"(?P<orcid>(\d{4}-\d{4}-\d{4}-\d{3}[0-9x]))$",
238 orcid,
239 )
240
241 help_url = (
242 "https://support.orcid.org/knowledgebase/articles/"
243 "116780-structure-of-the-orcid-identifier"
244 )
245 if not match:
246 raise ValueError(f"Invalid ORCID: {orcid}\n{help_url}")
247
248 # Conservative in what we produce:
249 # a) Ensure any checksum digit is uppercase
250 orcid_num = match.group("orcid").upper()
251 # b) ..and correct
252 if not _check_mod_11_2(orcid_num):
253 raise ValueError(f"Invalid ORCID checksum: {orcid_num}\n{help_url}")
254
255 # c) Re-add the official prefix https://orcid.org/
256 return "https://orcid.org/%s" % orcid_num
257
258
259 Annotation = TypedDict(
260 "Annotation",
261 {
262 "uri": str,
263 "about": str,
264 "content": Optional[Union[str, List[str]]],
265 "oa:motivatedBy": Dict[str, str],
266 },
267 )
268 Aggregate = TypedDict(
269 "Aggregate",
270 {
271 "uri": Optional[str],
272 "bundledAs": Optional[Dict[str, Any]],
273 "mediatype": Optional[str],
274 "conformsTo": Optional[Union[str, List[str]]],
275 "createdOn": Optional[str],
276 "createdBy": Optional[Dict[str, str]],
277 },
278 total=False,
279 )
280 # Aggregate.bundledAs is actually type Aggregate, but cyclic definitions are not suported
281 AuthoredBy = TypedDict(
282 "AuthoredBy",
283 {"orcid": Optional[str], "name": Optional[str], "uri": Optional[str]},
284 total=False,
285 )
286
287
288 class ResearchObject:
289 """CWLProv Research Object."""
290
291 def __init__(
292 self,
293 fsaccess: StdFsAccess,
294 temp_prefix_ro: str = "tmp",
295 orcid: str = "",
296 full_name: str = "",
297 ) -> None:
298 """Initialize the ResearchObject."""
299 self.temp_prefix = temp_prefix_ro
300 self.orcid = "" if not orcid else _valid_orcid(orcid)
301 self.full_name = full_name
302 self.folder = create_tmp_dir(temp_prefix_ro)
303 self.closed = False
304 # map of filename "data/de/alsdklkas": 12398123 bytes
305 self.bagged_size = {} # type: Dict[str, int]
306 self.tagfiles = set() # type: Set[str]
307 self._file_provenance = {} # type: Dict[str, Aggregate]
308 self._external_aggregates = [] # type: List[Aggregate]
309 self.annotations = [] # type: List[Annotation]
310 self._content_types = {} # type: Dict[str,str]
311 self.fsaccess = fsaccess
312 # These should be replaced by generate_prov_doc when workflow/run IDs are known:
313 self.engine_uuid = "urn:uuid:%s" % uuid.uuid4()
314 self.ro_uuid = uuid.uuid4()
315 self.base_uri = "arcp://uuid,%s/" % self.ro_uuid
316 self.cwltool_version = "cwltool %s" % versionstring().split()[-1]
317 ##
318 self.relativised_input_object = {} # type: CWLObjectType
319
320 self._initialize()
321 _logger.debug("[provenance] Temporary research object: %s", self.folder)
322
323 def self_check(self) -> None:
324 """Raise ValueError if this RO is closed."""
325 if self.closed:
326 raise ValueError(
327 "This ResearchObject has already been closed and is not "
328 "available for futher manipulation."
329 )
330
331 def __str__(self) -> str:
332 """Represent this RO as a string."""
333 return f"ResearchObject <{self.ro_uuid}> in <{self.folder}>"
334
335 def _initialize(self) -> None:
336 for research_obj_folder in (
337 METADATA,
338 DATA,
339 WORKFLOW,
340 SNAPSHOT,
341 PROVENANCE,
342 LOGS,
343 ):
344 os.makedirs(os.path.join(self.folder, research_obj_folder))
345 self._initialize_bagit()
346
347 def _initialize_bagit(self) -> None:
348 """Write fixed bagit header."""
349 self.self_check()
350 bagit = os.path.join(self.folder, "bagit.txt")
351 # encoding: always UTF-8 (although ASCII would suffice here)
352 # newline: ensure LF also on Windows
353 with open(bagit, "w", encoding=ENCODING, newline="\n") as bag_it_file:
354 # TODO: \n or \r\n ?
355 bag_it_file.write("BagIt-Version: 0.97\n")
356 bag_it_file.write("Tag-File-Character-Encoding: %s\n" % ENCODING)
357
358 def open_log_file_for_activity(
359 self, uuid_uri: str
360 ) -> Union[TextIOWrapper, WritableBagFile]:
361 self.self_check()
362 # Ensure valid UUID for safe filenames
363 activity_uuid = uuid.UUID(uuid_uri)
364 if activity_uuid.urn == self.engine_uuid:
365 # It's the engine aka cwltool!
366 name = "engine"
367 else:
368 name = "activity"
369 p = os.path.join(LOGS, f"{name}.{activity_uuid}.txt")
370 _logger.debug(f"[provenance] Opening log file for {name}: {p}")
371 self.add_annotation(activity_uuid.urn, [p], CWLPROV["log"].uri)
372 return self.write_bag_file(p)
373
374 def _finalize(self) -> None:
375 self._write_ro_manifest()
376 self._write_bag_info()
377
378 def user_provenance(self, document: ProvDocument) -> None:
379 """Add the user provenance."""
380 self.self_check()
381 (username, fullname) = _whoami()
382
383 if not self.full_name:
384 self.full_name = fullname
385
386 document.add_namespace(UUID)
387 document.add_namespace(ORCID)
388 document.add_namespace(FOAF)
389 account = document.agent(
390 ACCOUNT_UUID,
391 {
392 provM.PROV_TYPE: FOAF["OnlineAccount"],
393 "prov:label": username,
394 FOAF["accountName"]: username,
395 },
396 )
397
398 user = document.agent(
399 self.orcid or USER_UUID,
400 {
401 provM.PROV_TYPE: PROV["Person"],
402 "prov:label": self.full_name,
403 FOAF["name"]: self.full_name,
404 FOAF["account"]: account,
405 },
406 )
407 # cwltool may be started on the shell (directly by user),
408 # by shell script (indirectly by user)
409 # or from a different program
410 # (which again is launched by any of the above)
411 #
412 # We can't tell in which way, but ultimately we're still
413 # acting in behalf of that user (even if we might
414 # get their name wrong!)
415 document.actedOnBehalfOf(account, user)
416
417 def write_bag_file(
418 self, path: str, encoding: Optional[str] = ENCODING
419 ) -> Union[TextIOWrapper, WritableBagFile]:
420 """Write the bag file into our research object."""
421 self.self_check()
422 # For some reason below throws BlockingIOError
423 # fp = BufferedWriter(WritableBagFile(self, path))
424 bag_file = WritableBagFile(self, path)
425 if encoding is not None:
426 # encoding: match Tag-File-Character-Encoding: UTF-8
427 # newline: ensure LF also on Windows
428 return TextIOWrapper(
429 cast(BinaryIO, bag_file), encoding=encoding, newline="\n"
430 )
431 return bag_file
432
433 def add_tagfile(
434 self, path: str, timestamp: Optional[datetime.datetime] = None
435 ) -> None:
436 """Add tag files to our research object."""
437 self.self_check()
438 checksums = {}
439 # Read file to calculate its checksum
440 if os.path.isdir(path):
441 return
442 # FIXME: do the right thing for directories
443 with open(path, "rb") as tag_file:
444 # FIXME: Should have more efficient open_tagfile() that
445 # does all checksums in one go while writing through,
446 # adding checksums after closing.
447 # Below probably OK for now as metadata files
448 # are not too large..?
449
450 checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1)
451
452 tag_file.seek(0)
453 checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256)
454
455 tag_file.seek(0)
456 checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512)
457
458 rel_path = posix_path(os.path.relpath(path, self.folder))
459 self.tagfiles.add(rel_path)
460 self.add_to_manifest(rel_path, checksums)
461 if timestamp is not None:
462 self._file_provenance[rel_path] = {
463 "createdOn": timestamp.isoformat(),
464 "uri": None,
465 "bundledAs": None,
466 "mediatype": None,
467 "conformsTo": None,
468 }
469
470 def _ro_aggregates(self) -> List[Aggregate]:
471 """Gather dictionary of files to be added to the manifest."""
472
473 def guess_mediatype(
474 rel_path: str,
475 ) -> Tuple[Optional[str], Optional[Union[str, List[str]]]]:
476 """Return the mediatypes."""
477 media_types = {
478 # Adapted from
479 # https://w3id.org/bundle/2014-11-05/#media-types
480 "txt": TEXT_PLAIN,
481 "ttl": 'text/turtle; charset="UTF-8"',
482 "rdf": "application/rdf+xml",
483 "json": "application/json",
484 "jsonld": "application/ld+json",
485 "xml": "application/xml",
486 ##
487 "cwl": 'text/x+yaml; charset="UTF-8"',
488 "provn": 'text/provenance-notation; charset="UTF-8"',
489 "nt": "application/n-triples",
490 } # type: Dict[str, str]
491 conforms_to = {
492 "provn": "http://www.w3.org/TR/2013/REC-prov-n-20130430/",
493 "cwl": "https://w3id.org/cwl/",
494 } # type: Dict[str, str]
495
496 prov_conforms_to = {
497 "provn": "http://www.w3.org/TR/2013/REC-prov-n-20130430/",
498 "rdf": "http://www.w3.org/TR/2013/REC-prov-o-20130430/",
499 "ttl": "http://www.w3.org/TR/2013/REC-prov-o-20130430/",
500 "nt": "http://www.w3.org/TR/2013/REC-prov-o-20130430/",
501 "jsonld": "http://www.w3.org/TR/2013/REC-prov-o-20130430/",
502 "xml": "http://www.w3.org/TR/2013/NOTE-prov-xml-20130430/",
503 "json": "http://www.w3.org/Submission/2013/SUBM-prov-json-20130424/",
504 } # type: Dict[str, str]
505
506 extension = rel_path.rsplit(".", 1)[-1].lower() # type: Optional[str]
507 if extension == rel_path:
508 # No ".", no extension
509 extension = None
510
511 mediatype = None # type: Optional[str]
512 conformsTo = None # type: Optional[Union[str, List[str]]]
513 if extension in media_types:
514 mediatype = media_types[extension]
515
516 if extension in conforms_to:
517 # TODO: Open CWL file to read its declared "cwlVersion", e.g.
518 # cwlVersion = "v1.0"
519 conformsTo = conforms_to[extension]
520
521 if (
522 rel_path.startswith(posix_path(PROVENANCE))
523 and extension in prov_conforms_to
524 ):
525 if ".cwlprov" in rel_path:
526 # Our own!
527 conformsTo = [
528 prov_conforms_to[extension],
529 CWLPROV_VERSION,
530 ]
531 else:
532 # Some other PROV
533 # TODO: Recognize ProvOne etc.
534 conformsTo = prov_conforms_to[extension]
535 return (mediatype, conformsTo)
536
537 aggregates = [] # type: List[Aggregate]
538 for path in self.bagged_size.keys():
539
540 temp_path = PurePosixPath(path)
541 folder = temp_path.parent
542 filename = temp_path.name
543
544 # NOTE: Here we end up aggregating the abstract
545 # data items by their sha1 hash, so that it matches
546 # the entity() in the prov files.
547
548 # TODO: Change to nih:sha-256; hashes
549 # https://tools.ietf.org/html/rfc6920#section-7
550 aggregate_dict = {
551 "uri": "urn:hash::sha1:" + filename,
552 "bundledAs": {
553 # The arcp URI is suitable ORE proxy; local to this Research Object.
554 # (as long as we don't also aggregate it by relative path!)
555 "uri": self.base_uri + path,
556 # relate it to the data/ path
557 "folder": "/%s/" % folder,
558 "filename": filename,
559 },
560 } # type: Aggregate
561 if path in self._file_provenance:
562 # Made by workflow run, merge captured provenance
563 bundledAs = aggregate_dict["bundledAs"]
564 if bundledAs:
565 bundledAs.update(self._file_provenance[path])
566 else:
567 aggregate_dict["bundledAs"] = cast(
568 Optional[Dict[str, Any]], self._file_provenance[path]
569 )
570 else:
571 # Probably made outside wf run, part of job object?
572 pass
573 if path in self._content_types:
574 aggregate_dict["mediatype"] = self._content_types[path]
575
576 aggregates.append(aggregate_dict)
577
578 for path in self.tagfiles:
579 if not (
580 path.startswith(METADATA)
581 or path.startswith(WORKFLOW)
582 or path.startswith(SNAPSHOT)
583 ):
584 # probably a bagit file
585 continue
586 if path == str(PurePosixPath(METADATA) / "manifest.json"):
587 # Should not really be there yet! But anyway, we won't
588 # aggregate it.
589 continue
590
591 # These are local paths like metadata/provenance - but
592 # we need to relativize them for our current directory for
593 # as we are saved in metadata/manifest.json
594 mediatype, conformsTo = guess_mediatype(path)
595 rel_aggregates = {
596 "uri": str(Path(os.pardir) / path),
597 "mediatype": mediatype,
598 "conformsTo": conformsTo,
599 } # type: Aggregate
600
601 if path in self._file_provenance:
602 # Propagate file provenance (e.g. timestamp)
603 rel_aggregates.update(self._file_provenance[path])
604 elif not path.startswith(SNAPSHOT):
605 # make new timestamp?
606 (
607 rel_aggregates["createdOn"],
608 rel_aggregates["createdBy"],
609 ) = self._self_made()
610 aggregates.append(rel_aggregates)
611 aggregates.extend(self._external_aggregates)
612 return aggregates
613
614 def add_uri(
615 self, uri: str, timestamp: Optional[datetime.datetime] = None
616 ) -> Aggregate:
617 self.self_check()
618 aggr = {"uri": uri} # type: Aggregate
619 aggr["createdOn"], aggr["createdBy"] = self._self_made(timestamp=timestamp)
620 self._external_aggregates.append(aggr)
621 return aggr
622
623 def add_annotation(
624 self, about: str, content: List[str], motivated_by: str = "oa:describing"
625 ) -> str:
626 """Cheap URI relativize for current directory and /."""
627 self.self_check()
628 curr = self.base_uri + METADATA + "/"
629 content = [c.replace(curr, "").replace(self.base_uri, "../") for c in content]
630 uri = uuid.uuid4().urn
631 ann = {
632 "uri": uri,
633 "about": about,
634 "content": content,
635 "oa:motivatedBy": {"@id": motivated_by},
636 } # type: Annotation
637 self.annotations.append(ann)
638 return uri
639
640 def _ro_annotations(self) -> List[Annotation]:
641 annotations = [] # type: List[Annotation]
642 annotations.append(
643 {
644 "uri": uuid.uuid4().urn,
645 "about": self.ro_uuid.urn,
646 "content": "/",
647 # https://www.w3.org/TR/annotation-vocab/#named-individuals
648 "oa:motivatedBy": {"@id": "oa:describing"},
649 }
650 )
651
652 # How was it run?
653 # FIXME: Only primary*
654 prov_files = [
655 str(PurePosixPath(p).relative_to(METADATA))
656 for p in self.tagfiles
657 if p.startswith(posix_path(PROVENANCE)) and "/primary." in p
658 ]
659 annotations.append(
660 {
661 "uri": uuid.uuid4().urn,
662 "about": self.ro_uuid.urn,
663 "content": prov_files,
664 # Modulation of https://www.w3.org/TR/prov-aq/
665 "oa:motivatedBy": {"@id": "http://www.w3.org/ns/prov#has_provenance"},
666 }
667 )
668
669 # Where is the main workflow?
670 annotations.append(
671 {
672 "uri": uuid.uuid4().urn,
673 "about": str(PurePosixPath("..") / WORKFLOW / "packed.cwl"),
674 "content": None,
675 "oa:motivatedBy": {"@id": "oa:highlighting"},
676 }
677 )
678
679 annotations.append(
680 {
681 "uri": uuid.uuid4().urn,
682 "about": self.ro_uuid.urn,
683 "content": [
684 str(PurePosixPath("..") / WORKFLOW / "packed.cwl"),
685 str(PurePosixPath("..") / WORKFLOW / "primary-job.json"),
686 ],
687 "oa:motivatedBy": {"@id": "oa:linking"},
688 }
689 )
690 # Add user-added annotations at end
691 annotations.extend(self.annotations)
692 return annotations
693
694 def _authored_by(self) -> Optional[AuthoredBy]:
695 authored_by = {} # type: AuthoredBy
696 if self.orcid:
697 authored_by["orcid"] = self.orcid
698 if self.full_name:
699 authored_by["name"] = self.full_name
700 if not self.orcid:
701 authored_by["uri"] = USER_UUID
702
703 if authored_by:
704 return authored_by
705 return None
706
707 def _write_ro_manifest(self) -> None:
708
709 # Does not have to be this order, but it's nice to be consistent
710 filename = "manifest.json"
711 createdOn, createdBy = self._self_made()
712 manifest = OrderedDict(
713 {
714 "@context": [
715 {"@base": "{}{}/".format(self.base_uri, posix_path(METADATA))},
716 "https://w3id.org/bundle/context",
717 ],
718 "id": "/",
719 "conformsTo": CWLPROV_VERSION,
720 "manifest": filename,
721 "createdOn": createdOn,
722 "createdBy": createdBy,
723 "authoredBy": self._authored_by(),
724 "aggregates": self._ro_aggregates(),
725 "annotations": self._ro_annotations(),
726 }
727 )
728
729 json_manifest = json_dumps(manifest, indent=4, ensure_ascii=False)
730 rel_path = str(PurePosixPath(METADATA) / filename)
731 json_manifest += "\n"
732 with self.write_bag_file(rel_path) as manifest_file:
733 manifest_file.write(json_manifest)
734
735 def _write_bag_info(self) -> None:
736
737 with self.write_bag_file("bag-info.txt") as info_file:
738 info_file.write("Bag-Software-Agent: %s\n" % self.cwltool_version)
739 # FIXME: require sha-512 of payload to comply with profile?
740 # FIXME: Update profile
741 info_file.write(
742 "BagIt-Profile-Identifier: https://w3id.org/ro/bagit/profile\n"
743 )
744 info_file.write("Bagging-Date: %s\n" % datetime.date.today().isoformat())
745 info_file.write(
746 "External-Description: Research Object of CWL workflow run\n"
747 )
748 if self.full_name:
749 info_file.write("Contact-Name: %s\n" % self.full_name)
750
751 # NOTE: We can't use the urn:uuid:{UUID} of the workflow run (a prov:Activity)
752 # as identifier for the RO/bagit (a prov:Entity). However the arcp base URI is good.
753 info_file.write("External-Identifier: %s\n" % self.base_uri)
754
755 # Calculate size of data/ (assuming no external fetch.txt files)
756 total_size = sum(self.bagged_size.values())
757 num_files = len(self.bagged_size)
758 info_file.write("Payload-Oxum: %d.%d\n" % (total_size, num_files))
759 _logger.debug("[provenance] Generated bagit metadata: %s", self.folder)
760
761 def generate_snapshot(self, prov_dep: CWLObjectType) -> None:
762 """Copy all of the CWL files to the snapshot/ directory."""
763 self.self_check()
764 for key, value in prov_dep.items():
765 if key == "location" and cast(str, value).split("/")[-1]:
766 location = cast(str, value)
767 filename = location.split("/")[-1]
768 path = os.path.join(self.folder, SNAPSHOT, filename)
769 filepath = ""
770 if "file://" in location:
771 filepath = location[7:]
772 else:
773 filepath = location
774
775 # FIXME: What if destination path already exists?
776 if os.path.exists(filepath):
777 try:
778 if os.path.isdir(filepath):
779 shutil.copytree(filepath, path)
780 else:
781 shutil.copy(filepath, path)
782 timestamp = datetime.datetime.fromtimestamp(
783 os.path.getmtime(filepath)
784 )
785 self.add_tagfile(path, timestamp)
786 except PermissionError:
787 pass # FIXME: avoids duplicate snapshotting; need better solution
788 elif key in ("secondaryFiles", "listing"):
789 for files in cast(MutableSequence[CWLObjectType], value):
790 if isinstance(files, MutableMapping):
791 self.generate_snapshot(files)
792 else:
793 pass
794
795 def packed_workflow(self, packed: str) -> None:
796 """Pack CWL description to generate re-runnable CWL object in RO."""
797 self.self_check()
798 rel_path = str(PurePosixPath(WORKFLOW) / "packed.cwl")
799 # Write as binary
800 with self.write_bag_file(rel_path, encoding=None) as write_pack:
801 write_pack.write(packed)
802 _logger.debug("[provenance] Added packed workflow: %s", rel_path)
803
804 def has_data_file(self, sha1hash: str) -> bool:
805 """Confirm the presence of the given file in the RO."""
806 folder = os.path.join(self.folder, DATA, sha1hash[0:2])
807 hash_path = os.path.join(folder, sha1hash)
808 return os.path.isfile(hash_path)
809
810 def add_data_file(
811 self,
812 from_fp: IO[Any],
813 timestamp: Optional[datetime.datetime] = None,
814 content_type: Optional[str] = None,
815 ) -> str:
816 """Copy inputs to data/ folder."""
817 self.self_check()
818 tmp_dir, tmp_prefix = os.path.split(self.temp_prefix)
819 with tempfile.NamedTemporaryFile(
820 prefix=tmp_prefix, dir=tmp_dir, delete=False
821 ) as tmp:
822 checksum = checksum_copy(from_fp, tmp)
823
824 # Calculate hash-based file path
825 folder = os.path.join(self.folder, DATA, checksum[0:2])
826 path = os.path.join(folder, checksum)
827 # os.rename assumed safe, as our temp file should
828 # be in same file system as our temp folder
829 if not os.path.isdir(folder):
830 os.makedirs(folder)
831 os.rename(tmp.name, path)
832
833 # Relative posix path
834 # (to avoid \ on Windows)
835 rel_path = posix_path(os.path.relpath(path, self.folder))
836
837 # Register in bagit checksum
838 if Hasher == hashlib.sha1:
839 self._add_to_bagit(rel_path, sha1=checksum)
840 else:
841 _logger.warning(
842 "[provenance] Unknown hash method %s for bagit manifest", Hasher
843 )
844 # Inefficient, bagit support need to checksum again
845 self._add_to_bagit(rel_path)
846 _logger.debug("[provenance] Added data file %s", path)
847 if timestamp is not None:
848 createdOn, createdBy = self._self_made(timestamp)
849 self._file_provenance[rel_path] = cast(
850 Aggregate, {"createdOn": createdOn, "createdBy": createdBy}
851 )
852 _logger.debug("[provenance] Relative path for data file %s", rel_path)
853
854 if content_type is not None:
855 self._content_types[rel_path] = content_type
856 return rel_path
857
858 def _self_made(
859 self, timestamp: Optional[datetime.datetime] = None
860 ) -> Tuple[str, Dict[str, str]]: # createdOn, createdBy
861 if timestamp is None:
862 timestamp = datetime.datetime.now()
863 return (
864 timestamp.isoformat(),
865 {"uri": self.engine_uuid, "name": self.cwltool_version},
866 )
867
868 def add_to_manifest(self, rel_path: str, checksums: Dict[str, str]) -> None:
869 """Add files to the research object manifest."""
870 self.self_check()
871 if PurePosixPath(rel_path).is_absolute():
872 raise ValueError("rel_path must be relative: %s" % rel_path)
873
874 if os.path.commonprefix(["data/", rel_path]) == "data/":
875 # payload file, go to manifest
876 manifest = "manifest"
877 else:
878 # metadata file, go to tag manifest
879 manifest = "tagmanifest"
880
881 # Add checksums to corresponding manifest files
882 for (method, hash_value) in checksums.items():
883 # File not in manifest because we bailed out on
884 # existence in bagged_size above
885 manifestpath = os.path.join(self.folder, f"{manifest}-{method.lower()}.txt")
886 # encoding: match Tag-File-Character-Encoding: UTF-8
887 # newline: ensure LF also on Windows
888 with open(
889 manifestpath, "a", encoding=ENCODING, newline="\n"
890 ) as checksum_file:
891 line = f"{hash_value} {rel_path}\n"
892 _logger.debug("[provenance] Added to %s: %s", manifestpath, line)
893 checksum_file.write(line)
894
895 def _add_to_bagit(self, rel_path: str, **checksums: str) -> None:
896 if PurePosixPath(rel_path).is_absolute():
897 raise ValueError("rel_path must be relative: %s" % rel_path)
898 lpath = os.path.join(self.folder, local_path(rel_path))
899 if not os.path.exists(lpath):
900 raise OSError(f"File {rel_path} does not exist within RO: {lpath}")
901
902 if rel_path in self.bagged_size:
903 # Already added, assume checksum OK
904 return
905 self.bagged_size[rel_path] = os.path.getsize(lpath)
906
907 if SHA1 not in checksums:
908 # ensure we always have sha1
909 checksums = dict(checksums)
910 with open(lpath, "rb") as file_path:
911 # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile?
912 checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1)
913
914 self.add_to_manifest(rel_path, checksums)
915
916 def create_job(
917 self, builder_job: CWLObjectType, is_output: bool = False
918 ) -> CWLObjectType:
919 # TODO customise the file
920 """Generate the new job object with RO specific relative paths."""
921 copied = copy.deepcopy(builder_job)
922 relativised_input_objecttemp = {} # type: CWLObjectType
923 self._relativise_files(copied)
924
925 def jdefault(o: Any) -> Dict[Any, Any]:
926 return dict(o)
927
928 if is_output:
929 rel_path = PurePosixPath(WORKFLOW) / "primary-output.json"
930 else:
931 rel_path = PurePosixPath(WORKFLOW) / "primary-job.json"
932 j = json_dumps(copied, indent=4, ensure_ascii=False, default=jdefault)
933 with self.write_bag_file(str(rel_path)) as file_path:
934 file_path.write(j + "\n")
935 _logger.debug("[provenance] Generated customised job file: %s", rel_path)
936 # Generate dictionary with keys as workflow level input IDs and values
937 # as
938 # 1) for files the relativised location containing hash
939 # 2) for other attributes, the actual value.
940 for key, value in copied.items():
941 if isinstance(value, MutableMapping):
942 if value.get("class") in ("File", "Directory"):
943 relativised_input_objecttemp[key] = value
944 else:
945 relativised_input_objecttemp[key] = value
946 self.relativised_input_object.update(
947 {k: v for k, v in relativised_input_objecttemp.items() if v}
948 )
949 return self.relativised_input_object
950
951 def _relativise_files(
952 self,
953 structure: Union[CWLObjectType, CWLOutputType, MutableSequence[CWLObjectType]],
954 ) -> None:
955 """Save any file objects into the RO and update the local paths."""
956 # Base case - we found a File we need to update
957 _logger.debug("[provenance] Relativising: %s", structure)
958
959 if isinstance(structure, MutableMapping):
960 if structure.get("class") == "File":
961 relative_path = None # type: Optional[Union[str, PurePosixPath]]
962 if "checksum" in structure:
963 raw_checksum = cast(str, structure["checksum"])
964 alg, checksum = raw_checksum.split("$")
965 if alg != SHA1:
966 raise TypeError(
967 "Only SHA1 CWL checksums are currently supported: "
968 "{}".format(structure)
969 )
970 if self.has_data_file(checksum):
971 prefix = checksum[0:2]
972 relative_path = PurePosixPath("data") / prefix / checksum
973
974 if not (relative_path is not None and "location" in structure):
975 # Register in RO; but why was this not picked
976 # up by used_artefacts?
977 _logger.info("[provenance] Adding to RO %s", structure["location"])
978 with self.fsaccess.open(
979 cast(str, structure["location"]), "rb"
980 ) as fp:
981 relative_path = self.add_data_file(fp)
982 checksum = PurePosixPath(relative_path).name
983 structure["checksum"] = f"{SHA1}${checksum}"
984 if relative_path is not None:
985 # RO-relative path as new location
986 structure["location"] = str(PurePosixPath("..") / relative_path)
987 else:
988 _logger.warning(
989 "Could not determine RO path for file %s", structure
990 )
991 if "path" in structure:
992 del structure["path"]
993
994 if structure.get("class") == "Directory":
995 # TODO: Generate anonymoys Directory with a "listing"
996 # pointing to the hashed files
997 del structure["location"]
998
999 for val in structure.values():
1000 try:
1001 self._relativise_files(cast(CWLOutputType, val))
1002 except OSError:
1003 pass
1004 return
1005
1006 if isinstance(structure, MutableSequence):
1007 for obj in structure:
1008 # Recurse and rewrite any nested File objects
1009 self._relativise_files(cast(CWLOutputType, obj))
1010
1011 def close(self, save_to: Optional[str] = None) -> None:
1012 """Close the Research Object, optionally saving to specified folder.
1013
1014 Closing will remove any temporary files used by this research object.
1015 After calling this method, this ResearchObject instance can no longer
1016 be used, except for no-op calls to .close().
1017
1018 The 'saveTo' folder should not exist - if it does, it will be deleted.
1019
1020 It is safe to call this function multiple times without the
1021 'saveTo' argument, e.g. within a try..finally block to
1022 ensure the temporary files of this Research Object are removed.
1023 """
1024 if save_to is None:
1025 if not self.closed:
1026 _logger.debug("[provenance] Deleting temporary %s", self.folder)
1027 shutil.rmtree(self.folder, ignore_errors=True)
1028 else:
1029 save_to = os.path.abspath(save_to)
1030 _logger.info("[provenance] Finalizing Research Object")
1031 self._finalize() # write manifest etc.
1032 # TODO: Write as archive (.zip or .tar) based on extension?
1033
1034 if os.path.isdir(save_to):
1035 _logger.info("[provenance] Deleting existing %s", save_to)
1036 shutil.rmtree(save_to)
1037 shutil.move(self.folder, save_to)
1038 _logger.info("[provenance] Research Object saved to %s", save_to)
1039 self.folder = save_to
1040 self.closed = True
1041
1042
1043 def checksum_copy(
1044 src_file: IO[Any],
1045 dst_file: Optional[IO[Any]] = None,
1046 hasher=Hasher, # type: Callable[[], hashlib._Hash]
1047 buffersize: int = 1024 * 1024,
1048 ) -> str:
1049 """Compute checksums while copying a file."""
1050 # TODO: Use hashlib.new(Hasher_str) instead?
1051 checksum = hasher()
1052 contents = src_file.read(buffersize)
1053 if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"):
1054 temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4()))
1055 try:
1056 os.rename(dst_file.name, temp_location)
1057 os.link(src_file.name, dst_file.name)
1058 dst_file = None
1059 os.unlink(temp_location)
1060 except OSError:
1061 pass
1062 if os.path.exists(temp_location):
1063 os.rename(temp_location, dst_file.name) # type: ignore
1064 while contents != b"":
1065 if dst_file is not None:
1066 dst_file.write(contents)
1067 checksum.update(contents)
1068 contents = src_file.read(buffersize)
1069 if dst_file is not None:
1070 dst_file.flush()
1071 return checksum.hexdigest().lower()