from typing import *
import os
from pathlib import Path
import subprocess
[docs]class BashUtils:
"""
Utility functions for running Bash commands.
"""
PRINT_LIMIT = 1000
[docs] class RunResult(NamedTuple):
return_code: int
stdout: str
stderr: str
[docs] @classmethod
def run(cls, cmd: str,
expected_return_code: int = None,
is_update_env: bool = False,
timeout: Optional[float] = None,
) -> RunResult:
"""
Runs a Bash command and returns the stdout.
:param cmd: the command to run.
:param expected_return_code: if set to an int, will raise exception if the return code mismatch.
:param is_update_env: if true, the environment in *this python process (os.environ)* will be updated upon the successful execution of cmd (i.e., returns 0), to reflect the changes to the enrionment variables cmd may make. Note it can not change the environment of the process that invoked this python process. It is useful because the updated environment will be used for later BashUtils.run executions.
:param timeout: if not None, kill the process after timeout seconds and raise TimeoutExpire exception.
:return: the run result, which is a named tuple with field return_code, stdout, stderr.
"""
# If update env is requested, append an additional command to the cmd
if is_update_env:
tempfile_update_env = cls.get_temp_file()
cmd += f" && env > {tempfile_update_env}"
# end if
completed_process = subprocess.run(["bash", "-c", cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
#completed_process = subprocess.run(cmd, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return_code = completed_process.returncode
stdout = completed_process.stdout.decode("utf-8", errors="ignore")
stderr = completed_process.stderr.decode("utf-8", errors="ignore")
# Update env, if requested and return code is 0
if is_update_env and return_code == 0:
with open(str(tempfile_update_env), "r") as fp:
os.environ.clear()
for line in fp.read().splitlines(keepends=False):
env_key, env_value = line.split(sep="=", maxsplit=1)
os.environ[env_key] = env_value
# end for
# end with
# end if
if expected_return_code is not None:
if return_code != expected_return_code:
if len(stdout) > cls.PRINT_LIMIT:
tempfile_stdout = subprocess.run(["bash", "-c", "mktemp"], stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode("utf-8", errors="ignore").strip()
with open(tempfile_stdout, "w") as f:
f.write(stdout)
# end with
stdout = f"{stdout[:cls.PRINT_LIMIT]} //////////TOO LONG; dumped to {tempfile_stdout}//////////"
# end if
if len(stderr) > cls.PRINT_LIMIT:
tempfile_stderr = subprocess.run(["bash", "-c", "mktemp"], stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode("utf-8", errors="ignore").strip()
with open(tempfile_stderr, "w") as f:
f.write(stderr)
# end with
stderr = f"{stderr[:cls.PRINT_LIMIT]} //////////TOO LONG; dumped to {tempfile_stderr}//////////"
# end if
raise RuntimeError(f"Expected {expected_return_code} but returned {return_code} while executing bash command '{cmd}'.\nstdout: {stdout}\nstderr: {stderr}")
# end if, if
return cls.RunResult(return_code, stdout, stderr)
[docs] @classmethod
def get_temp_dir(cls) -> Path:
return Path(cls.run("mktemp -d", expected_return_code=0).stdout.strip())
[docs] @classmethod
def get_temp_file(cls) -> Path:
return Path(cls.run("mktemp", expected_return_code=0).stdout.strip())