Source code for runloop_api_client.sdk.async_

"""Asynchronous SDK entry points and management interfaces."""

from __future__ import annotations

import io
import asyncio
import tarfile
from typing import Dict, Mapping, Optional
from pathlib import Path
from datetime import timedelta
from typing_extensions import Unpack

import httpx

from ._types import (
    LongRequestOptions,
    SDKDevboxListParams,
    SDKObjectListParams,
    SDKDevboxCreateParams,
    SDKObjectCreateParams,
    SDKBlueprintListParams,
    SDKBlueprintCreateParams,
    SDKDiskSnapshotListParams,
    SDKDevboxCreateFromImageParams,
)
from .._types import Timeout, NotGiven, not_given
from .._client import DEFAULT_MAX_RETRIES, AsyncRunloop
from ._helpers import detect_content_type
from .async_devbox import AsyncDevbox
from .async_snapshot import AsyncSnapshot
from .async_blueprint import AsyncBlueprint
from .async_storage_object import AsyncStorageObject
from ..types.object_create_params import ContentType


[docs] class AsyncDevboxOps: """High-level async manager for creating and managing AsyncDevbox instances. Accessed via ``runloop.devbox`` from :class:`AsyncRunloopSDK`, provides coroutines to create devboxes from scratch, blueprints, or snapshots, and to list existing devboxes. Example: >>> runloop = AsyncRunloopSDK() >>> devbox = await runloop.devbox.create(name="my-devbox") >>> devboxes = await runloop.devbox.list(limit=10) """ def __init__(self, client: AsyncRunloop) -> None: """Initialize the manager. :param client: Generated AsyncRunloop client to wrap :type client: AsyncRunloop """ self._client = client
[docs] async def create( self, **params: Unpack[SDKDevboxCreateParams], ) -> AsyncDevbox: """Provision a new devbox and wait until it reaches ``running`` state. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKDevboxCreateParams` for available parameters :return: Wrapper bound to the newly created devbox :rtype: AsyncDevbox """ devbox_view = await self._client.devboxes.create_and_await_running( **params, ) return AsyncDevbox(self._client, devbox_view.id)
[docs] async def create_from_blueprint_id( self, blueprint_id: str, **params: Unpack[SDKDevboxCreateFromImageParams], ) -> AsyncDevbox: """Create a devbox from an existing blueprint by identifier. :param blueprint_id: Blueprint ID to create from :type blueprint_id: str :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKDevboxCreateFromImageParams` for available parameters :return: Wrapper bound to the newly created devbox :rtype: AsyncDevbox """ devbox_view = await self._client.devboxes.create_and_await_running( blueprint_id=blueprint_id, **params, ) return AsyncDevbox(self._client, devbox_view.id)
[docs] async def create_from_blueprint_name( self, blueprint_name: str, **params: Unpack[SDKDevboxCreateFromImageParams], ) -> AsyncDevbox: """Create a devbox from the latest blueprint with the given name. :param blueprint_name: Blueprint name to create from :type blueprint_name: str :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKDevboxCreateFromImageParams` for available parameters :return: Wrapper bound to the newly created devbox :rtype: AsyncDevbox """ devbox_view = await self._client.devboxes.create_and_await_running( blueprint_name=blueprint_name, **params, ) return AsyncDevbox(self._client, devbox_view.id)
[docs] async def create_from_snapshot( self, snapshot_id: str, **params: Unpack[SDKDevboxCreateFromImageParams], ) -> AsyncDevbox: """Create a devbox initialized from a snapshot. :param snapshot_id: Snapshot ID to create from :type snapshot_id: str :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKDevboxCreateFromImageParams` for available parameters :return: Wrapper bound to the newly created devbox :rtype: AsyncDevbox """ devbox_view = await self._client.devboxes.create_and_await_running( snapshot_id=snapshot_id, **params, ) return AsyncDevbox(self._client, devbox_view.id)
[docs] def from_id(self, devbox_id: str) -> AsyncDevbox: """Attach to an existing devbox by ID. Returns immediately without waiting for the devbox to reach ``running`` state. Call ``await_running()`` on the returned :class:`AsyncDevbox` if you need to wait for readiness (contrast with the synchronous SDK, which blocks). :param devbox_id: Existing devbox ID :type devbox_id: str :return: Wrapper bound to the requested devbox :rtype: AsyncDevbox """ return AsyncDevbox(self._client, devbox_id)
[docs] async def list( self, **params: Unpack[SDKDevboxListParams], ) -> list[AsyncDevbox]: """List devboxes accessible to the caller. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKDevboxListParams` for available parameters :return: Collection of devbox wrappers :rtype: list[AsyncDevbox] """ page = await self._client.devboxes.list( **params, ) return [AsyncDevbox(self._client, item.id) for item in page.devboxes]
[docs] class AsyncSnapshotOps: """High-level async manager for working with disk snapshots. Accessed via ``runloop.snapshot`` from :class:`AsyncRunloopSDK`, provides coroutines to list snapshots and access snapshot details. Example: >>> runloop = AsyncRunloopSDK() >>> snapshots = await runloop.snapshot.list(devbox_id="dev-123") >>> snapshot = await runloop.snapshot.from_id("snap-123") """ def __init__(self, client: AsyncRunloop) -> None: """Initialize the manager. :param client: Generated AsyncRunloop client to wrap :type client: AsyncRunloop """ self._client = client
[docs] async def list( self, **params: Unpack[SDKDiskSnapshotListParams], ) -> list[AsyncSnapshot]: """List snapshots created from devboxes. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKDiskSnapshotListParams` for available parameters :return: Snapshot wrappers for each record :rtype: list[AsyncSnapshot] """ page = await self._client.devboxes.disk_snapshots.list( **params, ) return [AsyncSnapshot(self._client, item.id) for item in page.snapshots]
[docs] def from_id(self, snapshot_id: str) -> AsyncSnapshot: """Return a snapshot wrapper for the given ID. :param snapshot_id: Snapshot ID to wrap :type snapshot_id: str :return: Wrapper for the snapshot resource :rtype: AsyncSnapshot """ return AsyncSnapshot(self._client, snapshot_id)
[docs] class AsyncBlueprintOps: """High-level async manager for creating and managing blueprints. Accessed via ``runloop.blueprint`` from :class:`AsyncRunloopSDK`, provides coroutines to create Dockerfile-based blueprints, inspect build logs, and list existing blueprints. Example: >>> runloop = AsyncRunloopSDK() >>> blueprint = await runloop.blueprint.create( ... name="my-blueprint", ... dockerfile="FROM ubuntu:22.04\\nRUN apt-get update", ... ) >>> blueprints = await runloop.blueprint.list() """ def __init__(self, client: AsyncRunloop) -> None: """Initialize the manager. :param client: Generated AsyncRunloop client to wrap :type client: AsyncRunloop """ self._client = client
[docs] async def create( self, **params: Unpack[SDKBlueprintCreateParams], ) -> AsyncBlueprint: """Create a blueprint and wait for the build to finish. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKBlueprintCreateParams` for available parameters :return: Wrapper bound to the finished blueprint :rtype: AsyncBlueprint """ blueprint = await self._client.blueprints.create_and_await_build_complete( **params, ) return AsyncBlueprint(self._client, blueprint.id)
[docs] def from_id(self, blueprint_id: str) -> AsyncBlueprint: """Return a blueprint wrapper for the given ID. :param blueprint_id: Blueprint ID to wrap :type blueprint_id: str :return: Wrapper for the blueprint resource :rtype: AsyncBlueprint """ return AsyncBlueprint(self._client, blueprint_id)
[docs] async def list( self, **params: Unpack[SDKBlueprintListParams], ) -> list[AsyncBlueprint]: """List available blueprints. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKBlueprintListParams` for available parameters :return: Blueprint wrappers for each record :rtype: list[AsyncBlueprint] """ page = await self._client.blueprints.list( **params, ) return [AsyncBlueprint(self._client, item.id) for item in page.blueprints]
[docs] class AsyncStorageObjectOps: """High-level async manager for creating and managing storage objects. Accessed via ``runloop.storage_object`` from :class:`AsyncRunloopSDK`, provides coroutines to create, upload, download, and list storage objects with convenient helpers for file and text uploads. Example: >>> runloop = AsyncRunloopSDK() >>> obj = await runloop.storage_object.upload_from_text("Hello!", "greeting.txt") >>> content = await obj.download_as_text() >>> objects = await runloop.storage_object.list() """ def __init__(self, client: AsyncRunloop) -> None: """Initialize the manager. :param client: Generated AsyncRunloop client to wrap :type client: AsyncRunloop """ self._client = client
[docs] async def create( self, **params: Unpack[SDKObjectCreateParams], ) -> AsyncStorageObject: """Create a storage object and obtain an upload URL. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKObjectCreateParams` for available parameters :return: Wrapper with upload URL set for immediate uploads :rtype: AsyncStorageObject """ obj = await self._client.objects.create(**params) return AsyncStorageObject(self._client, obj.id, upload_url=obj.upload_url)
[docs] def from_id(self, object_id: str) -> AsyncStorageObject: """Return a storage object wrapper by identifier. :param object_id: Storage object identifier to wrap :type object_id: str :return: Wrapper for the storage object resource :rtype: AsyncStorageObject """ return AsyncStorageObject(self._client, object_id, upload_url=None)
[docs] async def list( self, **params: Unpack[SDKObjectListParams], ) -> list[AsyncStorageObject]: """List storage objects owned by the caller. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKObjectListParams` for available parameters :return: Storage object wrappers for each record :rtype: list[AsyncStorageObject] """ page = await self._client.objects.list( **params, ) return [AsyncStorageObject(self._client, item.id, upload_url=item.upload_url) for item in page.objects]
[docs] async def upload_from_file( self, file_path: str | Path, *, name: Optional[str] = None, content_type: Optional[ContentType] = None, metadata: Optional[Dict[str, str]] = None, ttl: Optional[timedelta] = None, **options: Unpack[LongRequestOptions], ) -> AsyncStorageObject: """Create and upload an object from a local file path. :param file_path: Local filesystem path to read :type file_path: str | Path :param name: Optional object name; defaults to the file name :type name: Optional[str] :param content_type: Optional MIME type to apply to the object :type content_type: Optional[ContentType] :param metadata: Optional key-value metadata :type metadata: Optional[Dict[str, str]] :param ttl: Optional Time-To-Live, after which the object is automatically deleted :type ttl: Optional[timedelta] :param options: See :typeddict:`~runloop_api_client.sdk._types.LongRequestOptions` for available options :return: Wrapper for the uploaded object :rtype: AsyncStorageObject :raises OSError: If the local file cannot be read """ path = Path(file_path) try: content = await asyncio.to_thread(lambda: path.read_bytes()) except OSError as error: raise OSError(f"Failed to read file {path}: {error}") from error name = name or path.name content_type = content_type or detect_content_type(str(file_path)) ttl_ms = int(ttl.total_seconds()) * 1000 if ttl else None obj = await self.create(name=name, content_type=content_type, metadata=metadata, ttl_ms=ttl_ms, **options) await obj.upload_content(content) await obj.complete() return obj
[docs] async def upload_from_dir( self, dir_path: str | Path, *, name: Optional[str] = None, metadata: Optional[Dict[str, str]] = None, ttl: Optional[timedelta] = None, **options: Unpack[LongRequestOptions], ) -> AsyncStorageObject: """Create and upload an object from a local directory. The resulting object will be uploaded as a compressed tarball. :param dir_path: Local filesystem directory path to tar :type dir_path: str | Path :param name: Optional object name; defaults to the directory name + '.tar.gz' :type name: Optional[str] :param metadata: Optional key-value metadata :type metadata: Optional[Dict[str, str]] :param ttl: Optional Time-To-Live, after which the object is automatically deleted :type ttl: Optional[timedelta] :param options: See :typeddict:`~runloop_api_client.sdk._types.LongRequestOptions` for available options :return: Wrapper for the uploaded object :rtype: AsyncStorageObject :raises OSError: If the local file cannot be read """ path = Path(dir_path) name = name or f"{path.name}.tar.gz" ttl_ms = int(ttl.total_seconds()) * 1000 if ttl else None def synchronous_io() -> bytes: with io.BytesIO() as tar_buffer: with tarfile.open(fileobj=tar_buffer, mode="w:gz") as tar: tar.add(path, arcname=".", recursive=True) tar_buffer.seek(0) return tar_buffer.read() tar_bytes = await asyncio.to_thread(synchronous_io) obj = await self.create(name=name, content_type="tgz", metadata=metadata, ttl_ms=ttl_ms, **options) await obj.upload_content(tar_bytes) await obj.complete() return obj
[docs] async def upload_from_text( self, text: str, *, name: str, metadata: Optional[Dict[str, str]] = None, ttl: Optional[timedelta] = None, **options: Unpack[LongRequestOptions], ) -> AsyncStorageObject: """Create and upload an object from a text payload. :param text: Text content to upload :type text: str :param name: Object display name :type name: str :param metadata: Optional key-value metadata :type metadata: Optional[Dict[str, str]] :param ttl: Optional Time-To-Live, after which the object is automatically deleted :type ttl: Optional[timedelta] :param options: See :typeddict:`~runloop_api_client.sdk._types.LongRequestOptions` for available options :return: Wrapper for the uploaded object :rtype: AsyncStorageObject """ ttl_ms = int(ttl.total_seconds()) * 1000 if ttl else None obj = await self.create(name=name, content_type="text", metadata=metadata, ttl_ms=ttl_ms, **options) await obj.upload_content(text) await obj.complete() return obj
[docs] async def upload_from_bytes( self, data: bytes, *, name: str, content_type: ContentType, metadata: Optional[Dict[str, str]] = None, ttl: Optional[timedelta] = None, **options: Unpack[LongRequestOptions], ) -> AsyncStorageObject: """Create and upload an object from a bytes payload. :param data: Binary payload to upload :type data: bytes :param name: Object display name :type name: str :param content_type: MIME type describing the payload :type content_type: ContentType :param metadata: Optional key-value metadata :type metadata: Optional[Dict[str, str]] :param ttl: Optional Time-To-Live, after which the object is automatically deleted :type ttl: Optional[timedelta] :param options: See :typeddict:`~runloop_api_client.sdk._types.LongRequestOptions` for available options :return: Wrapper for the uploaded object :rtype: AsyncStorageObject """ ttl_ms = int(ttl.total_seconds()) * 1000 if ttl else None obj = await self.create(name=name, content_type=content_type, metadata=metadata, ttl_ms=ttl_ms, **options) await obj.upload_content(data) await obj.complete() return obj
class AsyncRunloopSDK: """High-level asynchronous entry point for the Runloop SDK. Provides a Pythonic, object-oriented interface for managing devboxes, blueprints, snapshots, and storage objects. Exposes the generated async REST client via the ``api`` attribute for advanced use cases. :ivar api: Direct access to the generated async REST API client :vartype api: AsyncRunloop :ivar devbox: High-level async interface for devbox management :vartype devbox: AsyncDevboxOps :ivar blueprint: High-level async interface for blueprint management :vartype blueprint: AsyncBlueprintOps :ivar snapshot: High-level async interface for snapshot management :vartype snapshot: AsyncSnapshotOps :ivar storage_object: High-level async interface for storage object management :vartype storage_object: AsyncStorageObjectOps Example: >>> runloop = AsyncRunloopSDK() # Uses RUNLOOP_API_KEY env var >>> devbox = await runloop.devbox.create(name="my-devbox") >>> result = await devbox.cmd.exec(command="echo 'hello'") >>> print(await result.stdout()) >>> await devbox.shutdown() """ api: AsyncRunloop devbox: AsyncDevboxOps blueprint: AsyncBlueprintOps snapshot: AsyncSnapshotOps storage_object: AsyncStorageObjectOps def __init__( self, *, bearer_token: str | None = None, base_url: str | httpx.URL | None = None, timeout: float | Timeout | None | NotGiven = not_given, max_retries: int = DEFAULT_MAX_RETRIES, default_headers: Mapping[str, str] | None = None, default_query: Mapping[str, object] | None = None, http_client: httpx.AsyncClient | None = None, ) -> None: """Configure the asynchronous SDK wrapper. :param bearer_token: API token; falls back to ``RUNLOOP_API_KEY`` env var, defaults to None :type bearer_token: str | None, optional :param base_url: Override the API base URL, defaults to None :type base_url: str | httpx.URL | None, optional :param timeout: Request timeout (seconds) or ``Timeout`` object, defaults to not_given :type timeout: float | Timeout | None | NotGiven, optional :param max_retries: Maximum automatic retry attempts, defaults to DEFAULT_MAX_RETRIES :type max_retries: int, optional :param default_headers: Headers merged into every request, defaults to None :type default_headers: Mapping[str, str] | None, optional :param default_query: Default query parameters merged into every request, defaults to None :type default_query: Mapping[str, object] | None, optional :param http_client: Custom ``httpx.AsyncClient`` instance to reuse, defaults to None :type http_client: httpx.AsyncClient | None, optional """ self.api = AsyncRunloop( bearer_token=bearer_token, base_url=base_url, timeout=timeout, max_retries=max_retries, default_headers=default_headers, default_query=default_query, http_client=http_client, ) self.devbox = AsyncDevboxOps(self.api) self.blueprint = AsyncBlueprintOps(self.api) self.snapshot = AsyncSnapshotOps(self.api) self.storage_object = AsyncStorageObjectOps(self.api)
[docs] async def aclose(self) -> None: """Close the underlying HTTP client and release resources.""" await self.api.close()
async def __aenter__(self) -> "AsyncRunloopSDK": """Allow ``async with AsyncRunloopSDK() as runloop`` usage. :return: The active SDK instance :rtype: AsyncRunloopSDK """ return self async def __aexit__(self, *_exc_info: object) -> None: """Ensure the API client closes when leaving the context manager.""" await self.aclose()