Mercurial > repos > shellac > guppy_basecaller
diff env/bin/bagit.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
line wrap: on
line diff
--- a/env/bin/bagit.py Thu May 14 16:47:39 2020 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1612 +0,0 @@ -#!/Users/pldms/Development/Projects/2020/david-matthews-galaxy/guppy_basecaller/env/bin/python3 -# encoding: utf-8 - -from __future__ import absolute_import, division, print_function, unicode_literals - -import argparse -import codecs -import gettext -import hashlib -import logging -import multiprocessing -import os -import re -import signal -import sys -import tempfile -import unicodedata -import warnings -from collections import defaultdict -from datetime import date -from functools import partial -from os.path import abspath, isdir, isfile, join - -from pkg_resources import DistributionNotFound, get_distribution - -try: - from urllib.parse import urlparse -except ImportError: - from urlparse import urlparse - - -def find_locale_dir(): - for prefix in (os.path.dirname(__file__), sys.prefix): - locale_dir = os.path.join(prefix, "locale") - if os.path.isdir(locale_dir): - return locale_dir - - -TRANSLATION_CATALOG = gettext.translation( - "bagit-python", localedir=find_locale_dir(), fallback=True -) -if sys.version_info < (3,): - _ = TRANSLATION_CATALOG.ugettext -else: - _ = TRANSLATION_CATALOG.gettext - -MODULE_NAME = "bagit" if __name__ == "__main__" else __name__ - -LOGGER = logging.getLogger(MODULE_NAME) - -try: - VERSION = get_distribution(MODULE_NAME).version -except DistributionNotFound: - VERSION = "0.0.dev0" - -PROJECT_URL = "https://github.com/LibraryOfCongress/bagit-python" - -__doc__ = ( - _( - """ -BagIt is a directory, filename convention for bundling an arbitrary set of -files with a manifest, checksums, and additional metadata. More about BagIt -can be found at: - - http://purl.org/net/bagit - -bagit.py is a pure python drop in library and command line tool for creating, -and working with BagIt directories. - - -Command-Line Usage: - -Basic usage is to give bagit.py a directory to bag up: - - $ bagit.py my_directory - -This does a bag-in-place operation where the current contents will be moved -into the appropriate BagIt structure and the metadata files will be created. - -You can bag multiple directories if you wish: - - $ bagit.py directory1 directory2 - -Optionally you can provide metadata which will be stored in bag-info.txt: - - $ bagit.py --source-organization "Library of Congress" directory - -You can also select which manifest algorithms will be used: - - $ bagit.py --sha1 --md5 --sha256 --sha512 directory - - -Using BagIt from your Python code: - - import bagit - bag = bagit.make_bag('example-directory', {'Contact-Name': 'Ed Summers'}) - print(bag.entries) - -For more information or to contribute to bagit-python's development, please -visit %(PROJECT_URL)s -""" - ) - % globals() -) - -# standard bag-info.txt metadata -STANDARD_BAG_INFO_HEADERS = [ - "Source-Organization", - "Organization-Address", - "Contact-Name", - "Contact-Phone", - "Contact-Email", - "External-Description", - "External-Identifier", - "Bag-Size", - "Bag-Group-Identifier", - "Bag-Count", - "Internal-Sender-Identifier", - "Internal-Sender-Description", - "BagIt-Profile-Identifier", - # Bagging-Date is autogenerated - # Payload-Oxum is autogenerated -] - -CHECKSUM_ALGOS = hashlib.algorithms_guaranteed -DEFAULT_CHECKSUMS = ["sha256", "sha512"] - -#: Block size used when reading files for hashing: -HASH_BLOCK_SIZE = 512 * 1024 - -#: Convenience function used everywhere we want to open a file to read text -#: rather than undecoded bytes: -open_text_file = partial(codecs.open, encoding="utf-8", errors="strict") - -# This is the same as decoding the byte values in codecs.BOM: -UNICODE_BYTE_ORDER_MARK = "\uFEFF" - - -def make_bag( - bag_dir, bag_info=None, processes=1, checksums=None, checksum=None, encoding="utf-8" -): - """ - Convert a given directory into a bag. You can pass in arbitrary - key/value pairs to put into the bag-info.txt metadata file as - the bag_info dictionary. - """ - - if checksum is not None: - warnings.warn( - _( - "The `checksum` argument for `make_bag` should be replaced with `checksums`" - ), - DeprecationWarning, - ) - checksums = checksum - - if checksums is None: - checksums = DEFAULT_CHECKSUMS - - bag_dir = os.path.abspath(bag_dir) - cwd = os.path.abspath(os.path.curdir) - - if cwd.startswith(bag_dir) and cwd != bag_dir: - raise RuntimeError( - _("Bagging a parent of the current directory is not supported") - ) - - LOGGER.info(_("Creating bag for directory %s"), bag_dir) - - if not os.path.isdir(bag_dir): - LOGGER.error(_("Bag directory %s does not exist"), bag_dir) - raise RuntimeError(_("Bag directory %s does not exist") % bag_dir) - - # FIXME: we should do the permissions checks before changing directories - old_dir = os.path.abspath(os.path.curdir) - - try: - # TODO: These two checks are currently redundant since an unreadable directory will also - # often be unwritable, and this code will require review when we add the option to - # bag to a destination other than the source. It would be nice if we could avoid - # walking the directory tree more than once even if most filesystems will cache it - - unbaggable = _can_bag(bag_dir) - - if unbaggable: - LOGGER.error( - _("Unable to write to the following directories and files:\n%s"), - unbaggable, - ) - raise BagError(_("Missing permissions to move all files and directories")) - - unreadable_dirs, unreadable_files = _can_read(bag_dir) - - if unreadable_dirs or unreadable_files: - if unreadable_dirs: - LOGGER.error( - _("The following directories do not have read permissions:\n%s"), - unreadable_dirs, - ) - if unreadable_files: - LOGGER.error( - _("The following files do not have read permissions:\n%s"), - unreadable_files, - ) - raise BagError( - _("Read permissions are required to calculate file fixities") - ) - else: - LOGGER.info(_("Creating data directory")) - - # FIXME: if we calculate full paths we won't need to deal with changing directories - os.chdir(bag_dir) - cwd = os.getcwd() - temp_data = tempfile.mkdtemp(dir=cwd) - - for f in os.listdir("."): - if os.path.abspath(f) == temp_data: - continue - new_f = os.path.join(temp_data, f) - LOGGER.info( - _("Moving %(source)s to %(destination)s"), - {"source": f, "destination": new_f}, - ) - os.rename(f, new_f) - - LOGGER.info( - _("Moving %(source)s to %(destination)s"), - {"source": temp_data, "destination": "data"}, - ) - os.rename(temp_data, "data") - - # permissions for the payload directory should match those of the - # original directory - os.chmod("data", os.stat(cwd).st_mode) - - total_bytes, total_files = make_manifests( - "data", processes, algorithms=checksums, encoding=encoding - ) - - LOGGER.info(_("Creating bagit.txt")) - txt = """BagIt-Version: 0.97\nTag-File-Character-Encoding: UTF-8\n""" - with open_text_file("bagit.txt", "w") as bagit_file: - bagit_file.write(txt) - - LOGGER.info(_("Creating bag-info.txt")) - if bag_info is None: - bag_info = {} - - # allow 'Bagging-Date' and 'Bag-Software-Agent' to be overidden - if "Bagging-Date" not in bag_info: - bag_info["Bagging-Date"] = date.strftime(date.today(), "%Y-%m-%d") - if "Bag-Software-Agent" not in bag_info: - bag_info["Bag-Software-Agent"] = "bagit.py v%s <%s>" % ( - VERSION, - PROJECT_URL, - ) - - bag_info["Payload-Oxum"] = "%s.%s" % (total_bytes, total_files) - _make_tag_file("bag-info.txt", bag_info) - - for c in checksums: - _make_tagmanifest_file(c, bag_dir, encoding="utf-8") - except Exception: - LOGGER.exception(_("An error occurred creating a bag in %s"), bag_dir) - raise - finally: - os.chdir(old_dir) - - return Bag(bag_dir) - - -class Bag(object): - """A representation of a bag.""" - - valid_files = ["bagit.txt", "fetch.txt"] - valid_directories = ["data"] - - def __init__(self, path=None): - super(Bag, self).__init__() - self.tags = {} - self.info = {} - #: Dictionary of manifest entries and the checksum values for each - #: algorithm: - self.entries = {} - - # To reliably handle Unicode normalization differences, we maintain - # lookup dictionaries in both directions for the filenames read from - # the filesystem and the manifests so we can handle cases where the - # normalization form changed between the bag being created and read. - # See https://github.com/LibraryOfCongress/bagit-python/issues/51. - - #: maps Unicode-normalized values to the raw value from the filesystem - self.normalized_filesystem_names = {} - - #: maps Unicode-normalized values to the raw value in the manifest - self.normalized_manifest_names = {} - - self.algorithms = [] - self.tag_file_name = None - self.path = abspath(path) - if path: - # if path ends in a path separator, strip it off - if path[-1] == os.sep: - self.path = path[:-1] - self._open() - - def __str__(self): - # FIXME: develop a more informative string representation for a Bag - return self.path - - @property - def algs(self): - warnings.warn(_("Use Bag.algorithms instead of Bag.algs"), DeprecationWarning) - return self.algorithms - - @property - def version(self): - warnings.warn( - _("Use the Bag.version_info tuple instead of Bag.version"), - DeprecationWarning, - ) - return self._version - - def _open(self): - # Open the bagit.txt file, and load any tags from it, including - # the required version and encoding. - bagit_file_path = os.path.join(self.path, "bagit.txt") - - if not isfile(bagit_file_path): - raise BagError(_("Expected bagit.txt does not exist: %s") % bagit_file_path) - - self.tags = tags = _load_tag_file(bagit_file_path) - - required_tags = ("BagIt-Version", "Tag-File-Character-Encoding") - missing_tags = [i for i in required_tags if i not in tags] - if missing_tags: - raise BagError( - _("Missing required tag in bagit.txt: %s") % ", ".join(missing_tags) - ) - - # To avoid breaking existing code we'll leave self.version as the string - # and parse it into a numeric version_info tuple. In version 2.0 we can - # break that. - - self._version = tags["BagIt-Version"] - - try: - self.version_info = tuple(int(i) for i in self._version.split(".", 1)) - except ValueError: - raise BagError( - _("Bag version numbers must be MAJOR.MINOR numbers, not %s") - % self._version - ) - - if (0, 93) <= self.version_info <= (0, 95): - self.tag_file_name = "package-info.txt" - elif (0, 96) <= self.version_info < (2,): - self.tag_file_name = "bag-info.txt" - else: - raise BagError(_("Unsupported bag version: %s") % self._version) - - self.encoding = tags["Tag-File-Character-Encoding"] - - try: - codecs.lookup(self.encoding) - except LookupError: - raise BagValidationError(_("Unsupported encoding: %s") % self.encoding) - - info_file_path = os.path.join(self.path, self.tag_file_name) - if os.path.exists(info_file_path): - self.info = _load_tag_file(info_file_path, encoding=self.encoding) - - self._load_manifests() - - def manifest_files(self): - for filename in ["manifest-%s.txt" % a for a in CHECKSUM_ALGOS]: - f = os.path.join(self.path, filename) - if isfile(f): - yield f - - def tagmanifest_files(self): - for filename in ["tagmanifest-%s.txt" % a for a in CHECKSUM_ALGOS]: - f = os.path.join(self.path, filename) - if isfile(f): - yield f - - def compare_manifests_with_fs(self): - """ - Compare the filenames in the manifests to the filenames present on the - local filesystem and returns two lists of the files which are only - present in the manifests and the files which are only present on the - local filesystem, respectively. - """ - - # We compare the filenames after Unicode normalization so we can - # reliably detect normalization changes after bag creation: - files_on_fs = set(normalize_unicode(i) for i in self.payload_files()) - files_in_manifest = set( - normalize_unicode(i) for i in self.payload_entries().keys() - ) - - if self.version_info >= (0, 97): - files_in_manifest.update(self.missing_optional_tagfiles()) - - only_on_fs = list() - only_in_manifest = list() - - for i in files_on_fs.difference(files_in_manifest): - only_on_fs.append(self.normalized_filesystem_names[i]) - - for i in files_in_manifest.difference(files_on_fs): - only_in_manifest.append(self.normalized_manifest_names[i]) - - return only_in_manifest, only_on_fs - - def compare_fetch_with_fs(self): - """Compares the fetch entries with the files actually - in the payload, and returns a list of all the files - that still need to be fetched. - """ - - files_on_fs = set(self.payload_files()) - files_in_fetch = set(self.files_to_be_fetched()) - - return list(files_in_fetch - files_on_fs) - - def payload_files(self): - """Returns a list of filenames which are present on the local filesystem""" - payload_dir = os.path.join(self.path, "data") - - for dirpath, _, filenames in os.walk(payload_dir): - for f in filenames: - # Jump through some hoops here to make the payload files are - # returned with the directory structure relative to the base - # directory rather than the - normalized_f = os.path.normpath(f) - rel_path = os.path.relpath( - os.path.join(dirpath, normalized_f), start=self.path - ) - - self.normalized_filesystem_names[normalize_unicode(rel_path)] = rel_path - yield rel_path - - def payload_entries(self): - """Return a dictionary of items """ - # Don't use dict comprehension (compatibility with Python < 2.7) - return dict( - (key, value) - for (key, value) in self.entries.items() - if key.startswith("data" + os.sep) - ) - - def save(self, processes=1, manifests=False): - """ - save will persist any changes that have been made to the bag - metadata (self.info). - - If you have modified the payload of the bag (added, modified, - removed files in the data directory) and want to regenerate manifests - set the manifests parameter to True. The default is False since you - wouldn't want a save to accidentally create a new manifest for - a corrupted bag. - - If you want to control the number of processes that are used when - recalculating checksums use the processes parameter. - """ - # Error checking - if not self.path: - raise BagError(_("Bag.save() called before setting the path!")) - - if not os.access(self.path, os.R_OK | os.W_OK | os.X_OK): - raise BagError( - _("Cannot save bag to non-existent or inaccessible directory %s") - % self.path - ) - - unbaggable = _can_bag(self.path) - if unbaggable: - LOGGER.error( - _( - "Missing write permissions for the following directories and files:\n%s" - ), - unbaggable, - ) - raise BagError(_("Missing permissions to move all files and directories")) - - unreadable_dirs, unreadable_files = _can_read(self.path) - if unreadable_dirs or unreadable_files: - if unreadable_dirs: - LOGGER.error( - _("The following directories do not have read permissions:\n%s"), - unreadable_dirs, - ) - if unreadable_files: - LOGGER.error( - _("The following files do not have read permissions:\n%s"), - unreadable_files, - ) - raise BagError( - _("Read permissions are required to calculate file fixities") - ) - - # Change working directory to bag directory so helper functions work - old_dir = os.path.abspath(os.path.curdir) - os.chdir(self.path) - - # Generate new manifest files - if manifests: - total_bytes, total_files = make_manifests( - "data", processes, algorithms=self.algorithms, encoding=self.encoding - ) - - # Update Payload-Oxum - LOGGER.info(_("Updating Payload-Oxum in %s"), self.tag_file_name) - self.info["Payload-Oxum"] = "%s.%s" % (total_bytes, total_files) - - _make_tag_file(self.tag_file_name, self.info) - - # Update tag-manifest for changes to manifest & bag-info files - for alg in self.algorithms: - _make_tagmanifest_file(alg, self.path, encoding=self.encoding) - - # Reload the manifests - self._load_manifests() - - os.chdir(old_dir) - - def tagfile_entries(self): - return dict( - (key, value) - for (key, value) in self.entries.items() - if not key.startswith("data" + os.sep) - ) - - def missing_optional_tagfiles(self): - """ - From v0.97 we need to validate any tagfiles listed - in the optional tagmanifest(s). As there is no mandatory - directory structure for additional tagfiles we can - only check for entries with missing files (not missing - entries for existing files). - """ - for tagfilepath in self.tagfile_entries().keys(): - if not os.path.isfile(os.path.join(self.path, tagfilepath)): - yield tagfilepath - - def fetch_entries(self): - """Load fetch.txt if present and iterate over its contents - - yields (url, size, filename) tuples - - raises BagError for errors such as an unsafe filename referencing - data outside of the bag directory - """ - - fetch_file_path = os.path.join(self.path, "fetch.txt") - - if isfile(fetch_file_path): - with open_text_file( - fetch_file_path, "r", encoding=self.encoding - ) as fetch_file: - for line in fetch_file: - url, file_size, filename = line.strip().split(None, 2) - - if self._path_is_dangerous(filename): - raise BagError( - _('Path "%(payload_file)s" in "%(source_file)s" is unsafe') - % { - "payload_file": filename, - "source_file": os.path.join(self.path, "fetch.txt"), - } - ) - - yield url, file_size, filename - - def files_to_be_fetched(self): - """ - Convenience wrapper for fetch_entries which returns only the - local filename - """ - - for url, file_size, filename in self.fetch_entries(): - yield filename - - def has_oxum(self): - return "Payload-Oxum" in self.info - - def validate(self, processes=1, fast=False, completeness_only=False): - """Checks the structure and contents are valid. - - If you supply the parameter fast=True the Payload-Oxum (if present) will - be used to check that the payload files are present and accounted for, - instead of re-calculating fixities and comparing them against the - manifest. By default validate() will re-calculate fixities (fast=False). - """ - - self._validate_structure() - self._validate_bagittxt() - - self.validate_fetch() - - self._validate_contents( - processes=processes, fast=fast, completeness_only=completeness_only - ) - - return True - - def is_valid(self, fast=False, completeness_only=False): - """Returns validation success or failure as boolean. - Optional fast parameter passed directly to validate(). - """ - - try: - self.validate(fast=fast, completeness_only=completeness_only) - except BagError: - return False - - return True - - def _load_manifests(self): - self.entries = {} - manifests = list(self.manifest_files()) - - if self.version_info >= (0, 97): - # v0.97+ requires that optional tagfiles are verified. - manifests += list(self.tagmanifest_files()) - - for manifest_filename in manifests: - if not manifest_filename.find("tagmanifest-") is -1: - search = "tagmanifest-" - else: - search = "manifest-" - alg = ( - os.path.basename(manifest_filename) - .replace(search, "") - .replace(".txt", "") - ) - if alg not in self.algorithms: - self.algorithms.append(alg) - - with open_text_file( - manifest_filename, "r", encoding=self.encoding - ) as manifest_file: - if manifest_file.encoding.startswith("UTF"): - # We'll check the first character to see if it's a BOM: - if manifest_file.read(1) == UNICODE_BYTE_ORDER_MARK: - # We'll skip it either way by letting line decoding - # happen at the new offset but we will issue a warning - # for UTF-8 since the presence of a BOM is contrary to - # the BagIt specification: - if manifest_file.encoding == "UTF-8": - LOGGER.warning( - _( - "%s is encoded using UTF-8 but contains an unnecessary" - " byte-order mark, which is not in compliance with the" - " BagIt RFC" - ), - manifest_file.name, - ) - else: - manifest_file.seek(0) # Pretend the first read never happened - - for line in manifest_file: - line = line.strip() - - # Ignore blank lines and comments. - if line == "" or line.startswith("#"): - continue - - entry = line.split(None, 1) - - # Format is FILENAME *CHECKSUM - if len(entry) != 2: - LOGGER.error( - _( - "%(bag)s: Invalid %(algorithm)s manifest entry: %(line)s" - ), - {"bag": self, "algorithm": alg, "line": line}, - ) - continue - - entry_hash = entry[0] - entry_path = os.path.normpath(entry[1].lstrip("*")) - entry_path = _decode_filename(entry_path) - - if self._path_is_dangerous(entry_path): - raise BagError( - _( - 'Path "%(payload_file)s" in manifest "%(manifest_file)s" is unsafe' - ) - % { - "payload_file": entry_path, - "manifest_file": manifest_file.name, - } - ) - - entry_hashes = self.entries.setdefault(entry_path, {}) - - if alg in entry_hashes: - warning_ctx = { - "bag": self, - "algorithm": alg, - "filename": entry_path, - } - if entry_hashes[alg] == entry_hash: - msg = _( - "%(bag)s: %(algorithm)s manifest lists %(filename)s" - " multiple times with the same value" - ) - if self.version_info >= (1,): - raise BagError(msg % warning_ctx) - else: - LOGGER.warning(msg, warning_ctx) - else: - raise BagError( - _( - "%(bag)s: %(algorithm)s manifest lists %(filename)s" - " multiple times with conflicting values" - ) - % warning_ctx - ) - - entry_hashes[alg] = entry_hash - - self.normalized_manifest_names.update( - (normalize_unicode(i), i) for i in self.entries.keys() - ) - - def _validate_structure(self): - """ - Checks the structure of the bag to determine whether it conforms to the - BagIt spec. Returns true on success, otherwise it will raise a - BagValidationError exception. - """ - - self._validate_structure_payload_directory() - self._validate_structure_tag_files() - - def _validate_structure_payload_directory(self): - data_dir_path = os.path.join(self.path, "data") - - if not isdir(data_dir_path): - raise BagValidationError( - _("Expected data directory %s does not exist") % data_dir_path - ) - - def _validate_structure_tag_files(self): - # Note: we deviate somewhat from v0.96 of the spec in that it allows - # other files and directories to be present in the base directory - - if not list(self.manifest_files()): - raise BagValidationError(_("No manifest files found")) - if "bagit.txt" not in os.listdir(self.path): - raise BagValidationError( - _('Expected %s to contain "bagit.txt"') % self.path - ) - - def validate_fetch(self): - """Validate the fetch.txt file - - Raises `BagError` for errors and otherwise returns no value - """ - - for url, file_size, filename in self.fetch_entries(): - # fetch_entries will raise a BagError for unsafe filenames - # so at this point we will check only that the URL is minimally - # well formed: - parsed_url = urlparse(url) - - if not all((parsed_url.scheme, parsed_url.netloc)): - raise BagError(_("Malformed URL in fetch.txt: %s") % url) - - def _validate_contents(self, processes=1, fast=False, completeness_only=False): - if fast and not self.has_oxum(): - raise BagValidationError( - _("Fast validation requires bag-info.txt to include Payload-Oxum") - ) - - # Perform the fast file count + size check so we can fail early: - self._validate_oxum() - - if fast: - return - - self._validate_completeness() - - if completeness_only: - return - - self._validate_entries(processes) - - def _validate_oxum(self): - oxum = self.info.get("Payload-Oxum") - - if oxum is None: - return - - # If multiple Payload-Oxum tags (bad idea) - # use the first listed in bag-info.txt - if isinstance(oxum, list): - LOGGER.warning(_("bag-info.txt defines multiple Payload-Oxum values!")) - oxum = oxum[0] - - oxum_byte_count, oxum_file_count = oxum.split(".", 1) - - if not oxum_byte_count.isdigit() or not oxum_file_count.isdigit(): - raise BagError(_("Malformed Payload-Oxum value: %s") % oxum) - - oxum_byte_count = int(oxum_byte_count) - oxum_file_count = int(oxum_file_count) - total_bytes = 0 - total_files = 0 - - for payload_file in self.payload_files(): - payload_file = os.path.join(self.path, payload_file) - total_bytes += os.stat(payload_file).st_size - total_files += 1 - - if oxum_file_count != total_files or oxum_byte_count != total_bytes: - raise BagValidationError( - _( - "Payload-Oxum validation failed." - " Expected %(oxum_file_count)d files and %(oxum_byte_count)d bytes" - " but found %(found_file_count)d files and %(found_byte_count)d bytes" - ) - % { - "found_file_count": total_files, - "found_byte_count": total_bytes, - "oxum_file_count": oxum_file_count, - "oxum_byte_count": oxum_byte_count, - } - ) - - def _validate_completeness(self): - """ - Verify that the actual file manifests match the files in the data directory - """ - errors = list() - - # First we'll make sure there's no mismatch between the filesystem - # and the list of files in the manifest(s) - only_in_manifests, only_on_fs = self.compare_manifests_with_fs() - for path in only_in_manifests: - e = FileMissing(path) - LOGGER.warning(force_unicode(e)) - errors.append(e) - for path in only_on_fs: - e = UnexpectedFile(path) - LOGGER.warning(force_unicode(e)) - errors.append(e) - - if errors: - raise BagValidationError(_("Bag validation failed"), errors) - - def _validate_entries(self, processes): - """ - Verify that the actual file contents match the recorded hashes stored in the manifest files - """ - errors = list() - - if os.name == "posix": - worker_init = posix_multiprocessing_worker_initializer - else: - worker_init = None - - args = ( - ( - self.path, - self.normalized_filesystem_names.get(rel_path, rel_path), - hashes, - self.algorithms, - ) - for rel_path, hashes in self.entries.items() - ) - - try: - if processes == 1: - hash_results = [_calc_hashes(i) for i in args] - else: - try: - pool = multiprocessing.Pool( - processes if processes else None, initializer=worker_init - ) - hash_results = pool.map(_calc_hashes, args) - finally: - pool.terminate() - - # Any unhandled exceptions are probably fatal - except: - LOGGER.exception(_("Unable to calculate file hashes for %s"), self) - raise - - for rel_path, f_hashes, hashes in hash_results: - for alg, computed_hash in f_hashes.items(): - stored_hash = hashes[alg] - if stored_hash.lower() != computed_hash: - e = ChecksumMismatch( - rel_path, alg, stored_hash.lower(), computed_hash - ) - LOGGER.warning(force_unicode(e)) - errors.append(e) - - if errors: - raise BagValidationError(_("Bag validation failed"), errors) - - def _validate_bagittxt(self): - """ - Verify that bagit.txt conforms to specification - """ - bagit_file_path = os.path.join(self.path, "bagit.txt") - - # Note that we are intentionally opening this file in binary mode so we can confirm - # that it does not start with the UTF-8 byte-order-mark - with open(bagit_file_path, "rb") as bagit_file: - first_line = bagit_file.read(4) - if first_line.startswith(codecs.BOM_UTF8): - raise BagValidationError( - _("bagit.txt must not contain a byte-order mark") - ) - - def _path_is_dangerous(self, path): - """ - Return true if path looks dangerous, i.e. potentially operates - outside the bagging directory structure, e.g. ~/.bashrc, ../../../secrets.json, - \\?\c:\, D:\sys32\cmd.exe - """ - if os.path.isabs(path): - return True - if os.path.expanduser(path) != path: - return True - if os.path.expandvars(path) != path: - return True - real_path = os.path.realpath(os.path.join(self.path, path)) - real_path = os.path.normpath(real_path) - bag_path = os.path.realpath(self.path) - bag_path = os.path.normpath(bag_path) - common = os.path.commonprefix((bag_path, real_path)) - return not (common == bag_path) - - -class BagError(Exception): - pass - - -class BagValidationError(BagError): - def __init__(self, message, details=None): - super(BagValidationError, self).__init__() - - if details is None: - details = [] - - self.message = message - self.details = details - - def __str__(self): - if len(self.details) > 0: - details = "; ".join([force_unicode(e) for e in self.details]) - return "%s: %s" % (self.message, details) - return self.message - - -class ManifestErrorDetail(BagError): - def __init__(self, path): - super(ManifestErrorDetail, self).__init__() - - self.path = path - - -class ChecksumMismatch(ManifestErrorDetail): - def __init__(self, path, algorithm=None, expected=None, found=None): - super(ChecksumMismatch, self).__init__(path) - - self.path = path - self.algorithm = algorithm - self.expected = expected - self.found = found - - def __str__(self): - return _( - '%(path)s %(algorithm)s validation failed: expected="%(expected)s" found="%(found)s"' - ) % { - "path": force_unicode(self.path), - "algorithm": self.algorithm, - "expected": self.expected, - "found": self.found, - } - - -class FileMissing(ManifestErrorDetail): - def __str__(self): - return _( - "%s exists in manifest but was not found on filesystem" - ) % force_unicode(self.path) - - -class UnexpectedFile(ManifestErrorDetail): - def __str__(self): - return _("%s exists on filesystem but is not in the manifest") % self.path - - -class FileNormalizationConflict(BagError): - """ - Exception raised when two files differ only in normalization and thus - are not safely portable - """ - - def __init__(self, file_a, file_b): - super(FileNormalizationConflict, self).__init__() - - self.file_a = file_a - self.file_b = file_b - - def __str__(self): - return _( - 'Unicode normalization conflict for file "%(file_a)s" and "%(file_b)s"' - ) % {"file_a": self.file_a, "file_b": self.file_b} - - -def posix_multiprocessing_worker_initializer(): - """Ignore SIGINT in multiprocessing workers on POSIX systems""" - signal.signal(signal.SIGINT, signal.SIG_IGN) - - -# The Unicode normalization form used here doesn't matter – all we care about -# is consistency since the input value will be preserved: - - -def normalize_unicode_py3(s): - return unicodedata.normalize("NFC", s) - - -def normalize_unicode_py2(s): - if isinstance(s, str): - s = s.decode("utf-8") - return unicodedata.normalize("NFC", s) - - -if sys.version_info > (3, 0): - normalize_unicode = normalize_unicode_py3 -else: - normalize_unicode = normalize_unicode_py2 - - -def build_unicode_normalized_lookup_dict(filenames): - """ - Return a dictionary mapping unicode-normalized filenames to as-encoded - values to efficiently detect conflicts between the filesystem and manifests. - - This is necessary because some filesystems and utilities may automatically - apply a different Unicode normalization form to filenames than was applied - when the bag was originally created. - - The best known example of this is when a bag is created using a - normalization form other than NFD and then transferred to a Mac where the - HFS+ filesystem will transparently normalize filenames to a variant of NFD - for every call: - - https://developer.apple.com/legacy/library/technotes/tn/tn1150.html#UnicodeSubtleties - - Windows is documented as storing filenames exactly as provided: - - https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247%28v=vs.85%29.aspx - - Linux performs no normalization in the kernel but it is technically - valid for a filesystem to perform normalization, such as when an HFS+ - volume is mounted. - - See http://www.unicode.org/reports/tr15/ for a full discussion of - equivalence and normalization in Unicode. - """ - - output = dict() - - for filename in filenames: - normalized_filename = normalize_unicode(filename) - if normalized_filename in output: - raise FileNormalizationConflict(filename, output[normalized_filename]) - else: - output[normalized_filename] = filename - - return output - - -def get_hashers(algorithms): - """ - Given a list of algorithm names, return a dictionary of hasher instances - - This avoids redundant code between the creation and validation code where in - both cases we want to avoid reading the same file more than once. The - intended use is a simple for loop: - - for block in file: - for hasher in hashers.values(): - hasher.update(block) - """ - - hashers = {} - - for alg in algorithms: - try: - hasher = hashlib.new(alg) - except ValueError: - LOGGER.warning( - _("Disabling requested hash algorithm %s: hashlib does not support it"), - alg, - ) - continue - - hashers[alg] = hasher - - if not hashers: - raise ValueError( - _( - "Unable to continue: hashlib does not support any of the requested algorithms!" - ) - ) - - return hashers - - -def _calc_hashes(args): - # auto unpacking of sequences illegal in Python3 - (base_path, rel_path, hashes, algorithms) = args - full_path = os.path.join(base_path, rel_path) - - # Create a clone of the default empty hash objects: - f_hashers = dict((alg, hashlib.new(alg)) for alg in hashes if alg in algorithms) - - try: - f_hashes = _calculate_file_hashes(full_path, f_hashers) - except BagValidationError as e: - f_hashes = dict((alg, force_unicode(e)) for alg in f_hashers.keys()) - - return rel_path, f_hashes, hashes - - -def _calculate_file_hashes(full_path, f_hashers): - """ - Returns a dictionary of (algorithm, hexdigest) values for the provided - filename - """ - LOGGER.info(_("Verifying checksum for file %s"), full_path) - - try: - with open(full_path, "rb") as f: - while True: - block = f.read(HASH_BLOCK_SIZE) - if not block: - break - for i in f_hashers.values(): - i.update(block) - except (OSError, IOError) as e: - raise BagValidationError( - _("Could not read %(filename)s: %(error)s") - % {"filename": full_path, "error": force_unicode(e)} - ) - - return dict((alg, h.hexdigest()) for alg, h in f_hashers.items()) - - -def _load_tag_file(tag_file_name, encoding="utf-8-sig"): - with open_text_file(tag_file_name, "r", encoding=encoding) as tag_file: - # Store duplicate tags as list of vals - # in order of parsing under the same key. - tags = {} - for name, value in _parse_tags(tag_file): - if name not in tags: - tags[name] = value - continue - - if not isinstance(tags[name], list): - tags[name] = [tags[name], value] - else: - tags[name].append(value) - - return tags - - -def _parse_tags(tag_file): - """Parses a tag file, according to RFC 2822. This - includes line folding, permitting extra-long - field values. - - See http://www.faqs.org/rfcs/rfc2822.html for - more information. - """ - - tag_name = None - tag_value = None - - # Line folding is handled by yielding values only after we encounter - # the start of a new tag, or if we pass the EOF. - for num, line in enumerate(tag_file): - # Skip over any empty or blank lines. - if len(line) == 0 or line.isspace(): - continue - elif line[0].isspace() and tag_value is not None: # folded line - tag_value += line - else: - # Starting a new tag; yield the last one. - if tag_name: - yield (tag_name, tag_value.strip()) - - if ":" not in line: - raise BagValidationError( - _("%(filename)s contains invalid tag: %(line)s") - % { - "line": line.strip(), - "filename": os.path.basename(tag_file.name), - } - ) - - parts = line.strip().split(":", 1) - tag_name = parts[0].strip() - tag_value = parts[1] - - # Passed the EOF. All done after this. - if tag_name: - yield (tag_name, tag_value.strip()) - - -def _make_tag_file(bag_info_path, bag_info): - headers = sorted(bag_info.keys()) - with open_text_file(bag_info_path, "w") as f: - for h in headers: - values = bag_info[h] - if not isinstance(values, list): - values = [values] - for txt in values: - # strip CR, LF and CRLF so they don't mess up the tag file - txt = re.sub(r"\n|\r|(\r\n)", "", force_unicode(txt)) - f.write("%s: %s\n" % (h, txt)) - - -def make_manifests(data_dir, processes, algorithms=DEFAULT_CHECKSUMS, encoding="utf-8"): - LOGGER.info( - _("Using %(process_count)d processes to generate manifests: %(algorithms)s"), - {"process_count": processes, "algorithms": ", ".join(algorithms)}, - ) - - manifest_line_generator = partial(generate_manifest_lines, algorithms=algorithms) - - if processes > 1: - pool = multiprocessing.Pool(processes=processes) - checksums = pool.map(manifest_line_generator, _walk(data_dir)) - pool.close() - pool.join() - else: - checksums = [manifest_line_generator(i) for i in _walk(data_dir)] - - # At this point we have a list of tuples which start with the algorithm name: - manifest_data = {} - for batch in checksums: - for entry in batch: - manifest_data.setdefault(entry[0], []).append(entry[1:]) - - # These will be keyed on the algorithm name so we can perform sanity checks - # below to catch failures in the hashing process: - num_files = defaultdict(lambda: 0) - total_bytes = defaultdict(lambda: 0) - - for algorithm, values in manifest_data.items(): - manifest_filename = "manifest-%s.txt" % algorithm - - with open_text_file(manifest_filename, "w", encoding=encoding) as manifest: - for digest, filename, byte_count in values: - manifest.write("%s %s\n" % (digest, _encode_filename(filename))) - num_files[algorithm] += 1 - total_bytes[algorithm] += byte_count - - # We'll use sets of the values for the error checks and eventually return the payload oxum values: - byte_value_set = set(total_bytes.values()) - file_count_set = set(num_files.values()) - - # allow a bag with an empty payload - if not byte_value_set and not file_count_set: - return 0, 0 - - if len(file_count_set) != 1: - raise RuntimeError(_("Expected the same number of files for each checksum")) - - if len(byte_value_set) != 1: - raise RuntimeError(_("Expected the same number of bytes for each checksums")) - - return byte_value_set.pop(), file_count_set.pop() - - -def _make_tagmanifest_file(alg, bag_dir, encoding="utf-8"): - tagmanifest_file = join(bag_dir, "tagmanifest-%s.txt" % alg) - LOGGER.info(_("Creating %s"), tagmanifest_file) - - checksums = [] - for f in _find_tag_files(bag_dir): - if re.match(r"^tagmanifest-.+\.txt$", f): - continue - with open(join(bag_dir, f), "rb") as fh: - m = hashlib.new(alg) - while True: - block = fh.read(HASH_BLOCK_SIZE) - if not block: - break - m.update(block) - checksums.append((m.hexdigest(), f)) - - with open_text_file( - join(bag_dir, tagmanifest_file), mode="w", encoding=encoding - ) as tagmanifest: - for digest, filename in checksums: - tagmanifest.write("%s %s\n" % (digest, filename)) - - -def _find_tag_files(bag_dir): - for dir in os.listdir(bag_dir): - if dir != "data": - if os.path.isfile(dir) and not dir.startswith("tagmanifest-"): - yield dir - for dir_name, _, filenames in os.walk(dir): - for filename in filenames: - if filename.startswith("tagmanifest-"): - continue - # remove everything up to the bag_dir directory - p = join(dir_name, filename) - yield os.path.relpath(p, bag_dir) - - -def _walk(data_dir): - for dirpath, dirnames, filenames in os.walk(data_dir): - # if we don't sort here the order of entries is non-deterministic - # which makes it hard to test the fixity of tagmanifest-md5.txt - filenames.sort() - dirnames.sort() - for fn in filenames: - path = os.path.join(dirpath, fn) - # BagIt spec requires manifest to always use '/' as path separator - if os.path.sep != "/": - parts = path.split(os.path.sep) - path = "/".join(parts) - yield path - - -def _can_bag(test_dir): - """Scan the provided directory for files which cannot be bagged due to insufficient permissions""" - unbaggable = [] - - if not os.access(test_dir, os.R_OK): - # We cannot continue without permission to read the source directory - unbaggable.append(test_dir) - return unbaggable - - if not os.access(test_dir, os.W_OK): - unbaggable.append(test_dir) - - for dirpath, dirnames, filenames in os.walk(test_dir): - for directory in dirnames: - full_path = os.path.join(dirpath, directory) - if not os.access(full_path, os.W_OK): - unbaggable.append(full_path) - - return unbaggable - - -def _can_read(test_dir): - """ - returns ((unreadable_dirs), (unreadable_files)) - """ - unreadable_dirs = [] - unreadable_files = [] - - if not os.access(test_dir, os.R_OK): - unreadable_dirs.append(test_dir) - else: - for dirpath, dirnames, filenames in os.walk(test_dir): - for dn in dirnames: - full_path = os.path.join(dirpath, dn) - if not os.access(full_path, os.R_OK): - unreadable_dirs.append(full_path) - for fn in filenames: - full_path = os.path.join(dirpath, fn) - if not os.access(full_path, os.R_OK): - unreadable_files.append(full_path) - return (tuple(unreadable_dirs), tuple(unreadable_files)) - - -def generate_manifest_lines(filename, algorithms=DEFAULT_CHECKSUMS): - LOGGER.info(_("Generating manifest lines for file %s"), filename) - - # For performance we'll read the file only once and pass it block - # by block to every requested hash algorithm: - hashers = get_hashers(algorithms) - - total_bytes = 0 - - with open(filename, "rb") as f: - while True: - block = f.read(HASH_BLOCK_SIZE) - - if not block: - break - - total_bytes += len(block) - for hasher in hashers.values(): - hasher.update(block) - - decoded_filename = _decode_filename(filename) - - # We'll generate a list of results in roughly manifest format but prefixed with the algorithm: - results = [ - (alg, hasher.hexdigest(), decoded_filename, total_bytes) - for alg, hasher in hashers.items() - ] - - return results - - -def _encode_filename(s): - s = s.replace("\r", "%0D") - s = s.replace("\n", "%0A") - return s - - -def _decode_filename(s): - s = re.sub(r"%0D", "\r", s, re.IGNORECASE) - s = re.sub(r"%0A", "\n", s, re.IGNORECASE) - return s - - -def force_unicode_py2(s): - """Reliably return a Unicode string given a possible unicode or byte string""" - if isinstance(s, str): - return s.decode("utf-8") - else: - return unicode(s) - - -if sys.version_info > (3, 0): - force_unicode = str -else: - force_unicode = force_unicode_py2 - -# following code is used for command line program - - -class BagArgumentParser(argparse.ArgumentParser): - def __init__(self, *args, **kwargs): - self.bag_info = {} - argparse.ArgumentParser.__init__(self, *args, **kwargs) - - -class BagHeaderAction(argparse.Action): - def __call__(self, parser, _, values, option_string=None): - opt = option_string.lstrip("--") - opt_caps = "-".join([o.capitalize() for o in opt.split("-")]) - parser.bag_info[opt_caps] = values - - -def _make_parser(): - parser = BagArgumentParser( - formatter_class=argparse.RawDescriptionHelpFormatter, - description="bagit-python version %s\n\n%s\n" % (VERSION, __doc__.strip()), - ) - parser.add_argument( - "--processes", - type=int, - dest="processes", - default=1, - help=_( - "Use multiple processes to calculate checksums faster (default: %(default)s)" - ), - ) - parser.add_argument("--log", help=_("The name of the log file (default: stdout)")) - parser.add_argument( - "--quiet", - action="store_true", - help=_("Suppress all progress information other than errors"), - ) - parser.add_argument( - "--validate", - action="store_true", - help=_( - "Validate existing bags in the provided directories instead of" - " creating new ones" - ), - ) - parser.add_argument( - "--fast", - action="store_true", - help=_( - "Modify --validate behaviour to only test whether the bag directory" - " has the number of files and total size specified in Payload-Oxum" - " without performing checksum validation to detect corruption." - ), - ) - parser.add_argument( - "--completeness-only", - action="store_true", - help=_( - "Modify --validate behaviour to test whether the bag directory" - " has the expected payload specified in the checksum manifests" - " without performing checksum validation to detect corruption." - ), - ) - - checksum_args = parser.add_argument_group( - _("Checksum Algorithms"), - _( - "Select the manifest algorithms to be used when creating bags" - " (default=%s)" - ) - % ", ".join(DEFAULT_CHECKSUMS), - ) - - for i in CHECKSUM_ALGOS: - alg_name = re.sub(r"^([A-Z]+)(\d+)$", r"\1-\2", i.upper()) - checksum_args.add_argument( - "--%s" % i, - action="append_const", - dest="checksums", - const=i, - help=_("Generate %s manifest when creating a bag") % alg_name, - ) - - metadata_args = parser.add_argument_group(_("Optional Bag Metadata")) - for header in STANDARD_BAG_INFO_HEADERS: - metadata_args.add_argument( - "--%s" % header.lower(), type=str, action=BagHeaderAction - ) - - parser.add_argument( - "directory", - nargs="+", - help=_( - "Directory which will be converted into a bag in place" - " by moving any existing files into the BagIt structure" - " and creating the manifests and other metadata." - ), - ) - - return parser - - -def _configure_logging(opts): - log_format = "%(asctime)s - %(levelname)s - %(message)s" - if opts.quiet: - level = logging.ERROR - else: - level = logging.INFO - if opts.log: - logging.basicConfig(filename=opts.log, level=level, format=log_format) - else: - logging.basicConfig(level=level, format=log_format) - - -def main(): - if "--version" in sys.argv: - print(_("bagit-python version %s") % VERSION) - sys.exit(0) - - parser = _make_parser() - args = parser.parse_args() - - if args.processes < 0: - parser.error(_("The number of processes must be 0 or greater")) - - if args.fast and not args.validate: - parser.error(_("--fast is only allowed as an option for --validate!")) - - _configure_logging(args) - - rc = 0 - for bag_dir in args.directory: - # validate the bag - if args.validate: - try: - bag = Bag(bag_dir) - # validate throws a BagError or BagValidationError - bag.validate( - processes=args.processes, - fast=args.fast, - completeness_only=args.completeness_only, - ) - if args.fast: - LOGGER.info(_("%s valid according to Payload-Oxum"), bag_dir) - else: - LOGGER.info(_("%s is valid"), bag_dir) - except BagError as e: - LOGGER.error( - _("%(bag)s is invalid: %(error)s"), {"bag": bag_dir, "error": e} - ) - rc = 1 - - # make the bag - else: - try: - make_bag( - bag_dir, - bag_info=parser.bag_info, - processes=args.processes, - checksums=args.checksums, - ) - except Exception as exc: - LOGGER.error( - _("Failed to create bag in %(bag_directory)s: %(error)s"), - {"bag_directory": bag_dir, "error": exc}, - exc_info=True, - ) - rc = 1 - - sys.exit(rc) - - -if __name__ == "__main__": - main()