Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/cwltool/provenance.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
| author | shellac |
|---|---|
| date | Mon, 22 Mar 2021 18:12:50 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:4f3585e2f14b |
|---|---|
| 1 """Stores Research Object including provenance.""" | |
| 2 | |
| 3 import copy | |
| 4 import datetime | |
| 5 import hashlib | |
| 6 import os | |
| 7 import re | |
| 8 import shutil | |
| 9 import tempfile | |
| 10 import uuid | |
| 11 from array import array | |
| 12 from collections import OrderedDict | |
| 13 from getpass import getuser | |
| 14 from io import FileIO, TextIOWrapper | |
| 15 from mmap import mmap | |
| 16 from pathlib import Path, PurePosixPath | |
| 17 from typing import ( | |
| 18 IO, | |
| 19 Any, | |
| 20 BinaryIO, | |
| 21 Callable, | |
| 22 Dict, | |
| 23 List, | |
| 24 MutableMapping, | |
| 25 MutableSequence, | |
| 26 Optional, | |
| 27 Set, | |
| 28 Tuple, | |
| 29 Union, | |
| 30 cast, | |
| 31 ) | |
| 32 | |
| 33 import prov.model as provM | |
| 34 from prov.model import PROV, ProvDocument | |
| 35 from schema_salad.utils import json_dumps | |
| 36 from typing_extensions import TYPE_CHECKING, TypedDict | |
| 37 | |
| 38 from .loghandler import _logger | |
| 39 from .provenance_constants import ( | |
| 40 ACCOUNT_UUID, | |
| 41 CWLPROV, | |
| 42 CWLPROV_VERSION, | |
| 43 DATA, | |
| 44 ENCODING, | |
| 45 FOAF, | |
| 46 LOGS, | |
| 47 METADATA, | |
| 48 ORCID, | |
| 49 PROVENANCE, | |
| 50 SHA1, | |
| 51 SHA256, | |
| 52 SHA512, | |
| 53 SNAPSHOT, | |
| 54 TEXT_PLAIN, | |
| 55 USER_UUID, | |
| 56 UUID, | |
| 57 WORKFLOW, | |
| 58 Hasher, | |
| 59 ) | |
| 60 from .stdfsaccess import StdFsAccess | |
| 61 from .utils import ( | |
| 62 CWLObjectType, | |
| 63 CWLOutputType, | |
| 64 create_tmp_dir, | |
| 65 local_path, | |
| 66 onWindows, | |
| 67 posix_path, | |
| 68 versionstring, | |
| 69 ) | |
| 70 | |
| 71 # imports needed for retrieving user data | |
| 72 if onWindows(): | |
| 73 import ctypes # pylint: disable=unused-import | |
| 74 else: | |
| 75 try: | |
| 76 import pwd # pylint: disable=unused-import | |
| 77 except ImportError: | |
| 78 pass | |
| 79 | |
| 80 if TYPE_CHECKING: | |
| 81 from .command_line_tool import ( # pylint: disable=unused-import | |
| 82 CommandLineTool, | |
| 83 ExpressionTool, | |
| 84 ) | |
| 85 from .workflow import Workflow # pylint: disable=unused-import | |
| 86 | |
| 87 | |
| 88 def _whoami() -> Tuple[str, str]: | |
| 89 """Return the current operating system account as (username, fullname).""" | |
| 90 username = getuser() | |
| 91 try: | |
| 92 if onWindows(): | |
| 93 get_user_name = ctypes.windll.secur32.GetUserNameExW # type: ignore | |
| 94 size = ctypes.pointer(ctypes.c_ulong(0)) | |
| 95 get_user_name(3, None, size) | |
| 96 | |
| 97 name_buffer = ctypes.create_unicode_buffer(size.contents.value) | |
| 98 get_user_name(3, name_buffer, size) | |
| 99 fullname = str(name_buffer.value) | |
| 100 else: | |
| 101 fullname = pwd.getpwuid(os.getuid())[4].split(",")[0] | |
| 102 except (KeyError, IndexError): | |
| 103 fullname = username | |
| 104 | |
| 105 return (username, fullname) | |
| 106 | |
| 107 | |
| 108 class WritableBagFile(FileIO): | |
| 109 """Writes files in research object.""" | |
| 110 | |
| 111 def __init__(self, research_object: "ResearchObject", rel_path: str) -> None: | |
| 112 """Initialize an ROBagIt.""" | |
| 113 self.research_object = research_object | |
| 114 if Path(rel_path).is_absolute(): | |
| 115 raise ValueError("rel_path must be relative: %s" % rel_path) | |
| 116 self.rel_path = rel_path | |
| 117 self.hashes = { | |
| 118 SHA1: hashlib.sha1(), # nosec | |
| 119 SHA256: hashlib.sha256(), | |
| 120 SHA512: hashlib.sha512(), | |
| 121 } | |
| 122 # Open file in Research Object folder | |
| 123 path = os.path.abspath( | |
| 124 os.path.join(research_object.folder, local_path(rel_path)) | |
| 125 ) | |
| 126 if not path.startswith(os.path.abspath(research_object.folder)): | |
| 127 raise ValueError("Path is outside Research Object: %s" % path) | |
| 128 _logger.debug("[provenance] Creating WritableBagFile at %s.", path) | |
| 129 super().__init__(path, mode="w") | |
| 130 | |
| 131 def write(self, b: Any) -> int: | |
| 132 """Write some content to the Bag.""" | |
| 133 real_b = b if isinstance(b, (bytes, mmap, array)) else b.encode("utf-8") | |
| 134 total = 0 | |
| 135 length = len(real_b) | |
| 136 while total < length: | |
| 137 ret = super().write(real_b) | |
| 138 if ret: | |
| 139 total += ret | |
| 140 for val in self.hashes.values(): | |
| 141 # print("Updating hasher %s ", val) | |
| 142 val.update(real_b) | |
| 143 return total | |
| 144 | |
| 145 def close(self) -> None: | |
| 146 # FIXME: Convert below block to a ResearchObject method? | |
| 147 if self.rel_path.startswith("data/"): | |
| 148 self.research_object.bagged_size[self.rel_path] = self.tell() | |
| 149 else: | |
| 150 self.research_object.tagfiles.add(self.rel_path) | |
| 151 | |
| 152 super().close() | |
| 153 # { "sha1": "f572d396fae9206628714fb2ce00f72e94f2258f" } | |
| 154 checksums = {} | |
| 155 for name in self.hashes: | |
| 156 checksums[name] = self.hashes[name].hexdigest().lower() | |
| 157 self.research_object.add_to_manifest(self.rel_path, checksums) | |
| 158 | |
| 159 # To simplify our hash calculation we won't support | |
| 160 # seeking, reading or truncating, as we can't do | |
| 161 # similar seeks in the current hash. | |
| 162 # TODO: Support these? At the expense of invalidating | |
| 163 # the current hash, then having to recalculate at close() | |
| 164 def seekable(self) -> bool: | |
| 165 return False | |
| 166 | |
| 167 def readable(self) -> bool: | |
| 168 return False | |
| 169 | |
| 170 def truncate(self, size: Optional[int] = None) -> int: | |
| 171 # FIXME: This breaks contract IOBase, | |
| 172 # as it means we would have to recalculate the hash | |
| 173 if size is not None: | |
| 174 raise OSError("WritableBagFile can't truncate") | |
| 175 return self.tell() | |
| 176 | |
| 177 | |
| 178 def _check_mod_11_2(numeric_string: str) -> bool: | |
| 179 """ | |
| 180 Validate numeric_string for its MOD-11-2 checksum. | |
| 181 | |
| 182 Any "-" in the numeric_string are ignored. | |
| 183 | |
| 184 The last digit of numeric_string is assumed to be the checksum, 0-9 or X. | |
| 185 | |
| 186 See ISO/IEC 7064:2003 and | |
| 187 https://support.orcid.org/knowledgebase/articles/116780-structure-of-the-orcid-identifier | |
| 188 """ | |
| 189 # Strip - | |
| 190 nums = numeric_string.replace("-", "") | |
| 191 total = 0 | |
| 192 # skip last (check)digit | |
| 193 for num in nums[:-1]: | |
| 194 digit = int(num) | |
| 195 total = (total + digit) * 2 | |
| 196 remainder = total % 11 | |
| 197 result = (12 - remainder) % 11 | |
| 198 if result == 10: | |
| 199 checkdigit = "X" | |
| 200 else: | |
| 201 checkdigit = str(result) | |
| 202 # Compare against last digit or X | |
| 203 return nums[-1].upper() == checkdigit | |
| 204 | |
| 205 | |
| 206 def _valid_orcid(orcid: Optional[str]) -> str: | |
| 207 """ | |
| 208 Ensure orcid is a valid ORCID identifier. | |
| 209 | |
| 210 The string must be equivalent to one of these forms: | |
| 211 | |
| 212 0000-0002-1825-0097 | |
| 213 orcid.org/0000-0002-1825-0097 | |
| 214 http://orcid.org/0000-0002-1825-0097 | |
| 215 https://orcid.org/0000-0002-1825-0097 | |
| 216 | |
| 217 If the ORCID number or prefix is invalid, a ValueError is raised. | |
| 218 | |
| 219 The returned ORCID string is always in the form of: | |
| 220 https://orcid.org/0000-0002-1825-0097 | |
| 221 """ | |
| 222 if orcid is None or not orcid: | |
| 223 raise ValueError("ORCID cannot be unspecified") | |
| 224 # Liberal in what we consume, e.g. ORCID.org/0000-0002-1825-009x | |
| 225 orcid = orcid.lower() | |
| 226 match = re.match( | |
| 227 # Note: concatinated r"" r"" below so we can add comments to pattern | |
| 228 # Optional hostname, with or without protocol | |
| 229 r"(http://orcid\.org/|https://orcid\.org/|orcid\.org/)?" | |
| 230 # alternative pattern, but probably messier | |
| 231 # r"^((https?://)?orcid.org/)?" | |
| 232 # ORCID number is always 4x4 numerical digits, | |
| 233 # but last digit (modulus 11 checksum) | |
| 234 # can also be X (but we made it lowercase above). | |
| 235 # e.g. 0000-0002-1825-0097 | |
| 236 # or 0000-0002-1694-233x | |
| 237 r"(?P<orcid>(\d{4}-\d{4}-\d{4}-\d{3}[0-9x]))$", | |
| 238 orcid, | |
| 239 ) | |
| 240 | |
| 241 help_url = ( | |
| 242 "https://support.orcid.org/knowledgebase/articles/" | |
| 243 "116780-structure-of-the-orcid-identifier" | |
| 244 ) | |
| 245 if not match: | |
| 246 raise ValueError(f"Invalid ORCID: {orcid}\n{help_url}") | |
| 247 | |
| 248 # Conservative in what we produce: | |
| 249 # a) Ensure any checksum digit is uppercase | |
| 250 orcid_num = match.group("orcid").upper() | |
| 251 # b) ..and correct | |
| 252 if not _check_mod_11_2(orcid_num): | |
| 253 raise ValueError(f"Invalid ORCID checksum: {orcid_num}\n{help_url}") | |
| 254 | |
| 255 # c) Re-add the official prefix https://orcid.org/ | |
| 256 return "https://orcid.org/%s" % orcid_num | |
| 257 | |
| 258 | |
| 259 Annotation = TypedDict( | |
| 260 "Annotation", | |
| 261 { | |
| 262 "uri": str, | |
| 263 "about": str, | |
| 264 "content": Optional[Union[str, List[str]]], | |
| 265 "oa:motivatedBy": Dict[str, str], | |
| 266 }, | |
| 267 ) | |
| 268 Aggregate = TypedDict( | |
| 269 "Aggregate", | |
| 270 { | |
| 271 "uri": Optional[str], | |
| 272 "bundledAs": Optional[Dict[str, Any]], | |
| 273 "mediatype": Optional[str], | |
| 274 "conformsTo": Optional[Union[str, List[str]]], | |
| 275 "createdOn": Optional[str], | |
| 276 "createdBy": Optional[Dict[str, str]], | |
| 277 }, | |
| 278 total=False, | |
| 279 ) | |
| 280 # Aggregate.bundledAs is actually type Aggregate, but cyclic definitions are not suported | |
| 281 AuthoredBy = TypedDict( | |
| 282 "AuthoredBy", | |
| 283 {"orcid": Optional[str], "name": Optional[str], "uri": Optional[str]}, | |
| 284 total=False, | |
| 285 ) | |
| 286 | |
| 287 | |
| 288 class ResearchObject: | |
| 289 """CWLProv Research Object.""" | |
| 290 | |
| 291 def __init__( | |
| 292 self, | |
| 293 fsaccess: StdFsAccess, | |
| 294 temp_prefix_ro: str = "tmp", | |
| 295 orcid: str = "", | |
| 296 full_name: str = "", | |
| 297 ) -> None: | |
| 298 """Initialize the ResearchObject.""" | |
| 299 self.temp_prefix = temp_prefix_ro | |
| 300 self.orcid = "" if not orcid else _valid_orcid(orcid) | |
| 301 self.full_name = full_name | |
| 302 self.folder = create_tmp_dir(temp_prefix_ro) | |
| 303 self.closed = False | |
| 304 # map of filename "data/de/alsdklkas": 12398123 bytes | |
| 305 self.bagged_size = {} # type: Dict[str, int] | |
| 306 self.tagfiles = set() # type: Set[str] | |
| 307 self._file_provenance = {} # type: Dict[str, Aggregate] | |
| 308 self._external_aggregates = [] # type: List[Aggregate] | |
| 309 self.annotations = [] # type: List[Annotation] | |
| 310 self._content_types = {} # type: Dict[str,str] | |
| 311 self.fsaccess = fsaccess | |
| 312 # These should be replaced by generate_prov_doc when workflow/run IDs are known: | |
| 313 self.engine_uuid = "urn:uuid:%s" % uuid.uuid4() | |
| 314 self.ro_uuid = uuid.uuid4() | |
| 315 self.base_uri = "arcp://uuid,%s/" % self.ro_uuid | |
| 316 self.cwltool_version = "cwltool %s" % versionstring().split()[-1] | |
| 317 ## | |
| 318 self.relativised_input_object = {} # type: CWLObjectType | |
| 319 | |
| 320 self._initialize() | |
| 321 _logger.debug("[provenance] Temporary research object: %s", self.folder) | |
| 322 | |
| 323 def self_check(self) -> None: | |
| 324 """Raise ValueError if this RO is closed.""" | |
| 325 if self.closed: | |
| 326 raise ValueError( | |
| 327 "This ResearchObject has already been closed and is not " | |
| 328 "available for futher manipulation." | |
| 329 ) | |
| 330 | |
| 331 def __str__(self) -> str: | |
| 332 """Represent this RO as a string.""" | |
| 333 return f"ResearchObject <{self.ro_uuid}> in <{self.folder}>" | |
| 334 | |
| 335 def _initialize(self) -> None: | |
| 336 for research_obj_folder in ( | |
| 337 METADATA, | |
| 338 DATA, | |
| 339 WORKFLOW, | |
| 340 SNAPSHOT, | |
| 341 PROVENANCE, | |
| 342 LOGS, | |
| 343 ): | |
| 344 os.makedirs(os.path.join(self.folder, research_obj_folder)) | |
| 345 self._initialize_bagit() | |
| 346 | |
| 347 def _initialize_bagit(self) -> None: | |
| 348 """Write fixed bagit header.""" | |
| 349 self.self_check() | |
| 350 bagit = os.path.join(self.folder, "bagit.txt") | |
| 351 # encoding: always UTF-8 (although ASCII would suffice here) | |
| 352 # newline: ensure LF also on Windows | |
| 353 with open(bagit, "w", encoding=ENCODING, newline="\n") as bag_it_file: | |
| 354 # TODO: \n or \r\n ? | |
| 355 bag_it_file.write("BagIt-Version: 0.97\n") | |
| 356 bag_it_file.write("Tag-File-Character-Encoding: %s\n" % ENCODING) | |
| 357 | |
| 358 def open_log_file_for_activity( | |
| 359 self, uuid_uri: str | |
| 360 ) -> Union[TextIOWrapper, WritableBagFile]: | |
| 361 self.self_check() | |
| 362 # Ensure valid UUID for safe filenames | |
| 363 activity_uuid = uuid.UUID(uuid_uri) | |
| 364 if activity_uuid.urn == self.engine_uuid: | |
| 365 # It's the engine aka cwltool! | |
| 366 name = "engine" | |
| 367 else: | |
| 368 name = "activity" | |
| 369 p = os.path.join(LOGS, f"{name}.{activity_uuid}.txt") | |
| 370 _logger.debug(f"[provenance] Opening log file for {name}: {p}") | |
| 371 self.add_annotation(activity_uuid.urn, [p], CWLPROV["log"].uri) | |
| 372 return self.write_bag_file(p) | |
| 373 | |
| 374 def _finalize(self) -> None: | |
| 375 self._write_ro_manifest() | |
| 376 self._write_bag_info() | |
| 377 | |
| 378 def user_provenance(self, document: ProvDocument) -> None: | |
| 379 """Add the user provenance.""" | |
| 380 self.self_check() | |
| 381 (username, fullname) = _whoami() | |
| 382 | |
| 383 if not self.full_name: | |
| 384 self.full_name = fullname | |
| 385 | |
| 386 document.add_namespace(UUID) | |
| 387 document.add_namespace(ORCID) | |
| 388 document.add_namespace(FOAF) | |
| 389 account = document.agent( | |
| 390 ACCOUNT_UUID, | |
| 391 { | |
| 392 provM.PROV_TYPE: FOAF["OnlineAccount"], | |
| 393 "prov:label": username, | |
| 394 FOAF["accountName"]: username, | |
| 395 }, | |
| 396 ) | |
| 397 | |
| 398 user = document.agent( | |
| 399 self.orcid or USER_UUID, | |
| 400 { | |
| 401 provM.PROV_TYPE: PROV["Person"], | |
| 402 "prov:label": self.full_name, | |
| 403 FOAF["name"]: self.full_name, | |
| 404 FOAF["account"]: account, | |
| 405 }, | |
| 406 ) | |
| 407 # cwltool may be started on the shell (directly by user), | |
| 408 # by shell script (indirectly by user) | |
| 409 # or from a different program | |
| 410 # (which again is launched by any of the above) | |
| 411 # | |
| 412 # We can't tell in which way, but ultimately we're still | |
| 413 # acting in behalf of that user (even if we might | |
| 414 # get their name wrong!) | |
| 415 document.actedOnBehalfOf(account, user) | |
| 416 | |
| 417 def write_bag_file( | |
| 418 self, path: str, encoding: Optional[str] = ENCODING | |
| 419 ) -> Union[TextIOWrapper, WritableBagFile]: | |
| 420 """Write the bag file into our research object.""" | |
| 421 self.self_check() | |
| 422 # For some reason below throws BlockingIOError | |
| 423 # fp = BufferedWriter(WritableBagFile(self, path)) | |
| 424 bag_file = WritableBagFile(self, path) | |
| 425 if encoding is not None: | |
| 426 # encoding: match Tag-File-Character-Encoding: UTF-8 | |
| 427 # newline: ensure LF also on Windows | |
| 428 return TextIOWrapper( | |
| 429 cast(BinaryIO, bag_file), encoding=encoding, newline="\n" | |
| 430 ) | |
| 431 return bag_file | |
| 432 | |
| 433 def add_tagfile( | |
| 434 self, path: str, timestamp: Optional[datetime.datetime] = None | |
| 435 ) -> None: | |
| 436 """Add tag files to our research object.""" | |
| 437 self.self_check() | |
| 438 checksums = {} | |
| 439 # Read file to calculate its checksum | |
| 440 if os.path.isdir(path): | |
| 441 return | |
| 442 # FIXME: do the right thing for directories | |
| 443 with open(path, "rb") as tag_file: | |
| 444 # FIXME: Should have more efficient open_tagfile() that | |
| 445 # does all checksums in one go while writing through, | |
| 446 # adding checksums after closing. | |
| 447 # Below probably OK for now as metadata files | |
| 448 # are not too large..? | |
| 449 | |
| 450 checksums[SHA1] = checksum_copy(tag_file, hasher=hashlib.sha1) | |
| 451 | |
| 452 tag_file.seek(0) | |
| 453 checksums[SHA256] = checksum_copy(tag_file, hasher=hashlib.sha256) | |
| 454 | |
| 455 tag_file.seek(0) | |
| 456 checksums[SHA512] = checksum_copy(tag_file, hasher=hashlib.sha512) | |
| 457 | |
| 458 rel_path = posix_path(os.path.relpath(path, self.folder)) | |
| 459 self.tagfiles.add(rel_path) | |
| 460 self.add_to_manifest(rel_path, checksums) | |
| 461 if timestamp is not None: | |
| 462 self._file_provenance[rel_path] = { | |
| 463 "createdOn": timestamp.isoformat(), | |
| 464 "uri": None, | |
| 465 "bundledAs": None, | |
| 466 "mediatype": None, | |
| 467 "conformsTo": None, | |
| 468 } | |
| 469 | |
| 470 def _ro_aggregates(self) -> List[Aggregate]: | |
| 471 """Gather dictionary of files to be added to the manifest.""" | |
| 472 | |
| 473 def guess_mediatype( | |
| 474 rel_path: str, | |
| 475 ) -> Tuple[Optional[str], Optional[Union[str, List[str]]]]: | |
| 476 """Return the mediatypes.""" | |
| 477 media_types = { | |
| 478 # Adapted from | |
| 479 # https://w3id.org/bundle/2014-11-05/#media-types | |
| 480 "txt": TEXT_PLAIN, | |
| 481 "ttl": 'text/turtle; charset="UTF-8"', | |
| 482 "rdf": "application/rdf+xml", | |
| 483 "json": "application/json", | |
| 484 "jsonld": "application/ld+json", | |
| 485 "xml": "application/xml", | |
| 486 ## | |
| 487 "cwl": 'text/x+yaml; charset="UTF-8"', | |
| 488 "provn": 'text/provenance-notation; charset="UTF-8"', | |
| 489 "nt": "application/n-triples", | |
| 490 } # type: Dict[str, str] | |
| 491 conforms_to = { | |
| 492 "provn": "http://www.w3.org/TR/2013/REC-prov-n-20130430/", | |
| 493 "cwl": "https://w3id.org/cwl/", | |
| 494 } # type: Dict[str, str] | |
| 495 | |
| 496 prov_conforms_to = { | |
| 497 "provn": "http://www.w3.org/TR/2013/REC-prov-n-20130430/", | |
| 498 "rdf": "http://www.w3.org/TR/2013/REC-prov-o-20130430/", | |
| 499 "ttl": "http://www.w3.org/TR/2013/REC-prov-o-20130430/", | |
| 500 "nt": "http://www.w3.org/TR/2013/REC-prov-o-20130430/", | |
| 501 "jsonld": "http://www.w3.org/TR/2013/REC-prov-o-20130430/", | |
| 502 "xml": "http://www.w3.org/TR/2013/NOTE-prov-xml-20130430/", | |
| 503 "json": "http://www.w3.org/Submission/2013/SUBM-prov-json-20130424/", | |
| 504 } # type: Dict[str, str] | |
| 505 | |
| 506 extension = rel_path.rsplit(".", 1)[-1].lower() # type: Optional[str] | |
| 507 if extension == rel_path: | |
| 508 # No ".", no extension | |
| 509 extension = None | |
| 510 | |
| 511 mediatype = None # type: Optional[str] | |
| 512 conformsTo = None # type: Optional[Union[str, List[str]]] | |
| 513 if extension in media_types: | |
| 514 mediatype = media_types[extension] | |
| 515 | |
| 516 if extension in conforms_to: | |
| 517 # TODO: Open CWL file to read its declared "cwlVersion", e.g. | |
| 518 # cwlVersion = "v1.0" | |
| 519 conformsTo = conforms_to[extension] | |
| 520 | |
| 521 if ( | |
| 522 rel_path.startswith(posix_path(PROVENANCE)) | |
| 523 and extension in prov_conforms_to | |
| 524 ): | |
| 525 if ".cwlprov" in rel_path: | |
| 526 # Our own! | |
| 527 conformsTo = [ | |
| 528 prov_conforms_to[extension], | |
| 529 CWLPROV_VERSION, | |
| 530 ] | |
| 531 else: | |
| 532 # Some other PROV | |
| 533 # TODO: Recognize ProvOne etc. | |
| 534 conformsTo = prov_conforms_to[extension] | |
| 535 return (mediatype, conformsTo) | |
| 536 | |
| 537 aggregates = [] # type: List[Aggregate] | |
| 538 for path in self.bagged_size.keys(): | |
| 539 | |
| 540 temp_path = PurePosixPath(path) | |
| 541 folder = temp_path.parent | |
| 542 filename = temp_path.name | |
| 543 | |
| 544 # NOTE: Here we end up aggregating the abstract | |
| 545 # data items by their sha1 hash, so that it matches | |
| 546 # the entity() in the prov files. | |
| 547 | |
| 548 # TODO: Change to nih:sha-256; hashes | |
| 549 # https://tools.ietf.org/html/rfc6920#section-7 | |
| 550 aggregate_dict = { | |
| 551 "uri": "urn:hash::sha1:" + filename, | |
| 552 "bundledAs": { | |
| 553 # The arcp URI is suitable ORE proxy; local to this Research Object. | |
| 554 # (as long as we don't also aggregate it by relative path!) | |
| 555 "uri": self.base_uri + path, | |
| 556 # relate it to the data/ path | |
| 557 "folder": "/%s/" % folder, | |
| 558 "filename": filename, | |
| 559 }, | |
| 560 } # type: Aggregate | |
| 561 if path in self._file_provenance: | |
| 562 # Made by workflow run, merge captured provenance | |
| 563 bundledAs = aggregate_dict["bundledAs"] | |
| 564 if bundledAs: | |
| 565 bundledAs.update(self._file_provenance[path]) | |
| 566 else: | |
| 567 aggregate_dict["bundledAs"] = cast( | |
| 568 Optional[Dict[str, Any]], self._file_provenance[path] | |
| 569 ) | |
| 570 else: | |
| 571 # Probably made outside wf run, part of job object? | |
| 572 pass | |
| 573 if path in self._content_types: | |
| 574 aggregate_dict["mediatype"] = self._content_types[path] | |
| 575 | |
| 576 aggregates.append(aggregate_dict) | |
| 577 | |
| 578 for path in self.tagfiles: | |
| 579 if not ( | |
| 580 path.startswith(METADATA) | |
| 581 or path.startswith(WORKFLOW) | |
| 582 or path.startswith(SNAPSHOT) | |
| 583 ): | |
| 584 # probably a bagit file | |
| 585 continue | |
| 586 if path == str(PurePosixPath(METADATA) / "manifest.json"): | |
| 587 # Should not really be there yet! But anyway, we won't | |
| 588 # aggregate it. | |
| 589 continue | |
| 590 | |
| 591 # These are local paths like metadata/provenance - but | |
| 592 # we need to relativize them for our current directory for | |
| 593 # as we are saved in metadata/manifest.json | |
| 594 mediatype, conformsTo = guess_mediatype(path) | |
| 595 rel_aggregates = { | |
| 596 "uri": str(Path(os.pardir) / path), | |
| 597 "mediatype": mediatype, | |
| 598 "conformsTo": conformsTo, | |
| 599 } # type: Aggregate | |
| 600 | |
| 601 if path in self._file_provenance: | |
| 602 # Propagate file provenance (e.g. timestamp) | |
| 603 rel_aggregates.update(self._file_provenance[path]) | |
| 604 elif not path.startswith(SNAPSHOT): | |
| 605 # make new timestamp? | |
| 606 ( | |
| 607 rel_aggregates["createdOn"], | |
| 608 rel_aggregates["createdBy"], | |
| 609 ) = self._self_made() | |
| 610 aggregates.append(rel_aggregates) | |
| 611 aggregates.extend(self._external_aggregates) | |
| 612 return aggregates | |
| 613 | |
| 614 def add_uri( | |
| 615 self, uri: str, timestamp: Optional[datetime.datetime] = None | |
| 616 ) -> Aggregate: | |
| 617 self.self_check() | |
| 618 aggr = {"uri": uri} # type: Aggregate | |
| 619 aggr["createdOn"], aggr["createdBy"] = self._self_made(timestamp=timestamp) | |
| 620 self._external_aggregates.append(aggr) | |
| 621 return aggr | |
| 622 | |
| 623 def add_annotation( | |
| 624 self, about: str, content: List[str], motivated_by: str = "oa:describing" | |
| 625 ) -> str: | |
| 626 """Cheap URI relativize for current directory and /.""" | |
| 627 self.self_check() | |
| 628 curr = self.base_uri + METADATA + "/" | |
| 629 content = [c.replace(curr, "").replace(self.base_uri, "../") for c in content] | |
| 630 uri = uuid.uuid4().urn | |
| 631 ann = { | |
| 632 "uri": uri, | |
| 633 "about": about, | |
| 634 "content": content, | |
| 635 "oa:motivatedBy": {"@id": motivated_by}, | |
| 636 } # type: Annotation | |
| 637 self.annotations.append(ann) | |
| 638 return uri | |
| 639 | |
| 640 def _ro_annotations(self) -> List[Annotation]: | |
| 641 annotations = [] # type: List[Annotation] | |
| 642 annotations.append( | |
| 643 { | |
| 644 "uri": uuid.uuid4().urn, | |
| 645 "about": self.ro_uuid.urn, | |
| 646 "content": "/", | |
| 647 # https://www.w3.org/TR/annotation-vocab/#named-individuals | |
| 648 "oa:motivatedBy": {"@id": "oa:describing"}, | |
| 649 } | |
| 650 ) | |
| 651 | |
| 652 # How was it run? | |
| 653 # FIXME: Only primary* | |
| 654 prov_files = [ | |
| 655 str(PurePosixPath(p).relative_to(METADATA)) | |
| 656 for p in self.tagfiles | |
| 657 if p.startswith(posix_path(PROVENANCE)) and "/primary." in p | |
| 658 ] | |
| 659 annotations.append( | |
| 660 { | |
| 661 "uri": uuid.uuid4().urn, | |
| 662 "about": self.ro_uuid.urn, | |
| 663 "content": prov_files, | |
| 664 # Modulation of https://www.w3.org/TR/prov-aq/ | |
| 665 "oa:motivatedBy": {"@id": "http://www.w3.org/ns/prov#has_provenance"}, | |
| 666 } | |
| 667 ) | |
| 668 | |
| 669 # Where is the main workflow? | |
| 670 annotations.append( | |
| 671 { | |
| 672 "uri": uuid.uuid4().urn, | |
| 673 "about": str(PurePosixPath("..") / WORKFLOW / "packed.cwl"), | |
| 674 "content": None, | |
| 675 "oa:motivatedBy": {"@id": "oa:highlighting"}, | |
| 676 } | |
| 677 ) | |
| 678 | |
| 679 annotations.append( | |
| 680 { | |
| 681 "uri": uuid.uuid4().urn, | |
| 682 "about": self.ro_uuid.urn, | |
| 683 "content": [ | |
| 684 str(PurePosixPath("..") / WORKFLOW / "packed.cwl"), | |
| 685 str(PurePosixPath("..") / WORKFLOW / "primary-job.json"), | |
| 686 ], | |
| 687 "oa:motivatedBy": {"@id": "oa:linking"}, | |
| 688 } | |
| 689 ) | |
| 690 # Add user-added annotations at end | |
| 691 annotations.extend(self.annotations) | |
| 692 return annotations | |
| 693 | |
| 694 def _authored_by(self) -> Optional[AuthoredBy]: | |
| 695 authored_by = {} # type: AuthoredBy | |
| 696 if self.orcid: | |
| 697 authored_by["orcid"] = self.orcid | |
| 698 if self.full_name: | |
| 699 authored_by["name"] = self.full_name | |
| 700 if not self.orcid: | |
| 701 authored_by["uri"] = USER_UUID | |
| 702 | |
| 703 if authored_by: | |
| 704 return authored_by | |
| 705 return None | |
| 706 | |
| 707 def _write_ro_manifest(self) -> None: | |
| 708 | |
| 709 # Does not have to be this order, but it's nice to be consistent | |
| 710 filename = "manifest.json" | |
| 711 createdOn, createdBy = self._self_made() | |
| 712 manifest = OrderedDict( | |
| 713 { | |
| 714 "@context": [ | |
| 715 {"@base": "{}{}/".format(self.base_uri, posix_path(METADATA))}, | |
| 716 "https://w3id.org/bundle/context", | |
| 717 ], | |
| 718 "id": "/", | |
| 719 "conformsTo": CWLPROV_VERSION, | |
| 720 "manifest": filename, | |
| 721 "createdOn": createdOn, | |
| 722 "createdBy": createdBy, | |
| 723 "authoredBy": self._authored_by(), | |
| 724 "aggregates": self._ro_aggregates(), | |
| 725 "annotations": self._ro_annotations(), | |
| 726 } | |
| 727 ) | |
| 728 | |
| 729 json_manifest = json_dumps(manifest, indent=4, ensure_ascii=False) | |
| 730 rel_path = str(PurePosixPath(METADATA) / filename) | |
| 731 json_manifest += "\n" | |
| 732 with self.write_bag_file(rel_path) as manifest_file: | |
| 733 manifest_file.write(json_manifest) | |
| 734 | |
| 735 def _write_bag_info(self) -> None: | |
| 736 | |
| 737 with self.write_bag_file("bag-info.txt") as info_file: | |
| 738 info_file.write("Bag-Software-Agent: %s\n" % self.cwltool_version) | |
| 739 # FIXME: require sha-512 of payload to comply with profile? | |
| 740 # FIXME: Update profile | |
| 741 info_file.write( | |
| 742 "BagIt-Profile-Identifier: https://w3id.org/ro/bagit/profile\n" | |
| 743 ) | |
| 744 info_file.write("Bagging-Date: %s\n" % datetime.date.today().isoformat()) | |
| 745 info_file.write( | |
| 746 "External-Description: Research Object of CWL workflow run\n" | |
| 747 ) | |
| 748 if self.full_name: | |
| 749 info_file.write("Contact-Name: %s\n" % self.full_name) | |
| 750 | |
| 751 # NOTE: We can't use the urn:uuid:{UUID} of the workflow run (a prov:Activity) | |
| 752 # as identifier for the RO/bagit (a prov:Entity). However the arcp base URI is good. | |
| 753 info_file.write("External-Identifier: %s\n" % self.base_uri) | |
| 754 | |
| 755 # Calculate size of data/ (assuming no external fetch.txt files) | |
| 756 total_size = sum(self.bagged_size.values()) | |
| 757 num_files = len(self.bagged_size) | |
| 758 info_file.write("Payload-Oxum: %d.%d\n" % (total_size, num_files)) | |
| 759 _logger.debug("[provenance] Generated bagit metadata: %s", self.folder) | |
| 760 | |
| 761 def generate_snapshot(self, prov_dep: CWLObjectType) -> None: | |
| 762 """Copy all of the CWL files to the snapshot/ directory.""" | |
| 763 self.self_check() | |
| 764 for key, value in prov_dep.items(): | |
| 765 if key == "location" and cast(str, value).split("/")[-1]: | |
| 766 location = cast(str, value) | |
| 767 filename = location.split("/")[-1] | |
| 768 path = os.path.join(self.folder, SNAPSHOT, filename) | |
| 769 filepath = "" | |
| 770 if "file://" in location: | |
| 771 filepath = location[7:] | |
| 772 else: | |
| 773 filepath = location | |
| 774 | |
| 775 # FIXME: What if destination path already exists? | |
| 776 if os.path.exists(filepath): | |
| 777 try: | |
| 778 if os.path.isdir(filepath): | |
| 779 shutil.copytree(filepath, path) | |
| 780 else: | |
| 781 shutil.copy(filepath, path) | |
| 782 timestamp = datetime.datetime.fromtimestamp( | |
| 783 os.path.getmtime(filepath) | |
| 784 ) | |
| 785 self.add_tagfile(path, timestamp) | |
| 786 except PermissionError: | |
| 787 pass # FIXME: avoids duplicate snapshotting; need better solution | |
| 788 elif key in ("secondaryFiles", "listing"): | |
| 789 for files in cast(MutableSequence[CWLObjectType], value): | |
| 790 if isinstance(files, MutableMapping): | |
| 791 self.generate_snapshot(files) | |
| 792 else: | |
| 793 pass | |
| 794 | |
| 795 def packed_workflow(self, packed: str) -> None: | |
| 796 """Pack CWL description to generate re-runnable CWL object in RO.""" | |
| 797 self.self_check() | |
| 798 rel_path = str(PurePosixPath(WORKFLOW) / "packed.cwl") | |
| 799 # Write as binary | |
| 800 with self.write_bag_file(rel_path, encoding=None) as write_pack: | |
| 801 write_pack.write(packed) | |
| 802 _logger.debug("[provenance] Added packed workflow: %s", rel_path) | |
| 803 | |
| 804 def has_data_file(self, sha1hash: str) -> bool: | |
| 805 """Confirm the presence of the given file in the RO.""" | |
| 806 folder = os.path.join(self.folder, DATA, sha1hash[0:2]) | |
| 807 hash_path = os.path.join(folder, sha1hash) | |
| 808 return os.path.isfile(hash_path) | |
| 809 | |
| 810 def add_data_file( | |
| 811 self, | |
| 812 from_fp: IO[Any], | |
| 813 timestamp: Optional[datetime.datetime] = None, | |
| 814 content_type: Optional[str] = None, | |
| 815 ) -> str: | |
| 816 """Copy inputs to data/ folder.""" | |
| 817 self.self_check() | |
| 818 tmp_dir, tmp_prefix = os.path.split(self.temp_prefix) | |
| 819 with tempfile.NamedTemporaryFile( | |
| 820 prefix=tmp_prefix, dir=tmp_dir, delete=False | |
| 821 ) as tmp: | |
| 822 checksum = checksum_copy(from_fp, tmp) | |
| 823 | |
| 824 # Calculate hash-based file path | |
| 825 folder = os.path.join(self.folder, DATA, checksum[0:2]) | |
| 826 path = os.path.join(folder, checksum) | |
| 827 # os.rename assumed safe, as our temp file should | |
| 828 # be in same file system as our temp folder | |
| 829 if not os.path.isdir(folder): | |
| 830 os.makedirs(folder) | |
| 831 os.rename(tmp.name, path) | |
| 832 | |
| 833 # Relative posix path | |
| 834 # (to avoid \ on Windows) | |
| 835 rel_path = posix_path(os.path.relpath(path, self.folder)) | |
| 836 | |
| 837 # Register in bagit checksum | |
| 838 if Hasher == hashlib.sha1: | |
| 839 self._add_to_bagit(rel_path, sha1=checksum) | |
| 840 else: | |
| 841 _logger.warning( | |
| 842 "[provenance] Unknown hash method %s for bagit manifest", Hasher | |
| 843 ) | |
| 844 # Inefficient, bagit support need to checksum again | |
| 845 self._add_to_bagit(rel_path) | |
| 846 _logger.debug("[provenance] Added data file %s", path) | |
| 847 if timestamp is not None: | |
| 848 createdOn, createdBy = self._self_made(timestamp) | |
| 849 self._file_provenance[rel_path] = cast( | |
| 850 Aggregate, {"createdOn": createdOn, "createdBy": createdBy} | |
| 851 ) | |
| 852 _logger.debug("[provenance] Relative path for data file %s", rel_path) | |
| 853 | |
| 854 if content_type is not None: | |
| 855 self._content_types[rel_path] = content_type | |
| 856 return rel_path | |
| 857 | |
| 858 def _self_made( | |
| 859 self, timestamp: Optional[datetime.datetime] = None | |
| 860 ) -> Tuple[str, Dict[str, str]]: # createdOn, createdBy | |
| 861 if timestamp is None: | |
| 862 timestamp = datetime.datetime.now() | |
| 863 return ( | |
| 864 timestamp.isoformat(), | |
| 865 {"uri": self.engine_uuid, "name": self.cwltool_version}, | |
| 866 ) | |
| 867 | |
| 868 def add_to_manifest(self, rel_path: str, checksums: Dict[str, str]) -> None: | |
| 869 """Add files to the research object manifest.""" | |
| 870 self.self_check() | |
| 871 if PurePosixPath(rel_path).is_absolute(): | |
| 872 raise ValueError("rel_path must be relative: %s" % rel_path) | |
| 873 | |
| 874 if os.path.commonprefix(["data/", rel_path]) == "data/": | |
| 875 # payload file, go to manifest | |
| 876 manifest = "manifest" | |
| 877 else: | |
| 878 # metadata file, go to tag manifest | |
| 879 manifest = "tagmanifest" | |
| 880 | |
| 881 # Add checksums to corresponding manifest files | |
| 882 for (method, hash_value) in checksums.items(): | |
| 883 # File not in manifest because we bailed out on | |
| 884 # existence in bagged_size above | |
| 885 manifestpath = os.path.join(self.folder, f"{manifest}-{method.lower()}.txt") | |
| 886 # encoding: match Tag-File-Character-Encoding: UTF-8 | |
| 887 # newline: ensure LF also on Windows | |
| 888 with open( | |
| 889 manifestpath, "a", encoding=ENCODING, newline="\n" | |
| 890 ) as checksum_file: | |
| 891 line = f"{hash_value} {rel_path}\n" | |
| 892 _logger.debug("[provenance] Added to %s: %s", manifestpath, line) | |
| 893 checksum_file.write(line) | |
| 894 | |
| 895 def _add_to_bagit(self, rel_path: str, **checksums: str) -> None: | |
| 896 if PurePosixPath(rel_path).is_absolute(): | |
| 897 raise ValueError("rel_path must be relative: %s" % rel_path) | |
| 898 lpath = os.path.join(self.folder, local_path(rel_path)) | |
| 899 if not os.path.exists(lpath): | |
| 900 raise OSError(f"File {rel_path} does not exist within RO: {lpath}") | |
| 901 | |
| 902 if rel_path in self.bagged_size: | |
| 903 # Already added, assume checksum OK | |
| 904 return | |
| 905 self.bagged_size[rel_path] = os.path.getsize(lpath) | |
| 906 | |
| 907 if SHA1 not in checksums: | |
| 908 # ensure we always have sha1 | |
| 909 checksums = dict(checksums) | |
| 910 with open(lpath, "rb") as file_path: | |
| 911 # FIXME: Need sha-256 / sha-512 as well for Research Object BagIt profile? | |
| 912 checksums[SHA1] = checksum_copy(file_path, hasher=hashlib.sha1) | |
| 913 | |
| 914 self.add_to_manifest(rel_path, checksums) | |
| 915 | |
| 916 def create_job( | |
| 917 self, builder_job: CWLObjectType, is_output: bool = False | |
| 918 ) -> CWLObjectType: | |
| 919 # TODO customise the file | |
| 920 """Generate the new job object with RO specific relative paths.""" | |
| 921 copied = copy.deepcopy(builder_job) | |
| 922 relativised_input_objecttemp = {} # type: CWLObjectType | |
| 923 self._relativise_files(copied) | |
| 924 | |
| 925 def jdefault(o: Any) -> Dict[Any, Any]: | |
| 926 return dict(o) | |
| 927 | |
| 928 if is_output: | |
| 929 rel_path = PurePosixPath(WORKFLOW) / "primary-output.json" | |
| 930 else: | |
| 931 rel_path = PurePosixPath(WORKFLOW) / "primary-job.json" | |
| 932 j = json_dumps(copied, indent=4, ensure_ascii=False, default=jdefault) | |
| 933 with self.write_bag_file(str(rel_path)) as file_path: | |
| 934 file_path.write(j + "\n") | |
| 935 _logger.debug("[provenance] Generated customised job file: %s", rel_path) | |
| 936 # Generate dictionary with keys as workflow level input IDs and values | |
| 937 # as | |
| 938 # 1) for files the relativised location containing hash | |
| 939 # 2) for other attributes, the actual value. | |
| 940 for key, value in copied.items(): | |
| 941 if isinstance(value, MutableMapping): | |
| 942 if value.get("class") in ("File", "Directory"): | |
| 943 relativised_input_objecttemp[key] = value | |
| 944 else: | |
| 945 relativised_input_objecttemp[key] = value | |
| 946 self.relativised_input_object.update( | |
| 947 {k: v for k, v in relativised_input_objecttemp.items() if v} | |
| 948 ) | |
| 949 return self.relativised_input_object | |
| 950 | |
| 951 def _relativise_files( | |
| 952 self, | |
| 953 structure: Union[CWLObjectType, CWLOutputType, MutableSequence[CWLObjectType]], | |
| 954 ) -> None: | |
| 955 """Save any file objects into the RO and update the local paths.""" | |
| 956 # Base case - we found a File we need to update | |
| 957 _logger.debug("[provenance] Relativising: %s", structure) | |
| 958 | |
| 959 if isinstance(structure, MutableMapping): | |
| 960 if structure.get("class") == "File": | |
| 961 relative_path = None # type: Optional[Union[str, PurePosixPath]] | |
| 962 if "checksum" in structure: | |
| 963 raw_checksum = cast(str, structure["checksum"]) | |
| 964 alg, checksum = raw_checksum.split("$") | |
| 965 if alg != SHA1: | |
| 966 raise TypeError( | |
| 967 "Only SHA1 CWL checksums are currently supported: " | |
| 968 "{}".format(structure) | |
| 969 ) | |
| 970 if self.has_data_file(checksum): | |
| 971 prefix = checksum[0:2] | |
| 972 relative_path = PurePosixPath("data") / prefix / checksum | |
| 973 | |
| 974 if not (relative_path is not None and "location" in structure): | |
| 975 # Register in RO; but why was this not picked | |
| 976 # up by used_artefacts? | |
| 977 _logger.info("[provenance] Adding to RO %s", structure["location"]) | |
| 978 with self.fsaccess.open( | |
| 979 cast(str, structure["location"]), "rb" | |
| 980 ) as fp: | |
| 981 relative_path = self.add_data_file(fp) | |
| 982 checksum = PurePosixPath(relative_path).name | |
| 983 structure["checksum"] = f"{SHA1}${checksum}" | |
| 984 if relative_path is not None: | |
| 985 # RO-relative path as new location | |
| 986 structure["location"] = str(PurePosixPath("..") / relative_path) | |
| 987 else: | |
| 988 _logger.warning( | |
| 989 "Could not determine RO path for file %s", structure | |
| 990 ) | |
| 991 if "path" in structure: | |
| 992 del structure["path"] | |
| 993 | |
| 994 if structure.get("class") == "Directory": | |
| 995 # TODO: Generate anonymoys Directory with a "listing" | |
| 996 # pointing to the hashed files | |
| 997 del structure["location"] | |
| 998 | |
| 999 for val in structure.values(): | |
| 1000 try: | |
| 1001 self._relativise_files(cast(CWLOutputType, val)) | |
| 1002 except OSError: | |
| 1003 pass | |
| 1004 return | |
| 1005 | |
| 1006 if isinstance(structure, MutableSequence): | |
| 1007 for obj in structure: | |
| 1008 # Recurse and rewrite any nested File objects | |
| 1009 self._relativise_files(cast(CWLOutputType, obj)) | |
| 1010 | |
| 1011 def close(self, save_to: Optional[str] = None) -> None: | |
| 1012 """Close the Research Object, optionally saving to specified folder. | |
| 1013 | |
| 1014 Closing will remove any temporary files used by this research object. | |
| 1015 After calling this method, this ResearchObject instance can no longer | |
| 1016 be used, except for no-op calls to .close(). | |
| 1017 | |
| 1018 The 'saveTo' folder should not exist - if it does, it will be deleted. | |
| 1019 | |
| 1020 It is safe to call this function multiple times without the | |
| 1021 'saveTo' argument, e.g. within a try..finally block to | |
| 1022 ensure the temporary files of this Research Object are removed. | |
| 1023 """ | |
| 1024 if save_to is None: | |
| 1025 if not self.closed: | |
| 1026 _logger.debug("[provenance] Deleting temporary %s", self.folder) | |
| 1027 shutil.rmtree(self.folder, ignore_errors=True) | |
| 1028 else: | |
| 1029 save_to = os.path.abspath(save_to) | |
| 1030 _logger.info("[provenance] Finalizing Research Object") | |
| 1031 self._finalize() # write manifest etc. | |
| 1032 # TODO: Write as archive (.zip or .tar) based on extension? | |
| 1033 | |
| 1034 if os.path.isdir(save_to): | |
| 1035 _logger.info("[provenance] Deleting existing %s", save_to) | |
| 1036 shutil.rmtree(save_to) | |
| 1037 shutil.move(self.folder, save_to) | |
| 1038 _logger.info("[provenance] Research Object saved to %s", save_to) | |
| 1039 self.folder = save_to | |
| 1040 self.closed = True | |
| 1041 | |
| 1042 | |
| 1043 def checksum_copy( | |
| 1044 src_file: IO[Any], | |
| 1045 dst_file: Optional[IO[Any]] = None, | |
| 1046 hasher=Hasher, # type: Callable[[], hashlib._Hash] | |
| 1047 buffersize: int = 1024 * 1024, | |
| 1048 ) -> str: | |
| 1049 """Compute checksums while copying a file.""" | |
| 1050 # TODO: Use hashlib.new(Hasher_str) instead? | |
| 1051 checksum = hasher() | |
| 1052 contents = src_file.read(buffersize) | |
| 1053 if dst_file and hasattr(dst_file, "name") and hasattr(src_file, "name"): | |
| 1054 temp_location = os.path.join(os.path.dirname(dst_file.name), str(uuid.uuid4())) | |
| 1055 try: | |
| 1056 os.rename(dst_file.name, temp_location) | |
| 1057 os.link(src_file.name, dst_file.name) | |
| 1058 dst_file = None | |
| 1059 os.unlink(temp_location) | |
| 1060 except OSError: | |
| 1061 pass | |
| 1062 if os.path.exists(temp_location): | |
| 1063 os.rename(temp_location, dst_file.name) # type: ignore | |
| 1064 while contents != b"": | |
| 1065 if dst_file is not None: | |
| 1066 dst_file.write(contents) | |
| 1067 checksum.update(contents) | |
| 1068 contents = src_file.read(buffersize) | |
| 1069 if dst_file is not None: | |
| 1070 dst_file.flush() | |
| 1071 return checksum.hexdigest().lower() |
