diff options
Diffstat (limited to 'nixpkgs/nixos/lib/test-driver/test_driver')
-rwxr-xr-x | nixpkgs/nixos/lib/test-driver/test_driver/__init__.py | 140 | ||||
-rw-r--r-- | nixpkgs/nixos/lib/test-driver/test_driver/driver.py | 260 | ||||
-rw-r--r-- | nixpkgs/nixos/lib/test-driver/test_driver/logger.py | 107 | ||||
-rw-r--r-- | nixpkgs/nixos/lib/test-driver/test_driver/machine.py | 1324 | ||||
-rw-r--r-- | nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py | 92 | ||||
-rw-r--r-- | nixpkgs/nixos/lib/test-driver/test_driver/py.typed | 0 | ||||
-rw-r--r-- | nixpkgs/nixos/lib/test-driver/test_driver/qmp.py | 98 | ||||
-rw-r--r-- | nixpkgs/nixos/lib/test-driver/test_driver/vlan.py | 62 |
8 files changed, 2083 insertions, 0 deletions
diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/__init__.py b/nixpkgs/nixos/lib/test-driver/test_driver/__init__.py new file mode 100755 index 000000000000..9daae1e941a6 --- /dev/null +++ b/nixpkgs/nixos/lib/test-driver/test_driver/__init__.py @@ -0,0 +1,140 @@ +import argparse +import os +import time +from pathlib import Path + +import ptpython.repl + +from test_driver.driver import Driver +from test_driver.logger import rootlog + + +class EnvDefault(argparse.Action): + """An argpars Action that takes values from the specified + environment variable as the flags default value. + """ + + def __init__(self, envvar, required=False, default=None, nargs=None, **kwargs): # type: ignore + if not default and envvar: + if envvar in os.environ: + if nargs is not None and (nargs.isdigit() or nargs in ["*", "+"]): + default = os.environ[envvar].split() + else: + default = os.environ[envvar] + kwargs["help"] = ( + kwargs["help"] + f" (default from environment: {default})" + ) + if required and default: + required = False + super().__init__(default=default, required=required, nargs=nargs, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): # type: ignore + setattr(namespace, self.dest, values) + + +def writeable_dir(arg: str) -> Path: + """Raises an ArgumentTypeError if the given argument isn't a writeable directory + Note: We want to fail as early as possible if a directory isn't writeable, + since an executed nixos-test could fail (very late) because of the test-driver + writing in a directory without proper permissions. + """ + path = Path(arg) + if not path.is_dir(): + raise argparse.ArgumentTypeError(f"{path} is not a directory") + if not os.access(path, os.W_OK): + raise argparse.ArgumentTypeError(f"{path} is not a writeable directory") + return path + + +def main() -> None: + arg_parser = argparse.ArgumentParser(prog="nixos-test-driver") + arg_parser.add_argument( + "-K", + "--keep-vm-state", + help="re-use a VM state coming from a previous run", + action="store_true", + ) + arg_parser.add_argument( + "-I", + "--interactive", + help="drop into a python repl and run the tests interactively", + action=argparse.BooleanOptionalAction, + ) + arg_parser.add_argument( + "--start-scripts", + metavar="START-SCRIPT", + action=EnvDefault, + envvar="startScripts", + nargs="*", + help="start scripts for participating virtual machines", + ) + arg_parser.add_argument( + "--vlans", + metavar="VLAN", + action=EnvDefault, + envvar="vlans", + nargs="*", + help="vlans to span by the driver", + ) + arg_parser.add_argument( + "--global-timeout", + type=int, + metavar="GLOBAL_TIMEOUT", + action=EnvDefault, + envvar="globalTimeout", + help="Timeout in seconds for the whole test", + ) + arg_parser.add_argument( + "-o", + "--output_directory", + help="""The path to the directory where outputs copied from the VM will be placed. + By e.g. Machine.copy_from_vm or Machine.screenshot""", + default=Path.cwd(), + type=writeable_dir, + ) + arg_parser.add_argument( + "testscript", + action=EnvDefault, + envvar="testScript", + help="the test script to run", + type=Path, + ) + + args = arg_parser.parse_args() + + if not args.keep_vm_state: + rootlog.info("Machine state will be reset. To keep it, pass --keep-vm-state") + + with Driver( + args.start_scripts, + args.vlans, + args.testscript.read_text(), + args.output_directory.resolve(), + args.keep_vm_state, + args.global_timeout, + ) as driver: + if args.interactive: + history_dir = os.getcwd() + history_path = os.path.join(history_dir, ".nixos-test-history") + ptpython.repl.embed( + driver.test_symbols(), + {}, + history_filename=history_path, + ) + else: + tic = time.time() + driver.run_tests() + toc = time.time() + rootlog.info(f"test script finished in {(toc-tic):.2f}s") + + +def generate_driver_symbols() -> None: + """ + This generates a file with symbols of the test-driver code that can be used + in user's test scripts. That list is then used by pyflakes to lint those + scripts. + """ + d = Driver([], [], "", Path()) + test_symbols = d.test_symbols() + with open("driver-symbols", "w") as fp: + fp.write(",".join(test_symbols.keys())) diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/driver.py b/nixpkgs/nixos/lib/test-driver/test_driver/driver.py new file mode 100644 index 000000000000..786821b0cc0d --- /dev/null +++ b/nixpkgs/nixos/lib/test-driver/test_driver/driver.py @@ -0,0 +1,260 @@ +import os +import re +import signal +import tempfile +import threading +from contextlib import contextmanager +from pathlib import Path +from typing import Any, Callable, ContextManager, Dict, Iterator, List, Optional, Union + +from test_driver.logger import rootlog +from test_driver.machine import Machine, NixStartScript, retry +from test_driver.polling_condition import PollingCondition +from test_driver.vlan import VLan + + +def get_tmp_dir() -> Path: + """Returns a temporary directory that is defined by TMPDIR, TEMP, TMP or CWD + Raises an exception in case the retrieved temporary directory is not writeable + See https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir + """ + tmp_dir = Path(tempfile.gettempdir()) + tmp_dir.mkdir(mode=0o700, exist_ok=True) + if not tmp_dir.is_dir(): + raise NotADirectoryError( + f"The directory defined by TMPDIR, TEMP, TMP or CWD: {tmp_dir} is not a directory" + ) + if not os.access(tmp_dir, os.W_OK): + raise PermissionError( + f"The directory defined by TMPDIR, TEMP, TMP, or CWD: {tmp_dir} is not writeable" + ) + return tmp_dir + + +def pythonize_name(name: str) -> str: + return re.sub(r"^[^A-z_]|[^A-z0-9_]", "_", name) + + +class Driver: + """A handle to the driver that sets up the environment + and runs the tests""" + + tests: str + vlans: List[VLan] + machines: List[Machine] + polling_conditions: List[PollingCondition] + global_timeout: int + race_timer: threading.Timer + + def __init__( + self, + start_scripts: List[str], + vlans: List[int], + tests: str, + out_dir: Path, + keep_vm_state: bool = False, + global_timeout: int = 24 * 60 * 60 * 7, + ): + self.tests = tests + self.out_dir = out_dir + self.global_timeout = global_timeout + self.race_timer = threading.Timer(global_timeout, self.terminate_test) + + tmp_dir = get_tmp_dir() + + with rootlog.nested("start all VLans"): + vlans = list(set(vlans)) + self.vlans = [VLan(nr, tmp_dir) for nr in vlans] + + def cmd(scripts: List[str]) -> Iterator[NixStartScript]: + for s in scripts: + yield NixStartScript(s) + + self.polling_conditions = [] + + self.machines = [ + Machine( + start_command=cmd, + keep_vm_state=keep_vm_state, + name=cmd.machine_name, + tmp_dir=tmp_dir, + callbacks=[self.check_polling_conditions], + out_dir=self.out_dir, + ) + for cmd in cmd(start_scripts) + ] + + def __enter__(self) -> "Driver": + return self + + def __exit__(self, *_: Any) -> None: + with rootlog.nested("cleanup"): + self.race_timer.cancel() + for machine in self.machines: + machine.release() + + def subtest(self, name: str) -> Iterator[None]: + """Group logs under a given test name""" + with rootlog.nested("subtest: " + name): + try: + yield + return True + except Exception as e: + rootlog.error(f'Test "{name}" failed with error: "{e}"') + raise e + + def test_symbols(self) -> Dict[str, Any]: + @contextmanager + def subtest(name: str) -> Iterator[None]: + return self.subtest(name) + + general_symbols = dict( + start_all=self.start_all, + test_script=self.test_script, + machines=self.machines, + vlans=self.vlans, + driver=self, + log=rootlog, + os=os, + create_machine=self.create_machine, + subtest=subtest, + run_tests=self.run_tests, + join_all=self.join_all, + retry=retry, + serial_stdout_off=self.serial_stdout_off, + serial_stdout_on=self.serial_stdout_on, + polling_condition=self.polling_condition, + Machine=Machine, # for typing + ) + machine_symbols = {pythonize_name(m.name): m for m in self.machines} + # If there's exactly one machine, make it available under the name + # "machine", even if it's not called that. + if len(self.machines) == 1: + (machine_symbols["machine"],) = self.machines + vlan_symbols = { + f"vlan{v.nr}": self.vlans[idx] for idx, v in enumerate(self.vlans) + } + print( + "additionally exposed symbols:\n " + + ", ".join(map(lambda m: m.name, self.machines)) + + ",\n " + + ", ".join(map(lambda v: f"vlan{v.nr}", self.vlans)) + + ",\n " + + ", ".join(list(general_symbols.keys())) + ) + return {**general_symbols, **machine_symbols, **vlan_symbols} + + def test_script(self) -> None: + """Run the test script""" + with rootlog.nested("run the VM test script"): + symbols = self.test_symbols() # call eagerly + exec(self.tests, symbols, None) + + def run_tests(self) -> None: + """Run the test script (for non-interactive test runs)""" + rootlog.info( + f"Test will time out and terminate in {self.global_timeout} seconds" + ) + self.race_timer.start() + self.test_script() + # TODO: Collect coverage data + for machine in self.machines: + if machine.is_up(): + machine.execute("sync") + + def start_all(self) -> None: + """Start all machines""" + with rootlog.nested("start all VMs"): + for machine in self.machines: + machine.start() + + def join_all(self) -> None: + """Wait for all machines to shut down""" + with rootlog.nested("wait for all VMs to finish"): + for machine in self.machines: + machine.wait_for_shutdown() + self.race_timer.cancel() + + def terminate_test(self) -> None: + # This will be usually running in another thread than + # the thread actually executing the test script. + with rootlog.nested("timeout reached; test terminating..."): + for machine in self.machines: + machine.release() + # As we cannot `sys.exit` from another thread + # We can at least force the main thread to get SIGTERM'ed. + # This will prevent any user who caught all the exceptions + # to swallow them and prevent itself from terminating. + os.kill(os.getpid(), signal.SIGTERM) + + def create_machine(self, args: Dict[str, Any]) -> Machine: + tmp_dir = get_tmp_dir() + + if args.get("startCommand"): + start_command: str = args.get("startCommand", "") + cmd = NixStartScript(start_command) + name = args.get("name", cmd.machine_name) + else: + cmd = Machine.create_startcommand(args) # type: ignore + name = args.get("name", "machine") + + return Machine( + tmp_dir=tmp_dir, + out_dir=self.out_dir, + start_command=cmd, + name=name, + keep_vm_state=args.get("keep_vm_state", False), + ) + + def serial_stdout_on(self) -> None: + rootlog._print_serial_logs = True + + def serial_stdout_off(self) -> None: + rootlog._print_serial_logs = False + + def check_polling_conditions(self) -> None: + for condition in self.polling_conditions: + condition.maybe_raise() + + def polling_condition( + self, + fun_: Optional[Callable] = None, + *, + seconds_interval: float = 2.0, + description: Optional[str] = None, + ) -> Union[Callable[[Callable], ContextManager], ContextManager]: + driver = self + + class Poll: + def __init__(self, fun: Callable): + self.condition = PollingCondition( + fun, + seconds_interval, + description, + ) + + def __enter__(self) -> None: + driver.polling_conditions.append(self.condition) + + def __exit__(self, a, b, c) -> None: # type: ignore + res = driver.polling_conditions.pop() + assert res is self.condition + + def wait(self, timeout: int = 900) -> None: + def condition(last: bool) -> bool: + if last: + rootlog.info(f"Last chance for {self.condition.description}") + ret = self.condition.check(force=True) + if not ret and not last: + rootlog.info( + f"({self.condition.description} failure not fatal yet)" + ) + return ret + + with rootlog.nested(f"waiting for {self.condition.description}"): + retry(condition, timeout=timeout) + + if fun_ is None: + return Poll + else: + return Poll(fun_) diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/logger.py b/nixpkgs/nixos/lib/test-driver/test_driver/logger.py new file mode 100644 index 000000000000..116244b5e4ae --- /dev/null +++ b/nixpkgs/nixos/lib/test-driver/test_driver/logger.py @@ -0,0 +1,107 @@ +# mypy: disable-error-code="no-untyped-call" +# drop the above line when mypy is upgraded to include +# https://github.com/python/typeshed/commit/49b717ca52bf0781a538b04c0d76a5513f7119b8 +import codecs +import os +import sys +import time +import unicodedata +from contextlib import contextmanager +from queue import Empty, Queue +from typing import Any, Dict, Iterator +from xml.sax.saxutils import XMLGenerator + +from colorama import Fore, Style + + +class Logger: + def __init__(self) -> None: + self.logfile = os.environ.get("LOGFILE", "/dev/null") + self.logfile_handle = codecs.open(self.logfile, "wb") + self.xml = XMLGenerator(self.logfile_handle, encoding="utf-8") + self.queue: "Queue[Dict[str, str]]" = Queue() + + self.xml.startDocument() + self.xml.startElement("logfile", attrs={}) + + self._print_serial_logs = True + + @staticmethod + def _eprint(*args: object, **kwargs: Any) -> None: + print(*args, file=sys.stderr, **kwargs) + + def close(self) -> None: + self.xml.endElement("logfile") + self.xml.endDocument() + self.logfile_handle.close() + + def sanitise(self, message: str) -> str: + return "".join(ch for ch in message if unicodedata.category(ch)[0] != "C") + + def maybe_prefix(self, message: str, attributes: Dict[str, str]) -> str: + if "machine" in attributes: + return f"{attributes['machine']}: {message}" + return message + + def log_line(self, message: str, attributes: Dict[str, str]) -> None: + self.xml.startElement("line", attributes) + self.xml.characters(message) + self.xml.endElement("line") + + def info(self, *args, **kwargs) -> None: # type: ignore + self.log(*args, **kwargs) + + def warning(self, *args, **kwargs) -> None: # type: ignore + self.log(*args, **kwargs) + + def error(self, *args, **kwargs) -> None: # type: ignore + self.log(*args, **kwargs) + sys.exit(1) + + def log(self, message: str, attributes: Dict[str, str] = {}) -> None: + self._eprint(self.maybe_prefix(message, attributes)) + self.drain_log_queue() + self.log_line(message, attributes) + + def log_serial(self, message: str, machine: str) -> None: + self.enqueue({"msg": message, "machine": machine, "type": "serial"}) + if self._print_serial_logs: + self._eprint(Style.DIM + f"{machine} # {message}" + Style.RESET_ALL) + + def enqueue(self, item: Dict[str, str]) -> None: + self.queue.put(item) + + def drain_log_queue(self) -> None: + try: + while True: + item = self.queue.get_nowait() + msg = self.sanitise(item["msg"]) + del item["msg"] + self.log_line(msg, item) + except Empty: + pass + + @contextmanager + def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]: + self._eprint( + self.maybe_prefix( + Style.BRIGHT + Fore.GREEN + message + Style.RESET_ALL, attributes + ) + ) + + self.xml.startElement("nest", attrs={}) + self.xml.startElement("head", attributes) + self.xml.characters(message) + self.xml.endElement("head") + + tic = time.time() + self.drain_log_queue() + yield + self.drain_log_queue() + toc = time.time() + self.log(f"(finished: {message}, in {toc - tic:.2f} seconds)") + + self.xml.endElement("nest") + + +rootlog = Logger() diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/machine.py b/nixpkgs/nixos/lib/test-driver/test_driver/machine.py new file mode 100644 index 000000000000..da60b669fa27 --- /dev/null +++ b/nixpkgs/nixos/lib/test-driver/test_driver/machine.py @@ -0,0 +1,1324 @@ +import base64 +import io +import os +import queue +import re +import select +import shlex +import shutil +import socket +import subprocess +import sys +import tempfile +import threading +import time +from contextlib import _GeneratorContextManager, nullcontext +from pathlib import Path +from queue import Queue +from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple + +from test_driver.logger import rootlog + +from .qmp import QMPSession + +CHAR_TO_KEY = { + "A": "shift-a", + "N": "shift-n", + "-": "0x0C", + "_": "shift-0x0C", + "B": "shift-b", + "O": "shift-o", + "=": "0x0D", + "+": "shift-0x0D", + "C": "shift-c", + "P": "shift-p", + "[": "0x1A", + "{": "shift-0x1A", + "D": "shift-d", + "Q": "shift-q", + "]": "0x1B", + "}": "shift-0x1B", + "E": "shift-e", + "R": "shift-r", + ";": "0x27", + ":": "shift-0x27", + "F": "shift-f", + "S": "shift-s", + "'": "0x28", + '"': "shift-0x28", + "G": "shift-g", + "T": "shift-t", + "`": "0x29", + "~": "shift-0x29", + "H": "shift-h", + "U": "shift-u", + "\\": "0x2B", + "|": "shift-0x2B", + "I": "shift-i", + "V": "shift-v", + ",": "0x33", + "<": "shift-0x33", + "J": "shift-j", + "W": "shift-w", + ".": "0x34", + ">": "shift-0x34", + "K": "shift-k", + "X": "shift-x", + "/": "0x35", + "?": "shift-0x35", + "L": "shift-l", + "Y": "shift-y", + " ": "spc", + "M": "shift-m", + "Z": "shift-z", + "\n": "ret", + "!": "shift-0x02", + "@": "shift-0x03", + "#": "shift-0x04", + "$": "shift-0x05", + "%": "shift-0x06", + "^": "shift-0x07", + "&": "shift-0x08", + "*": "shift-0x09", + "(": "shift-0x0A", + ")": "shift-0x0B", +} + + +def make_command(args: list) -> str: + return " ".join(map(shlex.quote, (map(str, args)))) + + +def _perform_ocr_on_screenshot( + screenshot_path: str, model_ids: Iterable[int] +) -> List[str]: + if shutil.which("tesseract") is None: + raise Exception("OCR requested but enableOCR is false") + + magick_args = ( + "-filter Catrom -density 72 -resample 300 " + + "-contrast -normalize -despeckle -type grayscale " + + "-sharpen 1 -posterize 3 -negate -gamma 100 " + + "-blur 1x65535" + ) + + tess_args = "-c debug_file=/dev/null --psm 11" + + cmd = f"convert {magick_args} '{screenshot_path}' 'tiff:{screenshot_path}.tiff'" + ret = subprocess.run(cmd, shell=True, capture_output=True) + if ret.returncode != 0: + raise Exception(f"TIFF conversion failed with exit code {ret.returncode}") + + model_results = [] + for model_id in model_ids: + cmd = f"tesseract '{screenshot_path}.tiff' - {tess_args} --oem '{model_id}'" + ret = subprocess.run(cmd, shell=True, capture_output=True) + if ret.returncode != 0: + raise Exception(f"OCR failed with exit code {ret.returncode}") + model_results.append(ret.stdout.decode("utf-8")) + + return model_results + + +def retry(fn: Callable, timeout: int = 900) -> None: + """Call the given function repeatedly, with 1 second intervals, + until it returns True or a timeout is reached. + """ + + for _ in range(timeout): + if fn(False): + return + time.sleep(1) + + if not fn(True): + raise Exception(f"action timed out after {timeout} seconds") + + +class StartCommand: + """The Base Start Command knows how to append the necessary + runtime qemu options as determined by a particular test driver + run. Any such start command is expected to happily receive and + append additional qemu args. + """ + + _cmd: str + + def cmd( + self, + monitor_socket_path: Path, + qmp_socket_path: Path, + shell_socket_path: Path, + allow_reboot: bool = False, + ) -> str: + display_opts = "" + display_available = any(x in os.environ for x in ["DISPLAY", "WAYLAND_DISPLAY"]) + if not display_available: + display_opts += " -nographic" + + # qemu options + qemu_opts = ( + " -device virtio-serial" + # Note: virtconsole will map to /dev/hvc0 in Linux guests + " -device virtconsole,chardev=shell" + " -device virtio-rng-pci" + " -serial stdio" + ) + if not allow_reboot: + qemu_opts += " -no-reboot" + # TODO: qemu script already catpures this env variable, legacy? + qemu_opts += " " + os.environ.get("QEMU_OPTS", "") + + return ( + f"{self._cmd}" + f" -qmp unix:{qmp_socket_path},server=on,wait=off" + f" -monitor unix:{monitor_socket_path}" + f" -chardev socket,id=shell,path={shell_socket_path}" + f"{qemu_opts}" + f"{display_opts}" + ) + + @staticmethod + def build_environment( + state_dir: Path, + shared_dir: Path, + ) -> dict: + # We make a copy to not update the current environment + env = dict(os.environ) + env.update( + { + "TMPDIR": str(state_dir), + "SHARED_DIR": str(shared_dir), + "USE_TMPDIR": "1", + } + ) + return env + + def run( + self, + state_dir: Path, + shared_dir: Path, + monitor_socket_path: Path, + qmp_socket_path: Path, + shell_socket_path: Path, + allow_reboot: bool, + ) -> subprocess.Popen: + return subprocess.Popen( + self.cmd( + monitor_socket_path, qmp_socket_path, shell_socket_path, allow_reboot + ), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True, + cwd=state_dir, + env=self.build_environment(state_dir, shared_dir), + ) + + +class NixStartScript(StartCommand): + """A start script from nixos/modules/virtualiation/qemu-vm.nix + that also satisfies the requirement of the BaseStartCommand. + These Nix commands have the particular characteristic that the + machine name can be extracted out of them via a regex match. + (Admittedly a _very_ implicit contract, evtl. TODO fix) + """ + + def __init__(self, script: str): + self._cmd = script + + @property + def machine_name(self) -> str: + match = re.search("run-(.+)-vm$", self._cmd) + name = "machine" + if match: + name = match.group(1) + return name + + +class LegacyStartCommand(StartCommand): + """Used in some places to create an ad-hoc machine instead of + using nix test instrumentation + module system for that purpose. + Legacy. + """ + + def __init__( + self, + netBackendArgs: Optional[str] = None, # noqa: N803 + netFrontendArgs: Optional[str] = None, # noqa: N803 + hda: Optional[Tuple[Path, str]] = None, + cdrom: Optional[str] = None, + usb: Optional[str] = None, + bios: Optional[str] = None, + qemuBinary: Optional[str] = None, # noqa: N803 + qemuFlags: Optional[str] = None, # noqa: N803 + ): + if qemuBinary is not None: + self._cmd = qemuBinary + else: + self._cmd = "qemu-kvm" + + self._cmd += " -m 384" + + # networking + net_backend = "-netdev user,id=net0" + net_frontend = "-device virtio-net-pci,netdev=net0" + if netBackendArgs is not None: + net_backend += "," + netBackendArgs + if netFrontendArgs is not None: + net_frontend += "," + netFrontendArgs + self._cmd += f" {net_backend} {net_frontend}" + + # hda + hda_cmd = "" + if hda is not None: + hda_path = hda[0].resolve() + hda_interface = hda[1] + if hda_interface == "scsi": + hda_cmd += ( + f" -drive id=hda,file={hda_path},werror=report,if=none" + " -device scsi-hd,drive=hda" + ) + else: + hda_cmd += f" -drive file={hda_path},if={hda_interface},werror=report" + self._cmd += hda_cmd + + # cdrom + if cdrom is not None: + self._cmd += f" -cdrom {cdrom}" + + # usb + usb_cmd = "" + if usb is not None: + # https://github.com/qemu/qemu/blob/master/docs/usb2.txt + usb_cmd += ( + " -device usb-ehci" + f" -drive id=usbdisk,file={usb},if=none,readonly" + " -device usb-storage,drive=usbdisk " + ) + self._cmd += usb_cmd + + # bios + if bios is not None: + self._cmd += f" -bios {bios}" + + # qemu flags + if qemuFlags is not None: + self._cmd += f" {qemuFlags}" + + +class Machine: + """A handle to the machine with this name, that also knows how to manage + the machine lifecycle with the help of a start script / command.""" + + name: str + out_dir: Path + tmp_dir: Path + shared_dir: Path + state_dir: Path + monitor_path: Path + qmp_path: Path + shell_path: Path + + start_command: StartCommand + keep_vm_state: bool + + process: Optional[subprocess.Popen] + pid: Optional[int] + monitor: Optional[socket.socket] + qmp_client: Optional[QMPSession] + shell: Optional[socket.socket] + serial_thread: Optional[threading.Thread] + + booted: bool + connected: bool + # Store last serial console lines for use + # of wait_for_console_text + last_lines: Queue = Queue() + callbacks: List[Callable] + + def __repr__(self) -> str: + return f"<Machine '{self.name}'>" + + def __init__( + self, + out_dir: Path, + tmp_dir: Path, + start_command: StartCommand, + name: str = "machine", + keep_vm_state: bool = False, + callbacks: Optional[List[Callable]] = None, + ) -> None: + self.out_dir = out_dir + self.tmp_dir = tmp_dir + self.keep_vm_state = keep_vm_state + self.name = name + self.start_command = start_command + self.callbacks = callbacks if callbacks is not None else [] + + # set up directories + self.shared_dir = self.tmp_dir / "shared-xchg" + self.shared_dir.mkdir(mode=0o700, exist_ok=True) + + self.state_dir = self.tmp_dir / f"vm-state-{self.name}" + self.monitor_path = self.state_dir / "monitor" + self.qmp_path = self.state_dir / "qmp" + self.shell_path = self.state_dir / "shell" + if (not self.keep_vm_state) and self.state_dir.exists(): + self.cleanup_statedir() + self.state_dir.mkdir(mode=0o700, exist_ok=True) + + self.process = None + self.pid = None + self.monitor = None + self.qmp_client = None + self.shell = None + self.serial_thread = None + + self.booted = False + self.connected = False + + @staticmethod + def create_startcommand(args: Dict[str, str]) -> StartCommand: + rootlog.warning( + "Using legacy create_startcommand(), " + "please use proper nix test vm instrumentation, instead " + "to generate the appropriate nixos test vm qemu startup script" + ) + hda = None + if args.get("hda"): + hda_arg: str = args.get("hda", "") + hda_arg_path: Path = Path(hda_arg) + hda = (hda_arg_path, args.get("hdaInterface", "")) + return LegacyStartCommand( + netBackendArgs=args.get("netBackendArgs"), + netFrontendArgs=args.get("netFrontendArgs"), + hda=hda, + cdrom=args.get("cdrom"), + usb=args.get("usb"), + bios=args.get("bios"), + qemuBinary=args.get("qemuBinary"), + qemuFlags=args.get("qemuFlags"), + ) + + def is_up(self) -> bool: + return self.booted and self.connected + + def log(self, msg: str) -> None: + rootlog.log(msg, {"machine": self.name}) + + def log_serial(self, msg: str) -> None: + rootlog.log_serial(msg, self.name) + + def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager: + my_attrs = {"machine": self.name} + my_attrs.update(attrs) + return rootlog.nested(msg, my_attrs) + + def wait_for_monitor_prompt(self) -> str: + assert self.monitor is not None + answer = "" + while True: + undecoded_answer = self.monitor.recv(1024) + if not undecoded_answer: + break + answer += undecoded_answer.decode() + if answer.endswith("(qemu) "): + break + return answer + + def send_monitor_command(self, command: str) -> str: + """ + Send a command to the QEMU monitor. This allows attaching + virtual USB disks to a running machine, among other things. + """ + self.run_callbacks() + message = f"{command}\n".encode() + assert self.monitor is not None + self.monitor.send(message) + return self.wait_for_monitor_prompt() + + def wait_for_unit( + self, unit: str, user: Optional[str] = None, timeout: int = 900 + ) -> None: + """ + Wait for a systemd unit to get into "active" state. + Throws exceptions on "failed" and "inactive" states as well as after + timing out. + """ + + def check_active(_: Any) -> bool: + state = self.get_unit_property(unit, "ActiveState", user) + if state == "failed": + raise Exception(f'unit "{unit}" reached state "{state}"') + + if state == "inactive": + status, jobs = self.systemctl("list-jobs --full 2>&1", user) + if "No jobs" in jobs: + info = self.get_unit_info(unit, user) + if info["ActiveState"] == state: + raise Exception( + f'unit "{unit}" is inactive and there are no pending jobs' + ) + + return state == "active" + + with self.nested( + f"waiting for unit {unit}" + + (f" with user {user}" if user is not None else "") + ): + retry(check_active, timeout) + + def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]: + status, lines = self.systemctl(f'--no-pager show "{unit}"', user) + if status != 0: + raise Exception( + f'retrieving systemctl info for unit "{unit}"' + + ("" if user is None else f' under user "{user}"') + + f" failed with exit code {status}" + ) + + line_pattern = re.compile(r"^([^=]+)=(.*)$") + + def tuple_from_line(line: str) -> Tuple[str, str]: + match = line_pattern.match(line) + assert match is not None + return match[1], match[2] + + return dict( + tuple_from_line(line) + for line in lines.split("\n") + if line_pattern.match(line) + ) + + def get_unit_property( + self, + unit: str, + property: str, + user: Optional[str] = None, + ) -> str: + status, lines = self.systemctl( + f'--no-pager show "{unit}" --property="{property}"', + user, + ) + if status != 0: + raise Exception( + f'retrieving systemctl property "{property}" for unit "{unit}"' + + ("" if user is None else f' under user "{user}"') + + f" failed with exit code {status}" + ) + + invalid_output_message = ( + f'systemctl show --property "{property}" "{unit}"' + f"produced invalid output: {lines}" + ) + + line_pattern = re.compile(r"^([^=]+)=(.*)$") + match = line_pattern.match(lines) + assert match is not None, invalid_output_message + + assert match[1] == property, invalid_output_message + return match[2] + + def systemctl(self, q: str, user: Optional[str] = None) -> Tuple[int, str]: + """ + Runs `systemctl` commands with optional support for + `systemctl --user` + + ```py + # run `systemctl list-jobs --no-pager` + machine.systemctl("list-jobs --no-pager") + + # spawn a shell for `any-user` and run + # `systemctl --user list-jobs --no-pager` + machine.systemctl("list-jobs --no-pager", "any-user") + ``` + """ + if user is not None: + q = q.replace("'", "\\'") + return self.execute( + f"su -l {user} --shell /bin/sh -c " + "$'XDG_RUNTIME_DIR=/run/user/`id -u` " + f"systemctl --user {q}'" + ) + return self.execute(f"systemctl {q}") + + def require_unit_state(self, unit: str, require_state: str = "active") -> None: + with self.nested( + f"checking if unit '{unit}' has reached state '{require_state}'" + ): + info = self.get_unit_info(unit) + state = info["ActiveState"] + if state != require_state: + raise Exception( + f"Expected unit '{unit}' to to be in state " + f"'{require_state}' but it is in state '{state}'" + ) + + def _next_newline_closed_block_from_shell(self) -> str: + assert self.shell + output_buffer = [] + while True: + # This receives up to 4096 bytes from the socket + chunk = self.shell.recv(4096) + if not chunk: + # Probably a broken pipe, return the output we have + break + + decoded = chunk.decode() + output_buffer += [decoded] + if decoded[-1] == "\n": + break + return "".join(output_buffer) + + def execute( + self, + command: str, + check_return: bool = True, + check_output: bool = True, + timeout: Optional[int] = 900, + ) -> Tuple[int, str]: + """ + Execute a shell command, returning a list `(status, stdout)`. + + Commands are run with `set -euo pipefail` set: + + - If several commands are separated by `;` and one fails, the + command as a whole will fail. + + - For pipelines, the last non-zero exit status will be returned + (if there is one; otherwise zero will be returned). + + - Dereferencing unset variables fails the command. + + - It will wait for stdout to be closed. + + If the command detaches, it must close stdout, as `execute` will wait + for this to consume all output reliably. This can be achieved by + redirecting stdout to stderr `>&2`, to `/dev/console`, `/dev/null` or + a file. Examples of detaching commands are `sleep 365d &`, where the + shell forks a new process that can write to stdout and `xclip -i`, where + the `xclip` command itself forks without closing stdout. + + Takes an optional parameter `check_return` that defaults to `True`. + Setting this parameter to `False` will not check for the return code + and return -1 instead. This can be used for commands that shut down + the VM and would therefore break the pipe that would be used for + retrieving the return code. + + A timeout for the command can be specified (in seconds) using the optional + `timeout` parameter, e.g., `execute(cmd, timeout=10)` or + `execute(cmd, timeout=None)`. The default is 900 seconds. + """ + self.run_callbacks() + self.connect() + + # Always run command with shell opts + command = f"set -euo pipefail; {command}" + + timeout_str = "" + if timeout is not None: + timeout_str = f"timeout {timeout}" + + # While sh is bash on NixOS, this is not the case for every distro. + # We explicitly call bash here to allow for the driver to boot other distros as well. + out_command = ( + f"{timeout_str} bash -c {shlex.quote(command)} | (base64 -w 0; echo)\n" + ) + + assert self.shell + self.shell.send(out_command.encode()) + + if not check_output: + return (-2, "") + + # Get the output + output = base64.b64decode(self._next_newline_closed_block_from_shell()) + + if not check_return: + return (-1, output.decode()) + + # Get the return code + self.shell.send(b"echo ${PIPESTATUS[0]}\n") + rc = int(self._next_newline_closed_block_from_shell().strip()) + + return (rc, output.decode(errors="replace")) + + def shell_interact(self, address: Optional[str] = None) -> None: + """ + Allows you to directly interact with the guest shell. This should + only be used during test development, not in production tests. + Killing the interactive session with `Ctrl-d` or `Ctrl-c` also ends + the guest session. + """ + self.connect() + + if address is None: + address = "READLINE,prompt=$ " + self.log("Terminal is ready (there is no initial prompt):") + + assert self.shell + try: + subprocess.run( + ["socat", address, f"FD:{self.shell.fileno()}"], + pass_fds=[self.shell.fileno()], + ) + # allow users to cancel this command without breaking the test + except KeyboardInterrupt: + pass + + def console_interact(self) -> None: + """ + Allows you to directly interact with QEMU's stdin, by forwarding + terminal input to the QEMU process. + This is for use with the interactive test driver, not for production + tests, which run unattended. + Output from QEMU is only read line-wise. `Ctrl-c` kills QEMU and + `Ctrl-d` closes console and returns to the test runner. + """ + self.log("Terminal is ready (there is no prompt):") + + assert self.process + assert self.process.stdin + + while True: + try: + char = sys.stdin.buffer.read(1) + except KeyboardInterrupt: + break + if char == b"": # ctrl+d + self.log("Closing connection to the console") + break + self.send_console(char.decode()) + + def succeed(self, *commands: str, timeout: Optional[int] = None) -> str: + """ + Execute a shell command, raising an exception if the exit status is + not zero, otherwise returning the standard output. Similar to `execute`, + except that the timeout is `None` by default. See `execute` for details on + command execution. + """ + output = "" + for command in commands: + with self.nested(f"must succeed: {command}"): + (status, out) = self.execute(command, timeout=timeout) + if status != 0: + self.log(f"output: {out}") + raise Exception(f"command `{command}` failed (exit code {status})") + output += out + return output + + def fail(self, *commands: str, timeout: Optional[int] = None) -> str: + """ + Like `succeed`, but raising an exception if the command returns a zero + status. + """ + output = "" + for command in commands: + with self.nested(f"must fail: {command}"): + (status, out) = self.execute(command, timeout=timeout) + if status == 0: + raise Exception(f"command `{command}` unexpectedly succeeded") + output += out + return output + + def wait_until_succeeds(self, command: str, timeout: int = 900) -> str: + """ + Repeat a shell command with 1-second intervals until it succeeds. + Has a default timeout of 900 seconds which can be modified, e.g. + `wait_until_succeeds(cmd, timeout=10)`. See `execute` for details on + command execution. + Throws an exception on timeout. + """ + output = "" + + def check_success(_: Any) -> bool: + nonlocal output + status, output = self.execute(command, timeout=timeout) + return status == 0 + + with self.nested(f"waiting for success: {command}"): + retry(check_success, timeout) + return output + + def wait_until_fails(self, command: str, timeout: int = 900) -> str: + """ + Like `wait_until_succeeds`, but repeating the command until it fails. + """ + output = "" + + def check_failure(_: Any) -> bool: + nonlocal output + status, output = self.execute(command, timeout=timeout) + return status != 0 + + with self.nested(f"waiting for failure: {command}"): + retry(check_failure, timeout) + return output + + def wait_for_shutdown(self) -> None: + if not self.booted: + return + + with self.nested("waiting for the VM to power off"): + sys.stdout.flush() + assert self.process + self.process.wait() + + self.pid = None + self.booted = False + self.connected = False + + def get_tty_text(self, tty: str) -> str: + status, output = self.execute( + f"fold -w$(stty -F /dev/tty{tty} size | " + f"awk '{{print $2}}') /dev/vcs{tty}" + ) + return output + + def wait_until_tty_matches(self, tty: str, regexp: str, timeout: int = 900) -> None: + """Wait until the visible output on the chosen TTY matches regular + expression. Throws an exception on timeout. + """ + matcher = re.compile(regexp) + + def tty_matches(last: bool) -> bool: + text = self.get_tty_text(tty) + if last: + self.log( + f"Last chance to match /{regexp}/ on TTY{tty}, " + f"which currently contains: {text}" + ) + return len(matcher.findall(text)) > 0 + + with self.nested(f"waiting for {regexp} to appear on tty {tty}"): + retry(tty_matches, timeout) + + def send_chars(self, chars: str, delay: Optional[float] = 0.01) -> None: + """ + Simulate typing a sequence of characters on the virtual keyboard, + e.g., `send_chars("foobar\n")` will type the string `foobar` + followed by the Enter key. + """ + with self.nested(f"sending keys {repr(chars)}"): + for char in chars: + self.send_key(char, delay, log=False) + + def wait_for_file(self, filename: str, timeout: int = 900) -> None: + """ + Waits until the file exists in the machine's file system. + """ + + def check_file(_: Any) -> bool: + status, _ = self.execute(f"test -e {filename}") + return status == 0 + + with self.nested(f"waiting for file '{filename}'"): + retry(check_file, timeout) + + def wait_for_open_port( + self, port: int, addr: str = "localhost", timeout: int = 900 + ) -> None: + """ + Wait until a process is listening on the given TCP port and IP address + (default `localhost`). + """ + + def port_is_open(_: Any) -> bool: + status, _ = self.execute(f"nc -z {addr} {port}") + return status == 0 + + with self.nested(f"waiting for TCP port {port} on {addr}"): + retry(port_is_open, timeout) + + def wait_for_open_unix_socket( + self, addr: str, is_datagram: bool = False, timeout: int = 900 + ) -> None: + """ + Wait until a process is listening on the given UNIX-domain socket + (default to a UNIX-domain stream socket). + """ + + nc_flags = [ + "-z", + "-uU" if is_datagram else "-U", + ] + + def socket_is_open(_: Any) -> bool: + status, _ = self.execute(f"nc {' '.join(nc_flags)} {addr}") + return status == 0 + + with self.nested( + f"waiting for UNIX-domain {'datagram' if is_datagram else 'stream'} on '{addr}'" + ): + retry(socket_is_open, timeout) + + def wait_for_closed_port( + self, port: int, addr: str = "localhost", timeout: int = 900 + ) -> None: + """ + Wait until nobody is listening on the given TCP port and IP address + (default `localhost`). + """ + + def port_is_closed(_: Any) -> bool: + status, _ = self.execute(f"nc -z {addr} {port}") + return status != 0 + + with self.nested(f"waiting for TCP port {port} on {addr} to be closed"): + retry(port_is_closed, timeout) + + def start_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: + return self.systemctl(f"start {jobname}", user) + + def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]: + return self.systemctl(f"stop {jobname}", user) + + def wait_for_job(self, jobname: str) -> None: + self.wait_for_unit(jobname) + + def connect(self) -> None: + def shell_ready(timeout_secs: int) -> bool: + """We sent some data from the backdoor service running on the guest + to indicate that the backdoor shell is ready. + As soon as we read some data from the socket here, we assume that + our root shell is operational. + """ + (ready, _, _) = select.select([self.shell], [], [], timeout_secs) + return bool(ready) + + if self.connected: + return + + with self.nested("waiting for the VM to finish booting"): + self.start() + + assert self.shell + + tic = time.time() + # TODO: do we want to bail after a set number of attempts? + while not shell_ready(timeout_secs=30): + self.log("Guest root shell did not produce any data yet...") + self.log( + " To debug, enter the VM and run 'systemctl status backdoor.service'." + ) + + while True: + chunk = self.shell.recv(1024) + # No need to print empty strings, it means we are waiting. + if len(chunk) == 0: + continue + self.log(f"Guest shell says: {chunk!r}") + # NOTE: for this to work, nothing must be printed after this line! + if b"Spawning backdoor root shell..." in chunk: + break + + toc = time.time() + + self.log("connected to guest root shell") + self.log(f"(connecting took {toc - tic:.2f} seconds)") + self.connected = True + + def screenshot(self, filename: str) -> None: + """ + Take a picture of the display of the virtual machine, in PNG format. + The screenshot will be available in the derivation output. + """ + if "." not in filename: + filename += ".png" + if "/" not in filename: + filename = os.path.join(self.out_dir, filename) + tmp = f"{filename}.ppm" + + with self.nested( + f"making screenshot {filename}", + {"image": os.path.basename(filename)}, + ): + self.send_monitor_command(f"screendump {tmp}") + ret = subprocess.run(f"pnmtopng '{tmp}' > '{filename}'", shell=True) + os.unlink(tmp) + if ret.returncode != 0: + raise Exception("Cannot convert screenshot") + + def copy_from_host_via_shell(self, source: str, target: str) -> None: + """Copy a file from the host into the guest by piping it over the + shell into the destination file. Works without host-guest shared folder. + Prefer copy_from_host for whenever possible. + """ + with open(source, "rb") as fh: + content_b64 = base64.b64encode(fh.read()).decode() + self.succeed( + f"mkdir -p $(dirname {target})", + f"echo -n {content_b64} | base64 -d > {target}", + ) + + def copy_from_host(self, source: str, target: str) -> None: + """ + Copies a file from host to machine, e.g., + `copy_from_host("myfile", "/etc/my/important/file")`. + + The first argument is the file on the host. Note that the "host" refers + to the environment in which the test driver runs, which is typically the + Nix build sandbox. + + The second argument is the location of the file on the machine that will + be written to. + + The file is copied via the `shared_dir` directory which is shared among + all the VMs (using a temporary directory). + The access rights bits will mimic the ones from the host file and + user:group will be root:root. + """ + host_src = Path(source) + vm_target = Path(target) + with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: + shared_temp = Path(shared_td) + host_intermediate = shared_temp / host_src.name + vm_shared_temp = Path("/tmp/shared") / shared_temp.name + vm_intermediate = vm_shared_temp / host_src.name + + self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) + if host_src.is_dir(): + shutil.copytree(host_src, host_intermediate) + else: + shutil.copy(host_src, host_intermediate) + self.succeed(make_command(["mkdir", "-p", vm_target.parent])) + self.succeed(make_command(["cp", "-r", vm_intermediate, vm_target])) + + def copy_from_vm(self, source: str, target_dir: str = "") -> None: + """Copy a file from the VM (specified by an in-VM source path) to a path + relative to `$out`. The file is copied via the `shared_dir` shared among + all the VMs (using a temporary directory). + """ + # Compute the source, target, and intermediate shared file names + vm_src = Path(source) + with tempfile.TemporaryDirectory(dir=self.shared_dir) as shared_td: + shared_temp = Path(shared_td) + vm_shared_temp = Path("/tmp/shared") / shared_temp.name + vm_intermediate = vm_shared_temp / vm_src.name + intermediate = shared_temp / vm_src.name + # Copy the file to the shared directory inside VM + self.succeed(make_command(["mkdir", "-p", vm_shared_temp])) + self.succeed(make_command(["cp", "-r", vm_src, vm_intermediate])) + abs_target = self.out_dir / target_dir / vm_src.name + abs_target.parent.mkdir(exist_ok=True, parents=True) + # Copy the file from the shared directory outside VM + if intermediate.is_dir(): + shutil.copytree(intermediate, abs_target) + else: + shutil.copy(intermediate, abs_target) + + def dump_tty_contents(self, tty: str) -> None: + """Debugging: Dump the contents of the TTY<n>""" + self.execute(f"fold -w 80 /dev/vcs{tty} | systemd-cat") + + def _get_screen_text_variants(self, model_ids: Iterable[int]) -> List[str]: + with tempfile.TemporaryDirectory() as tmpdir: + screenshot_path = os.path.join(tmpdir, "ppm") + self.send_monitor_command(f"screendump {screenshot_path}") + return _perform_ocr_on_screenshot(screenshot_path, model_ids) + + def get_screen_text_variants(self) -> List[str]: + """ + Return a list of different interpretations of what is currently + visible on the machine's screen using optical character + recognition. The number and order of the interpretations is not + specified and is subject to change, but if no exception is raised at + least one will be returned. + + ::: {.note} + This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. + ::: + """ + return self._get_screen_text_variants([0, 1, 2]) + + def get_screen_text(self) -> str: + """ + Return a textual representation of what is currently visible on the + machine's screen using optical character recognition. + + ::: {.note} + This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. + ::: + """ + return self._get_screen_text_variants([2])[0] + + def wait_for_text(self, regex: str, timeout: int = 900) -> None: + """ + Wait until the supplied regular expressions matches the textual + contents of the screen by using optical character recognition (see + `get_screen_text` and `get_screen_text_variants`). + + ::: {.note} + This requires [`enableOCR`](#test-opt-enableOCR) to be set to `true`. + ::: + """ + + def screen_matches(last: bool) -> bool: + variants = self.get_screen_text_variants() + for text in variants: + if re.search(regex, text) is not None: + return True + + if last: + self.log(f"Last OCR attempt failed. Text was: {variants}") + + return False + + with self.nested(f"waiting for {regex} to appear on screen"): + retry(screen_matches, timeout) + + def wait_for_console_text(self, regex: str, timeout: int | None = None) -> None: + """ + Wait until the supplied regular expressions match a line of the + serial console output. + This method is useful when OCR is not possible or inaccurate. + """ + # Buffer the console output, this is needed + # to match multiline regexes. + console = io.StringIO() + + def console_matches(_: Any) -> bool: + nonlocal console + try: + # This will return as soon as possible and + # sleep 1 second. + console.write(self.last_lines.get(block=False)) + except queue.Empty: + pass + console.seek(0) + matches = re.search(regex, console.read()) + return matches is not None + + with self.nested(f"waiting for {regex} to appear on console"): + if timeout is not None: + retry(console_matches, timeout) + else: + while not console_matches(False): + pass + + def send_key( + self, key: str, delay: Optional[float] = 0.01, log: Optional[bool] = True + ) -> None: + """ + Simulate pressing keys on the virtual keyboard, e.g., + `send_key("ctrl-alt-delete")`. + + Please also refer to the QEMU documentation for more information on the + input syntax: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys + """ + key = CHAR_TO_KEY.get(key, key) + context = self.nested(f"sending key {repr(key)}") if log else nullcontext() + with context: + self.send_monitor_command(f"sendkey {key}") + if delay is not None: + time.sleep(delay) + + def send_console(self, chars: str) -> None: + r""" + Send keys to the kernel console. This allows interaction with the systemd + emergency mode, for example. Takes a string that is sent, e.g., + `send_console("\n\nsystemctl default\n")`. + """ + assert self.process + assert self.process.stdin + self.process.stdin.write(chars.encode()) + self.process.stdin.flush() + + def start(self, allow_reboot: bool = False) -> None: + """ + Start the virtual machine. This method is asynchronous --- it does + not wait for the machine to finish booting. + """ + if self.booted: + return + + self.log("starting vm") + + def clear(path: Path) -> Path: + if path.exists(): + path.unlink() + return path + + def create_socket(path: Path) -> socket.socket: + s = socket.socket(family=socket.AF_UNIX, type=socket.SOCK_STREAM) + s.bind(str(path)) + s.listen(1) + return s + + monitor_socket = create_socket(clear(self.monitor_path)) + shell_socket = create_socket(clear(self.shell_path)) + self.process = self.start_command.run( + self.state_dir, + self.shared_dir, + self.monitor_path, + self.qmp_path, + self.shell_path, + allow_reboot, + ) + self.monitor, _ = monitor_socket.accept() + self.shell, _ = shell_socket.accept() + self.qmp_client = QMPSession.from_path(self.qmp_path) + + # Store last serial console lines for use + # of wait_for_console_text + self.last_lines: Queue = Queue() + + def process_serial_output() -> None: + assert self.process + assert self.process.stdout + for _line in self.process.stdout: + # Ignore undecodable bytes that may occur in boot menus + line = _line.decode(errors="ignore").replace("\r", "").rstrip() + self.last_lines.put(line) + self.log_serial(line) + + self.serial_thread = threading.Thread(target=process_serial_output) + self.serial_thread.start() + + self.wait_for_monitor_prompt() + + self.pid = self.process.pid + self.booted = True + + self.log(f"QEMU running (pid {self.pid})") + + def cleanup_statedir(self) -> None: + shutil.rmtree(self.state_dir) + rootlog.log(f"deleting VM state directory {self.state_dir}") + rootlog.log("if you want to keep the VM state, pass --keep-vm-state") + + def shutdown(self) -> None: + """ + Shut down the machine, waiting for the VM to exit. + """ + if not self.booted: + return + + assert self.shell + self.shell.send(b"poweroff\n") + self.wait_for_shutdown() + + def crash(self) -> None: + """ + Simulate a sudden power failure, by telling the VM to exit immediately. + """ + if not self.booted: + return + + self.log("forced crash") + self.send_monitor_command("quit") + self.wait_for_shutdown() + + def reboot(self) -> None: + """Press Ctrl+Alt+Delete in the guest. + + Prepares the machine to be reconnected which is useful if the + machine was started with `allow_reboot = True` + """ + self.send_key("ctrl-alt-delete") + self.connected = False + + def wait_for_x(self, timeout: int = 900) -> None: + """ + Wait until it is possible to connect to the X server. + """ + + def check_x(_: Any) -> bool: + cmd = ( + "journalctl -b SYSLOG_IDENTIFIER=systemd | " + + 'grep "Reached target Current graphical"' + ) + status, _ = self.execute(cmd) + if status != 0: + return False + status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]") + return status == 0 + + with self.nested("waiting for the X11 server"): + retry(check_x, timeout) + + def get_window_names(self) -> List[str]: + return self.succeed( + r"xwininfo -root -tree | sed 's/.*0x[0-9a-f]* \"\([^\"]*\)\".*/\1/; t; d'" + ).splitlines() + + def wait_for_window(self, regexp: str, timeout: int = 900) -> None: + """ + Wait until an X11 window has appeared whose name matches the given + regular expression, e.g., `wait_for_window("Terminal")`. + """ + pattern = re.compile(regexp) + + def window_is_visible(last_try: bool) -> bool: + names = self.get_window_names() + if last_try: + self.log( + f"Last chance to match {regexp} on the window list," + + " which currently contains: " + + ", ".join(names) + ) + return any(pattern.search(name) for name in names) + + with self.nested("waiting for a window to appear"): + retry(window_is_visible, timeout) + + def sleep(self, secs: int) -> None: + # We want to sleep in *guest* time, not *host* time. + self.succeed(f"sleep {secs}") + + def forward_port(self, host_port: int = 8080, guest_port: int = 80) -> None: + """ + Forward a TCP port on the host to a TCP port on the guest. + Useful during interactive testing. + """ + self.send_monitor_command(f"hostfwd_add tcp::{host_port}-:{guest_port}") + + def block(self) -> None: + """ + Simulate unplugging the Ethernet cable that connects the machine to + the other machines. + This happens by shutting down eth1 (the multicast interface used to talk + to the other VMs). eth0 is kept online to still enable the test driver + to communicate with the machine. + """ + self.send_monitor_command("set_link virtio-net-pci.1 off") + + def unblock(self) -> None: + """ + Undo the effect of `block`. + """ + self.send_monitor_command("set_link virtio-net-pci.1 on") + + def release(self) -> None: + if self.pid is None: + return + rootlog.info(f"kill machine (pid {self.pid})") + assert self.process + assert self.shell + assert self.monitor + assert self.serial_thread + + self.process.terminate() + self.shell.close() + self.monitor.close() + self.serial_thread.join() + + def run_callbacks(self) -> None: + for callback in self.callbacks: + callback() + + def switch_root(self) -> None: + """ + Transition from stage 1 to stage 2. This requires the + machine to be configured with `testing.initrdBackdoor = true` + and `boot.initrd.systemd.enable = true`. + """ + self.wait_for_unit("initrd.target") + self.execute( + "systemctl isolate --no-block initrd-switch-root.target 2>/dev/null >/dev/null", + check_return=False, + check_output=False, + ) + self.wait_for_console_text(r"systemd\[1\]:.*Switching root\.") + self.connected = False + self.connect() diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py b/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py new file mode 100644 index 000000000000..12cbad69e34e --- /dev/null +++ b/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py @@ -0,0 +1,92 @@ +import time +from math import isfinite +from typing import Callable, Optional + +from .logger import rootlog + + +class PollingConditionError(Exception): + pass + + +class PollingCondition: + condition: Callable[[], bool] + seconds_interval: float + description: Optional[str] + + last_called: float + entry_count: int + + def __init__( + self, + condition: Callable[[], Optional[bool]], + seconds_interval: float = 2.0, + description: Optional[str] = None, + ): + self.condition = condition # type: ignore + self.seconds_interval = seconds_interval + + if description is None: + if condition.__doc__: + self.description = condition.__doc__ + else: + self.description = condition.__name__ + else: + self.description = str(description) + + self.last_called = float("-inf") + self.entry_count = 0 + + def check(self, force: bool = False) -> bool: + if (self.entered or not self.overdue) and not force: + return True + + with self, rootlog.nested(self.nested_message): + time_since_last = time.monotonic() - self.last_called + last_message = ( + f"Time since last: {time_since_last:.2f}s" + if isfinite(time_since_last) + else "(not called yet)" + ) + + rootlog.info(last_message) + try: + res = self.condition() # type: ignore + except Exception: + res = False + res = res is None or res + rootlog.info(self.status_message(res)) + return res + + def maybe_raise(self) -> None: + if not self.check(): + raise PollingConditionError(self.status_message(False)) + + def status_message(self, status: bool) -> str: + return f"Polling condition {'succeeded' if status else 'failed'}: {self.description}" + + @property + def nested_message(self) -> str: + nested_message = ["Checking polling condition"] + if self.description is not None: + nested_message.append(repr(self.description)) + + return " ".join(nested_message) + + @property + def overdue(self) -> bool: + return self.last_called + self.seconds_interval < time.monotonic() + + @property + def entered(self) -> bool: + # entry_count should never dip *below* zero + assert self.entry_count >= 0 + return self.entry_count > 0 + + def __enter__(self) -> None: + self.entry_count += 1 + + def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore + assert self.entered + self.entry_count -= 1 + self.last_called = time.monotonic() diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/py.typed b/nixpkgs/nixos/lib/test-driver/test_driver/py.typed new file mode 100644 index 000000000000..e69de29bb2d1 --- /dev/null +++ b/nixpkgs/nixos/lib/test-driver/test_driver/py.typed diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/qmp.py b/nixpkgs/nixos/lib/test-driver/test_driver/qmp.py new file mode 100644 index 000000000000..62ca6d7d5b80 --- /dev/null +++ b/nixpkgs/nixos/lib/test-driver/test_driver/qmp.py @@ -0,0 +1,98 @@ +import json +import logging +import os +import socket +from collections.abc import Iterator +from pathlib import Path +from queue import Queue +from typing import Any + +logger = logging.getLogger(__name__) + + +class QMPAPIError(RuntimeError): + def __init__(self, message: dict[str, Any]): + assert "error" in message, "Not an error message!" + try: + self.class_name = message["class"] + self.description = message["desc"] + # NOTE: Some errors can occur before the Server is able to read the + # id member; in these cases the id member will not be part of the + # error response, even if provided by the client. + self.transaction_id = message.get("id") + except KeyError: + raise RuntimeError("Malformed QMP API error response") + + def __str__(self) -> str: + return f"<QMP API error related to transaction {self.transaction_id} [{self.class_name}]: {self.description}>" + + +class QMPSession: + def __init__(self, sock: socket.socket) -> None: + self.sock = sock + self.results: Queue[dict[str, str]] = Queue() + self.pending_events: Queue[dict[str, Any]] = Queue() + self.reader = sock.makefile("r") + self.writer = sock.makefile("w") + # Make the reader non-blocking so we can kind of select on it. + os.set_blocking(self.reader.fileno(), False) + hello = self._wait_for_new_result() + logger.debug(f"Got greeting from QMP API: {hello}") + # The greeting message format is: + # { "QMP": { "version": json-object, "capabilities": json-array } } + assert "QMP" in hello, f"Unexpected result: {hello}" + self.send("qmp_capabilities") + + @classmethod + def from_path(cls, path: Path) -> "QMPSession": + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.connect(str(path)) + return cls(sock) + + def __del__(self) -> None: + self.sock.close() + + def _wait_for_new_result(self) -> dict[str, str]: + assert self.results.empty(), "Results set is not empty, missed results!" + while self.results.empty(): + self.read_pending_messages() + return self.results.get() + + def read_pending_messages(self) -> None: + line = self.reader.readline() + if not line: + return + evt_or_result = json.loads(line) + logger.debug(f"Received a message: {evt_or_result}") + + # It's a result + if "return" in evt_or_result or "QMP" in evt_or_result: + self.results.put(evt_or_result) + # It's an event + elif "event" in evt_or_result: + self.pending_events.put(evt_or_result) + else: + raise QMPAPIError(evt_or_result) + + def wait_for_event(self, timeout: int = 10) -> dict[str, Any]: + while self.pending_events.empty(): + self.read_pending_messages() + + return self.pending_events.get(timeout=timeout) + + def events(self, timeout: int = 10) -> Iterator[dict[str, Any]]: + while not self.pending_events.empty(): + yield self.pending_events.get(timeout=timeout) + + def send(self, cmd: str, args: dict[str, str] = {}) -> dict[str, str]: + self.read_pending_messages() + assert self.results.empty(), "Results set is not empty, missed results!" + data: dict[str, Any] = dict(execute=cmd) + if args != {}: + data["arguments"] = args + + logger.debug(f"Sending {data} to QMP...") + json.dump(data, self.writer) + self.writer.write("\n") + self.writer.flush() + return self._wait_for_new_result() diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/vlan.py b/nixpkgs/nixos/lib/test-driver/test_driver/vlan.py new file mode 100644 index 000000000000..ec9679108e58 --- /dev/null +++ b/nixpkgs/nixos/lib/test-driver/test_driver/vlan.py @@ -0,0 +1,62 @@ +import io +import os +import pty +import subprocess +from pathlib import Path + +from test_driver.logger import rootlog + + +class VLan: + """This class handles a VLAN that the run-vm scripts identify via its + number handles. The network's lifetime equals the object's lifetime. + """ + + nr: int + socket_dir: Path + + process: subprocess.Popen + pid: int + fd: io.TextIOBase + + def __repr__(self) -> str: + return f"<Vlan Nr. {self.nr}>" + + def __init__(self, nr: int, tmp_dir: Path): + self.nr = nr + self.socket_dir = tmp_dir / f"vde{self.nr}.ctl" + + # TODO: don't side-effect environment here + os.environ[f"QEMU_VDE_SOCKET_{self.nr}"] = str(self.socket_dir) + + rootlog.info("start vlan") + pty_master, pty_slave = pty.openpty() + + # The --hub is required for the scenario determined by + # nixos/tests/networking.nix vlan-ping. + # VLAN Tagged traffic (802.1Q) seams to be blocked if a vde_switch is + # used without the hub mode (flood packets to all ports). + self.process = subprocess.Popen( + ["vde_switch", "-s", self.socket_dir, "--dirmode", "0700", "--hub"], + stdin=pty_slave, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + shell=False, + ) + self.pid = self.process.pid + self.fd = os.fdopen(pty_master, "w") + self.fd.write("version\n") + + # TODO: perl version checks if this can be read from + # an if not, dies. we could hang here forever. Fix it. + assert self.process.stdout is not None + self.process.stdout.readline() + if not (self.socket_dir / "ctl").exists(): + rootlog.error("cannot start vde_switch") + + rootlog.info(f"running vlan (pid {self.pid}; ctl {self.socket_dir})") + + def __del__(self) -> None: + rootlog.info(f"kill vlan (pid {self.pid})") + self.fd.close() + self.process.terminate() |