Module frantic.frantic

Client for basic usage

Expand source code
"""Client for basic usage"""

import logging
import os
from typing import AsyncGenerator, Optional, Type, TypeVar, Union, cast

from google.cloud.firestore import AsyncClient
from google.oauth2.service_account import Credentials
from google.api_core.exceptions import AlreadyExists

from frantic.base import BaseModel
from frantic import exceptions


ModelType = TypeVar("ModelType", bound=BaseModel)  # pylint: disable=invalid-name

logger = logging.getLogger(__name__)


def client_factory() -> AsyncClient:
    """Create a default client instance"""
    gappcreds = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
    if gappcreds is None:
        raise ValueError("No GOOGLE_APPLICATION_CREDENTIALS env was set")
    credentials = Credentials.from_service_account_file(gappcreds)
    return AsyncClient(credentials=credentials)


class Frantic:
    """Firestore with Pydantic models integration"""

    def __init__(self, *, client: Optional[AsyncClient] = None, prefix: Optional[str] = None):
        self.client = client or client_factory()
        self.prefix = (prefix or "").rstrip("/").lstrip("/")

    def _get_path(self, model: Type[ModelType]) -> str:
        """Get collection path for a specified model"""
        colname = model.collection or model.__name__.lower()
        return f"{self.prefix}/{colname}" if self.prefix else colname

    async def get(self, model: Type[ModelType], did: str) -> Optional[ModelType]:
        """Get a single document of a type {model} by its document id {did}

        Returns None if none found
        """
        collection_path = self._get_path(model)
        collection = self.client.collection(collection_path)
        docref = collection.document(document_id=did)
        document = await docref.get()
        if not document.exists:
            return None
        return model(**document.to_dict(), id=document.id)

    # TODO: Pagination
    async def list(self, model: Type[ModelType]) -> AsyncGenerator[ModelType, None]:
        """Iterate through all documents of a type {model}"""
        collection_path = self._get_path(model)
        collection = self.client.collection(collection_path)
        async for docref in collection.list_documents():
            document = await docref.get()
            yield model(**document.to_dict(), id=docref.id)

    async def add(self, instance: ModelType) -> ModelType:
        """Add instance as a document within corresponding collection"""
        collection_path = self._get_path(instance.__class__)
        collection = self.client.collection(collection_path)
        try:
            _, docref = await collection.add(
                instance.dict(exclude={"id"}), document_id=instance.id if instance.id is not None else None
            )
        except AlreadyExists as error:
            raise exceptions.AlreadyExists(instance.id) from error
        instance.id = docref.id
        return instance

    async def delete(self, model_or_instance: Union[Type[ModelType], ModelType], did: Optional[str] = None):
        """Delete a document specified by model type and document id {did}"""
        if isinstance(model_or_instance, BaseModel):
            if did:
                logger.warning(
                    "Document ID was provided even though an instance was provided as well. Using ID of the instance."
                )
            did = model_or_instance.id
            model = type(model_or_instance)
        else:
            model = cast(Type[BaseModel], model_or_instance)
        if did is None:
            raise ValueError(f"Cannot delete '{model.__name__}' without an id")
        collection_path = self._get_path(model)
        collection = self.client.collection(collection_path)
        docref = collection.document(did)
        await docref.delete()

Functions

def client_factory() ‑> google.cloud.firestore_v1.async_client.AsyncClient

Create a default client instance

Expand source code
def client_factory() -> AsyncClient:
    """Create a default client instance"""
    gappcreds = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
    if gappcreds is None:
        raise ValueError("No GOOGLE_APPLICATION_CREDENTIALS env was set")
    credentials = Credentials.from_service_account_file(gappcreds)
    return AsyncClient(credentials=credentials)

Classes

class Frantic (*, client: Optional[google.cloud.firestore_v1.async_client.AsyncClient] = None, prefix: Optional[str] = None)

Firestore with Pydantic models integration

