Source code for runloop_api_client.lib.polling
import time
from typing import Any, TypeVar, Callable, Optional
from dataclasses import dataclass
T = TypeVar("T")
[docs]
@dataclass
class PollingConfig:
"""Configuration for polling behavior"""
interval_seconds: float = 1.0
max_attempts: int = 120
timeout_seconds: Optional[float] = None
class PollingTimeout(Exception):
"""Raised when polling exceeds max attempts or timeout"""
def __init__(self, message: str, last_value: Any):
self.last_value = last_value
super().__init__(f"{message}. Last retrieved value: {last_value}")
def poll_until(
retriever: Callable[[], T],
is_terminal: Callable[[T], bool],
config: Optional[PollingConfig] = None,
on_error: Optional[Callable[[Exception], T]] = None,
) -> T:
"""
Poll until a condition is met or timeout/max attempts are reached.
Args:
retriever: Callable that returns the object to check
is_terminal: Callable that returns True when polling should stop
config: Optional polling configuration
on_error: Optional error handler that can return a value to continue polling
or re-raise the exception to stop polling
Returns:
The final state of the polled object
Raises:
PollingTimeout: When max attempts or timeout is reached
"""
if config is None:
config = PollingConfig()
attempts = 0
start_time = time.time()
last_result = None
while True:
try:
last_result = retriever()
except Exception as e:
if on_error is not None:
last_result = on_error(e)
else:
raise
if is_terminal(last_result):
return last_result
attempts += 1
if attempts >= config.max_attempts:
raise PollingTimeout(f"Exceeded maximum attempts ({config.max_attempts})", last_result)
if config.timeout_seconds is not None:
elapsed = time.time() - start_time
if elapsed >= config.timeout_seconds:
raise PollingTimeout(f"Exceeded timeout of {config.timeout_seconds} seconds", last_result)
time.sleep(config.interval_seconds)