comparison env/lib/python3.7/site-packages/cwltool/utils.py @ 0:26e78fe6e8c4 draft

"planemo upload commit c699937486c35866861690329de38ec1a5d9f783"
author shellac
date Sat, 02 May 2020 07:14:21 -0400
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:26e78fe6e8c4
1 """Shared functions and other definitions."""
2 from __future__ import absolute_import
3
4 import collections
5 import os
6 import platform
7 import random
8 import shutil
9 import string
10 import sys
11 import tempfile
12 from functools import partial # pylint: disable=unused-import
13 from typing import (IO, Any, AnyStr, Callable, # pylint: disable=unused-import
14 Dict, Iterable, List, MutableMapping, MutableSequence,
15 Optional, Union)
16
17 import pkg_resources
18 from mypy_extensions import TypedDict
19 from schema_salad.utils import json_dump, json_dumps # pylint: disable=unused-import
20 from six.moves import urllib, zip_longest
21 from typing_extensions import Deque, Text # pylint: disable=unused-import
22 # move to a regular typing import when Python 3.3-3.6 is no longer supported
23
24 from pathlib2 import Path
25
26 # no imports from cwltool allowed
27 if os.name == 'posix':
28 if sys.version_info < (3, 5):
29 import subprocess32 as subprocess # nosec # pylint: disable=unused-import
30 else:
31 import subprocess # nosec # pylint: disable=unused-import
32 else:
33 import subprocess # type: ignore # nosec
34
35 windows_default_container_id = "frolvlad/alpine-bash"
36
37 Directory = TypedDict('Directory',
38 {'class': Text, 'listing': List[Dict[Text, Text]],
39 'basename': Text})
40
41 DEFAULT_TMP_PREFIX = tempfile.gettempdir() + os.path.sep
42
43 processes_to_kill = collections.deque() # type: Deque[subprocess.Popen]
44
45 def versionstring():
46 # type: () -> Text
47 """Version of CWLtool used to execute the workflow."""
48 pkg = pkg_resources.require("cwltool")
49 if pkg:
50 return u"%s %s" % (sys.argv[0], pkg[0].version)
51 return u"%s %s" % (sys.argv[0], "unknown version")
52
53 def aslist(l): # type: (Any) -> MutableSequence[Any]
54 """Wrap any non-MutableSequence/list in a list."""
55 if isinstance(l, MutableSequence):
56 return l
57 return [l]
58
59 def copytree_with_merge(src, dst): # type: (Text, Text) -> None
60 if not os.path.exists(dst):
61 os.makedirs(dst)
62 shutil.copystat(src, dst)
63 lst = os.listdir(src)
64 for item in lst:
65 spath = os.path.join(src, item)
66 dpath = os.path.join(dst, item)
67 if os.path.isdir(spath):
68 copytree_with_merge(spath, dpath)
69 else:
70 shutil.copy2(spath, dpath)
71
72 def docker_windows_path_adjust(path):
73 # type: (Text) -> Text
74 r"""
75 Adjust only windows paths for Docker.
76
77 The docker run command treats them as unix paths.
78
79 Example: 'C:\Users\foo to /C/Users/foo (Docker for Windows) or /c/Users/foo
80 (Docker toolbox).
81 """
82 if onWindows():
83 split = path.split(':')
84 if len(split) == 2:
85 if platform.win32_ver()[0] in ('7', '8'): # type: ignore
86 split[0] = split[0].lower() # Docker toolbox uses lowecase windows Drive letters
87 else:
88 split[0] = split[0].capitalize()
89 # Docker for Windows uses uppercase windows Drive letters
90 path = ':'.join(split)
91 path = path.replace(':', '').replace('\\', '/')
92 return path if path[0] == '/' else '/' + path
93 return path
94
95
96 def docker_windows_reverse_path_adjust(path):
97 # type: (Text) -> (Text)
98 r"""
99 Change docker path (only on windows os) appropriately back to Windows path.
100
101 Example: /C/Users/foo to C:\Users\foo
102 """
103 if path is not None and onWindows():
104 if path[0] == '/':
105 path = path[1:]
106 else:
107 raise ValueError("not a docker path")
108 splitpath = path.split('/')
109 splitpath[0] = splitpath[0]+':'
110 return '\\'.join(splitpath)
111 return path
112
113
114 def docker_windows_reverse_fileuri_adjust(fileuri):
115 # type: (Text) -> (Text)
116 r"""
117 Convert fileuri to be MS Windows comptabile, if needed.
118
119 On docker in windows fileuri do not contain : in path
120 To convert this file uri to windows compatible add : after drive letter,
121 so file:///E/var becomes file:///E:/var
122 """
123 if fileuri is not None and onWindows():
124 if urllib.parse.urlsplit(fileuri).scheme == "file":
125 filesplit = fileuri.split("/")
126 if filesplit[3][-1] != ':':
127 filesplit[3] = filesplit[3]+':'
128 return '/'.join(filesplit)
129 return fileuri
130 raise ValueError("not a file URI")
131 return fileuri
132
133
134 def onWindows():
135 # type: () -> (bool)
136 """Check if we are on Windows OS."""
137 return os.name == 'nt'
138
139
140 def convert_pathsep_to_unix(path): # type: (Text) -> (Text)
141 """
142 Convert path seperators to unix style.
143
144 On windows os.path.join would use backslash to join path, since we would
145 use these paths in Docker we would convert it to use forward slashes: /
146 """
147 if path is not None and onWindows():
148 return path.replace('\\', '/')
149 return path
150
151 def cmp_like_py2(dict1, dict2): # type: (Dict[Text, Any], Dict[Text, Any]) -> int
152 """
153 Compare in the same manner as Python2.
154
155 Comparision function to be used in sorting as python3 doesn't allow sorting
156 of different types like str() and int().
157 This function re-creates sorting nature in py2 of heterogeneous list of
158 `int` and `str`
159 """
160 # extract lists from both dicts
161 first, second = dict1["position"], dict2["position"]
162 # iterate through both list till max of their size
163 for i, j in zip_longest(first, second):
164 if i == j:
165 continue
166 # in case 1st list is smaller
167 # should come first in sorting
168 if i is None:
169 return -1
170 # if 1st list is longer,
171 # it should come later in sort
172 elif j is None:
173 return 1
174
175 # if either of the list contains str element
176 # at any index, both should be str before comparing
177 if isinstance(i, str) or isinstance(j, str):
178 return 1 if str(i) > str(j) else -1
179 # int comparison otherwise
180 return 1 if i > j else -1
181 # if both lists are equal
182 return 0
183
184
185 def bytes2str_in_dicts(inp # type: Union[MutableMapping[Text, Any], MutableSequence[Any], Any]
186 ):
187 # type: (...) -> Union[Text, MutableSequence[Any], MutableMapping[Text, Any]]
188 """
189 Convert any present byte string to unicode string, inplace.
190
191 input is a dict of nested dicts and lists
192 """
193 # if input is dict, recursively call for each value
194 if isinstance(inp, MutableMapping):
195 for k in inp:
196 inp[k] = bytes2str_in_dicts(inp[k])
197 return inp
198
199 # if list, iterate through list and fn call
200 # for all its elements
201 if isinstance(inp, MutableSequence):
202 for idx, value in enumerate(inp):
203 inp[idx] = bytes2str_in_dicts(value)
204 return inp
205
206 # if value is bytes, return decoded string,
207 elif isinstance(inp, bytes):
208 return inp.decode('utf-8')
209
210 # simply return elements itself
211 return inp
212
213
214 def visit_class(rec, cls, op):
215 # type: (Any, Iterable[Any], Union[Callable[..., Any], partial[Any]]) -> None
216 """Apply a function to with "class" in cls."""
217 if isinstance(rec, MutableMapping):
218 if "class" in rec and rec.get("class") in cls:
219 op(rec)
220 for d in rec:
221 visit_class(rec[d], cls, op)
222 if isinstance(rec, MutableSequence):
223 for d in rec:
224 visit_class(d, cls, op)
225
226 def visit_field(rec, field, op):
227 # type: (Any, Text, Union[Callable[..., Any], partial[Any]]) -> None
228 """Apply a function to mapping with 'field'."""
229 if isinstance(rec, MutableMapping):
230 if field in rec:
231 rec[field] = op(rec[field])
232 for d in rec:
233 visit_field(rec[d], field, op)
234 if isinstance(rec, MutableSequence):
235 for d in rec:
236 visit_field(d, field, op)
237
238
239 def random_outdir(): # type: () -> Text
240 """Return the random directory name chosen to use for tool / workflow output."""
241 # compute this once and store it as a function attribute - each subsequent call will return the same value
242 if not hasattr(random_outdir, 'outdir'):
243 random_outdir.outdir = '/' + ''.join([random.choice(string.ascii_letters) for _ in range(6)]) # type: ignore # nosec
244 return random_outdir.outdir # type: ignore
245
246 #
247 # Simple multi-platform (fcntl/msvrt) file locking wrapper
248 #
249 try:
250 import fcntl
251
252 def shared_file_lock(fd): # type: (IO[Any]) -> None
253 fcntl.flock(fd.fileno(), fcntl.LOCK_SH)
254
255 def upgrade_lock(fd): # type: (IO[Any]) -> None
256 fcntl.flock(fd.fileno(), fcntl.LOCK_EX)
257
258 except ImportError:
259 import msvcrt
260
261 def shared_file_lock(fd): # type: (IO[Any]) -> None
262 msvcrt.locking(fd.fileno(), msvcrt.LK_LOCK, 1024) # type: ignore
263
264 def upgrade_lock(fd): # type: (IO[Any]) -> None
265 pass