Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/provenance.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Stores Research Object including provenance.""" | |
2 | |
3 import copy | |
4 import datetime | |
5 import hashlib | |
6 import os | |
7 import re | |
8 import shutil | |
9 import tempfile | |
10 import uuid | |
11 from array import array | |
12 from collections import OrderedDict | |
13 from getpass import getuser | |
14 from io import FileIO, TextIOWrapper | |
15 from mmap import mmap | |
16 from pathlib import Path, PurePosixPath | |
17 from typing import ( | |
18 IO, | |
19 Any, | |
20 BinaryIO, | |
21 Callable, | |
22 Dict, | |
23 List, | |
24 MutableMapping, | |
25 MutableSequence, | |
26 Optional, | |
27 Set, | |
28 Tuple, | |
29 Union, | |
30 cast, | |
31 ) | |
32 | |
33 import prov.model as provM | |
34 from prov.model import PROV, ProvDocument | |
35 from schema_salad.utils import json_dumps | |
36 from typing_extensions import TYPE_CHECKING, TypedDict | |
37 | |
38 from .loghandler import _logger | |
39 from .provenance_constants import ( | |
40 ACCOUNT_UUID, | |
41 CWLPROV, | |
42 CWLPROV_VERSION, | |
43 DATA, | |
44 ENCODING, | |
45 FOAF, | |
46 LOGS, | |
47 METADATA, | |
48 ORCID, | |
49 PROVENANCE, | |
50 SHA1, | |
51 SHA256, | |
52 SHA512, | |
53 SNAPSHOT, | |
54 TEXT_PLAIN, | |
55 USER_UUID, | |
56 UUID, | |
57 WORKFLOW, | |
58 Hasher, | |
59 ) | |
60 from .stdfsaccess import StdFsAccess | |
61 from .utils import ( | |
62 CWLObjectType, | |
63 CWLOutputType, | |
64 create_tmp_dir, | |
65 local_path, | |
66 onWindows, | |
67 posix_path, | |
68 versionstring, | |
69 ) | |
70 | |
71 # imports needed for retrieving user data | |
72 if onWindows(): | |
73 import ctypes # pylint: disable=unused-import | |
74 else: | |
75 try: | |
76 import pwd # pylint: disable=unused-import | |
77 except ImportError: | |
78 pass | |
79 | |
80 if TYPE_CHECKING: | |
81 from .command_line_tool import ( # pylint: disable=unused-import | |
82 CommandLineTool, | |
83 ExpressionTool, | |
84 ) | |
85 from .workflow import Workflow # pylint: disable=unused-import | |
86 | |
87 | |
88 def _whoami() -> Tuple[str, str]: | |
89 """Return the current operating system account as (username, fullname).""" | |
90 username = getuser() | |
91 try: | |
92 if onWindows(): | |
93 get_user_name = ctypes.windll.secur32.GetUserNameExW # type: ignore | |
94 size = ctypes.pointer(ctypes.c_ulong(0)) | |
95 get_user_name(3, None, size) | |
96 | |
97 name_buffer = ctypes.create_unicode_buffer(size.contents.value) | |
98 get_user_name(3, name_buffer, size) | |
99 fullname = str(name_buffer.value) | |
100 else: | |
101 fullname = pwd.getpwuid(os.getuid())[4].split(",")[0] | |
102 except (KeyError, IndexError): | |
103 fullname = username | |
104 | |
105 return (username, fullname) | |
106 | |
107 | |
108 class WritableBagFile(FileIO): | |
109 """Writes files in research object.""" | |
110 | |
111 def __init__(self, research_object: "ResearchObject", rel_path: str) -> None: | |
112 """Initialize an ROBagIt.""" | |
113 self.research_object = research_object | |
114 if Path(rel_path).is_absolute(): | |
115 raise ValueError("rel_path must be relative: %s" % rel_path) | |
116 self.rel_path = rel_path | |
117 self.hashes = { | |
118 SHA1: hashlib.sha1(), # nosec | |
119 SHA256: hashlib.sha256(), | |
120 SHA512: hashlib.sha512(), | |
121 } | |
122 # Open file in Research Object folder | |
123 path = os.path.abspath( | |
124 os.path.join(research_object.folder, local_path(rel_path)) | |
125 ) | |
126 if not path.startswith(os.path.abspath(research_object.folder)): | |
127 raise ValueError("Path is outside Research Object: %s" % path) | |
128 _logger.debug("[provenance] Creating WritableBagFile at %s.", path) | |
129 super().__init__(path, mode="w") | |
130 | |
131 def write(self, b: Any) -> int: | |
132 """Write some content to the Bag.""" | |
133 real_b = b if isinstance(b, (bytes, mmap, array)) else b.encode("utf-8") | |
134 total = 0 | |
135 length = len(real_b) | |
136 while total < length: | |
137 ret = super().write(real_b) | |
138 if ret: | |
139 total += ret | |
140 for val in self.hashes.values(): | |
141 # print("Updating hasher %s ", val) | |
142 val.update(real_b) | |
143 return total | |
144 | |
145 def close(self) -> None: | |
146 # FIXME: Convert below block to a ResearchObject method? | |
147 if self.rel_path.startswith("data/"): | |
148 self.research_object.bagged_size[self.rel_path] = self.tell() | |
149 else: | |
150 self.research_object.tagfiles.add(self.rel_path) | |
151 | |
152 super().close() | |
153 # { "sha1": "f572d396fae9206628714fb2ce00f72e94f2258f" } | |
154 checksums = {} | |
155 for name in self.hashes: | |
156 checksums[name] = self.hashes[name].hexdigest().lower() | |
157 self.research_object.add_to_manifest(self.rel_path, checksums) | |
158 | |
159 # To simplify our hash calculation we won't support | |
160 # seeking, reading or truncating, as we can't do | |
161 # similar seeks in the current hash. | |
162 # TODO: Support these? At the expense of invalidating | |
163 # the current hash, then having to recalculate at close() | |
164 def seekable(self) -> bool: | |
165 return False | |
166 | |
167 def readable(self) -> bool: | |
168 return False | |
169 | |
170 def truncate(self, size: Optional[int] = None) -> int: | |
171 # FIXME: This breaks contract IOBase, | |
172 # as it means we would have to recalculate the hash | |
173 if size is not None: | |
174 raise OSError("WritableBagFile can't truncate") | |
175 return self.tell() | |
176 | |
177 | |
178 def _check_mod_11_2(numeric_string: str) -> bool: | |
179 """ | |
180 Validate numeric_string for its MOD-11-2 checksum. | |
181 | |
182 Any "-" in the numeric_string are ignored. | |
183 | |
184 The last digit of numeric_string is assumed to be the checksum, 0-9 or X. | |
185 | |
186 See ISO/IEC 7064:2003 and | |
187 https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier | |
188 """ | |
189 # Strip - | |
190 nums = numeric_string.replace("-", "") | |
191 total = 0 | |
192 # skip last (check)digit | |
193 for num in nums[:-1]: | |
194 digit = int(num) | |
195 total = (total + digit) * 2 | |
196 remainder = total % 11 | |
197 result = (12 - remainder) % 11 | |
198 if result == 10: | |
199 checkdigit = "X" | |
200 else: | |
201 checkdigit = str(result) | |
202 # Compare against last digit or X | |
203 return nums[-1].upper() == checkdigit | |
204 | |
205 | |
206 def _valid_orcid(orcid: Optional[str]) -> str: | |
207 """ | |
208 Ensure orcid is a valid ORCID identifier. | |
209 | |
210 The string must be equivalent to one of these forms: | |
211 | |
212 0000-0002-1825-0097 | |
213 orcid.org/0000-0002-1825-0097 | |
214 http://orcid.org/0000-0002-1825-0097 | |
215 https://orcid.org/0000-0002-1825-0097 | |
216 | |
217 If the ORCID number or prefix is invalid, a ValueError is raised. | |
218 | |
219 The returned ORCID string is always in the form of: | |
220 https://orcid.org/0000-0002-1825-0097 | |
221 """ | |
222 if orcid is None or not orcid: | |
223 raise ValueError("ORCID cannot be unspecified") | |
224 # Liberal in what we consume, e.g. ORCID.org/0000-0002-1825-009x | |
225 orcid = orcid.lower() | |
226 match = re.match( | |
227 # Note: concatinated r"" r"" below so we can add comments to pattern | |
228 # Optional hostname, with or without protocol | |
229 r"(http://orcid\.org/|https://orcid\.org/|orcid\.org/)?" | |
230 # alternative pattern, but probably messier | |
231 # r"^((https?://)?orcid.org/)?" | |
232 # ORCID number is always 4x4 numerical digits, | |
233 # but last digit (modulus 11 checksum) | |
234 # can also be X (but we made it lowercase above). | |
235 # e.g. 0000-0002-1825-0097 | |
236 # or 0000-0002-1694-233x | |
237 r"(?P<orcid>(\d{4}-\d{4}-\d{4}-\d{3}[0-9x]))$", | |
238 orcid, | |
239 ) | |
240 | |
241 help_url = ( | |
242 "https://support.orcid.org/knowledgebase/articles/" | |
243 "116780-structure-of-the-orcid-identifier" | |
244 ) | |
245 if not match: | |
246 raise ValueError(f"Invalid ORCID: {orcid}\n{help_url}") | |
247 | |
248 # Conservative in what we produce: | |
249 # a) Ensure any checksum digit is uppercase | |
250 orcid_num = match.group("orcid").upper() | |
251 # b) ..and correct | |
252 if not _check_mod_11_2(orcid_num): | |
253 raise ValueError(f"Invalid ORCID checksum: {orcid_num}\n{help_url}") | |
254 | |
255 # c) Re-add the official prefix https://orcid.org/ | |
256 return "https://orcid.org/%s" % orcid_num | |
257 | |
258 | |
259 Annotation = TypedDict( | |
260 "Annotation", | |
261 { | |
262 "uri": str, | |
263 "about": str, | |
264 "content": Optional[Union[str, List[str]]], | |
265 "oa:motivatedBy": Dict[str, str], | |
266 }, | |
267 ) | |
268 Aggregate = TypedDict( | |
269 "Aggregate", | |
270 { | |
271 "uri": Optional[str], | |
272 "bundledAs": Optional[Dict[str, Any]], | |
273 "mediatype": Optional[str], | |
274 "conformsTo": Optional[Union[str, List[str]]], | |
275 "createdOn": Optional[str], | |
276 "createdBy": Optional[Dict[str, str]], | |
277 }, | |
278 total=False, | |
279 ) | |
280 # Aggregate.bundledAs is actually type Aggregate, but cyclic definitions are not suported | |
281 AuthoredBy = TypedDict( | |
282 "AuthoredBy", | |
283 {"orcid": Optional[str], "name": Optional[str], "uri": Optional[str]}, | |
284 total=False, | |
285 ) | |
286 | |
287 | |
288 class ResearchObject: | |
289 """CWLProv Research Object.""" | |
290 | |
291 def __init__( | |
292 self, | |
293 fsaccess: StdFsAccess, | |
294 temp_prefix_ro: str = "tmp", | |
295 orcid: str = "", | |
296 full_name: str = "", | |
297 ) -> None: | |
298 """Initialize the ResearchObject.""" | |
299 self.temp_prefix = temp_prefix_ro | |
300 self.orcid = "" if not orcid else _valid_orcid(orcid) | |
301 self.full_name = full_name | |
302 self.folder = create_tmp_dir(temp_prefix_ro) | |
303 self.closed = False | |
304 # map of filename "data/de/alsdklkas": 12398123 bytes | |
305 self.bagged_size = {} # type: Dict[str, int] | |
306 self.tagfiles = set() # type: Set[str] | |
307 self._file_provenance = {} # type: Dict[str, Aggregate] | |
308 self._external_aggregates = [] # type: List[Aggregate] | |
309 self.annotations = [] # type: List[Annotation] | |
310 self._content_types = {} # type: Dict[str,str] | |
311 self.fsaccess = fsaccess | |
312 # These should be replaced by generate_prov_doc when workflow/run IDs are known: | |
313 self.engine_uuid = "urn:uuid:%s" % uuid.uuid4() | |
314 self.ro_uuid = uuid.uuid4() | |
315 self.base_uri = "arcp://uuid,%s/" % self.ro_uuid | |
316 self.cwltool_version = "cwltool %s" % versionstring().split()[-1] | |
317 ## | |
318 self.relativised_input_object = {} # type: CWLObjectType | |
319 | |
320 self._initialize() | |
321 _logger.debug("[provenance] Temporary research object: %s", self.folder) | |
322 | |
323 def self_check(self) -> None: | |
324 """Raise ValueError if this RO is closed.""" | |
325 if self.closed: | |
326 raise ValueError( | |
327 "This ResearchObject has already been closed and is not " | |
328 "available for futher manipulation." | |
329 ) | |
330 | |
331 def __str__(self) -> str: | |
332 """Represent this RO as a string.""" | |
333 return f"ResearchObject <{self.ro_uuid}> in <{self.folder}>" | |
334 | |
335 def _initialize(self) -> None: | |
336 for research_obj_folder in ( | |
337 METADATA, | |
338 DATA, | |
339 WORKFLOW, | |
340 SNAPSHOT, | |
341 PROVENANCE, | |
342 LOGS, | |
343 ): | |
344 os.makedirs(os.path.join(self.folder, research_obj_folder)) | |
345 self._initialize_bagit() | |
346 | |
347 def _initialize_bagit(self) -> None: | |
348 """Write fixed bagit header.""" | |
349 self.self_check() | |
350 bagit = os.path.join(self.folder, "bagit.txt") | |
351 # encoding: always UTF-8 (although ASCII would suffice here) | |
352 # newline: ensure LF also on Windows | |
353 with open(bagit, "w", encoding=ENCODING, newline="\n") as bag_it_file: | |
354 # TODO: \n or \r\n ? | |
355 bag_it_file.write("BagIt-Version: 0.97\n") | |
356 bag_it_file.write("Tag-File-Character-Encoding: %s\n" % ENCODING) | |
357 | |
358 def open_log_file_for_activity( | |
359 self, uuid_uri: str | |
360 ) -> Union[TextIOWrapper, WritableBagFile]: | |
361 self.self_check() | |
362 # Ensure valid UUID for safe filenames | |
363 activity_uuid = uuid.UUID(uuid_uri) | |
364 if activity_uuid.urn == self.engine_uuid: | |
365 # It's the engine aka cwltool! | |
366 name = "engine" | |
367 else: | |
368 name = "activity" | |
369 p = os.path.join(LOGS, f"{name}.{activity_uuid}.txt") | |
370 _logger.debug(f"[provenance] Opening log file for {name}: {p}") | |
371 self.add_annotation(activity_uuid.urn, [p], CWLPROV["log"].uri) | |
372 return self.write_bag_file(p) | |
373 | |
374 def _finalize(self) -> None: | |
375 self._write_ro_manifest() | |
376 self._write_bag_info() | |
377 | |
378 def user_provenance(self, document: ProvDocument) -> None: | |
379 """Add the user provenance.""" | |
380 self.self_check() | |
381 (username, fullname) = _whoami() | |
382 | |
383 if not self.full_name: | |
384 self.full_name = fullname | |
385 | |
386 document.add_namespace(UUID) | |
387 document.add_namespace(ORCID) | |
388 document.add_namespace(FOAF) | |
389 account = document.agent( | |
390 ACCOUNT_UUID, | |
391 { | |
392 provM.PROV_TYPE: FOAF["OnlineAccount"], | |
393 "prov:label": username, | |
394 FOAF["accountName"]: username, | |
395 }, | |
396 ) | |
397 | |
398 user = document.agent( | |
399 self.orcid or USER_UUID, | |
400 { | |
401 provM.PROV_TYPE: PROV["Person"], | |
402 "prov:label": self.full_name, | |
403 FOAF["name"]: self.full_name, | |
404 FOAF["account"]: account, | |
405 }, | |
406 ) | |
407 # cwltool may be started on the shell (directly by user), | |
408 # by shell script (indirectly by user) | |
409 # or from a different program | |
410 # (which again is launched by any of the above) | |
411 # | |
412 # We can't tell in which way, but ultimately we're still | |
413 # acting in behalf of that user (even if we might | |
414 # get their name wrong!) | |
415 document.actedOnBehalfOf(account, user) | |
416 | |
417 def write_bag_file( | |
418 self, path: str, encoding: Optional[str] = ENCODING | |
419 ) -> Union[TextIOWrapper, WritableBagFile]: | |
420 """Write the bag file into our research object.""" | |
421 self.self_check() | |
422 # For some reason below throws BlockingIOError | |
423 # fp = BufferedWriter(WritableBagFile(self, path)) | |
424 bag_file = WritableBagFile(self, path) | |
425 if encoding is not None: | |
426 # encoding: match Tag-File-Character-Encoding: UTF-8 | |
427 # newline: ensure LF also on Windows | |
428 return TextIOWrapper( | |
429 cast(BinaryIO, bag_file), encoding=encoding, newline="\n" | |
430 ) | |
431 return bag_file | |
432 | |
433 def add_tagfile( | |
434 self, path: str, timestamp: Optional[datetime.datetime] = None | |
435 ) -> None: | |
436 """Add tag files to our research object.""" | |
437 self.self_check() | |
438 checksums = {} | |
439 # Read file to calculate its checksum | |
440 if os.path.isdir(path): | |
441 return | |
442 # FIXME: do the right thing for directories | |
443 with open(path, "rb") as tag_file: | |
444 # FIXME: Should have more efficient open_tagfile() that | |
445 # does all checksums in one go while writing through, | |
446 # adding checksums after closing. | |
447 # Below probably OK for now as metadata files | |
448 # are not too large..? | |
449 | |
450 checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) | |
451 | |
452 tag_file.seek(0) | |
453 checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) | |
454 | |
455 tag_file.seek(0) | |
456 checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) | |
457 | |
458 rel_path = posix_path(os.path.relpath(path, self.folder)) | |
459 self.tagfiles.add(rel_path) | |
460 self.add_to_manifest(rel_path, checksums) | |
461 if timestamp is not None: | |
462 self._file_provenance[rel_path] = { | |
463 "createdOn": timestamp.isoformat(), | |
464 "uri": None, | |
465 "bundledAs": None, | |
466 "mediatype": None, | |
467 "conformsTo": None, | |
468 } | |
469 | |
470 def _ro_aggregates(self) -> List[Aggregate]: | |
471 """Gather dictionary of files to be added to the manifest.""" | |
472 | |
473 def guess_mediatype( | |
474 rel_path: str, | |
475 ) -> Tuple[Optional[str], Optional[Union[str, List[str]]]]: | |
476 """Return the mediatypes.""" | |
477 media_types = { | |
478 # Adapted from | |
479 # https://w3id.org/bundle/2014-11-05/#media-types | |
480 "txt": TEXT_PLAIN, | |
481 "ttl": 'text/turtle; charset="UTF-8"', | |
482 "rdf": "application/rdf+xml", | |
483 "json": "application/json", | |
484 "jsonld": "application/ld+json", | |
485 "xml": "application/xml", | |
486 ## | |
487 "cwl": 'text/x+yaml; charset="UTF-8"', | |
488 "provn": 'text/provenance-notation; charset="UTF-8"', | |
489 "nt": "application/n-triples", | |
490 } # type: Dict[str, str] | |
491 conforms_to = { | |
492 "provn": "http://www.w3.org/TR/2013/REC-prov-n-20130430/", | |
493 "cwl": "https://w3id.org/cwl/", | |
494 } # type: Dict[str, str] | |
495 | |
496 prov_conforms_to = { | |
497 "provn": "http://www.w3.org/TR/2013/REC-prov-n-20130430/", | |
498 "rdf": "http://www.w3.org/TR/2013/REC-prov-o-20130430/", | |
499 "ttl": "http://www.w3.org/TR/2013/REC-prov-o-20130430/", | |
500 "nt": "http://www.w3.org/TR/2013/REC-prov-o-20130430/", | |
501 "jsonld": "http://www.w3.org/TR/2013/REC-prov-o-20130430/", | |
502 "xml": "http://www.w3.org/TR/2013/NOTE-prov-xml-20130430/", | |
503 "json": "http://www.w3.org/Submission/2013/SUBM-prov-json-20130424/", | |
504 } # type: Dict[str, str] | |
505 | |
506 extension = rel_path.rsplit(".", 1)[-1].lower() # type: Optional[str] | |
507 if extension == rel_path: | |
508 # No ".", no extension | |
509 extension = None | |
510 | |
511 mediatype = None # type: Optional[str] | |
512 conformsTo = None # type: Optional[Union[str, List[str]]] | |
513 if extension in media_types: | |
514 mediatype = media_types[extension] | |
515 | |
516 if extension in conforms_to: | |
517 # TODO: Open CWL file to read its declared "cwlVersion", e.g. | |
518 # cwlVersion = "v1.0" | |
519 conformsTo = conforms_to[extension] | |
520 | |
521 if ( | |
522 rel_path.startswith(posix_path(PROVENANCE)) | |
523 and extension in prov_conforms_to | |
524 ): | |
525 if ".cwlprov" in rel_path: | |
526 # Our own! | |
527 conformsTo = [ | |
528 prov_conforms_to[extension], | |
529 CWLPROV_VERSION, | |
530 ] | |
531 else: | |
532 # Some other PROV | |
533 # TODO: Recognize ProvOne etc. | |
534 conformsTo = prov_conforms_to[extension] | |
535 return (mediatype, conformsTo) | |
536 | |
537 aggregates = [] # type: List[Aggregate] | |
538 for path in self.bagged_size.keys(): | |
539 | |
540 temp_path = PurePosixPath(path) | |
541 folder = temp_path.parent | |
542 filename = temp_path.name | |
543 | |
544 # NOTE: Here we end up aggregating the abstract | |
545 # data items by their sha1 hash, so that it matches | |
546 # the entity() in the prov files. | |
547 | |
548 # TODO: Change to nih:sha-256; hashes | |
549 # https://tools.ietf.org/html/rfc6920#section-7 | |
550 aggregate_dict = { | |
551 "uri": "urn:hash::sha1:" + filename, | |
552 "bundledAs": { | |
553 # The arcp URI is suitable ORE proxy; local to this Research Object. | |
554 # (as long as we don't also aggregate it by relative path!) | |
555 "uri": self.base_uri + path, | |
556 # relate it to the data/ path | |
557 "folder": "/%s/" % folder, | |
558 "filename": filename, | |
559 }, | |
560 } # type: Aggregate | |
561 if path in self._file_provenance: | |
562 # Made by workflow run, merge captured provenance | |
563 bundledAs = aggregate_dict["bundledAs"] | |
564 if bundledAs: | |
565 bundledAs.update(self._file_provenance[path]) | |
566 else: | |
567 aggregate_dict["bundledAs"] = cast( | |
568 Optional[Dict[str, Any]], self._file_provenance[path] | |
569 ) | |
570 else: | |
571 # Probably made outside wf run, part of job object? | |
572 pass | |
573 if path in self._content_types: | |
574 aggregate_dict["mediatype"] = self._content_types[path] | |
575 | |
576 aggregates.append(aggregate_dict) | |
577 | |
578 for path in self.tagfiles: | |
579 if not ( | |
580 path.startswith(METADATA) | |
581 or path.startswith(WORKFLOW) | |
582 or path.startswith(SNAPSHOT) | |
583 ): | |
584 # probably a bagit file | |
585 continue | |
586 if path == str(PurePosixPath(METADATA) / "manifest.json"): | |
587 # Should not really be there yet! But anyway, we won't | |
588 # aggregate it. | |
589 continue | |
590 | |
591 # These are local paths like metadata/provenance - but | |
592 # we need to relativize them for our current directory for | |
593 # as we are saved in metadata/manifest.json | |
594 mediatype, conformsTo = guess_mediatype(path) | |
595 rel_aggregates = { | |
596 "uri": str(Path(os.pardir) / path), | |
597 "mediatype": mediatype, | |
598 "conformsTo": conformsTo, | |
599 } # type: Aggregate | |
600 | |
601 if path in self._file_provenance: | |
602 # Propagate file provenance (e.g. timestamp) | |
603 rel_aggregates.update(self._file_provenance[path]) | |
604 elif not path.startswith(SNAPSHOT): | |
605 # make new timestamp? | |
606 ( | |
607 rel_aggregates["createdOn"], | |
608 rel_aggregates["createdBy"], | |
609 ) = self._self_made() | |
610 aggregates.append(rel_aggregates) | |
611 aggregates.extend(self._external_aggregates) | |
612 return aggregates | |
613 | |
614 def add_uri( | |
615 self, uri: str, timestamp: Optional[datetime.datetime] = None | |
616 ) -> Aggregate: | |
617 self.self_check() | |
618 aggr = {"uri": uri} # type: Aggregate | |
619 aggr["createdOn"], aggr["createdBy"] = self._self_made(timestamp=timestamp) | |
620 self._external_aggregates.append(aggr) | |
621 return aggr | |
622 | |
623 def add_annotation( | |
624 self, about: str, content: List[str], motivated_by: str = "oa:describing" | |
625 ) -> str: | |
626 """Cheap URI relativize for current directory and /.""" | |
627 self.self_check() | |
628 curr = self.base_uri + METADATA + "/" | |
629 content = [c.replace(curr, "").replace(self.base_uri, "../") for c in content] | |
630 uri = uuid.uuid4().urn | |
631 ann = { | |
632 "uri": uri, | |
633 "about": about, | |
634 "content": content, | |
635 "oa:motivatedBy": {"@id": motivated_by}, | |
636 } # type: Annotation | |
637 self.annotations.append(ann) | |
638 return uri | |
639 | |
640 def _ro_annotations(self) -> List[Annotation]: | |
641 annotations = [] # type: List[Annotation] | |
642 annotations.append( | |
643 { | |
644 "uri": uuid.uuid4().urn, | |
645 "about": self.ro_uuid.urn, | |
646 "content": "/", | |
647 # https://www.w3.org/TR/annotation-vocab/#named-individuals | |
648 "oa:motivatedBy": {"@id": "oa:describing"}, | |
649 } | |
650 ) | |
651 | |
652 # How was it run? | |
653 # FIXME: Only primary* | |
654 prov_files = [ | |
655 str(PurePosixPath(p).relative_to(METADATA)) | |
656 for p in self.tagfiles | |
657 if p.startswith(posix_path(PROVENANCE)) and "/primary." in p | |
658 ] | |
659 annotations.append( | |
660 { | |
661 "uri": uuid.uuid4().urn, | |
662 "about": self.ro_uuid.urn, | |
663 "content": prov_files, | |
664 # Modulation of https://www.w3.org/TR/prov-aq/ | |
665 "oa:motivatedBy": {"@id": "http://www.w3.org/ns/prov#has_provenance"}, | |
666 } | |
667 ) | |
668 | |
669 # Where is the main workflow? | |
670 annotations.append( | |
671 { | |
672 "uri": uuid.uuid4().urn, | |
673 "about": str(PurePosixPath("..") / WORKFLOW / "packed.cwl"), | |
674 "content": None, | |
675 "oa:motivatedBy": {"@id": "oa:highlighting"}, | |
676 } | |
677 ) | |
678 | |
679 annotations.append( | |
680 { | |
681 "uri": uuid.uuid4().urn, | |
682 "about": self.ro_uuid.urn, | |
683 "content": [ | |
684 str(PurePosixPath("..") / WORKFLOW / "packed.cwl"), | |
685 str(PurePosixPath("..") / WORKFLOW / "primary-job.json"), | |
686 ], | |
687 "oa:motivatedBy": {"@id": "oa:linking"}, | |
688 } | |
689 ) | |
690 # Add user-added annotations at end | |
691 annotations.extend(self.annotations) | |
692 return annotations | |
693 | |
694 def _authored_by(self) -> Optional[AuthoredBy]: | |
695 authored_by = {} # type: AuthoredBy | |
696 if self.orcid: | |
697 authored_by["orcid"] = self.orcid | |
698 if self.full_name: | |
699 authored_by["name"] = self.full_name | |
700 if not self.orcid: | |
701 authored_by["uri"] = USER_UUID | |
702 | |
703 if authored_by: | |
704 return authored_by | |
705 return None | |
706 | |
707 def _write_ro_manifest(self) -> None: | |
708 | |
709 # Does not have to be this order, but it's nice to be consistent | |
710 filename = "manifest.json" | |
711 createdOn, createdBy = self._self_made() | |
712 manifest = OrderedDict( | |
713 { | |
714 "@context": [ | |
715 {"@base": "{}{}/".format(self.base_uri, posix_path(METADATA))}, | |
716 "https://w3id.org/bundle/context", | |
717 ], | |
718 "id": "/", | |
719 "conformsTo": CWLPROV_VERSION, | |
720 "manifest": filename, | |
721 "createdOn": createdOn, | |
722 "createdBy": createdBy, | |
723 "authoredBy": self._authored_by(), | |
724 "aggregates": self._ro_aggregates(), | |
725 "annotations": self._ro_annotations(), | |
726 } | |
727 ) | |
728 | |
729 json_manifest = json_dumps(manifest, indent=4, ensure_ascii=False) | |
730 rel_path = str(PurePosixPath(METADATA) / filename) | |
731 json_manifest += "\n" | |
732 with self.write_bag_file(rel_path) as manifest_file: | |
733 manifest_file.write(json_manifest) | |
734 | |
735 def _write_bag_info(self) -> None: | |
736 | |
737 with self.write_bag_file("bag-info.txt") as info_file: | |
738 info_file.write("Bag-Software-Agent: %s\n" % self.cwltool_version) | |
739 # FIXME: require sha-512 of payload to comply with profile? | |
740 # FIXME: Update profile | |
741 info_file.write( | |
742 "BagIt-Profile-Identifier: https://w3id.org/ro/bagit/profile\n" | |
743 ) | |
744 info_file.write("Bagging-Date: %s\n" % datetime.date.today().isoformat()) | |
745 info_file.write( | |
746 "External-Description: Research Object of CWL workflow run\n" | |
747 ) | |
748 if self.full_name: | |
749 info_file.write("Contact-Name: %s\n" % self.full_name) | |
750 | |
751 # NOTE: We can't use the urn:uuid:{UUID} of the workflow run (a prov:Activity) | |
752 # as identifier for the RO/bagit (a prov:Entity). However the arcp base URI is good. | |
753 info_file.write("External-Identifier: %s\n" % self.base_uri) | |
754 | |
755 # Calculate size of data/ (assuming no external fetch.txt files) | |
756 total_size = sum(self.bagged_size.values()) | |
757 num_files = len(self.bagged_size) | |
758 info_file.write("Payload-Oxum: %d.%d\n" % (total_size, num_files)) | |
759 _logger.debug("[provenance] Generated bagit metadata: %s", self.folder) | |
760 | |
761 def generate_snapshot(self, prov_dep: CWLObjectType) -> None: | |
762 """Copy all of the CWL files to the snapshot/ directory.""" | |
763 self.self_check() | |
764 for key, value in prov_dep.items(): | |
765 if key == "location" and cast(str, value).split("/")[-1]: | |
766 location = cast(str, value) | |
767 filename = location.split("/")[-1] | |
768 path = os.path.join(self.folder, SNAPSHOT, filename) | |
769 filepath = "" | |
770 if "file://" in location: | |
771 filepath = location[7:] | |
772 else: | |
773 filepath = location | |
774 | |
775 # FIXME: What if destination path already exists? | |
776 if os.path.exists(filepath): | |
777 try: | |
778 if os.path.isdir(filepath): | |
779 shutil.copytree(filepath, path) | |
780 else: | |
781 shutil.copy(filepath, path) | |
782 timestamp = datetime.datetime.fromtimestamp( | |
783 os.path.getmtime(filepath) | |
784 ) | |
785 self.add_tagfile(path, timestamp) | |
786 except PermissionError: | |
787 pass # FIXME: avoids duplicate snapshotting; need better solution | |
788 elif key in ("secondaryFiles", "listing"): | |
789 for files in cast(MutableSequence[CWLObjectType], value): | |
790 if isinstance(files, MutableMapping): | |
791 self.generate_snapshot(files) | |
792 else: | |
793 pass | |
794 | |
795 def packed_workflow(self, packed: str) -> None: | |
796 """Pack CWL description to generate re-runnable CWL object in RO.""" | |
797 self.self_check() | |
798 rel_path = str(PurePosixPath(WORKFLOW) / "packed.cwl") | |
799 # Write as binary | |
800 with self.write_bag_file(rel_path, encoding=None) as write_pack: | |
801 write_pack.write(packed) | |
802 _logger.debug("[provenance] Added packed workflow: %s", rel_path) | |
803 | |
804 def has_data_file(self, sha1hash: str) -> bool: | |
805 """Confirm the presence of the given file in the RO.""" | |
806 folder = os.path.join(self.folder, DATA, sha1hash[0:2]) | |
807 hash_path = os.path.join(folder, sha1hash) | |
808 return os.path.isfile(hash_path) | |
809 | |
810 def add_data_file( | |
811 self, | |
812 from_fp: IO[Any], | |
813 timestamp: Optional[datetime.datetime] = None, | |
814 content_type: Optional[str] = None, | |
815 ) -> str: | |
816 """Copy inputs to data/ folder.""" | |
817 self.self_check() | |
818 tmp_dir, tmp_prefix = os.path.split(self.temp_prefix) | |
819 with tempfile.NamedTemporaryFile( | |
820 prefix=tmp_prefix, dir=tmp_dir, delete=False | |
821 ) as tmp: | |
822 checksum = checksum_copy(from_fp, tmp) | |
823 | |
824 # Calculate hash-based file path | |
825 folder = os.path.join(self.folder, DATA, checksum[0:2]) | |
826 path = os.path.join(folder, checksum) | |
827 # os.rename assumed safe, as our temp file should | |
828 # be in same file system as our temp folder | |
829 if not os.path.isdir(folder): | |
830 os.makedirs(folder) | |
831 os.rename(tmp.name, path) | |
832 | |
833 # Relative posix path | |
834 # (to avoid \ on Windows) | |
835 rel_path = posix_path(os.path.relpath(path, self.folder)) | |
836 | |
837 # Register in bagit checksum | |
838 if Hasher == hashlib.sha1: | |
839 self._add_to_bagit(rel_path, sha1=checksum) | |
840 else: | |
841 _logger.warning( | |
842 "[provenance] Unknown hash method %s for bagit manifest", Hasher | |
843 ) | |
844 # Inefficient, bagit support need to checksum again | |
845 self._add_to_bagit(rel_path) | |
846 _logger.debug("[provenance] Added data file %s", path) | |
847 if timestamp is not None: | |
848 createdOn, createdBy = self._self_made(timestamp) | |
849 self._file_provenance[rel_path] = cast( | |
850 Aggregate, {"createdOn": createdOn, "createdBy": createdBy} | |
851 ) | |
852 _logger.debug("[provenance] Relative path for data file %s", rel_path) | |
853 | |
854 if content_type is not None: | |
855 self._content_types[rel_path] = content_type | |
856 return rel_path | |
857 | |
858 def _self_made( | |
859 self, timestamp: Optional[datetime.datetime] = None | |
860 ) -> Tuple[str, Dict[str, str]]: # createdOn, createdBy | |
861 if timestamp is None: | |
862 timestamp = datetime.datetime.now() | |
863 return ( | |
864 timestamp.isoformat(), | |
865 {"uri": self.engine_uuid, "name": self.cwltool_version}, | |
866 ) | |
867 | |
868 def add_to_manifest(self, rel_path: str, checksums: Dict[str, str]) -> None: | |
869 """Add files to the research object manifest.""" | |
870 self.self_check() | |
871 if PurePosixPath(rel_path).is_absolute(): | |
872 raise ValueError("rel_path must be relative: %s" % rel_path) | |
873 | |
874 if os.path.commonprefix(["data/", rel_path]) == "data/": | |
875 # payload file, go to manifest | |
876 manifest = "manifest" | |
877 else: | |
878 # metadata file, go to tag manifest | |
879 manifest = "tagmanifest" | |
880 | |
881 # Add checksums to corresponding manifest files | |
882 for (method, hash_value) in checksums.items(): | |
883 # File not in manifest because we bailed out on | |
884 # existence in bagged_size above | |
885 manifestpath = os.path.join(self.folder, f"{manifest}-{method.lower()}.txt") | |
886 # encoding: match Tag-File-Character-Encoding: UTF-8 | |
887 # newline: ensure LF also on Windows | |
888 with open( | |
889 manifestpath, "a", encoding=ENCODING, newline="\n" | |
890 ) as checksum_file: | |
891 line = f"{hash_value} {rel_path}\n" | |
892 _logger.debug("[provenance] Added to %s: %s", manifestpath, line) | |
893 checksum_file.write(line) | |
894 | |
895 def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: | |
896 if PurePosixPath(rel_path).is_absolute(): | |
897 raise ValueError("rel_path must be relative: %s" % rel_path) | |
898 lpath = os.path.join(self.folder, local_path(rel_path)) | |
899 if not os.path.exists(lpath): | |
900 raise OSError(f"File {rel_path} does not exist within RO: {lpath}") | |
901 | |
902 if rel_path in self.bagged_size: | |
903 # Already added, assume checksum OK | |
904 return | |
905 self.bagged_size[rel_path] = os.path.getsize(lpath) | |
906 | |
907 if SHA1 not in checksums: | |
908 # ensure we always have sha1 | |
909 checksums = dict(checksums) | |
910 with open(lpath, "rb") as file_path: | |
911 # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? | |
912 checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) | |
913 | |
914 self.add_to_manifest(rel_path, checksums) | |
915 | |
916 def create_job( | |
917 self, builder_job: CWLObjectType, is_output: bool = False | |
918 ) -> CWLObjectType: | |
919 # TODO customise the file | |
920 """Generate the new job object with RO specific relative paths.""" | |
921 copied = copy.deepcopy(builder_job) | |
922 relativised_input_objecttemp = {} # type: CWLObjectType | |
923 self._relativise_files(copied) | |
924 | |
925 def jdefault(o: Any) -> Dict[Any, Any]: | |
926 return dict(o) | |
927 | |
928 if is_output: | |
929 rel_path = PurePosixPath(WORKFLOW) / "primary-output.json" | |
930 else: | |
931 rel_path = PurePosixPath(WORKFLOW) / "primary-job.json" | |
932 j = json_dumps(copied, indent=4, ensure_ascii=False, default=jdefault) | |
933 with self.write_bag_file(str(rel_path)) as file_path: | |
934 file_path.write(j + "\n") | |
935 _logger.debug("[provenance] Generated customised job file: %s", rel_path) | |
936 # Generate dictionary with keys as workflow level input IDs and values | |
937 # as | |
938 # 1) for files the relativised location containing hash | |
939 # 2) for other attributes, the actual value. | |
940 for key, value in copied.items(): | |
941 if isinstance(value, MutableMapping): | |
942 if value.get("class") in ("File", "Directory"): | |
943 relativised_input_objecttemp[key] = value | |
944 else: | |
945 relativised_input_objecttemp[key] = value | |
946 self.relativised_input_object.update( | |
947 {k: v for k, v in relativised_input_objecttemp.items() if v} | |
948 ) | |
949 return self.relativised_input_object | |
950 | |
951 def _relativise_files( | |
952 self, | |
953 structure: Union[CWLObjectType, CWLOutputType, MutableSequence[CWLObjectType]], | |
954 ) -> None: | |
955 """Save any file objects into the RO and update the local paths.""" | |
956 # Base case - we found a File we need to update | |
957 _logger.debug("[provenance] Relativising: %s", structure) | |
958 | |
959 if isinstance(structure, MutableMapping): | |
960 if structure.get("class") == "File": | |
961 relative_path = None # type: Optional[Union[str, PurePosixPath]] | |
962 if "checksum" in structure: | |
963 raw_checksum = cast(str, structure["checksum"]) | |
964 alg, checksum = raw_checksum.split("$") | |
965 if alg != SHA1: | |
966 raise TypeError( | |
967 "Only SHA1 CWL checksums are currently supported: " | |
968 "{}".format(structure) | |
969 ) | |
970 if self.has_data_file(checksum): | |
971 prefix = checksum[0:2] | |
972 relative_path = PurePosixPath("data") / prefix / checksum | |
973 | |
974 if not (relative_path is not None and "location" in structure): | |
975 # Register in RO; but why was this not picked | |
976 # up by used_artefacts? | |
977 _logger.info("[provenance] Adding to RO %s", structure["location"]) | |
978 with self.fsaccess.open( | |
979 cast(str, structure["location"]), "rb" | |
980 ) as fp: | |
981 relative_path = self.add_data_file(fp) | |
982 checksum = PurePosixPath(relative_path).name | |
983 structure["checksum"] = f"{SHA1}${checksum}" | |
984 if relative_path is not None: | |
985 # RO-relative path as new location | |
986 structure["location"] = str(PurePosixPath("..") / relative_path) | |
987 else: | |
988 _logger.warning( | |
989 "Could not determine RO path for file %s", structure | |
990 ) | |
991 if "path" in structure: | |
992 del structure["path"] | |
993 | |
994 if structure.get("class") == "Directory": | |
995 # TODO: Generate anonymoys Directory with a "listing" | |
996 # pointing to the hashed files | |
997 del structure["location"] | |
998 | |
999 for val in structure.values(): | |
1000 try: | |
1001 self._relativise_files(cast(CWLOutputType, val)) | |
1002 except OSError: | |
1003 pass | |
1004 return | |
1005 | |
1006 if isinstance(structure, MutableSequence): | |
1007 for obj in structure: | |
1008 # Recurse and rewrite any nested File objects | |
1009 self._relativise_files(cast(CWLOutputType, obj)) | |
1010 | |
1011 def close(self, save_to: Optional[str] = None) -> None: | |
1012 """Close the Research Object, optionally saving to specified folder. | |
1013 | |
1014 Closing will remove any temporary files used by this research object. | |
1015 After calling this method, this ResearchObject instance can no longer | |
1016 be used, except for no-op calls to .close(). | |
1017 | |
1018 The 'saveTo' folder should not exist - if it does, it will be deleted. | |
1019 | |
1020 It is safe to call this function multiple times without the | |
1021 'saveTo' argument, e.g. within a try..finally block to | |
1022 ensure the temporary files of this Research Object are removed. | |
1023 """ | |
1024 if save_to is None: | |
1025 if not self.closed: | |
1026 _logger.debug("[provenance] Deleting temporary %s", self.folder) | |
1027 shutil.rmtree(self.folder, ignore_errors=True) | |
1028 else: | |
1029 save_to = os.path.abspath(save_to) | |
1030 _logger.info("[provenance] Finalizing Research Object") | |
1031 self._finalize() # write manifest etc. | |
1032 # TODO: Write as archive (.zip or .tar) based on extension? | |
1033 | |
1034 if os.path.isdir(save_to): | |
1035 _logger.info("[provenance] Deleting existing %s", save_to) | |
1036 shutil.rmtree(save_to) | |
1037 shutil.move(self.folder, save_to) | |
1038 _logger.info("[provenance] Research Object saved to %s", save_to) | |
1039 self.folder = save_to | |
1040 self.closed = True | |
1041 | |
1042 | |
1043 def checksum_copy( | |
1044 src_file: IO[Any], | |
1045 dst_file: Optional[IO[Any]] = None, | |
1046 hasher=Hasher, # type: Callable[[], hashlib._Hash] | |
1047 buffersize: int = 1024 * 1024, | |
1048 ) -> str: | |
1049 """Compute checksums while copying a file.""" | |
1050 # TODO: Use hashlib.new(Hasher_str) instead? | |
1051 checksum = hasher() | |
1052 contents = src_file.read(buffersize) | |
1053 if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): | |
1054 temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) | |
1055 try: | |
1056 os.rename(dst_file.name, temp_location) | |
1057 os.link(src_file.name, dst_file.name) | |
1058 dst_file = None | |
1059 os.unlink(temp_location) | |
1060 except OSError: | |
1061 pass | |
1062 if os.path.exists(temp_location): | |
1063 os.rename(temp_location, dst_file.name) # type: ignore | |
1064 while contents != b"": | |
1065 if dst_file is not None: | |
1066 dst_file.write(contents) | |
1067 checksum.update(contents) | |
1068 contents = src_file.read(buffersize) | |
1069 if dst_file is not None: | |
1070 dst_file.flush() | |
1071 return checksum.hexdigest().lower() |