Expand source code
class Frantic:
    """Firestore with Pydantic models integration"""

    def __init__(self, *, client: Optional[AsyncClient] = None, prefix: Optional[str] = None):
        self.client = client or client_factory()
        self.prefix = (prefix or "").rstrip("/").lstrip("/")

    def _get_path(self, model: Type[ModelType]) -> str:
        """Get collection path for a specified model"""
        colname = model.collection or model.__name__.lower()
        return f"{self.prefix}/{colname}" if self.prefix else colname

    async def get(self, model: Type[ModelType], did: str) -> Optional[ModelType]:
        """Get a single document of a type {model} by its document id {did}

        Returns None if none found
        """
        collection_path = self._get_path(model)
        collection = self.client.collection(collection_path)
        docref = collection.document(document_id=did)
        document = await docref.get()
        if not document.exists:
            return None
        return model(**document.to_dict(), id=document.id)

    # TODO: Pagination
    async def list(self, model: Type[ModelType]) -> AsyncGenerator[ModelType, None]:
        """Iterate through all documents of a type {model}"""
        collection_path = self._get_path(model)
        collection = self.client.collection(collection_path)
        async for docref in collection.list_documents():
            document = await docref.get()
            yield model(**document.to_dict(), id=docref.id)

    async def add(self, instance: ModelType) -> ModelType:
        """Add instance as a document within corresponding collection"""
        collection_path = self._get_path(instance.__class__)
        collection = self.client.collection(collection_path)
        try:
            _, docref = await collection.add(
                instance.dict(exclude={"id"}), document_id=instance.id if instance.id is not None else None
            )
        except AlreadyExists as error:
            raise exceptions.AlreadyExists(instance.id) from error
        instance.id = docref.id
        return instance

    async def delete(self, model_or_instance: Union[Type[ModelType], ModelType], did: Optional[str] = None):
        """Delete a document specified by model type and document id {did}"""
        if isinstance(model_or_instance, BaseModel):
            if did:
                logger.warning(
                    "Document ID was provided even though an instance was provided as well. Using ID of the instance."
                )
            did = model_or_instance.id
            model = type(model_or_instance)
        else:
            model = cast(Type[BaseModel], model_or_instance)
        if did is None:
            raise ValueError(f"Cannot delete '{model.__name__}' without an id")
        collection_path = self._get_path(model)
        collection = self.client.collection(collection_path)
        docref = collection.document(did)
        await docref.delete()

Methods

async def add(self, instance: ~ModelType) ‑> ~ModelType

Add instance as a document within corresponding collection

Expand source code
async def add(self, instance: ModelType) -> ModelType:
    """Add instance as a document within corresponding collection"""
    collection_path = self._get_path(instance.__class__)
    collection = self.client.collection(collection_path)
    try:
        _, docref = await collection.add(
            instance.dict(exclude={"id"}), document_id=instance.id if instance.id is not None else None
        )
    except AlreadyExists as error:
        raise exceptions.AlreadyExists(instance.id) from error
    instance.id = docref.id
    return instance
async def delete(self, model_or_instance: Union[Type[~ModelType], ~ModelType], did: Optional[str] = None)

Delete a document specified by model type and document id {did}

Expand source code
async def delete(self, model_or_instance: Union[Type[ModelType], ModelType], did: Optional[str] = None):
    """Delete a document specified by model type and document id {did}"""
    if isinstance(model_or_instance, BaseModel):
        if did:
            logger.warning(
                "Document ID was provided even though an instance was provided as well. Using ID of the instance."
            )
        did = model_or_instance.id
        model = type(model_or_instance)
    else:
        model = cast(Type[BaseModel], model_or_instance)
    if did is None:
        raise ValueError(f"Cannot delete '{model.__name__}' without an id")
    collection_path = self._get_path(model)
    collection = self.client.collection(collection_path)
    docref = collection.document(did)
    await docref.delete()
async def get(self, model: Type[~ModelType], did: str) ‑> Optional[~ModelType]

Get a single document of a type {model} by its document id {did}

Returns None if none found

Expand source code
async def get(self, model: Type[ModelType], did: str) -> Optional[ModelType]:
    """Get a single document of a type {model} by its document id {did}

    Returns None if none found
    """
    collection_path = self._get_path(model)
    collection = self.client.collection(collection_path)
    docref = collection.document(document_id=did)
    document = await docref.get()
    if not document.exists:
        return None
    return model(**document.to_dict(), id=document.id)
async def list(self, model: Type[~ModelType]) ‑> AsyncGenerator[~ModelType, None]

Iterate through all documents of a type {model}

Expand source code
async def list(self, model: Type[ModelType]) -> AsyncGenerator[ModelType, None]:
    """Iterate through all documents of a type {model}"""
    collection_path = self._get_path(model)
    collection = self.client.collection(collection_path)
    async for docref in collection.list_documents():
        document = await docref.get()
        yield model(**document.to_dict(), id=docref.id)