comparison env/lib/python3.9/site-packages/galaxy/util/commands.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """Generic I/O and shell processing code used by Galaxy tool dependencies."""
2 import logging
3 import os
4 import shlex
5 import subprocess
6 import sys as _sys
7
8 from galaxy.util import (
9 unicodify,
10 which
11 )
12
13 log = logging.getLogger(__name__)
14
15 STDOUT_INDICATOR = "-"
16
17
18 def redirecting_io(sys=_sys):
19 """Predicate to determine if we are redicting stdout in process."""
20 assert sys is not None
21 try:
22 # Need to explicitly call fileno() because sys.stdout could be a
23 # io.StringIO object, which has a fileno() method but only raises an
24 # io.UnsupportedOperation exception
25 sys.stdout.fileno()
26 except Exception:
27 return True
28 else:
29 return False
30
31
32 def redirect_aware_commmunicate(p, sys=_sys):
33 """Variant of process.communicate that works with in process I/O redirection."""
34 assert sys is not None
35 out, err = p.communicate()
36 if redirecting_io(sys=sys):
37 if out:
38 # We don't unicodify in Python2 because sys.stdout may be a
39 # cStringIO.StringIO object, which does not accept Unicode strings
40 out = unicodify(out)
41 sys.stdout.write(out)
42 out = None
43 if err:
44 err = unicodify(err)
45 sys.stderr.write(err)
46 err = None
47 return out, err
48
49
50 def shell(cmds, env=None, **kwds):
51 """Run shell commands with `shell_process` and wait."""
52 sys = kwds.get("sys", _sys)
53 assert sys is not None
54 p = shell_process(cmds, env, **kwds)
55 if redirecting_io(sys=sys):
56 redirect_aware_commmunicate(p, sys=sys)
57 exit = p.returncode
58 return exit
59 else:
60 return p.wait()
61
62
63 def shell_process(cmds, env=None, **kwds):
64 """A high-level method wrapping subprocess.Popen.
65
66 Handles details such as environment extension and in process I/O
67 redirection.
68 """
69 sys = kwds.get("sys", _sys)
70 popen_kwds = dict()
71 if isinstance(cmds, str):
72 log.warning("Passing program arguments as a string may be a security hazard if combined with untrusted input")
73 popen_kwds['shell'] = True
74 if kwds.get("stdout", None) is None and redirecting_io(sys=sys):
75 popen_kwds["stdout"] = subprocess.PIPE
76 if kwds.get("stderr", None) is None and redirecting_io(sys=sys):
77 popen_kwds["stderr"] = subprocess.PIPE
78
79 popen_kwds.update(**kwds)
80 if env:
81 new_env = os.environ.copy()
82 new_env.update(env)
83 popen_kwds["env"] = new_env
84 p = subprocess.Popen(cmds, **popen_kwds)
85 return p
86
87
88 def execute(cmds, input=None):
89 """Execute commands and throw an exception on a non-zero exit.
90 if input is not None then the string is sent to the process' stdin.
91
92 Return the standard output if the commands are successful
93 """
94 return _wait(cmds, input=input, shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
95
96
97 def argv_to_str(command_argv, quote=True):
98 """Convert an argv command list to a string for shell subprocess.
99
100 If None appears in the command list it is simply excluded.
101
102 Arguments are quoted with shlex.quote(). That said, this method is not meant to be
103 used in security critical paths of code and should not be used to sanitize
104 code.
105 """
106 map_func = shlex.quote if quote else lambda x: x
107 return " ".join(map_func(c) for c in command_argv if c is not None)
108
109
110 def _wait(cmds, input=None, **popen_kwds):
111 p = subprocess.Popen(cmds, **popen_kwds)
112 stdout, stderr = p.communicate(input)
113 stdout, stderr = unicodify(stdout), unicodify(stderr)
114 if p.returncode != 0:
115 raise CommandLineException(argv_to_str(cmds), stdout, stderr, p.returncode)
116 return stdout
117
118
119 def download_command(url, to=STDOUT_INDICATOR):
120 """Build a command line to download a URL.
121
122 By default the URL will be downloaded to standard output but a specific
123 file can be specified with the `to` argument.
124 """
125 if which("wget"):
126 download_cmd = ["wget", "-q"]
127 if to == STDOUT_INDICATOR:
128 download_cmd.extend(["-O", STDOUT_INDICATOR, url])
129 else:
130 download_cmd.extend(["--recursive", "-O", to, url])
131 else:
132 download_cmd = ["curl", "-L", url]
133 if to != STDOUT_INDICATOR:
134 download_cmd.extend(["-o", to])
135 return download_cmd
136
137
138 class CommandLineException(Exception):
139 """An exception indicating a non-zero command-line exit."""
140
141 def __init__(self, command, stdout, stderr, returncode):
142 """Construct a CommandLineException from command and standard I/O."""
143 self.command = command
144 self.stdout = stdout
145 self.stderr = stderr
146 self.returncode = returncode
147 self.message = ("Failed to execute command-line %s, stderr was:\n"
148 "-------->>begin stderr<<--------\n"
149 "%s\n"
150 "-------->>end stderr<<--------\n"
151 "-------->>begin stdout<<--------\n"
152 "%s\n"
153 "-------->>end stdout<<--------\n"
154 ) % (command, stderr, stdout)
155
156 def __str__(self):
157 """Return a verbose error message indicating the command problem."""
158 return self.message
159
160
161 __all__ = (
162 'argv_to_str',
163 'CommandLineException',
164 'download_command',
165 'execute',
166 'redirect_aware_commmunicate',
167 'redirecting_io',
168 'shell',
169 'shell_process',
170 'which',
171 )