Skip to content

hyperion.repository.asset_collection

hyperion.repository.asset_collection

Asset collection is a class that allows you to fetch and store data from the catalog in a type-safe manner.

FeatureAssetSpecification dataclass

FeatureAssetSpecification(feature, start_date=None, end_date=None)

Bases: Generic[CClass]

Specification for fetching feature assets from the catalog.

Parameters:

Name Type Description Default
feature type[CClass]

The feature model class to fetch.

required
start_date DateOrDelta | None

The start date or delta from now to fetch the data.

None
end_date DateOrDelta | None

The end date or delta from now to fetch the data.

None

resolve_start_date

resolve_start_date(the_now=None)

Resolve the start date for fetching the feature asset data.

Source code in hyperion/repository/asset_collection.py
def resolve_start_date(self, the_now: datetime.datetime | None = None) -> datetime.datetime:
    """Resolve the start date for fetching the feature asset data."""
    return self._resolve_date(self.start_date, datetime.datetime.min.replace(tzinfo=datetime.UTC), the_now)

resolve_end_date

resolve_end_date(the_now=None)

Resolve the end date for fetching the feature asset data.

Source code in hyperion/repository/asset_collection.py
def resolve_end_date(self, the_now: datetime.datetime | None = None) -> datetime.datetime:
    """Resolve the end date for fetching the feature asset data."""
    return self._resolve_date(self.end_date, utcnow(), the_now)

AssetCollection

A collection of feature assets that can be fetched from the catalog.

Attributes:

Name Type Description
catalog Catalog | None

The catalog to fetch the data from. If not set, it will be created from the config.

max_concurrency int

The maximum concurrency for fetching data. Default is 8.

reserved_fields ClassVar

The reserved field names for the collection.

_state _CollectionState

The internal state of the collection. It should under no circumstances be modified directly.

is_fetched classmethod

is_fetched()

Check if the collection has fetched all data.

Source code in hyperion/repository/asset_collection.py
@classmethod
def is_fetched(cls) -> bool:
    """Check if the collection has fetched all data."""
    return cls._get_state().fetched

get_data classmethod

get_data(field)

Get the fetched data for the given field.

Source code in hyperion/repository/asset_collection.py
@classmethod
def get_data(cls, field: str) -> list[Any]:
    """Get the fetched data for the given field."""
    state = cls._get_state()
    if field not in state.data:
        raise ValueError(f"Data for {field!r} has not been fetched yet. Did you call 'fetch_all()'?")
    return state.data[field]

clear classmethod

clear()

Clear all fetched data from the collection.

Source code in hyperion/repository/asset_collection.py
@classmethod
def clear(cls) -> None:
    """Clear all fetched data from the collection."""
    logger.info("Clearing all fetched data from the collection.", collection=cls.__name__)
    cls._get_state().data = {}
    cls._get_state().fetched = False

register_specification classmethod

register_specification(field_name, specification, anchor_timestamp=None)

Register a fetch specification for a field in the collection.

This is normally only called by the FeatureFetchSpecifier descriptor and should not be called directly.

Parameters:

Name Type Description Default
field_name str

The name of the field to register the specification for.

required
specification FeatureAssetSpecification[CClass]

The fetch specification for the field.

required
anchor_timestamp datetime | None

The anchor timestamp for fetching the data.

None
Source code in hyperion/repository/asset_collection.py
@classmethod
def register_specification(
    cls,
    field_name: str,
    specification: FeatureAssetSpecification[CClass],
    anchor_timestamp: datetime.datetime | None = None,
) -> None:
    """Register a fetch specification for a field in the collection.

    This is normally only called by the `FeatureFetchSpecifier` descriptor and should
    not be called directly.

    Args:
        field_name: The name of the field to register the specification for.
        specification: The fetch specification for the field.
        anchor_timestamp: The anchor timestamp for fetching the data.
    """
    if field_name in cls._get_state().fetch_specifications:
        logger.warning(
            "Registering duplicate fetch specification, existing will be discarded.",
            field=field_name,
            asset_name=specification.feature_name,
        )
    logger.debug("Registering field into an asset collection.", collection=cls.__name__, field=field_name)
    cls._get_state().fetch_specifications[field_name] = specification
    cls._get_state().anchor_timestamps[field_name] = anchor_timestamp

fetch_all async classmethod

fetch_all()

Fetch all data for the collection.

Source code in hyperion/repository/asset_collection.py
@classmethod
async def fetch_all(cls) -> None:
    """Fetch all data for the collection."""
    if cls.is_fetched():
        logger.info(
            "Collection already fetched all data, if you want to start over, call .clear()", collection=cls.__name__
        )
        return
    logger.info("Gather all data within the collection.", collection=cls.__name__)
    tasks: list[Coroutine[None, None, tuple[str, list[Any]]]] = []

    async def _gather(name: str, specs: FeatureAssetSpecification[CClass]) -> tuple[str, list[CClass]]:
        anchor_timestamp = cls._get_state().anchor_timestamps.get(name) or utcnow()
        cast_specs: FeatureAssetSpecification[FeatureModel] | FeatureAssetSpecification[PolarsFeatureModel]
        if issubclass(specs.feature, FeatureModel):
            cast_specs = cast(FeatureAssetSpecification[FeatureModel], specs)
            return (
                name,
                cast(
                    list[CClass],
                    await cls._gather_pydantic_asset_range(cast_specs, anchor_timestamp),
                ),
            )
        if issubclass(specs.feature, PolarsFeatureModel):
            cast_specs = cast(FeatureAssetSpecification[PolarsFeatureModel], specs)
            return (name, cast(list[CClass], await cls._gather_polars_asset_range(cast_specs, anchor_timestamp)))
        raise UnsupportedFeatureTypeError(specs.feature)

    async for prop, specs in iter_async(cls._get_state().fetch_specifications.items()):
        tasks.append(_gather(prop, specs))

    results = await asyncio.gather(*tasks)
    for name, data in results:
        logger.info("Finished receiving feature data.", field=name)
        cls._get_state().data[name] = data

    cls._get_state().fetched = True

FeatureFetchSpecifier

FeatureFetchSpecifier(feature, start_date=None, end_date=None)

Create a feature fetch specifier for the given feature model class.

Parameters:

Name Type Description Default
feature type[PydanticFeature]

The feature model class to fetch.

required
start_date DateOrDelta | None

The start date or delta from now to fetch the data.

None
end_date DateOrDelta | None

The end date or delta from now to fetch the data.

None
Source code in hyperion/repository/asset_collection.py
def FeatureFetchSpecifier(  # noqa: N802, a fake class factory
    feature: type[PydanticFeature], start_date: DateOrDelta | None = None, end_date: DateOrDelta | None = None
) -> list[PydanticFeature]:
    """Create a feature fetch specifier for the given feature model class.

    Args:
        feature: The feature model class to fetch.
        start_date: The start date or delta from now to fetch the data.
        end_date: The end date or delta from now to fetch the data.
    """
    return cast(list[PydanticFeature], _FeatureFetchSpecifier(feature, start_date, end_date))