Fetch data with Asset Collections¶
Asset Collections provide a type-safe, declarative way to fetch and work with data assets from the Hyperion catalog.
Overview¶
The AssetCollection system makes it easy to:
- Define what feature data you need with proper type definitions
- Fetch all required data in a single call
- Access typed data with full IDE support
- Control concurrency during fetching
- Work with either Pydantic models or Polars DataFrames
Basic usage¶
1. Define a feature model¶
You can define feature models using either Pydantic (for object-oriented data) or Pandera with Polars (for large datasets).
Option A: Pydantic model¶
Create a model class that extends both FeatureModel and pydantic.BaseModel.
Feature models live in hyperion.data, so this requires the [data] extra
(pip install 'hyperion-sdk[data]'):
import datetime
from typing import ClassVar
from pydantic import BaseModel
from hyperion.data.asset_schemas import FeatureModel
from hyperion.dateutils import TimeResolution
class WeatherFeature(FeatureModel, BaseModel):
asset_name: ClassVar = "weather_data"
resolution: ClassVar = TimeResolution(1, "d")
timestamp: datetime.datetime
temperature: float
humidity: float
Option B: Polars model with Pandera¶
For large datasets or analytics workloads, create a model using
PolarsFeatureModel:
import polars as pl
import pandera.typing.polars as pt
from typing import Annotated, ClassVar
from pandera.engines.polars_engine import DateTime, Float64
from hyperion.data.asset_schemas import PolarsFeatureModel
from hyperion.dateutils import TimeResolution
class WeatherPolarsFeature(PolarsFeatureModel):
_asset_name: ClassVar = "weather_data"
_resolution: ClassVar = TimeResolution(1, "d")
_schema_version: ClassVar = 1
timestamp: pt.Series[Annotated[DateTime, False, "UTC", "us"]]
temperature: pt.Series[Float64]
humidity: pt.Series[Float64]
2. Create an asset collection¶
Define a collection class that declares what feature data you need:
from hyperion.repository.asset_collection import AssetCollection, FeatureFetchSpecifier, PolarsFeatureFetchSpecifier
import datetime
class WeatherDataCollection(AssetCollection):
# Fetch last 7 days of pydantic weather data
weather = FeatureFetchSpecifier(
WeatherFeature,
start_date=datetime.timedelta(days=-7)
)
# Fetch historical data using Polars for efficient processing
historical_weather = PolarsFeatureFetchSpecifier(
WeatherPolarsFeature,
start_date=datetime.datetime(2024, 1, 1, tzinfo=datetime.timezone.utc),
end_date=datetime.datetime(2024, 2, 1, tzinfo=datetime.timezone.utc)
)
3. Fetch and use the data¶
# Fetch all data asynchronously
await WeatherDataCollection.fetch_all()
# Access the Pydantic data as objects
for record in WeatherDataCollection.weather:
print(f"Temperature: {record.temperature}°C at {record.timestamp}")
# Work with Polars data using DataFrame operations
avg_temp = WeatherDataCollection.historical_weather.select(
pl.col("temperature").mean().alias("avg_temp")
).collect()
# Or collect the Polars data if you need the full DataFrame
historical_df = WeatherDataCollection.historical_weather.collect()
print(f"Records: {len(historical_df)}")
Advanced features¶
Custom catalog¶
You can specify a custom catalog for your collection:
class CustomCollection(AssetCollection):
catalog: ClassVar = my_custom_catalog
weather = FeatureFetchSpecifier(WeatherFeature)
weather_polars = PolarsFeatureFetchSpecifier(WeatherPolarsFeature)
Concurrency control¶
Control how many concurrent fetches are allowed:
class LimitedConcurrencyCollection(AssetCollection):
max_concurrency: ClassVar = 4 # limit to 4 concurrent requests
weather = FeatureFetchSpecifier(WeatherFeature)
Reset data¶
Clear fetched data to fetch again:
Working with Polars data¶
When using PolarsFeatureFetchSpecifier, you get a LazyFrame with the
following benefits:
- Lazy evaluation: operations execute only when you call
.collect() - Query optimization: Polars optimises the execution plan
- Memory efficiency: great for working with millions of rows
- Type safety: full schema validation through Pandera
Example operations:
# Filter data
hot_days = WeatherDataCollection.historical_weather.filter(
pl.col("temperature") > 30
).collect()
# Aggregations
monthly_avg = WeatherDataCollection.historical_weather.group_by(
pl.col("timestamp").dt.month()
).agg(
pl.col("temperature").mean().alias("avg_temp"),
pl.col("humidity").mean().alias("avg_humidity")
).collect()
Important notes¶
- Shared data: all instances of a collection class share the same data.
- Lazy fetching: data is not fetched until
fetch_all()is called. - Class-level API: most methods are class methods, not instance methods.
- Requirements:
- Pydantic models must have
asset_nameandresolutionclass variables - Polars models must have
_asset_nameand_resolutionclass variables
- Pydantic models must have
- Data size considerations:
- Use Pydantic models for smaller datasets and object-oriented manipulation
- Use Polars models for large datasets (millions of rows) and analytics
Date specifications¶
You can specify date ranges in multiple ways:
- Absolute dates: use
datetimeobjects - Relative dates: use
timedeltaobjects (negative for past, positive for future) - Mixed: combine absolute and relative dates
A date specification of None means:
- For
start_date: use the minimum date (fetch all historical data) - For
end_date: use the current time (fetch up to now)
Coming soon¶
Future versions will include:
- Enhanced Polars streaming for extremely large datasets
- Column projection to reduce I/O for large datasets
- Support for DataLake and PersistentStore assets
- Advanced caching strategies for feature data
See also¶
- Work with FeatureAssets for the lower-level API.
- The
hyperion.repository.asset_collectionAPI reference.