Source code for runloop_api_client.sdk.async_storage_object

"""Storage object resource class for asynchronous operations."""

from __future__ import annotations

from typing import Iterable
from typing_extensions import Unpack, override

from ._types import BaseRequestOptions, LongRequestOptions, SDKObjectDownloadParams
from .._client import AsyncRunloop
from ..types.object_view import ObjectView
from ..types.object_download_url_view import ObjectDownloadURLView


[docs] class AsyncStorageObject: """Async wrapper around storage object operations, including uploads and downloads.""" def __init__(self, client: AsyncRunloop, object_id: str, upload_url: str | None) -> None: """Initialize the wrapper. :param client: Generated AsyncRunloop client :type client: AsyncRunloop :param object_id: Storage object identifier returned by the API :type object_id: str :param upload_url: Optional pre-signed upload URL if the object is still open, defaults to None :type upload_url: str | None, optional """ self._client = client self._id = object_id self._upload_url = upload_url @override def __repr__(self) -> str: return f"<AsyncStorageObject id={self._id!r}>" @property def id(self) -> str: """Return the storage object identifier. :return: Unique object ID :rtype: str """ return self._id @property def upload_url(self) -> str | None: """Return the pre-signed upload URL, if available. :return: Upload URL when the object is pending completion :rtype: str | None """ return self._upload_url
[docs] async def refresh( self, **options: Unpack[BaseRequestOptions], ) -> ObjectView: """Fetch the latest metadata for the object. :param options: Optional request configuration :return: Updated object metadata :rtype: ObjectView """ return await self._client.objects.retrieve( self._id, **options, )
[docs] async def complete( self, **options: Unpack[LongRequestOptions], ) -> ObjectView: """Mark the object as fully uploaded. :param options: Optional long-running request configuration :return: Finalized object metadata :rtype: ObjectView """ result = await self._client.objects.complete( self._id, **options, ) self._upload_url = None return result
[docs] async def get_download_url( self, **params: Unpack[SDKObjectDownloadParams], ) -> ObjectDownloadURLView: """Request a signed download URL for the object. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKObjectDownloadParams` for available parameters :return: URL + metadata describing the download :rtype: ObjectDownloadURLView """ return await self._client.objects.download( self._id, **params, )
[docs] async def download_as_bytes( self, **params: Unpack[SDKObjectDownloadParams], ) -> bytes: """Download the object contents as bytes. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKObjectDownloadParams` for available parameters :return: Entire object payload :rtype: bytes """ url_view = await self.get_download_url( **params, ) response = await self._client._client.get(url_view.download_url) response.raise_for_status() return response.content
[docs] async def download_as_text( self, **params: Unpack[SDKObjectDownloadParams], ) -> str: """Download the object contents as UTF-8 text. :param params: See :typeddict:`~runloop_api_client.sdk._types.SDKObjectDownloadParams` for available parameters :return: Entire object payload decoded as UTF-8 :rtype: str """ url_view = await self.get_download_url( **params, ) response = await self._client._client.get(url_view.download_url) response.raise_for_status() response.encoding = "utf-8" return response.text
[docs] async def delete( self, **options: Unpack[LongRequestOptions], ) -> ObjectView: """Delete the storage object. :param options: Optional long-running request configuration :return: API response for the deleted object :rtype: ObjectView """ return await self._client.objects.delete( self._id, **options, )
[docs] async def upload_content(self, content: str | bytes | Iterable[bytes]) -> None: """Upload content to the object's pre-signed URL. :param content: Bytes or text payload to upload :type content: str | bytes :raises RuntimeError: If no upload URL is available :raises httpx.HTTPStatusError: Propagated from the underlying ``httpx`` client when the upload fails """ url = self._ensure_upload_url() response = await self._client._client.put(url, content=content) response.raise_for_status()
def _ensure_upload_url(self) -> str: """Return the upload URL, ensuring it exists. :return: Upload URL ready for use :rtype: str :raises RuntimeError: If no upload URL is available """ if not self._upload_url: raise RuntimeError("No upload URL available. Create a new object before uploading content.") return self._upload_url