Source code for runloop_api_client.sdk.async_execution

"""Async execution management for async commands."""

from __future__ import annotations

import asyncio
import logging
from typing import Optional, Awaitable, cast
from typing_extensions import Unpack, override

from ._types import BaseRequestOptions, LongRequestOptions
from .._client import AsyncRunloop
from .async_execution_result import AsyncExecutionResult
from ..types.devbox_async_execution_detail_view import DevboxAsyncExecutionDetailView


class _AsyncStreamingGroup:
    """Internal helper to manage background streaming tasks."""

    def __init__(self, tasks: list[asyncio.Task[None]]) -> None:
        self._tasks = tasks
        self._logger = logging.getLogger(__name__)

    async def wait(self) -> None:
        results = await asyncio.gather(*self._tasks, return_exceptions=True)
        self._log_results(tuple(results))

    async def cancel(self) -> None:
        for task in self._tasks:
            task.cancel()
        results = await asyncio.gather(*self._tasks, return_exceptions=True)
        self._log_results(tuple(results))

    def _log_results(self, results: tuple[object | BaseException | None, ...]) -> None:
        for result in results:
            if isinstance(result, Exception) and not isinstance(result, asyncio.CancelledError):
                self._logger.debug("stream task error: %s", result)


[docs] class AsyncExecution: """Manages an asynchronous command execution on a devbox. Provides coroutines to poll execution state, wait for completion, and terminate the running process. Created by ``await devbox.cmd.exec_async()``. Example: >>> execution = await devbox.cmd.exec_async(command="python train.py") >>> state = await execution.get_state() >>> if state.status == "running": ... await execution.kill() >>> result = await execution.result() # Wait for completion >>> print(await result.stdout()) """ def __init__( self, client: AsyncRunloop, devbox_id: str, execution: DevboxAsyncExecutionDetailView, streaming_group: Optional[_AsyncStreamingGroup] = None, ) -> None: self._client = client self._devbox_id = devbox_id self._execution_id = execution.execution_id self._initial_result = execution self._streaming_group = streaming_group @override def __repr__(self) -> str: return f"<AsyncExecution id={self._execution_id!r}>" @property def execution_id(self) -> str: """Return the execution identifier. :return: Unique execution ID :rtype: str """ return self._execution_id @property def devbox_id(self) -> str: """Return the devbox identifier. :return: Devbox ID where the command is running :rtype: str """ return self._devbox_id
[docs] async def result(self, **options: Unpack[LongRequestOptions]) -> AsyncExecutionResult: """Wait for completion and return an :class:`AsyncExecutionResult`. :param options: Optional long-running request configuration :return: Wrapper with exit status and output helpers :rtype: AsyncExecutionResult """ # Wait for both command completion and streaming to finish awaitables: list[Awaitable[DevboxAsyncExecutionDetailView | None]] = [ self._client.devboxes.wait_for_command( self._execution_id, devbox_id=self._devbox_id, statuses=["completed"], **options, ) ] if self._streaming_group is not None: awaitables.append(self._streaming_group.wait()) results = await asyncio.gather(*awaitables, return_exceptions=True) command_result = results[0] # Extract command result (throw if it failed, ignore streaming errors) if isinstance(command_result, Exception): raise command_result if self._streaming_group is not None: self._streaming_group = None # Streaming errors are already logged in _AsyncStreamingGroup._log_results() final = cast(DevboxAsyncExecutionDetailView, command_result) return AsyncExecutionResult(self._client, self._devbox_id, final)
[docs] async def get_state(self, **options: Unpack[BaseRequestOptions]) -> DevboxAsyncExecutionDetailView: """Fetch the latest execution state. :param options: Optional request configuration :return: Current execution metadata :rtype: DevboxAsyncExecutionDetailView """ return await self._client.devboxes.executions.retrieve( self._execution_id, devbox_id=self._devbox_id, **options, )
[docs] async def kill(self, **options: Unpack[LongRequestOptions]) -> None: """Request termination of the running execution. :param options: Optional long-running request configuration """ await self._client.devboxes.executions.kill( self._execution_id, devbox_id=self._devbox_id, **options, )