Source code for runloop_api_client.sdk.execution
"""Execution management for async commands."""
from __future__ import annotations
import logging
import threading
from typing import Optional
from typing_extensions import Unpack, override
from ._types import BaseRequestOptions, LongRequestOptions
from .._client import Runloop
from .execution_result import ExecutionResult
from ..types.devbox_async_execution_detail_view import DevboxAsyncExecutionDetailView
class _StreamingGroup:
"""Internal helper used to coordinate stdout/stderr streaming threads."""
def __init__(self, threads: list[threading.Thread], stop_event: threading.Event) -> None:
self._threads = threads
self._stop_event = stop_event
self._logger = logging.getLogger(__name__)
def stop(self) -> None:
self._stop_event.set()
def join(self, timeout: float = 5.0) -> None:
for thread in self._threads:
thread.join(timeout)
if thread.is_alive():
self._logger.debug("streaming thread %s still running after join timeout", thread.name)
@property
def active(self) -> bool:
return any(thread.is_alive() for thread in self._threads)
[docs]
class Execution:
"""Manages an asynchronous command execution on a devbox.
Provides methods to poll execution state, wait for completion, and terminate
the running process. Created by ``devbox.cmd.exec_async()``.
Example:
>>> execution = devbox.cmd.exec_async(command="python train.py")
>>> state = execution.get_state()
>>> if state.status == "running":
... execution.kill()
>>> result = execution.result() # Wait for completion
>>> print(result.stdout())
"""
def __init__(
self,
client: Runloop,
devbox_id: str,
execution: DevboxAsyncExecutionDetailView,
streaming_group: Optional[_StreamingGroup] = None,
) -> None:
self._client = client
self._devbox_id = devbox_id
self._execution_id = execution.execution_id
self._initial_result = execution
self._streaming_group = streaming_group
@override
def __repr__(self) -> str:
return f"<Execution id={self._execution_id!r}>"
@property
def execution_id(self) -> str:
"""Return the execution identifier.
:return: Unique execution ID
:rtype: str
"""
return self._execution_id
@property
def devbox_id(self) -> str:
"""Return the devbox identifier.
:return: Devbox ID where the command is running
:rtype: str
"""
return self._devbox_id
[docs]
def result(self, **options: Unpack[LongRequestOptions]) -> ExecutionResult:
"""Wait for completion and return an :class:`ExecutionResult`.
:param options: Optional long-running request configuration
:return: Wrapper with exit status and output helpers
:rtype: ExecutionResult
"""
# Wait for command completion
final = self._client.devboxes.wait_for_command(
self._execution_id,
devbox_id=self._devbox_id,
statuses=["completed"],
**options,
)
# Wait for streaming to complete naturally (log but don't throw streaming errors)
if self._streaming_group is not None:
self._streaming_group.join()
self._streaming_group = None
return ExecutionResult(self._client, self._devbox_id, final)
[docs]
def get_state(self, **options: Unpack[BaseRequestOptions]) -> DevboxAsyncExecutionDetailView:
"""Fetch the latest execution state.
:param options: Optional request configuration
:return: Current execution metadata
:rtype: DevboxAsyncExecutionDetailView
"""
return self._client.devboxes.executions.retrieve(
self._execution_id,
devbox_id=self._devbox_id,
**options,
)
[docs]
def kill(self, **options: Unpack[LongRequestOptions]) -> None:
"""Request termination of the running execution.
:param options: Optional long-running request configuration
"""
self._client.devboxes.executions.kill(
self._execution_id,
devbox_id=self._devbox_id,
**options,
)