Snowflake ML
Feature store
- consists of entities and feature views
- entities are the base objects that contain the source data
Feature View
- feature views are the features exposed for ML pipelines
- feature views are versioned
- feature views generally have transformations applied to data sources continuously
- can be declarative (uses dynamic tables under-the-covers), or custom pipeline (e.g. dbt)
- can be created using snowpark dataframes:
order_ts_fv = order_lineitem.analytics.time_series_agg(
aggs={"ORDER_TOTAL": ["SUM", "COUNT"]},
windows=["-3MM", "-6MM"],
sliding_interval="1D",
group_by=["O_CUST_KEY"],
time_col="ORDER_DATE",
)
- multiple feature views are joined using
ASOF JOIN because each feature might have been updated at different time
Model Registry
- is a collection of models and their versions.
- maps to a specific database and schema
- model can be run in either warehouse or SPCS compute pool or both.
Model
- A first-class schema level object that is managed within Snowflake
sample_input_data helps Snowflake learn the function signature
- It is versioned, that is, a single model can have multiple versions
- a version is an identifier; it can optionally have one alias as an alternate name, or tags
- only one version can be the default version
- Can be stored using
Registry.log_model method
- models can be created within Snowflake or imported
- models can be copied
Dataset
- First-class schema-level, versioned, materialized snapshot of data
- immutable, designed specifically for ML workloads
- stored as parquet files on internal stage (Ref: Feature Store training 9/20/2024)
from snowflake.ml import dataset
ds = dataset.create_from_dataframe(sess, "my_dataset", "v1", input_dataframe=df)
Data connectors
- load data in parallel in containers
- can load data from either Snowpark dataframes or datasets
- BP: Use dataframes during Dev and datasets in production
from snowflake.ml.data.data_connector import DataConnector
from snowflake.snowpark.context import get_active_session
from snowflake.ml import dataset
df = get_active_session().table("my_table")
dc1 = DataConnector.from_dataframe(df)
ds = dataset.Dataset.load(sess, "my_dataset").select_version("v1")
dc2 = DataConnector.from_dataset(ds)
Example
# Train a model
model = XGBRegressor(input_cols=..., label_cols=..., output_cols=...)
model.fit(training_df) # train
result = model.predict(test_df) # eval
df = result["PRICE", "PREDICTED_PRICE"].to_pandas()
mape = mean_absolute_percentage_error(df["PRICE"], df["PREDICTED_PRICE"])
# Create a registry to store the model
reg = registry.Registry(session=..., database_name=..., schema_name=...)
# store the trained model in registry as a new version
mv = reg.log_model(model, model_name="price_model", version_name='"1.0"', sample_input_date=test_df)
mv.set_metric(metric_name="mape", value=mape)
# retrieve a model and a version
reg.show_models() # sql: show models
m = reg.get_model("price_model")
m.show_versions(). # sql: show versions in model
mv = m.version('"1.0"')
# mv = m.default
mv.show_functions() # sql: show functions in model [version <ver>]
# run inference
predictions_df = mv.run(data_df, function_name="predict")
# manage
m.show_tags()
m.export("/tmp/model")
lineage_nodes = mv.lineage(session=..., direction="upstream")
dsv = dataset.Dataset.load_by_lineage_node(session, lineage_nodes[0])
alter model price_model set default_version = "1.0";
alter model price_model modify version "1.0" set metadata = '{"metric": {"mape": 0.789, "QA": "passed"}}';
-- inference
select price_model!predict(0.73, 62,7, ...);
with mv as model price_model version "1.0"
select mv!predict(0.73, 62.7, ...)
-- model files
ls 'snow://model/price_model/versions/1.0';
-- rbac
grant usage on model price_model to role ds_group;
-- list versions
select * from information_schema.model_versions;