Package mongorm
MongORM
Expand source code
"""MongORM"""
from mongorm.index import MongoIndex, MongoIndexType
from mongorm.mongorm import BaseModel, MongORM, ObjectId
__all__ = ["BaseModel", "MongORM", "MongoIndex", "MongoIndexType", "ObjectId"]
Sub-modules
mongorm.exceptions
-
MongORM exceptions
mongorm.index
-
MongORM index
mongorm.mongorm
-
This code defines the basic Mongo Object-Relational Model (MongORM) framework. It consists of a BaseModel class and MongORM class. MongORM provides …
mongorm.version
-
Package version
Classes
class BaseModel (**data: Any)
-
Base model for all models
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class BaseModel(pydantic.BaseModel): """Base model for all models""" class Meta: """Indexes and meta definition""" client: "MongORM | None" = None collection: str | None = None class Config: """json encoders""" json_encoders = {bson.objectid.ObjectId: str} allow_population_by_field_name = True @classmethod def list_indexes(cls) -> list[MongoIndex]: """List indexes associated with this model""" indexes: list[MongoIndex] = [] for handle, index in cls.Meta.__dict__.items(): if handle.startswith("__") or handle in ("client", "collection"): continue if not isinstance(index, MongoIndex): logger.warning( "Skipping index specified by handle '%s' - is of type '%s', 'MongoIndex' was expected.", handle, type(index), ) continue indexes.append(index) return indexes async def save(self): """Update document or insert a new one""" await self._get_client().save(self) @classmethod async def find_one( cls: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs""" return await cls._get_client().find_one(cls, oid=oid, query=query, **kwargs) @classmethod async def find( cls: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, "SortDirection"]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" async for model in cls._get_client().find(cls, query=query, sort=sort, skip=skip, limit=limit, **kwargs): yield model @classmethod async def find_and_delete(cls: Type[ModelType], instance_or_oid: "BaseModel | str | ObjectId"): """Delete instance from collection""" oid: str | ObjectId if isinstance(instance_or_oid, BaseModel): oid = instance_or_oid.id else: oid = instance_or_oid await cls._get_client().delete(instance_or_model=cls, oid=oid) async def delete(self): """Delete current instance from collection""" await self._get_client().delete(self) async def exists(self) -> bool: """Check whether instance exists in the collection""" return (await self.find_one(oid=self.id)) is not None @classmethod def _get_client(cls) -> "MongORM": """Get client from model's Meta""" if not hasattr(cls.Meta, "client") or cls.Meta.client is None: raise MissingClientException(cls) return cls.Meta.client def json( self, *, include: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, exclude: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any, ) -> str: """Override pydantic's json method to use str as default for ObjectId and exclude none by default""" encoder = encoder or str return super().json( include=include, exclude=exclude, by_alias=by_alias, skip_defaults=skip_defaults, exclude_unset=exclude_unset, exclude_defaults=exclude_defaults, exclude_none=exclude_none, encoder=encoder, models_as_dict=models_as_dict, **dumps_kwargs, ) id: ObjectId = pydantic.Field(alias="_id", default_factory=bson.objectid.ObjectId) created: datetime = pydantic.Field(default_factory=datetime.utcnow)
Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Config
-
json encoders
var Meta
-
Indexes and meta definition
var created : datetime.datetime
var id : ObjectId
Static methods
async def find(query: dict[str, typing.Any] | None = None, sort: Optional[Sequence[tuple[str, 'SortDirection']]] = None, skip: int = 0, limit: int = 0, **kwargs) ‑> AsyncGenerator[~ModelType, None]
-
Find all instances (or specify query to search)
Expand source code
@classmethod async def find( cls: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, "SortDirection"]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" async for model in cls._get_client().find(cls, query=query, sort=sort, skip=skip, limit=limit, **kwargs): yield model
async def find_and_delete(instance_or_oid: BaseModel | str | ObjectId)
-
Delete instance from collection
Expand source code
@classmethod async def find_and_delete(cls: Type[ModelType], instance_or_oid: "BaseModel | str | ObjectId"): """Delete instance from collection""" oid: str | ObjectId if isinstance(instance_or_oid, BaseModel): oid = instance_or_oid.id else: oid = instance_or_oid await cls._get_client().delete(instance_or_model=cls, oid=oid)
async def find_one(oid: str | ObjectId | None = None, query: dict[str, typing.Any] | None = None, **kwargs) ‑> Optional[~ModelType]
-
Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs
Expand source code
@classmethod async def find_one( cls: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs""" return await cls._get_client().find_one(cls, oid=oid, query=query, **kwargs)
def list_indexes() ‑> list[MongoIndex]
-
List indexes associated with this model
Expand source code
@classmethod def list_indexes(cls) -> list[MongoIndex]: """List indexes associated with this model""" indexes: list[MongoIndex] = [] for handle, index in cls.Meta.__dict__.items(): if handle.startswith("__") or handle in ("client", "collection"): continue if not isinstance(index, MongoIndex): logger.warning( "Skipping index specified by handle '%s' - is of type '%s', 'MongoIndex' was expected.", handle, type(index), ) continue indexes.append(index) return indexes
Methods
async def delete(self)
-
Delete current instance from collection
Expand source code
async def delete(self): """Delete current instance from collection""" await self._get_client().delete(self)
async def exists(self) ‑> bool
-
Check whether instance exists in the collection
Expand source code
async def exists(self) -> bool: """Check whether instance exists in the collection""" return (await self.find_one(oid=self.id)) is not None
def json(self, *, include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), ForwardRef(None)] = None, exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), ForwardRef(None)] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any) ‑> str
-
Override pydantic's json method to use str as default for ObjectId and exclude none by default
Expand source code
def json( self, *, include: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, exclude: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any, ) -> str: """Override pydantic's json method to use str as default for ObjectId and exclude none by default""" encoder = encoder or str return super().json( include=include, exclude=exclude, by_alias=by_alias, skip_defaults=skip_defaults, exclude_unset=exclude_unset, exclude_defaults=exclude_defaults, exclude_none=exclude_none, encoder=encoder, models_as_dict=models_as_dict, **dumps_kwargs, )
async def save(self)
-
Update document or insert a new one
Expand source code
async def save(self): """Update document or insert a new one""" await self._get_client().save(self)
class MongORM (url: str, database: str)
-
MongoDB abstraction to work with pydantic models
Expand source code
class MongORM: """MongoDB abstraction to work with pydantic models""" def __init__(self, url: str, database: str): self.url = url self.client: "AgnosticClient" = AsyncIOMotorClient(self.url) self.database: "AgnosticDatabase" = self.client[database] def _get_collection(self, instance_or_model: BaseModel | Type[BaseModel]) -> "AgnosticCollection": """Get collection associated with instance (or model)""" if not hasattr(instance_or_model.Meta, "collection") or instance_or_model.Meta.collection is None: raise ValueError("Instance must be a valid root model with '__collection__' ClassVar") return self.database[instance_or_model.Meta.collection] async def find_one( self, model: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs""" collection = self._get_collection(model) query = query or {} if oid: if isinstance(oid, str): oid = ObjectId(oid) query.update({"_id": oid}) if kwargs: query.update(kwargs) found = await collection.find_one(query) if found is None: return None return model(**found) # pylint: disable=too-many-arguments async def find( self, model: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, SortDirection]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" sort_by = [(field, direction.value) for field, direction in (sort or [("created", SortDirection.ASCENDING)])] query = query or {} query.update(kwargs) collection = self._get_collection(model) cursor = collection.find( filter=query, skip=skip, limit=limit, sort=sort_by, ) async for item in cursor: yield model(**item) async def update(self, instance: ModelType): """Update existing document in the collection""" collection = self._get_collection(instance) query = {"_id": instance.id} updated = await collection.replace_one(filter=query, replacement=instance.dict(by_alias=True)) if updated.matched_count == 0: raise DocumentNotFound(collection.name, query) async def save(self, instance: ModelType): """Update existing document or insert a new one""" collection = self._get_collection(instance) await collection.replace_one(filter={"_id": instance.id}, replacement=instance.dict(by_alias=True), upsert=True) async def delete( self, instance_or_model: BaseModel | Type[BaseModel], oid: str | ObjectId | None = None, ): """Delete document by its oid""" if isinstance(instance_or_model, BaseModel): oid = instance_or_model.id else: if not oid: raise ValueError("oid must be specified if not providing model instance") if isinstance(oid, str): oid = ObjectId(oid) collection = self._get_collection(instance_or_model) result: pymongo.results.DeleteResult = await collection.delete_one(filter={"_id": oid}) if result.deleted_count == 0: raise DocumentNotFound(collection=collection.name, query={"_id": oid}) async def create_schema(self, models: Sequence[BaseModel]): """Create collections and indexes for specified models""" for model in models: indexes = model.list_indexes() if not indexes: continue collection = self._get_collection(model) for ind in indexes: await ind.create(collection)
Methods
async def create_schema(self, models: Sequence[BaseModel])
-
Create collections and indexes for specified models
Expand source code
async def create_schema(self, models: Sequence[BaseModel]): """Create collections and indexes for specified models""" for model in models: indexes = model.list_indexes() if not indexes: continue collection = self._get_collection(model) for ind in indexes: await ind.create(collection)
async def delete(self, instance_or_model: Union[BaseModel, Type[BaseModel]], oid: str | ObjectId | None = None)
-
Delete document by its oid
Expand source code
async def delete( self, instance_or_model: BaseModel | Type[BaseModel], oid: str | ObjectId | None = None, ): """Delete document by its oid""" if isinstance(instance_or_model, BaseModel): oid = instance_or_model.id else: if not oid: raise ValueError("oid must be specified if not providing model instance") if isinstance(oid, str): oid = ObjectId(oid) collection = self._get_collection(instance_or_model) result: pymongo.results.DeleteResult = await collection.delete_one(filter={"_id": oid}) if result.deleted_count == 0: raise DocumentNotFound(collection=collection.name, query={"_id": oid})
async def find(self, model: Type[~ModelType], query: dict[str, typing.Any] | None = None, sort: Optional[Sequence[tuple[str, SortDirection]]] = None, skip: int = 0, limit: int = 0, **kwargs) ‑> AsyncGenerator[~ModelType, None]
-
Find all instances (or specify query to search)
Expand source code
async def find( self, model: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, SortDirection]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" sort_by = [(field, direction.value) for field, direction in (sort or [("created", SortDirection.ASCENDING)])] query = query or {} query.update(kwargs) collection = self._get_collection(model) cursor = collection.find( filter=query, skip=skip, limit=limit, sort=sort_by, ) async for item in cursor: yield model(**item)
async def find_one(self, model: Type[~ModelType], oid: str | ObjectId | None = None, query: dict[str, typing.Any] | None = None, **kwargs) ‑> Optional[~ModelType]
-
Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs
Expand source code
async def find_one( self, model: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs""" collection = self._get_collection(model) query = query or {} if oid: if isinstance(oid, str): oid = ObjectId(oid) query.update({"_id": oid}) if kwargs: query.update(kwargs) found = await collection.find_one(query) if found is None: return None return model(**found)
async def save(self, instance: ~ModelType)
-
Update existing document or insert a new one
Expand source code
async def save(self, instance: ModelType): """Update existing document or insert a new one""" collection = self._get_collection(instance) await collection.replace_one(filter={"_id": instance.id}, replacement=instance.dict(by_alias=True), upsert=True)
async def update(self, instance: ~ModelType)
-
Update existing document in the collection
Expand source code
async def update(self, instance: ModelType): """Update existing document in the collection""" collection = self._get_collection(instance) query = {"_id": instance.id} updated = await collection.replace_one(filter=query, replacement=instance.dict(by_alias=True)) if updated.matched_count == 0: raise DocumentNotFound(collection.name, query)
class MongoIndex (field: str, ftype: MongoIndexType = MongoIndexType.ASCENDING, *, unique: bool = False, sparse: bool = True, expire_seconds: int | None = None, compound_with: dict[str, MongoIndexType] | None = None, name: str | None = None, comment: str | None = None)
-
A mongo index
Expand source code
class MongoIndex: """A mongo index""" def __init__( self, field: str, ftype: MongoIndexType = MongoIndexType.ASCENDING, *, unique: bool = False, sparse: bool = True, expire_seconds: int | None = None, compound_with: dict[str, MongoIndexType] | None = None, name: str | None = None, comment: str | None = None ): self.field = field self.type = ftype self.unique = unique self.sparse = sparse self.compound_with = compound_with or {} self.expire_seconds = expire_seconds self.name = name self.comment = comment async def create(self, collection: "AgnosticCollection", **kwargs): """Create the index in the specified collection""" keys = [(self.field, self.type.value)] if self.compound_with: keys.extend(((field, ftype.value)) for field, ftype in self.compound_with.items()) if self.name: kwargs.setdefault("name", self.name) if self.comment: kwargs.setdefault("comment", self.comment) if self.expire_seconds is not None: kwargs.setdefault("expireAfterSeconds", self.expire_seconds) kwargs.setdefault("unique", self.unique) kwargs.setdefault("sparse", self.sparse) return await collection.create_index(keys, **kwargs)
Methods
async def create(self, collection: AgnosticCollection, **kwargs)
-
Create the index in the specified collection
Expand source code
async def create(self, collection: "AgnosticCollection", **kwargs): """Create the index in the specified collection""" keys = [(self.field, self.type.value)] if self.compound_with: keys.extend(((field, ftype.value)) for field, ftype in self.compound_with.items()) if self.name: kwargs.setdefault("name", self.name) if self.comment: kwargs.setdefault("comment", self.comment) if self.expire_seconds is not None: kwargs.setdefault("expireAfterSeconds", self.expire_seconds) kwargs.setdefault("unique", self.unique) kwargs.setdefault("sparse", self.sparse) return await collection.create_index(keys, **kwargs)
class MongoIndexType (value, names=None, *, module=None, qualname=None, type=None, start=1)
-
Mongo Index Type
Expand source code
class MongoIndexType(Enum): """Mongo Index Type""" ASCENDING = pymongo.ASCENDING DESCENDING = pymongo.DESCENDING GEO2D = pymongo.GEO2D GEOSPHERE = pymongo.GEOSPHERE HASHED = pymongo.HASHED TEXT = pymongo.TEXT
Ancestors
- enum.Enum
Class variables
var ASCENDING
var DESCENDING
var GEO2D
var GEOSPHERE
var HASHED
var TEXT
class ObjectId (oid: Union[str, ForwardRef('ObjectId'), bytes, ForwardRef(None)] = None)
-
Validated ObjectId to be used with pydantic
Initialize a new ObjectId.
An ObjectId is a 12-byte unique identifier consisting of:
- a 4-byte value representing the seconds since the Unix epoch,
- a 5-byte random value,
- a 3-byte counter, starting with a random value.
By default,
ObjectId
creates a new unique identifier. The optional parameteroid
can be an :class:ObjectId
, or any 12 :class:bytes
.For example, the 12 bytes b'foo-bar-quux' do not follow the ObjectId specification but they are acceptable input::
ObjectId(b'foo-bar-quux') ObjectId('666f6f2d6261722d71757578')
oid
can also be a :class:str
of 24 hex digits::ObjectId('0123456789ab0123456789ab') ObjectId('0123456789ab0123456789ab')
Raises :class:
~bson.errors.InvalidId
ifoid
is not 12 bytes nor 24 hex digits, or :class:TypeError
ifoid
is not an accepted type.:Parameters: -
oid
(optional): a valid ObjectId.Seealso: The MongoDB documentation on
ObjectIds <http://dochub.mongodb.org/core/objectids>
_.Changed in version: 3.8
:class:
~bson.objectid.ObjectId
now implements theObjectID specification version 0.2 <https://github.com/mongodb/specifications/blob/master/source/ objectid.rst>
_.Expand source code
class ObjectId(bson.objectid.ObjectId): """Validated ObjectId to be used with pydantic""" @classmethod def __get_validators__(cls): yield cls.validate @classmethod def validate(cls, val) -> bson.objectid.ObjectId: """Validate ObjectId""" if isinstance(val, str): return bson.objectid.ObjectId(val) if isinstance(val, bson.objectid.ObjectId): return val raise TypeError(f"Expected str or ObjectId, got '{type(val)}'")
Ancestors
- bson.objectid.ObjectId
Static methods
def validate(val) ‑> bson.objectid.ObjectId
-
Validate ObjectId
Expand source code
@classmethod def validate(cls, val) -> bson.objectid.ObjectId: """Validate ObjectId""" if isinstance(val, str): return bson.objectid.ObjectId(val) if isinstance(val, bson.objectid.ObjectId): return val raise TypeError(f"Expected str or ObjectId, got '{type(val)}'")