Mercurial > repos > shellac > guppy_basecaller
comparison env/lib/python3.7/site-packages/cwltool/utils.py @ 5:9b1c78e6ba9c draft default tip
"planemo upload commit 6c0a8142489327ece472c84e558c47da711a9142"
author | shellac |
---|---|
date | Mon, 01 Jun 2020 08:59:25 -0400 |
parents | 79f47841a781 |
children |
comparison
equal
deleted
inserted
replaced
4:79f47841a781 | 5:9b1c78e6ba9c |
---|---|
1 """Shared functions and other definitions.""" | |
2 from __future__ import absolute_import | |
3 | |
4 import collections | |
5 import os | |
6 import platform | |
7 import random | |
8 import shutil | |
9 import string | |
10 import sys | |
11 import tempfile | |
12 from functools import partial # pylint: disable=unused-import | |
13 from typing import (IO, Any, AnyStr, Callable, # pylint: disable=unused-import | |
14 Dict, Iterable, List, MutableMapping, MutableSequence, | |
15 Optional, Union) | |
16 | |
17 import pkg_resources | |
18 from mypy_extensions import TypedDict | |
19 from schema_salad.utils import json_dump, json_dumps # pylint: disable=unused-import | |
20 from six.moves import urllib, zip_longest | |
21 from typing_extensions import Deque, Text # pylint: disable=unused-import | |
22 # move to a regular typing import when Python 3.3-3.6 is no longer supported | |
23 | |
24 from pathlib2 import Path | |
25 | |
26 # no imports from cwltool allowed | |
27 if os.name == 'posix': | |
28 if sys.version_info < (3, 5): | |
29 import subprocess32 as subprocess # nosec # pylint: disable=unused-import | |
30 else: | |
31 import subprocess # nosec # pylint: disable=unused-import | |
32 else: | |
33 import subprocess # type: ignore # nosec | |
34 | |
35 windows_default_container_id = "frolvlad/alpine-bash" | |
36 | |
37 Directory = TypedDict('Directory', | |
38 {'class': Text, 'listing': List[Dict[Text, Text]], | |
39 'basename': Text}) | |
40 | |
41 DEFAULT_TMP_PREFIX = tempfile.gettempdir() + os.path.sep | |
42 | |
43 processes_to_kill = collections.deque() # type: Deque[subprocess.Popen] | |
44 | |
45 def versionstring(): | |
46 # type: () -> Text | |
47 """Version of CWLtool used to execute the workflow.""" | |
48 pkg = pkg_resources.require("cwltool") | |
49 if pkg: | |
50 return u"%s %s" % (sys.argv[0], pkg[0].version) | |
51 return u"%s %s" % (sys.argv[0], "unknown version") | |
52 | |
53 def aslist(l): # type: (Any) -> MutableSequence[Any] | |
54 """Wrap any non-MutableSequence/list in a list.""" | |
55 if isinstance(l, MutableSequence): | |
56 return l | |
57 return [l] | |
58 | |
59 def copytree_with_merge(src, dst): # type: (Text, Text) -> None | |
60 if not os.path.exists(dst): | |
61 os.makedirs(dst) | |
62 shutil.copystat(src, dst) | |
63 lst = os.listdir(src) | |
64 for item in lst: | |
65 spath = os.path.join(src, item) | |
66 dpath = os.path.join(dst, item) | |
67 if os.path.isdir(spath): | |
68 copytree_with_merge(spath, dpath) | |
69 else: | |
70 shutil.copy2(spath, dpath) | |
71 | |
72 def docker_windows_path_adjust(path): | |
73 # type: (Text) -> Text | |
74 r""" | |
75 Adjust only windows paths for Docker. | |
76 | |
77 The docker run command treats them as unix paths. | |
78 | |
79 Example: 'C:\Users\foo to /C/Users/foo (Docker for Windows) or /c/Users/foo | |
80 (Docker toolbox). | |
81 """ | |
82 if onWindows(): | |
83 split = path.split(':') | |
84 if len(split) == 2: | |
85 if platform.win32_ver()[0] in ('7', '8'): # type: ignore | |
86 split[0] = split[0].lower() # Docker toolbox uses lowecase windows Drive letters | |
87 else: | |
88 split[0] = split[0].capitalize() | |
89 # Docker for Windows uses uppercase windows Drive letters | |
90 path = ':'.join(split) | |
91 path = path.replace(':', '').replace('\\', '/') | |
92 return path if path[0] == '/' else '/' + path | |
93 return path | |
94 | |
95 | |
96 def docker_windows_reverse_path_adjust(path): | |
97 # type: (Text) -> (Text) | |
98 r""" | |
99 Change docker path (only on windows os) appropriately back to Windows path. | |
100 | |
101 Example: /C/Users/foo to C:\Users\foo | |
102 """ | |
103 if path is not None and onWindows(): | |
104 if path[0] == '/': | |
105 path = path[1:] | |
106 else: | |
107 raise ValueError("not a docker path") | |
108 splitpath = path.split('/') | |
109 splitpath[0] = splitpath[0]+':' | |
110 return '\\'.join(splitpath) | |
111 return path | |
112 | |
113 | |
114 def docker_windows_reverse_fileuri_adjust(fileuri): | |
115 # type: (Text) -> (Text) | |
116 r""" | |
117 Convert fileuri to be MS Windows comptabile, if needed. | |
118 | |
119 On docker in windows fileuri do not contain : in path | |
120 To convert this file uri to windows compatible add : after drive letter, | |
121 so file:///E/var becomes file:///E:/var | |
122 """ | |
123 if fileuri is not None and onWindows(): | |
124 if urllib.parse.urlsplit(fileuri).scheme == "file": | |
125 filesplit = fileuri.split("/") | |
126 if filesplit[3][-1] != ':': | |
127 filesplit[3] = filesplit[3]+':' | |
128 return '/'.join(filesplit) | |
129 return fileuri | |
130 raise ValueError("not a file URI") | |
131 return fileuri | |
132 | |
133 | |
134 def onWindows(): | |
135 # type: () -> (bool) | |
136 """Check if we are on Windows OS.""" | |
137 return os.name == 'nt' | |
138 | |
139 | |
140 def convert_pathsep_to_unix(path): # type: (Text) -> (Text) | |
141 """ | |
142 Convert path seperators to unix style. | |
143 | |
144 On windows os.path.join would use backslash to join path, since we would | |
145 use these paths in Docker we would convert it to use forward slashes: / | |
146 """ | |
147 if path is not None and onWindows(): | |
148 return path.replace('\\', '/') | |
149 return path | |
150 | |
151 def cmp_like_py2(dict1, dict2): # type: (Dict[Text, Any], Dict[Text, Any]) -> int | |
152 """ | |
153 Compare in the same manner as Python2. | |
154 | |
155 Comparision function to be used in sorting as python3 doesn't allow sorting | |
156 of different types like str() and int(). | |
157 This function re-creates sorting nature in py2 of heterogeneous list of | |
158 `int` and `str` | |
159 """ | |
160 # extract lists from both dicts | |
161 first, second = dict1["position"], dict2["position"] | |
162 # iterate through both list till max of their size | |
163 for i, j in zip_longest(first, second): | |
164 if i == j: | |
165 continue | |
166 # in case 1st list is smaller | |
167 # should come first in sorting | |
168 if i is None: | |
169 return -1 | |
170 # if 1st list is longer, | |
171 # it should come later in sort | |
172 elif j is None: | |
173 return 1 | |
174 | |
175 # if either of the list contains str element | |
176 # at any index, both should be str before comparing | |
177 if isinstance(i, str) or isinstance(j, str): | |
178 return 1 if str(i) > str(j) else -1 | |
179 # int comparison otherwise | |
180 return 1 if i > j else -1 | |
181 # if both lists are equal | |
182 return 0 | |
183 | |
184 | |
185 def bytes2str_in_dicts(inp # type: Union[MutableMapping[Text, Any], MutableSequence[Any], Any] | |
186 ): | |
187 # type: (...) -> Union[Text, MutableSequence[Any], MutableMapping[Text, Any]] | |
188 """ | |
189 Convert any present byte string to unicode string, inplace. | |
190 | |
191 input is a dict of nested dicts and lists | |
192 """ | |
193 # if input is dict, recursively call for each value | |
194 if isinstance(inp, MutableMapping): | |
195 for k in inp: | |
196 inp[k] = bytes2str_in_dicts(inp[k]) | |
197 return inp | |
198 | |
199 # if list, iterate through list and fn call | |
200 # for all its elements | |
201 if isinstance(inp, MutableSequence): | |
202 for idx, value in enumerate(inp): | |
203 inp[idx] = bytes2str_in_dicts(value) | |
204 return inp | |
205 | |
206 # if value is bytes, return decoded string, | |
207 elif isinstance(inp, bytes): | |
208 return inp.decode('utf-8') | |
209 | |
210 # simply return elements itself | |
211 return inp | |
212 | |
213 | |
214 def visit_class(rec, cls, op): | |
215 # type: (Any, Iterable[Any], Union[Callable[..., Any], partial[Any]]) -> None | |
216 """Apply a function to with "class" in cls.""" | |
217 if isinstance(rec, MutableMapping): | |
218 if "class" in rec and rec.get("class") in cls: | |
219 op(rec) | |
220 for d in rec: | |
221 visit_class(rec[d], cls, op) | |
222 if isinstance(rec, MutableSequence): | |
223 for d in rec: | |
224 visit_class(d, cls, op) | |
225 | |
226 def visit_field(rec, field, op): | |
227 # type: (Any, Text, Union[Callable[..., Any], partial[Any]]) -> None | |
228 """Apply a function to mapping with 'field'.""" | |
229 if isinstance(rec, MutableMapping): | |
230 if field in rec: | |
231 rec[field] = op(rec[field]) | |
232 for d in rec: | |
233 visit_field(rec[d], field, op) | |
234 if isinstance(rec, MutableSequence): | |
235 for d in rec: | |
236 visit_field(d, field, op) | |
237 | |
238 | |
239 def random_outdir(): # type: () -> Text | |
240 """Return the random directory name chosen to use for tool / workflow output.""" | |
241 # compute this once and store it as a function attribute - each subsequent call will return the same value | |
242 if not hasattr(random_outdir, 'outdir'): | |
243 random_outdir.outdir = '/' + ''.join([random.choice(string.ascii_letters) for _ in range(6)]) # type: ignore # nosec | |
244 return random_outdir.outdir # type: ignore | |
245 | |
246 # | |
247 # Simple multi-platform (fcntl/msvrt) file locking wrapper | |
248 # | |
249 try: | |
250 import fcntl | |
251 | |
252 def shared_file_lock(fd): # type: (IO[Any]) -> None | |
253 fcntl.flock(fd.fileno(), fcntl.LOCK_SH) | |
254 | |
255 def upgrade_lock(fd): # type: (IO[Any]) -> None | |
256 fcntl.flock(fd.fileno(), fcntl.LOCK_EX) | |
257 | |
258 except ImportError: | |
259 import msvcrt | |
260 | |
261 def shared_file_lock(fd): # type: (IO[Any]) -> None | |
262 msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1024) # type: ignore | |
263 | |
264 def upgrade_lock(fd): # type: (IO[Any]) -> None | |
265 pass |