Package mongorm
MongORM
Expand source code
"""MongORM"""
from mongorm.index import MongoIndex, MongoIndexType
from mongorm.mongorm import BaseModel, MongORM, ObjectId
__all__ = ["BaseModel", "MongORM", "MongoIndex", "MongoIndexType", "ObjectId"]
Sub-modules
mongorm.exceptions-
MongORM exceptions
mongorm.index-
MongORM index
mongorm.mongorm-
This code defines the basic Mongo Object-Relational Model (MongORM) framework. It consists of a BaseModel class and MongORM class. MongORM provides …
mongorm.version-
Package version
Classes
class BaseModel (**data: Any)-
Base model for all models
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class BaseModel(pydantic.BaseModel): """Base model for all models""" class Meta: """Indexes and meta definition""" client: "MongORM | None" = None collection: str | None = None class Config: """json encoders""" json_encoders = {bson.objectid.ObjectId: str} allow_population_by_field_name = True @classmethod def list_indexes(cls) -> list[MongoIndex]: """List indexes associated with this model""" indexes: list[MongoIndex] = [] for handle, index in cls.Meta.__dict__.items(): if handle.startswith("__") or handle in ("client", "collection"): continue if not isinstance(index, MongoIndex): logger.warning( "Skipping index specified by handle '%s' - is of type '%s', 'MongoIndex' was expected.", handle, type(index), ) continue indexes.append(index) return indexes async def save(self): """Update document or insert a new one""" await self._get_client().save(self) @classmethod async def find_one( cls: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs""" return await cls._get_client().find_one(cls, oid=oid, query=query, **kwargs) @classmethod async def find( cls: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, "SortDirection"]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" async for model in cls._get_client().find(cls, query=query, sort=sort, skip=skip, limit=limit, **kwargs): yield model @classmethod async def find_and_delete(cls: Type[ModelType], instance_or_oid: "BaseModel | str | ObjectId"): """Delete instance from collection""" oid: str | ObjectId if isinstance(instance_or_oid, BaseModel): oid = instance_or_oid.id else: oid = instance_or_oid await cls._get_client().delete(instance_or_model=cls, oid=oid) async def delete(self): """Delete current instance from collection""" await self._get_client().delete(self) async def exists(self) -> bool: """Check whether instance exists in the collection""" return (await self.find_one(oid=self.id)) is not None @classmethod def _get_client(cls) -> "MongORM": """Get client from model's Meta""" if not hasattr(cls.Meta, "client") or cls.Meta.client is None: raise MissingClientException(cls) return cls.Meta.client def json( self, *, include: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, exclude: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any, ) -> str: """Override pydantic's json method to use str as default for ObjectId and exclude none by default""" encoder = encoder or str return super().json( include=include, exclude=exclude, by_alias=by_alias, skip_defaults=skip_defaults, exclude_unset=exclude_unset, exclude_defaults=exclude_defaults, exclude_none=exclude_none, encoder=encoder, models_as_dict=models_as_dict, **dumps_kwargs, ) id: ObjectId = pydantic.Field(alias="_id", default_factory=bson.objectid.ObjectId) created: datetime = pydantic.Field(default_factory=datetime.utcnow)Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Config-
json encoders
var Meta-
Indexes and meta definition
var created : datetime.datetimevar id : ObjectId
Static methods
async def find(query: dict[str, typing.Any] | None = None, sort: Optional[Sequence[tuple[str, 'SortDirection']]] = None, skip: int = 0, limit: int = 0, **kwargs) ‑> AsyncGenerator[~ModelType, None]-
Find all instances (or specify query to search)
Expand source code
@classmethod async def find( cls: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, "SortDirection"]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" async for model in cls._get_client().find(cls, query=query, sort=sort, skip=skip, limit=limit, **kwargs): yield model async def find_and_delete(instance_or_oid: BaseModel | str | ObjectId)-
Delete instance from collection
Expand source code
@classmethod async def find_and_delete(cls: Type[ModelType], instance_or_oid: "BaseModel | str | ObjectId"): """Delete instance from collection""" oid: str | ObjectId if isinstance(instance_or_oid, BaseModel): oid = instance_or_oid.id else: oid = instance_or_oid await cls._get_client().delete(instance_or_model=cls, oid=oid) async def find_one(oid: str | ObjectId | None = None, query: dict[str, typing.Any] | None = None, **kwargs) ‑> Optional[~ModelType]-
Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs
Expand source code
@classmethod async def find_one( cls: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs""" return await cls._get_client().find_one(cls, oid=oid, query=query, **kwargs) def list_indexes() ‑> list[MongoIndex]-
List indexes associated with this model
Expand source code
@classmethod def list_indexes(cls) -> list[MongoIndex]: """List indexes associated with this model""" indexes: list[MongoIndex] = [] for handle, index in cls.Meta.__dict__.items(): if handle.startswith("__") or handle in ("client", "collection"): continue if not isinstance(index, MongoIndex): logger.warning( "Skipping index specified by handle '%s' - is of type '%s', 'MongoIndex' was expected.", handle, type(index), ) continue indexes.append(index) return indexes
Methods
async def delete(self)-
Delete current instance from collection
Expand source code
async def delete(self): """Delete current instance from collection""" await self._get_client().delete(self) async def exists(self) ‑> bool-
Check whether instance exists in the collection
Expand source code
async def exists(self) -> bool: """Check whether instance exists in the collection""" return (await self.find_one(oid=self.id)) is not None def json(self, *, include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), ForwardRef(None)] = None, exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), ForwardRef(None)] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any) ‑> str-
Override pydantic's json method to use str as default for ObjectId and exclude none by default
Expand source code
def json( self, *, include: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, exclude: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any, ) -> str: """Override pydantic's json method to use str as default for ObjectId and exclude none by default""" encoder = encoder or str return super().json( include=include, exclude=exclude, by_alias=by_alias, skip_defaults=skip_defaults, exclude_unset=exclude_unset, exclude_defaults=exclude_defaults, exclude_none=exclude_none, encoder=encoder, models_as_dict=models_as_dict, **dumps_kwargs, ) async def save(self)-
Update document or insert a new one
Expand source code
async def save(self): """Update document or insert a new one""" await self._get_client().save(self)
class MongORM (url: str, database: str)-
MongoDB abstraction to work with pydantic models
Expand source code
class MongORM: """MongoDB abstraction to work with pydantic models""" def __init__(self, url: str, database: str): self.url = url self.client: "AgnosticClient" = AsyncIOMotorClient(self.url) self.database: "AgnosticDatabase" = self.client[database] def _get_collection(self, instance_or_model: BaseModel | Type[BaseModel]) -> "AgnosticCollection": """Get collection associated with instance (or model)""" if not hasattr(instance_or_model.Meta, "collection") or instance_or_model.Meta.collection is None: raise ValueError("Instance must be a valid root model with '__collection__' ClassVar") return self.database[instance_or_model.Meta.collection] async def find_one( self, model: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs""" collection = self._get_collection(model) query = query or {} if oid: if isinstance(oid, str): oid = ObjectId(oid) query.update({"_id": oid}) if kwargs: query.update(kwargs) found = await collection.find_one(query) if found is None: return None return model(**found) # pylint: disable=too-many-arguments async def find( self, model: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, SortDirection]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" sort_by = [(field, direction.value) for field, direction in (sort or [("created", SortDirection.ASCENDING)])] query = query or {} query.update(kwargs) collection = self._get_collection(model) cursor = collection.find( filter=query, skip=skip, limit=limit, sort=sort_by, ) async for item in cursor: yield model(**item) async def update(self, instance: ModelType): """Update existing document in the collection""" collection = self._get_collection(instance) query = {"_id": instance.id} updated = await collection.replace_one(filter=query, replacement=instance.dict(by_alias=True)) if updated.matched_count == 0: raise DocumentNotFound(collection.name, query) async def save(self, instance: ModelType): """Update existing document or insert a new one""" collection = self._get_collection(instance) await collection.replace_one(filter={"_id": instance.id}, replacement=instance.dict(by_alias=True), upsert=True) async def delete( self, instance_or_model: BaseModel | Type[BaseModel], oid: str | ObjectId | None = None, ): """Delete document by its oid""" if isinstance(instance_or_model, BaseModel): oid = instance_or_model.id else: if not oid: raise ValueError("oid must be specified if not providing model instance") if isinstance(oid, str): oid = ObjectId(oid) collection = self._get_collection(instance_or_model) result: pymongo.results.DeleteResult = await collection.delete_one(filter={"_id": oid}) if result.deleted_count == 0: raise DocumentNotFound(collection=collection.name, query={"_id": oid}) async def create_schema(self, models: Sequence[BaseModel]): """Create collections and indexes for specified models""" for model in models: indexes = model.list_indexes() if not indexes: continue collection = self._get_collection(model) for ind in indexes: await ind.create(collection)Methods
async def create_schema(self, models: Sequence[BaseModel])-
Create collections and indexes for specified models
Expand source code
async def create_schema(self, models: Sequence[BaseModel]): """Create collections and indexes for specified models""" for model in models: indexes = model.list_indexes() if not indexes: continue collection = self._get_collection(model) for ind in indexes: await ind.create(collection) async def delete(self, instance_or_model: Union[BaseModel, Type[BaseModel]], oid: str | ObjectId | None = None)-
Delete document by its oid
Expand source code
async def delete( self, instance_or_model: BaseModel | Type[BaseModel], oid: str | ObjectId | None = None, ): """Delete document by its oid""" if isinstance(instance_or_model, BaseModel): oid = instance_or_model.id else: if not oid: raise ValueError("oid must be specified if not providing model instance") if isinstance(oid, str): oid = ObjectId(oid) collection = self._get_collection(instance_or_model) result: pymongo.results.DeleteResult = await collection.delete_one(filter={"_id": oid}) if result.deleted_count == 0: raise DocumentNotFound(collection=collection.name, query={"_id": oid}) async def find(self, model: Type[~ModelType], query: dict[str, typing.Any] | None = None, sort: Optional[Sequence[tuple[str, SortDirection]]] = None, skip: int = 0, limit: int = 0, **kwargs) ‑> AsyncGenerator[~ModelType, None]-
Find all instances (or specify query to search)
Expand source code
async def find( self, model: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, SortDirection]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" sort_by = [(field, direction.value) for field, direction in (sort or [("created", SortDirection.ASCENDING)])] query = query or {} query.update(kwargs) collection = self._get_collection(model) cursor = collection.find( filter=query, skip=skip, limit=limit, sort=sort_by, ) async for item in cursor: yield model(**item) async def find_one(self, model: Type[~ModelType], oid: str | ObjectId | None = None, query: dict[str, typing.Any] | None = None, **kwargs) ‑> Optional[~ModelType]-
Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs
Expand source code
async def find_one( self, model: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs""" collection = self._get_collection(model) query = query or {} if oid: if isinstance(oid, str): oid = ObjectId(oid) query.update({"_id": oid}) if kwargs: query.update(kwargs) found = await collection.find_one(query) if found is None: return None return model(**found) async def save(self, instance: ~ModelType)-
Update existing document or insert a new one
Expand source code
async def save(self, instance: ModelType): """Update existing document or insert a new one""" collection = self._get_collection(instance) await collection.replace_one(filter={"_id": instance.id}, replacement=instance.dict(by_alias=True), upsert=True) async def update(self, instance: ~ModelType)-
Update existing document in the collection
Expand source code
async def update(self, instance: ModelType): """Update existing document in the collection""" collection = self._get_collection(instance) query = {"_id": instance.id} updated = await collection.replace_one(filter=query, replacement=instance.dict(by_alias=True)) if updated.matched_count == 0: raise DocumentNotFound(collection.name, query)
class MongoIndex (field: str, ftype: MongoIndexType = MongoIndexType.ASCENDING, *, unique: bool = False, sparse: bool = True, expire_seconds: int | None = None, compound_with: dict[str, MongoIndexType] | None = None, name: str | None = None, comment: str | None = None)-
A mongo index
Expand source code
class MongoIndex: """A mongo index""" def __init__( self, field: str, ftype: MongoIndexType = MongoIndexType.ASCENDING, *, unique: bool = False, sparse: bool = True, expire_seconds: int | None = None, compound_with: dict[str, MongoIndexType] | None = None, name: str | None = None, comment: str | None = None ): self.field = field self.type = ftype self.unique = unique self.sparse = sparse self.compound_with = compound_with or {} self.expire_seconds = expire_seconds self.name = name self.comment = comment async def create(self, collection: "AgnosticCollection", **kwargs): """Create the index in the specified collection""" keys = [(self.field, self.type.value)] if self.compound_with: keys.extend(((field, ftype.value)) for field, ftype in self.compound_with.items()) if self.name: kwargs.setdefault("name", self.name) if self.comment: kwargs.setdefault("comment", self.comment) if self.expire_seconds is not None: kwargs.setdefault("expireAfterSeconds", self.expire_seconds) kwargs.setdefault("unique", self.unique) kwargs.setdefault("sparse", self.sparse) return await collection.create_index(keys, **kwargs)Methods
async def create(self, collection: AgnosticCollection, **kwargs)-
Create the index in the specified collection
Expand source code
async def create(self, collection: "AgnosticCollection", **kwargs): """Create the index in the specified collection""" keys = [(self.field, self.type.value)] if self.compound_with: keys.extend(((field, ftype.value)) for field, ftype in self.compound_with.items()) if self.name: kwargs.setdefault("name", self.name) if self.comment: kwargs.setdefault("comment", self.comment) if self.expire_seconds is not None: kwargs.setdefault("expireAfterSeconds", self.expire_seconds) kwargs.setdefault("unique", self.unique) kwargs.setdefault("sparse", self.sparse) return await collection.create_index(keys, **kwargs)
class MongoIndexType (value, names=None, *, module=None, qualname=None, type=None, start=1)-
Mongo Index Type
Expand source code
class MongoIndexType(Enum): """Mongo Index Type""" ASCENDING = pymongo.ASCENDING DESCENDING = pymongo.DESCENDING GEO2D = pymongo.GEO2D GEOSPHERE = pymongo.GEOSPHERE HASHED = pymongo.HASHED TEXT = pymongo.TEXTAncestors
- enum.Enum
Class variables
var ASCENDINGvar DESCENDINGvar GEO2Dvar GEOSPHEREvar HASHEDvar TEXT
class ObjectId (oid: Union[str, ForwardRef('ObjectId'), bytes, ForwardRef(None)] = None)-
Validated ObjectId to be used with pydantic
Initialize a new ObjectId.
An ObjectId is a 12-byte unique identifier consisting of:
- a 4-byte value representing the seconds since the Unix epoch,
- a 5-byte random value,
- a 3-byte counter, starting with a random value.
By default,
ObjectIdcreates a new unique identifier. The optional parameteroidcan be an :class:ObjectId, or any 12 :class:bytes.For example, the 12 bytes b'foo-bar-quux' do not follow the ObjectId specification but they are acceptable input::
ObjectId(b'foo-bar-quux') ObjectId('666f6f2d6261722d71757578')
oidcan also be a :class:strof 24 hex digits::ObjectId('0123456789ab0123456789ab') ObjectId('0123456789ab0123456789ab')
Raises :class:
~bson.errors.InvalidIdifoidis not 12 bytes nor 24 hex digits, or :class:TypeErrorifoidis not an accepted type.:Parameters: -
oid(optional): a valid ObjectId.Seealso: The MongoDB documentation on
ObjectIds <http://dochub.mongodb.org/core/objectids>_.Changed in version: 3.8
:class:
~bson.objectid.ObjectIdnow implements theObjectID specification version 0.2 <https://github.com/mongodb/specifications/blob/master/source/ objectid.rst>_.Expand source code
class ObjectId(bson.objectid.ObjectId): """Validated ObjectId to be used with pydantic""" @classmethod def __get_validators__(cls): yield cls.validate @classmethod def validate(cls, val) -> bson.objectid.ObjectId: """Validate ObjectId""" if isinstance(val, str): return bson.objectid.ObjectId(val) if isinstance(val, bson.objectid.ObjectId): return val raise TypeError(f"Expected str or ObjectId, got '{type(val)}'")Ancestors
- bson.objectid.ObjectId
Static methods
def validate(val) ‑> bson.objectid.ObjectId-
Validate ObjectId
Expand source code
@classmethod def validate(cls, val) -> bson.objectid.ObjectId: """Validate ObjectId""" if isinstance(val, str): return bson.objectid.ObjectId(val) if isinstance(val, bson.objectid.ObjectId): return val raise TypeError(f"Expected str or ObjectId, got '{type(val)}'")