Coverage for portality / lib / thread_utils.py: 90%

10 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-04 09:41 +0100

1import time 

2from typing import Callable 

3 

4 

5def wait_until(cond_fn: Callable[[], bool], timeout=10, sleep_time=0.1, 

6 timeout_msg='fail to meet the condition within the timeout period.'): 

7 start_time = time.time() 

8 while (time.time() - start_time) < timeout: 

9 cond_result = cond_fn() 

10 if cond_result: 

11 return cond_result 

12 

13 time.sleep(sleep_time) 

14 

15 raise TimeoutError(timeout_msg)