Package mongorm

MongORM

Expand source code
"""MongORM"""

from mongorm.index import MongoIndex, MongoIndexType
from mongorm.mongorm import BaseModel, MongORM, ObjectId

__all__ = ["BaseModel", "MongORM", "MongoIndex", "MongoIndexType", "ObjectId"]

Sub-modules

mongorm.exceptions

MongORM exceptions

mongorm.index

MongORM index

mongorm.mongorm

This code defines the basic Mongo Object-Relational Model (MongORM) framework. It consists of a BaseModel class and MongORM class. MongORM provides …

mongorm.version

Package version

Classes

class BaseModel (**data: Any)

Base model for all models

Create a new model by parsing and validating input data from keyword arguments.

Raises ValidationError if the input data cannot be parsed to form a valid model.

Expand source code
class BaseModel(pydantic.BaseModel):
    """Base model for all models"""

    class Meta:
        """Indexes and meta definition"""

        client: "MongORM | None" = None
        collection: str | None = None

    class Config:
        """json encoders"""

        json_encoders = {bson.objectid.ObjectId: str}
        allow_population_by_field_name = True

    @classmethod
    def list_indexes(cls) -> list[MongoIndex]:
        """List indexes associated with this model"""
        indexes: list[MongoIndex] = []
        for handle, index in cls.Meta.__dict__.items():
            if handle.startswith("__") or handle in ("client", "collection"):
                continue
            if not isinstance(index, MongoIndex):
                logger.warning(
                    "Skipping index specified by handle '%s' - is of type '%s', 'MongoIndex' was expected.",
                    handle,
                    type(index),
                )
                continue
            indexes.append(index)
        return indexes

    async def save(self):
        """Update document or insert a new one"""
        await self._get_client().save(self)

    @classmethod
    async def find_one(
        cls: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs
    ) -> ModelType | None:
        """Find one either by its oid, by mongo filter query or by specified
        fields and their values using kwargs"""
        return await cls._get_client().find_one(cls, oid=oid, query=query, **kwargs)

    @classmethod
    async def find(
        cls: Type[ModelType],
        query: dict[str, Any] | None = None,
        sort: Sequence[tuple[str, "SortDirection"]] | None = None,
        skip: int = 0,
        limit: int = 0,
        **kwargs,
    ) -> AsyncGenerator[ModelType, None]:
        """Find all instances (or specify query to search)"""
        async for model in cls._get_client().find(cls, query=query, sort=sort, skip=skip, limit=limit, **kwargs):
            yield model

    @classmethod
    async def find_and_delete(cls: Type[ModelType], instance_or_oid: "BaseModel | str | ObjectId"):
        """Delete instance from collection"""
        oid: str | ObjectId
        if isinstance(instance_or_oid, BaseModel):
            oid = instance_or_oid.id
        else:
            oid = instance_or_oid
        await cls._get_client().delete(instance_or_model=cls, oid=oid)

    async def delete(self):
        """Delete current instance from collection"""
        await self._get_client().delete(self)

    async def exists(self) -> bool:
        """Check whether instance exists in the collection"""
        return (await self.find_one(oid=self.id)) is not None

    @classmethod
    def _get_client(cls) -> "MongORM":
        """Get client from model's Meta"""
        if not hasattr(cls.Meta, "client") or cls.Meta.client is None:
            raise MissingClientException(cls)
        return cls.Meta.client

    def json(
        self,
        *,
        include: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None,
        exclude: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None,
        by_alias: bool = False,
        skip_defaults: Optional[bool] = None,
        exclude_unset: bool = False,
        exclude_defaults: bool = False,
        exclude_none: bool = True,
        encoder: Optional[Callable[[Any], Any]] = None,
        models_as_dict: bool = True,
        **dumps_kwargs: Any,
    ) -> str:
        """Override pydantic's json method to use str as default for ObjectId and exclude none by default"""
        encoder = encoder or str
        return super().json(
            include=include,
            exclude=exclude,
            by_alias=by_alias,
            skip_defaults=skip_defaults,
            exclude_unset=exclude_unset,
            exclude_defaults=exclude_defaults,
            exclude_none=exclude_none,
            encoder=encoder,
            models_as_dict=models_as_dict,
            **dumps_kwargs,
        )

    id: ObjectId = pydantic.Field(alias="_id", default_factory=bson.objectid.ObjectId)
    created: datetime = pydantic.Field(default_factory=datetime.utcnow)

