Coverage for portality / lib / thread_utils.py: 90%
10 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-05 00:09 +0100
1import time
2from typing import Callable
5def wait_until(cond_fn: Callable[[], bool], timeout=10, sleep_time=0.1,
6 timeout_msg='fail to meet the condition within the timeout period.'):
7 start_time = time.time()
8 while (time.time() - start_time) < timeout:
9 cond_result = cond_fn()
10 if cond_result:
11 return cond_result
13 time.sleep(sleep_time)
15 raise TimeoutError(timeout_msg)