Module mongorm.mongorm
This code defines the basic Mongo Object-Relational Model (MongORM) framework. It consists of a BaseModel class and MongORM class. MongORM provides implementation for basic create, read, update, and delete (CRUD) operations on MongoDB documents with a Pydantic schema. BaseModel contains the meta-model defining the schema as well as support for CRUD operations. It is a subclass of pydantic.BaseModel and defines a default id and created timestamp field. Methods from both classes are used to connect to and operate on MongoDB databases. Lastly, there is an ObjectId class that serves as an OID within Pydantic schemaized documents. The code also includes some helper classes and enumerations such as MongoIndex and SortDirection to assist with the querying of the database.
Expand source code
"""This code defines the basic Mongo Object-Relational Model (MongORM) framework.
It consists of a BaseModel class and MongORM class. MongORM provides implementation for basic create, read, update,
and delete (CRUD) operations on MongoDB documents with a Pydantic schema. BaseModel contains the meta-model
defining the schema as well as support for CRUD operations. It is a subclass of pydantic.BaseModel
and defines a default id and created timestamp field. Methods from both classes are used to connect to and operate
on MongoDB databases. Lastly, there is an ObjectId class that serves as an OID within Pydantic schemaized documents.
The code also includes some helper classes and enumerations such as MongoIndex and SortDirection to assist with
the querying of the database.
"""
import logging
from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING, Any, AsyncGenerator, Callable, Optional, Sequence, Type, TypeVar, Union
import bson
import pydantic
import pymongo
import pymongo.errors
import pymongo.results
from motor.motor_asyncio import AsyncIOMotorClient
from mongorm.exceptions import DocumentNotFound, MissingClientException
from mongorm.index import MongoIndex
if TYPE_CHECKING:
from motor.core import AgnosticClient, AgnosticCollection, AgnosticDatabase
from pydantic.main import AbstractSetIntStr, MappingIntStrAny
ModelType = TypeVar("ModelType", bound="BaseModel") # pylint: disable=invalid-name
logger = logging.getLogger(__name__)
class SortDirection(Enum):
"""Sort direction"""
ASCENDING = pymongo.ASCENDING
DESCENDING = pymongo.DESCENDING
class ObjectId(bson.objectid.ObjectId):
"""Validated ObjectId to be used with pydantic"""
@classmethod
def __get_validators__(cls):
yield cls.validate
@classmethod
def validate(cls, val) -> bson.objectid.ObjectId:
"""Validate ObjectId"""
if isinstance(val, str):
return bson.objectid.ObjectId(val)
if isinstance(val, bson.objectid.ObjectId):
return val
raise TypeError(f"Expected str or ObjectId, got '{type(val)}'")
class BaseModel(pydantic.BaseModel):
"""Base model for all models"""
class Meta:
"""Indexes and meta definition"""
client: "MongORM | None" = None
collection: str | None = None
class Config:
"""json encoders"""
json_encoders = {bson.objectid.ObjectId: str}
allow_population_by_field_name = True
@classmethod
def list_indexes(cls) -> list[MongoIndex]:
"""List indexes associated with this model"""
indexes: list[MongoIndex] = []
for handle, index in cls.Meta.__dict__.items():
if handle.startswith("__") or handle in ("client", "collection"):
continue
if not isinstance(index, MongoIndex):
logger.warning(
"Skipping index specified by handle '%s' - is of type '%s', 'MongoIndex' was expected.",
handle,
type(index),
)
continue
indexes.append(index)
return indexes
async def save(self):
"""Update document or insert a new one"""
await self._get_client().save(self)
@classmethod
async def find_one(
cls: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs
) -> ModelType | None:
"""Find one either by its oid, by mongo filter query or by specified
fields and their values using kwargs"""
return await cls._get_client().find_one(cls, oid=oid, query=query, **kwargs)
@classmethod
async def find(
cls: Type[ModelType],
query: dict[str, Any] | None = None,
sort: Sequence[tuple[str, "SortDirection"]] | None = None,
skip: int = 0,
limit: int = 0,
**kwargs,
) -> AsyncGenerator[ModelType, None]:
"""Find all instances (or specify query to search)"""
async for model in cls._get_client().find(cls, query=query, sort=sort, skip=skip, limit=limit, **kwargs):
yield model
@classmethod
async def find_and_delete(cls: Type[ModelType], instance_or_oid: "BaseModel | str | ObjectId"):
"""Delete instance from collection"""
oid: str | ObjectId
if isinstance(instance_or_oid, BaseModel):
oid = instance_or_oid.id
else:
oid = instance_or_oid
await cls._get_client().delete(instance_or_model=cls, oid=oid)
async def delete(self):
"""Delete current instance from collection"""
await self._get_client().delete(self)
async def exists(self) -> bool:
"""Check whether instance exists in the collection"""
return (await self.find_one(oid=self.id)) is not None
@classmethod
def _get_client(cls) -> "MongORM":
"""Get client from model's Meta"""
if not hasattr(cls.Meta, "client") or cls.Meta.client is None:
raise MissingClientException(cls)
return cls.Meta.client
def json(
self,
*,
include: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None,
exclude: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None,
by_alias: bool = False,
skip_defaults: Optional[bool] = None,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = True,
encoder: Optional[Callable[[Any], Any]] = None,
models_as_dict: bool = True,
**dumps_kwargs: Any,
) -> str:
"""Override pydantic's json method to use str as default for ObjectId and exclude none by default"""
encoder = encoder or str
return super().json(
include=include,
exclude=exclude,
by_alias=by_alias,
skip_defaults=skip_defaults,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
encoder=encoder,
models_as_dict=models_as_dict,
**dumps_kwargs,
)
id: ObjectId = pydantic.Field(alias="_id", default_factory=bson.objectid.ObjectId)
created: datetime = pydantic.Field(default_factory=datetime.utcnow)
class MongORM:
"""MongoDB abstraction to work with pydantic models"""
def __init__(self, url: str, database: str):
self.url = url
self.client: "AgnosticClient" = AsyncIOMotorClient(self.url)
self.database: "AgnosticDatabase" = self.client[database]
def _get_collection(self, instance_or_model: BaseModel | Type[BaseModel]) -> "AgnosticCollection":
"""Get collection associated with instance (or model)"""
if not hasattr(instance_or_model.Meta, "collection") or instance_or_model.Meta.collection is None:
raise ValueError("Instance must be a valid root model with '__collection__' ClassVar")
return self.database[instance_or_model.Meta.collection]
async def find_one(
self, model: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs
) -> ModelType | None:
"""Find one either by its oid, by mongo filter query or by specifying
fields and their values using kwargs"""
collection = self._get_collection(model)
query = query or {}
if oid:
if isinstance(oid, str):
oid = ObjectId(oid)
query.update({"_id": oid})
if kwargs:
query.update(kwargs)
found = await collection.find_one(query)
if found is None:
return None
return model(**found)
# pylint: disable=too-many-arguments
async def find(
self,
model: Type[ModelType],
query: dict[str, Any] | None = None,
sort: Sequence[tuple[str, SortDirection]] | None = None,
skip: int = 0,
limit: int = 0,
**kwargs,
) -> AsyncGenerator[ModelType, None]:
"""Find all instances (or specify query to search)"""
sort_by = [(field, direction.value) for field, direction in (sort or [("created", SortDirection.ASCENDING)])]
query = query or {}
query.update(kwargs)
collection = self._get_collection(model)
cursor = collection.find(
filter=query,
skip=skip,
limit=limit,
sort=sort_by,
)
async for item in cursor:
yield model(**item)
async def update(self, instance: ModelType):
"""Update existing document in the collection"""
collection = self._get_collection(instance)
query = {"_id": instance.id}
updated = await collection.replace_one(filter=query, replacement=instance.dict(by_alias=True))
if updated.matched_count == 0:
raise DocumentNotFound(collection.name, query)
async def save(self, instance: ModelType):
"""Update existing document or insert a new one"""
collection = self._get_collection(instance)
await collection.replace_one(filter={"_id": instance.id}, replacement=instance.dict(by_alias=True), upsert=True)
async def delete(
self,
instance_or_model: BaseModel | Type[BaseModel],
oid: str | ObjectId | None = None,
):
"""Delete document by its oid"""
if isinstance(instance_or_model, BaseModel):
oid = instance_or_model.id
else:
if not oid:
raise ValueError("oid must be specified if not providing model instance")
if isinstance(oid, str):
oid = ObjectId(oid)
collection = self._get_collection(instance_or_model)
result: pymongo.results.DeleteResult = await collection.delete_one(filter={"_id": oid})
if result.deleted_count == 0:
raise DocumentNotFound(collection=collection.name, query={"_id": oid})
async def create_schema(self, models: Sequence[BaseModel]):
"""Create collections and indexes for specified models"""
for model in models:
indexes = model.list_indexes()
if not indexes:
continue
collection = self._get_collection(model)
for ind in indexes:
await ind.create(collection)
Classes
class BaseModel (**data: Any)-
Base model for all models
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class BaseModel(pydantic.BaseModel): """Base model for all models""" class Meta: """Indexes and meta definition""" client: "MongORM | None" = None collection: str | None = None class Config: """json encoders""" json_encoders = {bson.objectid.ObjectId: str} allow_population_by_field_name = True @classmethod def list_indexes(cls) -> list[MongoIndex]: """List indexes associated with this model""" indexes: list[MongoIndex] = [] for handle, index in cls.Meta.__dict__.items(): if handle.startswith("__") or handle in ("client", "collection"): continue if not isinstance(index, MongoIndex): logger.warning( "Skipping index specified by handle '%s' - is of type '%s', 'MongoIndex' was expected.", handle, type(index), ) continue indexes.append(index) return indexes async def save(self): """Update document or insert a new one""" await self._get_client().save(self) @classmethod async def find_one( cls: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs""" return await cls._get_client().find_one(cls, oid=oid, query=query, **kwargs) @classmethod async def find( cls: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, "SortDirection"]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" async for model in cls._get_client().find(cls, query=query, sort=sort, skip=skip, limit=limit, **kwargs): yield model @classmethod async def find_and_delete(cls: Type[ModelType], instance_or_oid: "BaseModel | str | ObjectId"): """Delete instance from collection""" oid: str | ObjectId if isinstance(instance_or_oid, BaseModel): oid = instance_or_oid.id else: oid = instance_or_oid await cls._get_client().delete(instance_or_model=cls, oid=oid) async def delete(self): """Delete current instance from collection""" await self._get_client().delete(self) async def exists(self) -> bool: """Check whether instance exists in the collection""" return (await self.find_one(oid=self.id)) is not None @classmethod def _get_client(cls) -> "MongORM": """Get client from model's Meta""" if not hasattr(cls.Meta, "client") or cls.Meta.client is None: raise MissingClientException(cls) return cls.Meta.client def json( self, *, include: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, exclude: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any, ) -> str: """Override pydantic's json method to use str as default for ObjectId and exclude none by default""" encoder = encoder or str return super().json( include=include, exclude=exclude, by_alias=by_alias, skip_defaults=skip_defaults, exclude_unset=exclude_unset, exclude_defaults=exclude_defaults, exclude_none=exclude_none, encoder=encoder, models_as_dict=models_as_dict, **dumps_kwargs, ) id: ObjectId = pydantic.Field(alias="_id", default_factory=bson.objectid.ObjectId) created: datetime = pydantic.Field(default_factory=datetime.utcnow)Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var Config-
json encoders
var Meta-
Indexes and meta definition
var created : datetime.datetimevar id : ObjectId
Static methods
async def find(query: dict[str, typing.Any] | None = None, sort: Optional[Sequence[tuple[str, 'SortDirection']]] = None, skip: int = 0, limit: int = 0, **kwargs) ‑> AsyncGenerator[~ModelType, None]-
Find all instances (or specify query to search)
Expand source code
@classmethod async def find( cls: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, "SortDirection"]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" async for model in cls._get_client().find(cls, query=query, sort=sort, skip=skip, limit=limit, **kwargs): yield model async def find_and_delete(instance_or_oid: BaseModel | str | ObjectId)-
Delete instance from collection
Expand source code
@classmethod async def find_and_delete(cls: Type[ModelType], instance_or_oid: "BaseModel | str | ObjectId"): """Delete instance from collection""" oid: str | ObjectId if isinstance(instance_or_oid, BaseModel): oid = instance_or_oid.id else: oid = instance_or_oid await cls._get_client().delete(instance_or_model=cls, oid=oid) async def find_one(oid: str | ObjectId | None = None, query: dict[str, typing.Any] | None = None, **kwargs) ‑> Optional[~ModelType]-
Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs
Expand source code
@classmethod async def find_one( cls: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specified fields and their values using kwargs""" return await cls._get_client().find_one(cls, oid=oid, query=query, **kwargs) def list_indexes() ‑> list[MongoIndex]-
List indexes associated with this model
Expand source code
@classmethod def list_indexes(cls) -> list[MongoIndex]: """List indexes associated with this model""" indexes: list[MongoIndex] = [] for handle, index in cls.Meta.__dict__.items(): if handle.startswith("__") or handle in ("client", "collection"): continue if not isinstance(index, MongoIndex): logger.warning( "Skipping index specified by handle '%s' - is of type '%s', 'MongoIndex' was expected.", handle, type(index), ) continue indexes.append(index) return indexes
Methods
async def delete(self)-
Delete current instance from collection
Expand source code
async def delete(self): """Delete current instance from collection""" await self._get_client().delete(self) async def exists(self) ‑> bool-
Check whether instance exists in the collection
Expand source code
async def exists(self) -> bool: """Check whether instance exists in the collection""" return (await self.find_one(oid=self.id)) is not None def json(self, *, include: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), ForwardRef(None)] = None, exclude: Union[ForwardRef('AbstractSetIntStr'), ForwardRef('MappingIntStrAny'), ForwardRef(None)] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any) ‑> str-
Override pydantic's json method to use str as default for ObjectId and exclude none by default
Expand source code
def json( self, *, include: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, exclude: Optional[Union["AbstractSetIntStr", "MappingIntStrAny"]] = None, by_alias: bool = False, skip_defaults: Optional[bool] = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = True, encoder: Optional[Callable[[Any], Any]] = None, models_as_dict: bool = True, **dumps_kwargs: Any, ) -> str: """Override pydantic's json method to use str as default for ObjectId and exclude none by default""" encoder = encoder or str return super().json( include=include, exclude=exclude, by_alias=by_alias, skip_defaults=skip_defaults, exclude_unset=exclude_unset, exclude_defaults=exclude_defaults, exclude_none=exclude_none, encoder=encoder, models_as_dict=models_as_dict, **dumps_kwargs, ) async def save(self)-
Update document or insert a new one
Expand source code
async def save(self): """Update document or insert a new one""" await self._get_client().save(self)
class MongORM (url: str, database: str)-
MongoDB abstraction to work with pydantic models
Expand source code
class MongORM: """MongoDB abstraction to work with pydantic models""" def __init__(self, url: str, database: str): self.url = url self.client: "AgnosticClient" = AsyncIOMotorClient(self.url) self.database: "AgnosticDatabase" = self.client[database] def _get_collection(self, instance_or_model: BaseModel | Type[BaseModel]) -> "AgnosticCollection": """Get collection associated with instance (or model)""" if not hasattr(instance_or_model.Meta, "collection") or instance_or_model.Meta.collection is None: raise ValueError("Instance must be a valid root model with '__collection__' ClassVar") return self.database[instance_or_model.Meta.collection] async def find_one( self, model: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs""" collection = self._get_collection(model) query = query or {} if oid: if isinstance(oid, str): oid = ObjectId(oid) query.update({"_id": oid}) if kwargs: query.update(kwargs) found = await collection.find_one(query) if found is None: return None return model(**found) # pylint: disable=too-many-arguments async def find( self, model: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, SortDirection]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" sort_by = [(field, direction.value) for field, direction in (sort or [("created", SortDirection.ASCENDING)])] query = query or {} query.update(kwargs) collection = self._get_collection(model) cursor = collection.find( filter=query, skip=skip, limit=limit, sort=sort_by, ) async for item in cursor: yield model(**item) async def update(self, instance: ModelType): """Update existing document in the collection""" collection = self._get_collection(instance) query = {"_id": instance.id} updated = await collection.replace_one(filter=query, replacement=instance.dict(by_alias=True)) if updated.matched_count == 0: raise DocumentNotFound(collection.name, query) async def save(self, instance: ModelType): """Update existing document or insert a new one""" collection = self._get_collection(instance) await collection.replace_one(filter={"_id": instance.id}, replacement=instance.dict(by_alias=True), upsert=True) async def delete( self, instance_or_model: BaseModel | Type[BaseModel], oid: str | ObjectId | None = None, ): """Delete document by its oid""" if isinstance(instance_or_model, BaseModel): oid = instance_or_model.id else: if not oid: raise ValueError("oid must be specified if not providing model instance") if isinstance(oid, str): oid = ObjectId(oid) collection = self._get_collection(instance_or_model) result: pymongo.results.DeleteResult = await collection.delete_one(filter={"_id": oid}) if result.deleted_count == 0: raise DocumentNotFound(collection=collection.name, query={"_id": oid}) async def create_schema(self, models: Sequence[BaseModel]): """Create collections and indexes for specified models""" for model in models: indexes = model.list_indexes() if not indexes: continue collection = self._get_collection(model) for ind in indexes: await ind.create(collection)Methods
async def create_schema(self, models: Sequence[BaseModel])-
Create collections and indexes for specified models
Expand source code
async def create_schema(self, models: Sequence[BaseModel]): """Create collections and indexes for specified models""" for model in models: indexes = model.list_indexes() if not indexes: continue collection = self._get_collection(model) for ind in indexes: await ind.create(collection) async def delete(self, instance_or_model: Union[BaseModel, Type[BaseModel]], oid: str | ObjectId | None = None)-
Delete document by its oid
Expand source code
async def delete( self, instance_or_model: BaseModel | Type[BaseModel], oid: str | ObjectId | None = None, ): """Delete document by its oid""" if isinstance(instance_or_model, BaseModel): oid = instance_or_model.id else: if not oid: raise ValueError("oid must be specified if not providing model instance") if isinstance(oid, str): oid = ObjectId(oid) collection = self._get_collection(instance_or_model) result: pymongo.results.DeleteResult = await collection.delete_one(filter={"_id": oid}) if result.deleted_count == 0: raise DocumentNotFound(collection=collection.name, query={"_id": oid}) async def find(self, model: Type[~ModelType], query: dict[str, typing.Any] | None = None, sort: Optional[Sequence[tuple[str, SortDirection]]] = None, skip: int = 0, limit: int = 0, **kwargs) ‑> AsyncGenerator[~ModelType, None]-
Find all instances (or specify query to search)
Expand source code
async def find( self, model: Type[ModelType], query: dict[str, Any] | None = None, sort: Sequence[tuple[str, SortDirection]] | None = None, skip: int = 0, limit: int = 0, **kwargs, ) -> AsyncGenerator[ModelType, None]: """Find all instances (or specify query to search)""" sort_by = [(field, direction.value) for field, direction in (sort or [("created", SortDirection.ASCENDING)])] query = query or {} query.update(kwargs) collection = self._get_collection(model) cursor = collection.find( filter=query, skip=skip, limit=limit, sort=sort_by, ) async for item in cursor: yield model(**item) async def find_one(self, model: Type[~ModelType], oid: str | ObjectId | None = None, query: dict[str, typing.Any] | None = None, **kwargs) ‑> Optional[~ModelType]-
Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs
Expand source code
async def find_one( self, model: Type[ModelType], oid: str | ObjectId | None = None, query: dict[str, Any] | None = None, **kwargs ) -> ModelType | None: """Find one either by its oid, by mongo filter query or by specifying fields and their values using kwargs""" collection = self._get_collection(model) query = query or {} if oid: if isinstance(oid, str): oid = ObjectId(oid) query.update({"_id": oid}) if kwargs: query.update(kwargs) found = await collection.find_one(query) if found is None: return None return model(**found) async def save(self, instance: ~ModelType)-
Update existing document or insert a new one
Expand source code
async def save(self, instance: ModelType): """Update existing document or insert a new one""" collection = self._get_collection(instance) await collection.replace_one(filter={"_id": instance.id}, replacement=instance.dict(by_alias=True), upsert=True) async def update(self, instance: ~ModelType)-
Update existing document in the collection
Expand source code
async def update(self, instance: ModelType): """Update existing document in the collection""" collection = self._get_collection(instance) query = {"_id": instance.id} updated = await collection.replace_one(filter=query, replacement=instance.dict(by_alias=True)) if updated.matched_count == 0: raise DocumentNotFound(collection.name, query)
class ObjectId (oid: Union[str, ForwardRef('ObjectId'), bytes, ForwardRef(None)] = None)-
Validated ObjectId to be used with pydantic
Initialize a new ObjectId.
An ObjectId is a 12-byte unique identifier consisting of:
- a 4-byte value representing the seconds since the Unix epoch,
- a 5-byte random value,
- a 3-byte counter, starting with a random value.
By default,
ObjectIdcreates a new unique identifier. The optional parameteroidcan be an :class:ObjectId, or any 12 :class:bytes.For example, the 12 bytes b'foo-bar-quux' do not follow the ObjectId specification but they are acceptable input::
ObjectId(b'foo-bar-quux') ObjectId('666f6f2d6261722d71757578')
oidcan also be a :class:strof 24 hex digits::ObjectId('0123456789ab0123456789ab') ObjectId('0123456789ab0123456789ab')
Raises :class:
~bson.errors.InvalidIdifoidis not 12 bytes nor 24 hex digits, or :class:TypeErrorifoidis not an accepted type.:Parameters: -
oid(optional): a valid ObjectId.Seealso: The MongoDB documentation on
ObjectIds <http://dochub.mongodb.org/core/objectids>_.Changed in version: 3.8
:class:
~bson.objectid.ObjectIdnow implements theObjectID specification version 0.2 <https://github.com/mongodb/specifications/blob/master/source/ objectid.rst>_.Expand source code
class ObjectId(bson.objectid.ObjectId): """Validated ObjectId to be used with pydantic""" @classmethod def __get_validators__(cls): yield cls.validate @classmethod def validate(cls, val) -> bson.objectid.ObjectId: """Validate ObjectId""" if isinstance(val, str): return bson.objectid.ObjectId(val) if isinstance(val, bson.objectid.ObjectId): return val raise TypeError(f"Expected str or ObjectId, got '{type(val)}'")Ancestors
- bson.objectid.ObjectId
Static methods
def validate(val) ‑> bson.objectid.ObjectId-
Validate ObjectId
Expand source code
@classmethod def validate(cls, val) -> bson.objectid.ObjectId: """Validate ObjectId""" if isinstance(val, str): return bson.objectid.ObjectId(val) if isinstance(val, bson.objectid.ObjectId): return val raise TypeError(f"Expected str or ObjectId, got '{type(val)}'")
class SortDirection (value, names=None, *, module=None, qualname=None, type=None, start=1)-
Sort direction
Expand source code
class SortDirection(Enum): """Sort direction""" ASCENDING = pymongo.ASCENDING DESCENDING = pymongo.DESCENDINGAncestors
- enum.Enum
Class variables
var ASCENDINGvar DESCENDING