Ancestors

  • pydantic.main.BaseModel
  • pydantic.utils.Representation

Class variables

var Config

json encoders

var Meta

Indexes and meta definition

var created : datetime.datetime
var idObjectId

Static methods

async def find(query: dict[str, typing.Any] | None = None, sort: Optional[Sequence[tuple[str, 'SortDirection']]] = None, skip: int = 0, limit: int = 0, **kwargs) ‑> AsyncGenerator[~ModelType, None]

Find all instances (or specify query to search)

Expand source code
@classmethod
async def find(
    cls: Type[ModelType],
    query: dict[str, Any] | None = None,
    sort: Sequence[tuple[str, "SortDirection"]] | None = None,
    skip: int = 0,
    limit: int = 0,
    **kwargs,
) -> AsyncGenerator[ModelType, None]:
    """Find all instances (or specify query to search)"""
    async for model in cls._get_client().find(cls, query=query, sort=sort, skip=skip, limit=limit, **kwargs):
        yield model
async def find_and_delete(instance_or_oid: BaseModel | str | ObjectId)

Delete instance from collection

Expand source code
@classmethod
async def find_and_delete(cls: Type[ModelType], instance_or_oid: "BaseModel | str | ObjectId"):
    """Delete instance from collection"""
    oid: str | ObjectId
    if isinstance(instance_or_oid, BaseModel):
        oid = instance_or_oid.id
    else:
        oid = instance_or_oid
    await cls._get_client().delete(instance_or_model=cls, oid=oid)
async def find_one(oid: str | ObjectId | None = None, query: dict[str, typing.Any] | None = None, **kwargs) ‑> Optional[~ModelType]

Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs

Expand source code
@classmethod
async def find_one(
    cls: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs
) -> ModelType | None:
    """Find one either by its oid, by mongo filter query or by specified
    fields and their values using kwargs"""
    return await cls._get_client().find_one(cls, oid=oid, query=query, **kwargs)
def list_indexes() ‑> list[MongoIndex]

List indexes associated with this model

Expand source code
@classmethod
def list_indexes(cls) -> list[MongoIndex]:
    """List indexes associated with this model"""
    indexes: list[MongoIndex] = []
    for handle, index in cls.Meta.__dict__.items():
        if handle.startswith("__") or handle in ("client", "collection"):
            continue
        if not isinstance(index, MongoIndex):
            logger.warning(
                "Skipping index specified by handle '%s' - is of type '%s', 'MongoIndex' was expected.",
                handle,
                type(index),
            )
            continue
        indexes.append(index)
    return indexes

Methods

async def delete(self)

Delete current instance from collection

Expand source code
async def delete(self):
    """Delete current instance from collection"""
    await self._get_client().delete(self)
async def exists(self) ‑> bool

Check whether instance exists in the collection

Expand source code
async def exists(self) -> bool:
    """Check whether instance exists in the collection"""
    return (await self.find_one(oid=self.id)) is not None
def json(self, *, include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), ForwardRef(None)] = None, exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), ForwardRef(None)] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any) ‑> str

Override pydantic's json method to use str as default for ObjectId and exclude none by default

Expand source code
def json(
    self,
    *,
    include: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None,
    exclude: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = True,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any,
) -> str:
    """Override pydantic's json method to use str as default for ObjectId and exclude none by default"""
    encoder = encoder or str
    return super().json(
        include=include,
        exclude=exclude,
        by_alias=by_alias,
        skip_defaults=skip_defaults,
        exclude_unset=exclude_unset,
        exclude_defaults=exclude_defaults,
        exclude_none=exclude_none,
        encoder=encoder,
        models_as_dict=models_as_dict,
        **dumps_kwargs,
    )
async def save(self)

Update document or insert a new one

Expand source code
async def save(self):
    """Update document or insert a new one"""
    await self._get_client().save(self)
class MongORM (url: str, database: str)

MongoDB abstraction to work with pydantic models

