Skip to content

Snowflake ML

Feature store

  • consists of entities and feature views
  • entities are the base objects that contain the source data
    • specifies join keys

Feature View

  • feature views are the features exposed for ML pipelines
  • feature views are versioned
  • feature views generally have transformations applied to data sources continuously
    • can be declarative (uses dynamic tables under-the-covers), or custom pipeline (e.g. dbt)
  • can be created using snowpark dataframes:
    order_ts_fv = order_lineitem.analytics.time_series_agg(
        aggs={"ORDER_TOTAL": ["SUM", "COUNT"]},
        windows=["-3MM", "-6MM"],
        sliding_interval="1D",
        group_by=["O_CUST_KEY"],
        time_col="ORDER_DATE",
    )
    
  • multiple feature views are joined using ASOF JOIN because each feature might have been updated at different time

Model Registry

  • is a collection of models and their versions.
  • maps to a specific database and schema
  • model can be run in either warehouse or SPCS compute pool or both.

Model

  • A first-class schema level object that is managed within Snowflake
    • sample_input_data helps Snowflake learn the function signature
  • It is versioned, that is, a single model can have multiple versions
    • a version is an identifier; it can optionally have one alias as an alternate name, or tags
    • only one version can be the default version
  • Can be stored using Registry.log_model method
  • models can be created within Snowflake or imported
  • models can be copied

Dataset

  • First-class schema-level, versioned, materialized snapshot of data
  • immutable, designed specifically for ML workloads
  • stored as parquet files on internal stage (Ref: Feature Store training 9/20/2024)
    from snowflake.ml import dataset
    ds = dataset.create_from_dataframe(sess, "my_dataset", "v1", input_dataframe=df)
    

Data connectors

  • load data in parallel in containers
  • can load data from either Snowpark dataframes or datasets
  • BP: Use dataframes during Dev and datasets in production
    from snowflake.ml.data.data_connector import DataConnector
    from snowflake.snowpark.context import get_active_session
    from snowflake.ml import dataset
    
    df = get_active_session().table("my_table")
    dc1 = DataConnector.from_dataframe(df)
    ds = dataset.Dataset.load(sess, "my_dataset").select_version("v1")
    dc2 = DataConnector.from_dataset(ds)
    

Example

# Train a model
model = XGBRegressor(input_cols=..., label_cols=..., output_cols=...)
model.fit(training_df) # train
result = model.predict(test_df)  # eval
df = result["PRICE", "PREDICTED_PRICE"].to_pandas()
mape = mean_absolute_percentage_error(df["PRICE"], df["PREDICTED_PRICE"])

# Create a registry to store the model
reg = registry.Registry(session=..., database_name=..., schema_name=...)

# store the trained model in registry as a new version
mv = reg.log_model(model, model_name="price_model", version_name='"1.0"', sample_input_date=test_df)
mv.set_metric(metric_name="mape", value=mape)

# retrieve a model and a version
reg.show_models()  # sql: show models
m = reg.get_model("price_model")

m.show_versions(). # sql: show versions in model
mv = m.version('"1.0"')
# mv = m.default

mv.show_functions()  # sql: show functions in model [version <ver>]

# run inference
predictions_df = mv.run(data_df, function_name="predict")

# manage
m.show_tags()
m.export("/tmp/model")
lineage_nodes = mv.lineage(session=..., direction="upstream")

dsv = dataset.Dataset.load_by_lineage_node(session, lineage_nodes[0])
alter model price_model set default_version = "1.0";
alter model price_model modify version "1.0" set metadata = '{"metric": {"mape": 0.789, "QA": "passed"}}';

-- inference
select price_model!predict(0.73, 62,7, ...);
with mv as model price_model version "1.0"
    select mv!predict(0.73, 62.7, ...)

-- model files
ls 'snow://model/price_model/versions/1.0';

-- rbac
grant usage on model price_model to role ds_group;

-- list versions
select * from information_schema.model_versions;