diff options
Diffstat (limited to 'nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py')
-rw-r--r-- | nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py | 92 |
1 files changed, 92 insertions, 0 deletions
diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py b/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py new file mode 100644 index 000000000000..12cbad69e34e --- /dev/null +++ b/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py @@ -0,0 +1,92 @@ +import time +from math import isfinite +from typing import Callable, Optional + +from .logger import rootlog + + +class PollingConditionError(Exception): + pass + + +class PollingCondition: + condition: Callable[[], bool] + seconds_interval: float + description: Optional[str] + + last_called: float + entry_count: int + + def __init__( + self, + condition: Callable[[], Optional[bool]], + seconds_interval: float = 2.0, + description: Optional[str] = None, + ): + self.condition = condition # type: ignore + self.seconds_interval = seconds_interval + + if description is None: + if condition.__doc__: + self.description = condition.__doc__ + else: + self.description = condition.__name__ + else: + self.description = str(description) + + self.last_called = float("-inf") + self.entry_count = 0 + + def check(self, force: bool = False) -> bool: + if (self.entered or not self.overdue) and not force: + return True + + with self, rootlog.nested(self.nested_message): + time_since_last = time.monotonic() - self.last_called + last_message = ( + f"Time since last: {time_since_last:.2f}s" + if isfinite(time_since_last) + else "(not called yet)" + ) + + rootlog.info(last_message) + try: + res = self.condition() # type: ignore + except Exception: + res = False + res = res is None or res + rootlog.info(self.status_message(res)) + return res + + def maybe_raise(self) -> None: + if not self.check(): + raise PollingConditionError(self.status_message(False)) + + def status_message(self, status: bool) -> str: + return f"Polling condition {'succeeded' if status else 'failed'}: {self.description}" + + @property + def nested_message(self) -> str: + nested_message = ["Checking polling condition"] + if self.description is not None: + nested_message.append(repr(self.description)) + + return " ".join(nested_message) + + @property + def overdue(self) -> bool: + return self.last_called + self.seconds_interval < time.monotonic() + + @property + def entered(self) -> bool: + # entry_count should never dip *below* zero + assert self.entry_count >= 0 + return self.entry_count > 0 + + def __enter__(self) -> None: + self.entry_count += 1 + + def __exit__(self, exc_type, exc_value, traceback) -> None: # type: ignore + assert self.entered + self.entry_count -= 1 + self.last_called = time.monotonic() |