Expand source code
class MongORM:
    """MongoDB abstraction to work with pydantic models"""

    def __init__(self, url: str, database: str):
        self.url = url
        self.client: "AgnosticClient" = AsyncIOMotorClient(self.url)
        self.database: "AgnosticDatabase" = self.client[database]

    def _get_collection(self, instance_or_model: BaseModel | Type[BaseModel]) -> "AgnosticCollection":
        """Get collection associated with instance (or model)"""
        if not hasattr(instance_or_model.Meta, "collection") or instance_or_model.Meta.collection is None:
            raise ValueError("Instance must be a valid root model with '__collection__' ClassVar")
        return self.database[instance_or_model.Meta.collection]

    async def find_one(
        self, model: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs
    ) -> ModelType | None:
        """Find one either by its oid, by mongo filter query or by specifying
        fields and their values using kwargs"""
        collection = self._get_collection(model)
        query = query or {}
        if oid:
            if isinstance(oid, str):
                oid = ObjectId(oid)
            query.update({"_id": oid})
        if kwargs:
            query.update(kwargs)
        found = await collection.find_one(query)
        if found is None:
            return None
        return model(**found)

    # pylint: disable=too-many-arguments
    async def find(
        self,
        model: Type[ModelType],
        query: dict[str, Any] | None = None,
        sort: Sequence[tuple[str, SortDirection]] | None = None,
        skip: int = 0,
        limit: int = 0,
        **kwargs,
    ) -> AsyncGenerator[ModelType, None]:
        """Find all instances (or specify query to search)"""
        sort_by = [(field, direction.value) for field, direction in (sort or [("created", SortDirection.ASCENDING)])]
        query = query or {}
        query.update(kwargs)
        collection = self._get_collection(model)
        cursor = collection.find(
            filter=query,
            skip=skip,
            limit=limit,
            sort=sort_by,
        )
        async for item in cursor:
            yield model(**item)

    async def update(self, instance: ModelType):
        """Update existing document in the collection"""
        collection = self._get_collection(instance)
        query = {"_id": instance.id}
        updated = await collection.replace_one(filter=query, replacement=instance.dict(by_alias=True))
        if updated.matched_count == 0:
            raise DocumentNotFound(collection.name, query)

    async def save(self, instance: ModelType):
        """Update existing document or insert a new one"""
        collection = self._get_collection(instance)
        await collection.replace_one(filter={"_id": instance.id}, replacement=instance.dict(by_alias=True), upsert=True)

    async def delete(
        self,
        instance_or_model: BaseModel | Type[BaseModel],
        oid: str | ObjectId | None = None,
    ):
        """Delete document by its oid"""
        if isinstance(instance_or_model, BaseModel):
            oid = instance_or_model.id
        else:
            if not oid:
                raise ValueError("oid must be specified if not providing model instance")
            if isinstance(oid, str):
                oid = ObjectId(oid)
        collection = self._get_collection(instance_or_model)
        result: pymongo.results.DeleteResult = await collection.delete_one(filter={"_id": oid})
        if result.deleted_count == 0:
            raise DocumentNotFound(collection=collection.name, query={"_id": oid})

    async def create_schema(self, models: Sequence[BaseModel]):
        """Create collections and indexes for specified models"""
        for model in models:
            indexes = model.list_indexes()
            if not indexes:
                continue
            collection = self._get_collection(model)
            for ind in indexes:
                await ind.create(collection)

Methods

async def create_schema(self, models: Sequence[BaseModel])

Create collections and indexes for specified models

Expand source code
async def create_schema(self, models: Sequence[BaseModel]):
    """Create collections and indexes for specified models"""
    for model in models:
        indexes = model.list_indexes()
        if not indexes:
            continue
        collection = self._get_collection(model)
        for ind in indexes:
            await ind.create(collection)
async def delete(self, instance_or_model: Union[BaseModel, Type[BaseModel]], oid: str | ObjectId | None = None)

Delete document by its oid

