about summary refs log tree commit diff
path: root/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py
diff options
context:
space:
mode:
Diffstat (limited to 'nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py')
-rw-r--r--nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py92
1 files changed, 92 insertions, 0 deletions
diff --git a/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py b/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py
new file mode 100644
index 000000000000..12cbad69e34e
--- /dev/null
+++ b/nixpkgs/nixos/lib/test-driver/test_driver/polling_condition.py
@@ -0,0 +1,92 @@
+import time
+from math import isfinite
+from typing import Callable, Optional
+
+from .logger import rootlog
+
+
+class PollingConditionError(Exception):
+    pass
+
+
+class PollingCondition:
+    condition: Callable[[], bool]
+    seconds_interval: float
+    description: Optional[str]
+
+    last_called: float
+    entry_count: int
+
+    def __init__(
+        self,
+        condition: Callable[[], Optional[bool]],
+        seconds_interval: float = 2.0,
+        description: Optional[str] = None,
+    ):
+        self.condition = condition  # type: ignore
+        self.seconds_interval = seconds_interval
+
+        if description is None:
+            if condition.__doc__:
+                self.description = condition.__doc__
+            else:
+                self.description = condition.__name__
+        else:
+            self.description = str(description)
+
+        self.last_called = float("-inf")
+        self.entry_count = 0
+
+    def check(self, force: bool = False) -> bool:
+        if (self.entered or not self.overdue) and not force:
+            return True
+
+        with self, rootlog.nested(self.nested_message):
+            time_since_last = time.monotonic() - self.last_called
+            last_message = (
+                f"Time since last: {time_since_last:.2f}s"
+                if isfinite(time_since_last)
+                else "(not called yet)"
+            )
+
+            rootlog.info(last_message)
+            try:
+                res = self.condition()  # type: ignore
+            except Exception:
+                res = False
+            res = res is None or res
+            rootlog.info(self.status_message(res))
+            return res
+
+    def maybe_raise(self) -> None:
+        if not self.check():
+            raise PollingConditionError(self.status_message(False))
+
+    def status_message(self, status: bool) -> str:
+        return f"Polling condition {'succeeded' if status else 'failed'}: {self.description}"
+
+    @property
+    def nested_message(self) -> str:
+        nested_message = ["Checking polling condition"]
+        if self.description is not None:
+            nested_message.append(repr(self.description))
+
+        return " ".join(nested_message)
+
+    @property
+    def overdue(self) -> bool:
+        return self.last_called + self.seconds_interval < time.monotonic()
+
+    @property
+    def entered(self) -> bool:
+        # entry_count should never dip *below* zero
+        assert self.entry_count >= 0
+        return self.entry_count > 0
+
+    def __enter__(self) -> None:
+        self.entry_count += 1
+
+    def __exit__(self, exc_type, exc_value, traceback) -> None:  # type: ignore
+        assert self.entered
+        self.entry_count -= 1
+        self.last_called = time.monotonic()