Mercurial > repos > shellac > sam_consensus_v3
comparison env/lib/python3.9/site-packages/planemo/lint.py @ 0:4f3585e2f14b draft default tip
"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author | shellac |
---|---|
date | Mon, 22 Mar 2021 18:12:50 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4f3585e2f14b |
---|---|
1 """Utilities to help linting various targets.""" | |
2 from __future__ import absolute_import | |
3 | |
4 import os | |
5 | |
6 import requests | |
7 from galaxy.tool_util.lint import LintContext | |
8 from six.moves.urllib.request import urlopen | |
9 | |
10 from planemo.io import error | |
11 from planemo.shed import find_urls_for_xml | |
12 from planemo.xml import validation | |
13 | |
14 | |
15 def build_lint_args(ctx, **kwds): | |
16 """Handle common report, error, and skip linting arguments.""" | |
17 report_level = kwds.get("report_level", "all") | |
18 fail_level = kwds.get("fail_level", "warn") | |
19 skip = kwds.get("skip", None) | |
20 if skip is None: | |
21 skip = ctx.global_config.get("lint_skip", "") | |
22 if isinstance(skip, list): | |
23 skip = ",".join(skip) | |
24 | |
25 skip_types = [s.strip() for s in skip.split(",")] | |
26 lint_args = dict( | |
27 level=report_level, | |
28 fail_level=fail_level, | |
29 skip_types=skip_types, | |
30 ) | |
31 return lint_args | |
32 | |
33 | |
34 def setup_lint(ctx, **kwds): | |
35 """Prepare lint_args and lint_ctx to begin linting a target.""" | |
36 lint_args = kwds.get("lint_args", None) or build_lint_args(ctx, **kwds) | |
37 lint_ctx = LintContext(lint_args["level"]) | |
38 return lint_args, lint_ctx | |
39 | |
40 | |
41 def handle_lint_complete(lint_ctx, lint_args, failed=False): | |
42 """Complete linting of a target and decide exit code.""" | |
43 if not failed: | |
44 failed = lint_ctx.failed(lint_args["fail_level"]) | |
45 if failed: | |
46 error("Failed linting") | |
47 return 1 if failed else 0 | |
48 | |
49 | |
50 def lint_dois(tool_xml, lint_ctx): | |
51 """Find referenced DOIs and check they have valid with https://doi.org.""" | |
52 dois = find_dois_for_xml(tool_xml) | |
53 for publication in dois: | |
54 is_doi(publication, lint_ctx) | |
55 | |
56 | |
57 def find_dois_for_xml(tool_xml): | |
58 dois = [] | |
59 for element in tool_xml.getroot().findall("citations"): | |
60 for citation in list(element): | |
61 if citation.tag == 'citation' and citation.attrib.get('type', '') == 'doi': | |
62 dois.append(citation.text) | |
63 return dois | |
64 | |
65 | |
66 def is_doi(publication_id, lint_ctx): | |
67 """Check if dx.doi knows about the ``publication_id``.""" | |
68 base_url = "https://doi.org" | |
69 if publication_id is None: | |
70 lint_ctx.error('Empty DOI citation') | |
71 return | |
72 publication_id = publication_id.strip() | |
73 doiless_publication_id = publication_id.split("doi:", 1)[-1] | |
74 if not doiless_publication_id: | |
75 lint_ctx.error('Empty DOI citation') | |
76 return | |
77 url = "%s/%s" % (base_url, doiless_publication_id) | |
78 r = requests.get(url) | |
79 if r.status_code == 200: | |
80 if publication_id != doiless_publication_id: | |
81 lint_ctx.error("%s is valid, but Galaxy expects DOI without 'doi:' prefix" % publication_id) | |
82 else: | |
83 lint_ctx.info("%s is a valid DOI" % publication_id) | |
84 elif r.status_code == 404: | |
85 lint_ctx.error("%s is not a valid DOI" % publication_id) | |
86 else: | |
87 lint_ctx.warn("dx.doi returned unexpected status code %d" % r.status_code) | |
88 | |
89 | |
90 def lint_xsd(lint_ctx, schema_path, path): | |
91 """Lint XML at specified path with supplied schema.""" | |
92 name = lint_ctx.object_name or os.path.basename(path) | |
93 validator = validation.get_validator(require=True) | |
94 validation_result = validator.validate(schema_path, path) | |
95 if not validation_result.passed: | |
96 msg = "Invalid XML found in file: %s. Errors [%s]" | |
97 msg = msg % (name, validation_result.output) | |
98 lint_ctx.error(msg) | |
99 else: | |
100 lint_ctx.info("File validates against XML schema.") | |
101 | |
102 | |
103 def lint_urls(root, lint_ctx): | |
104 """Find referenced URLs and verify they are valid.""" | |
105 urls, docs = find_urls_for_xml(root) | |
106 | |
107 # This is from Google Chome on macOS, current at time of writing: | |
108 BROWSER_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 11_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.141 Safari/537.36" | |
109 | |
110 def validate_url(url, lint_ctx, user_agent=None): | |
111 is_valid = True | |
112 if url.startswith('http://') or url.startswith('https://'): | |
113 if user_agent: | |
114 headers = {"User-Agent": user_agent, 'Accept': '*/*'} | |
115 else: | |
116 headers = None | |
117 r = None | |
118 try: | |
119 r = requests.get(url, headers=headers, stream=True) | |
120 r.raise_for_status() | |
121 next(r.iter_content(1000)) | |
122 except Exception as e: | |
123 if r is not None and r.status_code == 429: | |
124 # too many requests | |
125 pass | |
126 if r is not None and r.status_code == 403 and 'cloudflare' in r.text: | |
127 # CloudFlare protection block | |
128 pass | |
129 else: | |
130 is_valid = False | |
131 lint_ctx.error("Error '%s' accessing %s" % (e, url)) | |
132 else: | |
133 try: | |
134 with urlopen(url) as handle: | |
135 handle.read(100) | |
136 except Exception as e: | |
137 is_valid = False | |
138 lint_ctx.error("Error '%s' accessing %s" % (e, url)) | |
139 if is_valid: | |
140 lint_ctx.info("URL OK %s" % url) | |
141 | |
142 for url in urls: | |
143 validate_url(url, lint_ctx) | |
144 for url in docs: | |
145 validate_url(url, lint_ctx, BROWSER_USER_AGENT) | |
146 | |
147 | |
148 __all__ = ( | |
149 "build_lint_args", | |
150 "handle_lint_complete", | |
151 "lint_dois", | |
152 "lint_urls", | |
153 "lint_xsd", | |
154 ) |