Expand source code
async def delete(
    self,
    instance_or_model: BaseModel | Type[BaseModel],
    oid: str | ObjectId | None = None,
):
    """Delete document by its oid"""
    if isinstance(instance_or_model, BaseModel):
        oid = instance_or_model.id
    else:
        if not oid:
            raise ValueError("oid must be specified if not providing model instance")
        if isinstance(oid, str):
            oid = ObjectId(oid)
    collection = self._get_collection(instance_or_model)
    result: pymongo.results.DeleteResult = await collection.delete_one(filter={"_id": oid})
    if result.deleted_count == 0:
        raise DocumentNotFound(collection=collection.name, query={"_id": oid})
async def find(self, model: Type[~ModelType], query: dict[str, typing.Any] | None = None, sort: Optional[Sequence[tuple[str, SortDirection]]] = None, skip: int = 0, limit: int = 0, **kwargs) ‑> AsyncGenerator[~ModelType, None]

Find all instances (or specify query to search)

Expand source code
async def find(
    self,
    model: Type[ModelType],
    query: dict[str, Any] | None = None,
    sort: Sequence[tuple[str, SortDirection]] | None = None,
    skip: int = 0,
    limit: int = 0,
    **kwargs,
) -> AsyncGenerator[ModelType, None]:
    """Find all instances (or specify query to search)"""
    sort_by = [(field, direction.value) for field, direction in (sort or [("created", SortDirection.ASCENDING)])]
    query = query or {}
    query.update(kwargs)
    collection = self._get_collection(model)
    cursor = collection.find(
        filter=query,
        skip=skip,
        limit=limit,
        sort=sort_by,
    )
    async for item in cursor:
        yield model(**item)
async def find_one(self, model: Type[~ModelType], oid: str | ObjectId | None = None, query: dict[str, typing.Any] | None = None, **kwargs) ‑> Optional[~ModelType]

Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs

Expand source code
async def find_one(
    self, model: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs
) -> ModelType | None:
    """Find one either by its oid, by mongo filter query or by specifying
    fields and their values using kwargs"""
    collection = self._get_collection(model)
    query = query or {}
    if oid:
        if isinstance(oid, str):
            oid = ObjectId(oid)
        query.update({"_id": oid})
    if kwargs:
        query.update(kwargs)
    found = await collection.find_one(query)
    if found is None:
        return None
    return model(**found)
async def save(self, instance: ~ModelType)

Update existing document or insert a new one

Expand source code
async def save(self, instance: ModelType):
    """Update existing document or insert a new one"""
    collection = self._get_collection(instance)
    await collection.replace_one(filter={"_id": instance.id}, replacement=instance.dict(by_alias=True), upsert=True)
async def update(self, instance: ~ModelType)

Update existing document in the collection

Expand source code
async def update(self, instance: ModelType):
    """Update existing document in the collection"""
    collection = self._get_collection(instance)
    query = {"_id": instance.id}
    updated = await collection.replace_one(filter=query, replacement=instance.dict(by_alias=True))
    if updated.matched_count == 0:
        raise DocumentNotFound(collection.name, query)
class MongoIndex (field: str, ftype: MongoIndexType = MongoIndexType.ASCENDING, *, unique: bool = False, sparse: bool = True, expire_seconds: int | None = None, compound_with: dict[str, MongoIndexType] | None = None, name: str | None = None, comment: str | None = None)

A mongo index

Expand source code
class MongoIndex:
    """A mongo index"""

    def __init__(
        self,
        field: str,
        ftype: MongoIndexType = MongoIndexType.ASCENDING,
        *,
        unique: bool = False,
        sparse: bool = True,
        expire_seconds: int | None = None,
        compound_with: dict[str, MongoIndexType] | None = None,
        name: str | None = None,
        comment: str | None = None
    ):
        self.field = field
        self.type = ftype
        self.unique = unique
        self.sparse = sparse
        self.compound_with = compound_with or {}
        self.expire_seconds = expire_seconds
        self.name = name
        self.comment = comment

    async def create(self, collection: "AgnosticCollection", **kwargs):
        """Create the index in the specified collection"""
        keys = [(self.field, self.type.value)]
        if self.compound_with:
            keys.extend(((field, ftype.value)) for field, ftype in self.compound_with.items())
        if self.name:
            kwargs.setdefault("name", self.name)
        if self.comment:
            kwargs.setdefault("comment", self.comment)
        if self.expire_seconds is not None:
            kwargs.setdefault("expireAfterSeconds", self.expire_seconds)
        kwargs.setdefault("unique", self.unique)
        kwargs.setdefault("sparse", self.sparse)

        return await collection.create_index(keys, **kwargs)

