Skip to content

Kafka connector

  • is limited to loading data into Snowflake, two flavors: OSS or Confluent package
  • fault tolerant: offers exactly-once except in rare cases
  • acceptable formats are JSON, AVRO and for version > 1.5.0, Protobuf
  • One table and one stage for each topic; as many pipes as partitions for each topic
    • kafka topic can be mapped to Snowflake table, if not, a topic creates a new table
    • table if already exists, must contain RECORD_CONTENT and RECORD_METADATA columns
  • Workflow:
    1. One Kafka connector subscribes to a topic,
    2. writes to internal stage one file per partition,
    3. triggers 1+ Snowpipe(s) to ingest
    4. Connector deletes file from stage after confirming that the data has been loaded; or on failure, the file is moved to the table stage
      • polls insertReport API for 1 hour, before moving file to the table stage
  • possible to run multiple instances of connector, but BP is to let one topic be subscribed by only one instance
  • configuration information can be specified via Kafka configuration file, command line or Confluent control center (Confluent only)
  • Metadata includes:
    • topic, partition, offset, CreateTime,
    • key (for topics with keys), key.converter must be set to org.apache.kafka.connect.storage.StringConverter
    • schema_id (for avro when using with schema registry)
    • headers, user defined key-value pairs
  • SMT are supported unless
    1. either the key.converter or value.converter are set to Snowflake{Json,Avro}Converter
    2. SMT is regex.router
  • BP: Kafka config that affect file size are buffer.flush.time, buffer.size.bytes. buffer.count.records
    • Snowpipe file size are generally quite small because size specified is JAVA's memory size. Buffer contents are compressed to files that are typically 5%-10% of the original size. E.g. Snowpipe is expensive for files < 10MB, which is about 400MB of buffer.

Kafka Connector (Snowpipe Streaming)

  • When using snowflake streaming, topic corresponds to a table, and each partition of a topic correspond to one channel in a table
  • Error-handling: sent to DLQ (Kafka Topic) - Schematization: Unlike Snowpipe, which supports only two variant columns, support for table schema detection and evolution
  • config options:
    • with version 2.0.0, Specify snowflake.ingestion.method=SNOWPIPE_STREAMING
    • snowflake.enable.schematization: : enables schema evolution
    • schema.registry.url: required for schema evolution for Avro and Protobuf, optional if JSON
    • with version 2.1.2 when enable.streaming.client.optimization=true (default), only one client is created for all partitions. Disable for high throughput scenario
    • buffer.flush.time: 10 (seconds)
    • buffer.count.records: 10_000
    • buffer.size.bytes: 20_000_000 (20 MB)
    • MAX_CLIENT_LAG, Snowpipe streaming flush lag: default 1 second for Snowflake tables, 30 seconds for Iceberg tables
  • Pricing: 1 CR per compute-hour, cloud-services charged at 0.01 CR/hour/client instance (calculated per second when client is actively queueing)