Methods

async def create(self, collection: AgnosticCollection, **kwargs)

Create the index in the specified collection

Expand source code
async def create(self, collection: "AgnosticCollection", **kwargs):
    """Create the index in the specified collection"""
    keys = [(self.field, self.type.value)]
    if self.compound_with:
        keys.extend(((field, ftype.value)) for field, ftype in self.compound_with.items())
    if self.name:
        kwargs.setdefault("name", self.name)
    if self.comment:
        kwargs.setdefault("comment", self.comment)
    if self.expire_seconds is not None:
        kwargs.setdefault("expireAfterSeconds", self.expire_seconds)
    kwargs.setdefault("unique", self.unique)
    kwargs.setdefault("sparse", self.sparse)

    return await collection.create_index(keys, **kwargs)
class MongoIndexType (value, names=None, *, module=None, qualname=None, type=None, start=1)

Mongo Index Type

Expand source code
class MongoIndexType(Enum):
    """Mongo Index Type"""

    ASCENDING = pymongo.ASCENDING
    DESCENDING = pymongo.DESCENDING
    GEO2D = pymongo.GEO2D
    GEOSPHERE = pymongo.GEOSPHERE
    HASHED = pymongo.HASHED
    TEXT = pymongo.TEXT

Ancestors

  • enum.Enum

Class variables

var ASCENDING
var DESCENDING
var GEO2D
var GEOSPHERE
var HASHED
var TEXT
class ObjectId (oid: Union[str, ForwardRef('ObjectId'), bytes, ForwardRef(None)] = None)

Validated ObjectId to be used with pydantic

Initialize a new ObjectId.

An ObjectId is a 12-byte unique identifier consisting of:

  • a 4-byte value representing the seconds since the Unix epoch,
  • a 5-byte random value,
  • a 3-byte counter, starting with a random value.

By default, ObjectId creates a new unique identifier. The optional parameter oid can be an :class:ObjectId, or any 12 :class:bytes.

For example, the 12 bytes b'foo-bar-quux' do not follow the ObjectId specification but they are acceptable input::

ObjectId(b'foo-bar-quux') ObjectId('666f6f2d6261722d71757578')

oid can also be a :class:str of 24 hex digits::

ObjectId('0123456789ab0123456789ab') ObjectId('0123456789ab0123456789ab')

Raises :class:~bson.errors.InvalidId if oid is not 12 bytes nor 24 hex digits, or :class:TypeError if oid is not an accepted type.

:Parameters: - oid (optional): a valid ObjectId.

Seealso: The MongoDB documentation on ObjectIds <http://dochub.mongodb.org/core/objectids>_.

Changed in version: 3.8

:class:~bson.objectid.ObjectId now implements the ObjectID specification version 0.2 <https://github.com/mongodb/specifications/blob/master/source/ objectid.rst>_.

Expand source code
class ObjectId(bson.objectid.ObjectId):
    """Validated ObjectId to be used with pydantic"""

    @classmethod
    def __get_validators__(cls):
        yield cls.validate

    @classmethod
    def validate(cls, val) -> bson.objectid.ObjectId:
        """Validate ObjectId"""
        if isinstance(val, str):
            return bson.objectid.ObjectId(val)
        if isinstance(val, bson.objectid.ObjectId):
            return val
        raise TypeError(f"Expected str or ObjectId, got '{type(val)}'")

Ancestors

  • bson.objectid.ObjectId

Static methods

def validate(val) ‑> bson.objectid.ObjectId

Validate ObjectId

Expand source code
@classmethod
def validate(cls, val) -> bson.objectid.ObjectId:
    """Validate ObjectId"""
    if isinstance(val, str):
        return bson.objectid.ObjectId(val)
    if isinstance(val, bson.objectid.ObjectId):
        return val
    raise TypeError(f"Expected str or ObjectId, got '{type(val)}'")