Snowflake &
Data Engineering
Calculate total compute hours consumed using LAG() to capture the start state and timestamp, then LEAD() to look ahead to the next end timestamp — DATEDIFF between them gives run duration in seconds, which is summed and converted to hours.
Sample Data
-- your_table_name -- | Date_timestamp | progress | -- |-------------------------|----------| -- | 2024-01-15 08:00:00 | start | -- | 2024-01-15 09:45:00 | end | -- | 2024-01-15 11:00:00 | start | -- | 2024-01-15 12:30:00 | end |
Step 1 — Use LAG to carry forward the previous state and its timestamp
WITH lagged AS (
SELECT
Date_timestamp,
progress,
-- Pull the previous row's state into this row
LAG(progress) OVER (ORDER BY Date_timestamp) AS prev_state,
LAG(Date_timestamp) OVER (ORDER BY Date_timestamp) AS prev_timestamp
FROM your_table_name
)
-- At this point every 'end' row also knows its paired 'start' timestamp
SELECT * FROM lagged WHERE progress = 'end';
Step 2 — Use LEAD on the start rows to look ahead to the paired end timestamp
WITH windowed AS (
SELECT
Date_timestamp,
progress,
LAG(progress) OVER (ORDER BY Date_timestamp) AS prev_state,
LAG(Date_timestamp) OVER (ORDER BY Date_timestamp) AS prev_timestamp,
-- Look ahead from this row to the next row's timestamp
LEAD(Date_timestamp) OVER (ORDER BY Date_timestamp) AS next_timestamp
FROM your_table_name
)
Step 3 — DATEDIFF between start and its LEAD end timestamp, SUM & divide by 3600
WITH windowed AS (
SELECT
Date_timestamp,
progress,
LAG(progress) OVER (ORDER BY Date_timestamp) AS prev_state,
LAG(Date_timestamp) OVER (ORDER BY Date_timestamp) AS prev_timestamp,
LEAD(Date_timestamp) OVER (ORDER BY Date_timestamp) AS next_timestamp
FROM your_table_name
),
run_seconds AS (
SELECT
Date_timestamp AS run_start,
next_timestamp AS run_end,
TO_DATE(Date_timestamp) AS run_date,
-- DATEDIFF between this start row and the LEAD end timestamp
DATEDIFF(SECOND, Date_timestamp, next_timestamp) AS duration_seconds
FROM windowed
WHERE progress = 'start' -- anchor on start rows
AND next_timestamp IS NOT NULL -- guard against last row
)
SELECT
run_date,
SUM(duration_seconds) AS total_run_seconds,
SUM(duration_seconds) / 3600.0 AS total_run_hours -- divide by 3600 to get hours
FROM run_seconds
GROUP BY run_date
ORDER BY run_date;
Overall total across all days
-- Wrap the above to get a grand total
WITH windowed AS (
SELECT
Date_timestamp,
progress,
LEAD(Date_timestamp) OVER (ORDER BY Date_timestamp) AS next_timestamp
FROM your_table_name
),
run_seconds AS (
SELECT DATEDIFF(SECOND, Date_timestamp, next_timestamp) AS duration_seconds
FROM windowed
WHERE progress = 'start' AND next_timestamp IS NOT NULL
)
SELECT
SUM(duration_seconds) AS total_run_seconds,
SUM(duration_seconds) / 3600.0 AS total_compute_hours
FROM run_seconds;
How it works step by step
- LAG(progress) — brings the previous row's state alongside each row, so we can confirm the row before an
endwas astart - LEAD(Date_timestamp) — from a
startrow, looks one row ahead to fetch theendtimestamp — this is the key change from the naive LAG-only approach - DATEDIFF(SECOND, start_ts, end_ts) — produces the raw run duration in seconds for each start→end pair
- SUM(...) / 3600.0 — aggregates all run durations and converts seconds → hours (3600 seconds per hour)
- Anchoring the filter on
progress = 'start'rows keeps the logic clean — each start row directly carries its own paired end timestamp via LEAD
Real-World Scenario: Your BI team runs 50 concurrent Tableau dashboards while your data engineering team runs nightly dbt transforms. On a traditional monolith both workloads starve each other. On Snowflake you give each team a dedicated virtual warehouse — they share the same data but use completely independent compute. No contention, no queuing.
The Three-Layer Architecture
┌─────────────────────────────────────────────────────┐
│ CLOUD SERVICES LAYER │
│ Authentication · Query Optimiser · Metadata · RBAC │
│ Transaction Manager · Infrastructure Management │
└───────────────────┬─────────────────────────────────┘
│ (orchestrates everything)
┌───────────┴────────────┐
│ │
┌───────▼────────┐ ┌──────────▼──────────┐
│ ANALYTICS_WH │ │ TRANSFORM_WH │ ← Virtual Warehouses (Compute)
│ (Tableau / BI)│ │ (dbt runs here) │ Independent, auto-suspend
└───────┬────────┘ └──────────┬──────────┘
│ │
└────────────┬───────────┘
│ (both read same data, zero contention)
┌─────────▼──────────┐
│ STORAGE LAYER │ ← S3 / Azure Blob / GCS
│ Micro-partitions │ Columnar, compressed, encrypted
│ (50–500 MB each) │ Decoupled from compute entirely
└────────────────────┘
1. Storage Layer — Immutable Micro-Partitions
- All data is automatically reorganised into micro-partitions (50–500 MB compressed) in columnar format
- Each micro-partition stores min/max metadata per column — Snowflake uses this to prune partitions at query time without scanning them
- Compression is typically 10–20x. A 1TB uncompressed table may occupy 60GB in Snowflake
- Stored in cloud object storage (S3 on AWS, Azure Blob, GCS). You do not manage disks or nodes
- Data is never modified in-place — mutations create new partitions (copy-on-write), enabling Time Travel and Fail-Safe
2. Compute Layer — Virtual Warehouses
-- Create separate warehouses per workload to eliminate contention
CREATE WAREHOUSE BI_WH WAREHOUSE_SIZE='LARGE' AUTO_SUSPEND=300;
CREATE WAREHOUSE TRANSFORM_WH WAREHOUSE_SIZE='MEDIUM' AUTO_SUSPEND=120;
CREATE WAREHOUSE INGEST_WH WAREHOUSE_SIZE='SMALL' AUTO_SUSPEND=60;
CREATE WAREHOUSE ADMIN_WH WAREHOUSE_SIZE='XSMALL' AUTO_SUSPEND=60;
-- Multi-cluster warehouse for BI concurrency spikes (Black Friday dashboards)
ALTER WAREHOUSE BI_WH SET
MAX_CLUSTER_COUNT = 4
MIN_CLUSTER_COUNT = 1
SCALING_POLICY = 'STANDARD';
- Warehouses are MPP clusters that spin up in seconds and auto-suspend when idle — you only pay while running
- Size doubles compute (and cost) each tier: X-SMALL → SMALL → MEDIUM → LARGE → X-LARGE → 2X-LARGE → 3X-LARGE → 4X-LARGE
- Multi-cluster warehouses scale out horizontally (add nodes) for concurrency. Scale up (bigger size) for complex single queries
- Separation of warehouses is the #1 cost and performance best practice in Snowflake
3. Cloud Services Layer — The Brain
- Query optimiser — rewrites your SQL, chooses join order, decides which micro-partitions to read, all before the warehouse starts
- Metadata store — tracks every partition, column stats, clustering depth, transaction history
- Transaction manager — ACID compliance via MVCC (multi-version concurrency control). Reads never block writes
- Result cache — identical queries return instantly for 24 hours at zero warehouse cost
- Billed as Cloud Services credits — usually <10% of compute spend; Snowflake credits back anything under 10%
Why This Architecture Wins Over Traditional MPP (Redshift, Teradata)
| Dimension | Traditional MPP | Snowflake |
|---|---|---|
| Scale storage | Must add nodes (costly) | Storage scales independently (pay-per-TB) |
| Scale compute | Resize whole cluster | New warehouse in 5 seconds |
| Concurrency | Shared queue | Separate warehouses per team |
| Idle cost | Pay 24/7 | Auto-suspend (pay per second) |
| Maintenance | Vacuum, reindex, patch | Zero maintenance |
Snowflake's Time Travel allows querying data as it existed at any point in the past (default 1 day, up to 90 days for Enterprise).
1. Using AT (OFFSET) - Relative Time
-- Query data from 5 minutes ago SELECT * FROM my_table AT (OFFSET => -300); -- Query data from 1 hour ago SELECT * FROM my_table AT (OFFSET => -3600);
2. Using AT (TIMESTAMP) - Absolute Time
-- Query data as it existed at specific timestamp SELECT * FROM my_table AT (TIMESTAMP => '2025-06-25 10:30:00'::TIMESTAMP_LTZ); -- Create table with historical data CREATE TABLE my_table_history AS SELECT * FROM my_table AT (TIMESTAMP => '2025-06-24 00:00:00'::TIMESTAMP_LTZ);
3. Using BEFORE (STATEMENT) - Before a Specific Query
-- Recover data deleted by a specific query CREATE TABLE recovered_data AS SELECT * FROM my_table BEFORE (STATEMENT => 'query_id_123');
Real Use Cases:
- Mistakenly deleted rows? Recover them instantly without restore
- Audit trail of data changes
- Debugging failed transformations
- Point-in-time analysis
Five-step approach to optimize slow queries:
1. Profile the Query (Root Cause Analysis)
- Check query execution plan in Snowflake UI
- Identify bottleneck: scanning, joining, aggregating, or sorting?
- Look for spilling to disk (memory constraint)
- Evaluate partition/micro-partition pruning efficiency
2. Right-Size Compute
-- If query shows high queue time, increase warehouse size ALTER WAREHOUSE MY_WH SET WAREHOUSE_SIZE = 'LARGE'; -- If query spills to disk, increase memory ALTER WAREHOUSE MY_WH SET AUTO_SUSPEND = 10; -- Minutes
3. Optimize SQL Logic
- SELECT specific columns: Avoid SELECT *, specify needed columns only
- Filter early: Apply WHERE clauses on clustered/partitioned columns first
- Join optimization: Smaller tables on left, larger on right; use INNER JOIN when possible
- Avoid correlated subqueries: Use joins instead
- Eliminate unnecessary operations: Remove redundant DISTINCT or ORDER BY unless needed
4. Physical Data Organization
- Clustering Keys: Define on columns frequently used in WHERE or JOIN
- Materialized Views: Pre-compute complex, frequently-run logic
5. Leverage Caching
- Result Cache: Snowflake caches query results for 24 hours; run same queries for cache hits
- Warehouse Cache: Keep warehouse running to leverage SSD cache of recently scanned blocks
Example Before & After:
-- BEFORE: Slow query (20 seconds) SELECT * FROM huge_table WHERE created_date = '2025-01-01' ORDER BY customer_id; -- AFTER: Optimized (2 seconds) SELECT customer_id, order_id, amount FROM huge_table WHERE created_date = '2025-01-01' -- Filter early QUALIFY ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY created_date DESC) = 1;↑ Back to top
Partitions are logical divisions; Micro-partitions are Snowflake's intelligent physical chunks.
Traditional Partitions:
- Admin-defined logical divisions (e.g., one partition per month)
- Manual creation and management
- Typically large (GB-TB per partition)
- Used in databases like PostgreSQL, Oracle
Snowflake Micro-partitions:
- Automatic, immutable chunks (50-500 MB)
- Transparent to users - no manual management needed
- Columnar, compressed format
- Snowflake maintains rich metadata about each micro-partition (value ranges, distinct counts, NULL counts)
Key Advantages of Micro-partitions:
- Automatic Pruning: Metadata allows skipping micro-partitions that don't match WHERE clause
- No Manual Maintenance: Snowflake handles partitioning automatically
- Clustering Keys: Optional optimization to group similar values physically
- Efficient Compression: Columnar storage compresses repetitive data 10x+
| Feature | Traditional Partition | Micro-partition |
|---|---|---|
| Management | Manual | Automatic |
| Size | GB-TB | 50-500 MB |
| Format | Row or Column | Always Columnar |
| Pruning | Basic | Intelligent with Metadata |
Slowly Changing Dimensions (SCD) are methods to handle data that changes over time in dimension tables.
Type 0: Retain Original
No updates. Historical value never changes.
Customer ID 1 registered on 2020-01-01 as "John" → stays "John" forever
Type 1: Overwrite
Overwrite old value with new. No history preserved.
Customer 1 name: "John" (2020-01-01) → "John Smith" (2024-01-01) Result: Only latest name is stored
Type 2: Add New Row (Full History)
Most common in analytics. New row added for each change with effective dates.
-- Initial row ID=1, Name=John, Dept=Sales, Valid_From=2020-01-01, Valid_To=9999-12-31, Is_Current=TRUE -- After promotion to Marketing (2024-01-01) -- Old row gets end date ID=1, Name=John, Dept=Sales, Valid_From=2020-01-01, Valid_To=2023-12-31, Is_Current=FALSE -- New row for current state ID=1, Name=John, Dept=Marketing, Valid_From=2024-01-01, Valid_To=9999-12-31, Is_Current=TRUE
Type 3: Add Previous Attribute
New column stores previous value. Partial history.
Dept (current), Dept_Previous columns When change happens: move current to previous, update current value
Type 4: Separate History Table
Main table holds current (Type 1), separate history table stores all versions (Type 2).
Which to Use?
- Type 2 is most common: Complete audit trail, easy to analyze "as of" dates
- Type 1 for: Attributes that don't need historical tracking
- Type 2 via dbt snapshots: Automated in modern workflows
Real-World Scenario: A logistics partner sends webhook events in JSON — each event has a nested shipment object with an array of items. You need to land the raw JSON, query nested fields, and flatten the array into relational rows for downstream dbt models.
Step 1 — Create File Format & Stage
CREATE FILE FORMAT json_fmt
TYPE = 'JSON'
STRIP_OUTER_ARRAY = TRUE -- unwrap top-level array [{}{}{}] → rows
NULL_IF = ('', 'NULL', 'null');
CREATE STAGE raw.logistics_stage
URL = 's3://logistics-bucket/events/'
STORAGE_INTEGRATION = s3_logistics_int
FILE_FORMAT = json_fmt;
Step 2 — Land Raw JSON into a VARIANT Column
-- VARIANT stores any JSON structure; schema changes in the source don't break this table
CREATE TABLE raw.shipment_events (
event_raw VARIANT,
_file_name VARCHAR DEFAULT METADATA$FILENAME,
_loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
COPY INTO raw.shipment_events (event_raw)
FROM @raw.logistics_stage
FILE_FORMAT = json_fmt
ON_ERROR = 'CONTINUE'; -- log bad rows, don't fail the whole load
Step 3 — Query Nested Fields with : and :: operators
-- Sample JSON structure:
-- {
-- "shipment_id": "SHP-001",
-- "carrier": "DHL",
-- "status": "delivered",
-- "delivered_at": "2024-01-15T14:30:00Z",
-- "destination": { "city": "Hyderabad", "country": "IN" },
-- "items": [
-- {"sku": "SKU-A", "qty": 2, "weight_kg": 1.5},
-- {"sku": "SKU-B", "qty": 1, "weight_kg": 0.8}
-- ]
-- }
SELECT
event_raw:shipment_id::VARCHAR AS shipment_id,
event_raw:carrier::VARCHAR AS carrier,
event_raw:status::VARCHAR AS status,
event_raw:delivered_at::TIMESTAMP AS delivered_at,
event_raw:destination:city::VARCHAR AS city, -- nested object
event_raw:destination:country::VARCHAR AS country
FROM raw.shipment_events;
Step 4 — FLATTEN to Explode JSON Arrays into Rows
-- FLATTEN turns the items array into one row per item (lateral join)
SELECT
e.event_raw:shipment_id::VARCHAR AS shipment_id,
e.event_raw:carrier::VARCHAR AS carrier,
f.value:sku::VARCHAR AS sku,
f.value:qty::INT AS quantity,
f.value:weight_kg::FLOAT AS weight_kg,
f.index AS item_position -- 0-based array index
FROM raw.shipment_events e,
LATERAL FLATTEN(input => e.event_raw:items) f;
Step 5 — Promote to Staging with dbt (typed, named columns)
-- models/staging/stg_shipment_items.sql
-- Flatten and cast the raw VARIANT into a typed relational model
SELECT
event_raw:shipment_id::VARCHAR AS shipment_id,
event_raw:carrier::VARCHAR AS carrier,
UPPER(event_raw:status::VARCHAR) AS status,
event_raw:delivered_at::TIMESTAMP AS delivered_at,
event_raw:destination:country::VARCHAR AS destination_country,
f.value:sku::VARCHAR AS sku,
f.value:qty::INT AS quantity,
f.value:weight_kg::FLOAT AS weight_kg,
_loaded_at AS ingested_at
FROM {{ source('raw', 'shipment_events') }},
LATERAL FLATTEN(input => event_raw:items) f
Key Points
- VARIANT is schema-flexible — new fields added by the source don't break the landing table. Extract only what you need in the staging layer
- : path notation navigates nested keys. :: casts to SQL types. Use
TRY_CASTfor fields that might be null or malformed - LATERAL FLATTEN is a lateral join — it produces multiple rows per parent row, one per array element
- STRIP_OUTER_ARRAY = TRUE handles the common pattern where a file contains a single JSON array of objects
- Always store
_file_nameand_loaded_atfor lineage and debugging
Strategy for large file ingestion:
1. Use External Stage (Mandatory for 50GB)
- Deploy file to AWS S3, Azure Blob, or GCS before loading
- Much faster and more reliable than internal stage (PUT command)
- Enables parallel processing by Snowflake
2. Use Large Warehouse
-- Use at least X-Large or 2X-Large for parallel processing ALTER WAREHOUSE etl_wh SET WAREHOUSE_SIZE = '2X-LARGE';
3. Execute COPY INTO
COPY INTO target_table FROM @external_stage/50gb_file.parquet FILE_FORMAT = (TYPE = PARQUET) ON_ERROR = 'CONTINUE' PURGE = TRUE;
Key Considerations:
- Use columnar formats: Parquet or ORC instead of CSV
- Parallel warehouse: Larger warehouse processes in parallel
- Network bandwidth: Ensure good connectivity to cloud storage
- COPY INTO is inherently parallel: Snowflake handles parallelization
- Monitor progress: Check query history and COPY_HISTORY view
Snowflake supports two types of stages for file management:
Internal Stages (Snowflake-Managed)
- User Stage: @~/ (default per user)
- Table Stage: @%table_name (auto-created for each table)
- Named Stage: CREATE STAGE my_stage
- Use: Smaller files, ad-hoc loads, testing
- Storage: Snowflake-managed cloud storage (included in account)
External Stages (User-Managed Cloud Storage)
- Points to AWS S3, Azure Blob Storage, or Google Cloud Storage
- Use: Large-scale production loads, data lakes, Snowpipe
- Cost: Cloud provider charges for storage and egress
- Requires storage integration and IAM permissions
| Feature | Internal | External |
|---|---|---|
| Storage | Snowflake-managed | AWS/Azure/GCP |
| File Access | PUT / GET commands | COPY INTO |
| Cost | Included | Cloud provider pays |
| Scale | Small files | Large scale/GB-TB |
| Use Case | Testing, ad-hoc | Production, Snowpipe |
Real-World Scenario: You have a daily pipeline: ingest from S3 at 6 AM → run dbt transforms → run data quality tests → refresh Tableau extracts. If ingestion fails at 6 AM, dbt should not run. You need dependency-aware scheduling with failure alerting — not just a cron job.
Option 1: Snowflake Native Tasks (best for Snowflake-only pipelines)
-- Root task: fires daily at 6 AM UTC, only if stream has new data
CREATE OR REPLACE TASK pipeline.ingest_root
WAREHOUSE = INGEST_WH
SCHEDULE = 'USING CRON 0 6 * * * UTC'
WHEN SYSTEM$STREAM_HAS_DATA('raw.orders_stream')
AS
COPY INTO raw.orders FROM @raw.s3_stage FILE_FORMAT = parquet_fmt;
-- Child task: runs after ingestion completes
CREATE OR REPLACE TASK pipeline.dbt_transform
WAREHOUSE = TRANSFORM_WH
AFTER pipeline.ingest_root
AS
CALL pipeline.sp_run_dbt_hourly(); -- calls a stored procedure or shell
-- Grandchild: quality check after transforms
CREATE OR REPLACE TASK pipeline.quality_check
WAREHOUSE = TRANSFORM_WH
AFTER pipeline.dbt_transform
AS
CALL pipeline.sp_run_dbt_tests();
-- Activate the whole DAG (children first, then root)
ALTER TASK pipeline.quality_check RESUME;
ALTER TASK pipeline.dbt_transform RESUME;
ALTER TASK pipeline.ingest_root RESUME;
-- Monitor task run history
SELECT name, state, error_message, scheduled_time, completed_time
FROM TABLE(information_schema.task_history(
scheduled_time_range_start => DATEADD('day', -1, CURRENT_TIMESTAMP)
))
ORDER BY scheduled_time DESC;
Option 2: Apache Airflow (best for multi-system, complex DAGs)
# dags/daily_pipeline.py — Airflow DAG with Snowflake + dbt
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-engineering',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['vsurya@duck.com'],
}
with DAG(
dag_id = 'daily_snowflake_pipeline',
schedule_interval = '0 6 * * *', # 6 AM UTC daily
start_date = datetime(2024, 1, 1),
catchup = False,
default_args = default_args,
tags = ['snowflake', 'dbt', 'production'],
) as dag:
ingest = SnowflakeOperator(
task_id = 'copy_from_s3',
snowflake_conn_id = 'snowflake_prod',
sql = "COPY INTO raw.orders FROM @raw.s3_stage;",
warehouse = 'INGEST_WH',
)
transform = DbtCloudRunJobOperator(
task_id = 'dbt_run_daily_models',
dbt_cloud_conn_id = 'dbt_cloud',
job_id = 12345, # dbt Cloud job ID
check_interval = 30,
timeout = 3600,
additional_run_config = {
'steps_override': ['dbt run --select tag:daily']
},
)
test = DbtCloudRunJobOperator(
task_id = 'dbt_test',
dbt_cloud_conn_id = 'dbt_cloud',
job_id = 12346,
additional_run_config = {
'steps_override': ['dbt test --select tag:daily --store-failures']
},
)
ingest >> transform >> test # dependency chain: fail early, stop downstream
Comparison: When to Use Each
| Factor | Snowflake Tasks | Apache Airflow |
|---|---|---|
| Setup complexity | Zero (native SQL) | Requires infrastructure |
| Cross-system orchestration | Snowflake only | Any system (S3, Kafka, APIs, dbt, Spark) |
| DAG complexity | Linear / tree DAGs | Complex fan-out, dynamic DAGs, sensors |
| Monitoring UI | Task History in Snowflake | Full Airflow UI with logs, retries, SLAs |
| Cost | Warehouse credits + serverless | Compute for Airflow workers |
| Best for | Micro-batch Snowflake pipelines | Enterprise multi-system orchestration |
Real-World Scenario: Your CDC pipeline receives deletes from Qlik Replicate as soft-deletes (a flag). Every night you run a stored procedure that physically removes expired records, rebuilds affected aggregates, and logs the cleanup to an audit table — all as a single transactional unit.
JavaScript Stored Procedure (classic Snowflake approach)
CREATE OR REPLACE PROCEDURE raw.sp_purge_expired_records(retention_days FLOAT)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS
$$
try {
// Step 1: Archive records before deletion
var archive_sql = `
INSERT INTO audit.deleted_records_log
SELECT *, CURRENT_TIMESTAMP AS deleted_at
FROM staging.orders
WHERE is_deleted = TRUE
AND updated_at < DATEADD('day', -${RETENTION_DAYS}, CURRENT_DATE)`;
snowflake.execute({ sqlText: archive_sql });
// Step 2: Delete from main table
var delete_sql = `
DELETE FROM staging.orders
WHERE is_deleted = TRUE
AND updated_at < DATEADD('day', -${RETENTION_DAYS}, CURRENT_DATE)`;
var result = snowflake.execute({ sqlText: delete_sql });
// Step 3: Log the run
var rows_deleted = snowflake.execute({ sqlText: "SELECT ROW_COUNT()" })
.next() ? 0 : 0;
return `SUCCESS: purged expired records older than ${RETENTION_DAYS} days`;
} catch(err) {
return `ERROR: ${err.message}`;
}
$$;
-- Call it
CALL raw.sp_purge_expired_records(90);
Python Stored Procedure (modern approach — richer libraries)
CREATE OR REPLACE PROCEDURE analytics.sp_rebuild_daily_aggregates(run_date DATE)
RETURNS TABLE (summary_date DATE, rows_rebuilt INT, status VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python', 'pandas')
HANDLER = 'run'
EXECUTE AS CALLER
AS
$$
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, current_date, dateadd
def run(session: Session, run_date):
# Use Snowpark DataFrame API — no SQL strings, type-safe
orders = session.table('staging.orders')
# Filter to the date being rebuilt
daily = (orders
.filter(col('order_date') == run_date)
.filter(col('is_deleted') == False)
.group_by('order_date', 'region', 'product_category')
.agg({'net_revenue': 'sum', 'order_id': 'count'})
.rename({'sum(net_revenue)': 'total_revenue',
'count(order_id)': 'order_count'}))
# Delete old aggregate for this date (idempotent)
session.sql(f"DELETE FROM marts.fact_daily_sales WHERE sale_date = '{run_date}'").collect()
# Write new aggregate
daily.write.mode('append').save_as_table('marts.fact_daily_sales')
row_count = daily.count()
result_df = pd.DataFrame([{'summary_date': run_date,
'rows_rebuilt': row_count,
'status': 'SUCCESS'}])
return session.create_dataframe(result_df)
$$;
CALL analytics.sp_rebuild_daily_aggregates('2024-01-15');
Key Design Principles
- EXECUTE AS CALLER vs OWNER — CALLER uses the calling user's privileges (safer for DML); OWNER uses the SP owner's privileges (useful for granting limited elevated access)
- Idempotency — always design SPs to be safely re-runnable. Delete before insert, not update-or-insert
- Error handling — wrap in try/catch and return structured error messages; log failures to an audit table
- Use Snowpark Python for complex logic — it compiles DataFrame operations to Snowflake-native SQL, running inside the warehouse rather than pulling data out
- Call from Tasks — SPs pair naturally with Snowflake Tasks for event-driven or scheduled execution
What is a UDF?
A custom-built function that encapsulates logic reusable in SQL queries, similar to built-in functions.
Types of UDFs:
- Scalar UDF: Returns single value per input row
- Table UDF (UDTF): Returns multiple rows per input row
Scalar UDF Example:
CREATE FUNCTION calculate_net_sales(gross DECIMAL, returns DECIMAL)
RETURNS DECIMAL(10,2)
AS
$$
gross - COALESCE(returns, 0)
$$;
-- Use in query
SELECT order_id,
calculate_net_sales(gross_amount, return_amount) AS net_sales
FROM orders;
Languages Supported:
- SQL (simple transformations)
- JavaScript (logic-heavy operations)
- Python (via Snowpark)
- Java (enterprise use)
When to Use:
- Repetitive calculations across multiple queries
- Complex business logic
- Masking sensitive data
Real-World Scenario: Three data engineers work on the same dbt project. Without Git, two engineers overwrite each other's models on the same day. With Git, every change lives on its own branch, CI runs tests before merge, and the main branch always reflects production-verified code.
Core Git Workflow for Data Teams
# Daily workflow for a data engineer git checkout main && git pull origin main # always start fresh from main git checkout -b feat/add-revenue-mart # new branch per feature # ... edit models, macros, tests ... git add models/marts/fact_revenue.sql git commit -m "feat: add fact_revenue model with daily grain" git push origin feat/add-revenue-mart # push to remote # Open Pull Request → CI runs dbt Slim CI → reviewer approves → merge
Branch Strategy for dbt Projects
# Branch types and their purpose: # main → production. Auto-deployed to Snowflake ANALYTICS schema on merge # develop → integration. Multiple features merged here for joint testing # feat/* → feature branches. One per Jira ticket / logical change # fix/* → hotfix branches. Cut from main, merged back to main + develop # release/* → release candidates (optional for teams with staged rollouts) # Environment mapping: # main → ANALYTICS schema (production Snowflake data) # develop → ANALYTICS_DEV schema (team integration testing) # feat/* → ANALYTICS_CI_NNN schema (ephemeral, created per PR, auto-dropped)
Essential Git Commands for Data Engineers
# Undo a staged change before commit (safe) git restore --staged models/staging/stg_orders.sql # Amend last commit message (before pushing) git commit --amend -m "fix: correct revenue calculation in fact_orders" # Interactive rebase — clean up messy commits before PR git rebase -i HEAD~3 # squash last 3 commits into one clean commit # Cherry-pick a hotfix from another branch git cherry-pick abc1234 # apply specific commit hash to current branch # See what changed in a file across the last 5 commits git log --follow -p -- models/marts/fact_revenue.sql # Find when a bug was introduced (binary search) git bisect start git bisect bad HEAD # current commit is broken git bisect good v1.2.0 # last known good tag # Git checks out midpoint; you test + mark good/bad until it finds the commit
Git + dbt: Slim CI in Practice
# .github/workflows/dbt_ci.yml (runs on every PR)
# Only builds and tests models changed in this PR
# --defer means unchanged upstream models resolve from production
name: dbt Slim CI
on: [pull_request]
jobs:
slim_ci:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- run: pip install dbt-snowflake
- run: aws s3 cp s3://dbt-state/manifest.json ./prod-artifacts/
- run: |
dbt build --select state:modified+ --defer --state ./prod-artifacts --target ci
Git Best Practices for Data Teams
- One model per commit — small, focused PRs are easier to review and roll back
- Never commit secrets — use
.envfiles and add to.gitignore. Rotate any key accidentally committed immediately - Tag releases —
git tag v2.1.0marks the manifest used for production deploys, enabling exact rollback - Protect main branch — require PR reviews + passing CI before merge. Prevents direct pushes to production
- Conventional commits — prefix messages with
feat:,fix:,refactor:,docs:for automated changelogs
What is QUALIFY?
Unique Snowflake clause that filters window function results without subqueries or CTEs. Syntax sugar for cleaner SQL.
Problem QUALIFY Solves:
-- Without QUALIFY (needs CTE)
WITH ranked_employees AS (
SELECT employee_id, salary, department,
RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rnk
FROM employees
)
SELECT * FROM ranked_employees WHERE rnk <= 3;
-- With QUALIFY (simpler)
SELECT employee_id, salary, department
FROM employees
QUALIFY RANK() OVER (PARTITION BY department ORDER BY salary DESC) <= 3;
Real Examples:
-- Get top 3 earners per department SELECT employee_id, salary, department FROM employees QUALIFY RANK() OVER (PARTITION BY department ORDER BY salary DESC) <= 3; -- Get only increased salaries (previous < current) SELECT employee_id, salary, prev_salary FROM salary_history QUALIFY salary > LAG(salary) OVER (PARTITION BY employee_id ORDER BY year);
Benefits:
- Cleaner, more readable SQL
- No CTE nesting required
- Direct filtering on window functions
- Performance improvements in some cases
Why Snowflake is the Modern Data Warehouse Choice:
1. Separation of Compute & Storage (Most Important)
Scale independently. Run light queries on XS warehouse ($1-2/hour), heavy queries on 3XL ($16-32/hour). Pay only when warehouse runs. No unused capacity costs.
2. Elasticity & Concurrency
Instantly scale warehouses (XS to 3XL in seconds). Multiple teams work simultaneously: BI team on WH1, ETL team on WH2, Data Science on WH3 - zero contention.
3. Zero Management
Snowflake handles: patching, updates, hardware provisioning, backups, compression, encryption. No DBAs needed for infrastructure.
4. Semi-Structured Data (VARIANT Type)
Native JSON/Avro/Parquet support. Query nested data with dot notation without flattening.
5. Time Travel & Fail-safe
-- Query data 1-90 days ago SELECT * FROM my_table AT (OFFSET => -86400); -- Accidentally dropped table? UNDROP TABLE my_table;
6. Data Sharing (Game-Changer)
Live data sharing with other Snowflake accounts. No copying, instant access, secure.
7. Performance (Auto-Optimized)
- Automatic micro-partitioning with intelligent pruning
- Multi-layer caching (result cache + warehouse cache)
- Optional clustering keys for fine-tuning
8. Enterprise Security
- AES-256 encryption at rest and in transit
- Row-level and column-level masking
- SOC2, HIPAA, GDPR, PCI-DSS certified
Cost Comparison Example:
Traditional DW: Pay $100K/year for hardware (month 1-12). Snowflake: Pay $20K for actual compute, scale on demand.
↑ Back to topWhat is CDC?
A technique (not a tool) that identifies and captures INSERT/UPDATE/DELETE changes from source systems and delivers to target efficiently, instead of reprocessing entire datasets.
Why CDC Matters:
WITHOUT CDC: - Load entire 100GB table daily (2 hours) - Heavy impact on source system - Only 1GB changed, but process all 100GB WITH CDC: - Load only 1GB changed (5 minutes) - Minimal source system impact - Near real-time data availability
Common CDC Mechanisms:
1. Timestamp-Based (Polling)
SELECT * FROM source_table WHERE last_updated_timestamp > previous_check_time;
- Simple but unreliable (miss deletes, timestamp skew)
2. Log-Based (Most Robust)
Read database transaction logs (Oracle redo logs, MySQL binlog, SQL Server transaction log).
- Captures ALL changes in real-time
- No impact on source DB
- Handles deletes properly
3. Trigger-Based
Database triggers fire on DML, write to change table.
- Real-time but adds overhead to source
Popular CDC Tools: Fivetran, Qlik Replicate, Debezium, AWS DMS, Oracle GoldenGate
Use Cases: Real-time dashboards, microservices sync, database migration with zero downtime
↑ Back to topList [] - Mutable (Changeable)
my_list = [10, "apple", 3.14, [1, 2]] # Can contain mixed types
# Modify
my_list[0] = 20
my_list.append("banana")
my_list[1:3] = ["orange", "grape"]
# Operations
for item in my_list:
print(item)
my_list.sort()
my_list.reverse()
my_list.pop() # Remove and return last
Tuple () - Immutable (Fixed)
my_tuple = (10, "apple", 3.14, (1, 2)) # Can contain mixed types
# Cannot modify
# my_tuple[0] = 20 # TypeError!
# Single element tuple REQUIRES comma
single = (5,) # Correct tuple
wrong = (5) # This is just int!
# Unpacking
x, y, z = (1, 2, 3)
# Can use as dict key (lists cannot)
cache = {(0, 0): "origin", (1, 1): "diagonal"}
| Feature | List | Tuple |
|---|---|---|
| Mutability | Changeable | Fixed |
| Performance | Slower | Faster |
| Memory | More overhead | Less overhead |
| As Dict Key | No (unhashable) | Yes (if elements hashable) |
| Best Use | Dynamic data | Immutable, fixed data |
Three major columnar/serialization formats and when to use each:
AVRO - Row-Oriented with Self-Describing Schema
- Best for: Real-time streaming with Kafka, CDC platforms
- Schema Evolution: Excellent (reader/writer schemas)
- Compression: Moderate
- Limitation: Row-based, slow for OLAP queries
- Tools: Kafka, Debezium, Confluent
Parquet - Columnar, Analytics-Optimized
- Best for: Data lakes (S3, Azure), analytics queries
- Compression: Excellent (10-100x on repetitive columns)
- Query Speed: 10-100x faster for column subset queries
- Use Case: AWS S3, Snowflake, BigQuery, Spark native
- Popularity: Most used in modern data stacks
ORC - Optimized Row Columnar (Hadoop-Specific)
- Best for: Hive, Hadoop ecosystems
- Compression: Best (smaller than Parquet)
- Predicate Pushdown: Filtering during read, not after
- Limitation: Less ecosystem support outside Hadoop
| Feature | AVRO | Parquet | ORC |
|---|---|---|---|
| Type | Row-oriented | Columnar | Columnar |
| Compression | Moderate | Very Good | Best |
| Schema Evolution | Excellent | Good | Good |
| Best For | Streaming | Data lakes | Hive/Hadoop |
Recommendation: Use Parquet for data lakes and Snowflake. Use AVRO for streaming. ORC only if using Hadoop.
↑ Back to topCTE (WITH Clause) - Query Scope
WITH top_customers AS (
SELECT customer_id, SUM(amount) as total
FROM orders
GROUP BY customer_id
)
SELECT * FROM top_customers WHERE total > 1000;
Characteristics:
- Scope: Single query only
- Storage: Logical, not stored
- Reusability: Within same query
- Cost: None
- Materialization: Optimizer decides (often inlined)
Temporary Table - Session Scope
CREATE TEMPORARY TABLE top_customers AS SELECT customer_id, SUM(amount) as total FROM orders GROUP BY customer_id; -- Use in Query 1 SELECT * FROM top_customers WHERE total > 1000; -- Use in Query 2 (same session) INSERT INTO summary SELECT COUNT(*) FROM top_customers;
Characteristics:
- Scope: Entire session
- Storage: Physical table in database
- Reusability: Across multiple queries
- Cost: Counts toward storage
- Materialization: Always materialized (fast lookups)
- Cleanup: Auto-drops at session end
| Aspect | CTE | Temp Table |
|---|---|---|
| Scope | Single query | Entire session |
| Persistence | Logical | Physical |
| When to Use | Breaking down complex queries | Expensive calc reused many times |
Permanent Table (Default)
- Persistence: Until explicitly dropped
- Time Travel: 1-90 days
- Fail-safe: 7 days recovery
- Cost: Highest
- Use: Core analytics tables, historical data
Transient Table (For Staging)
- Time Travel: 0 days (configurable up to 1 day)
- Fail-safe: None
- Cost: ~50% lower than permanent
- Use: Daily staging tables, intermediate ETL results
Temporary Table (Session-Scoped)
- Persistence: Auto-drops at session end
- Time Travel: Session-bound
- Fail-safe: None
- Cost: Lowest
- Use: Ad-hoc analysis, session-specific work
| Type | Time Travel | Fail-safe | Cost | Use Case |
|---|---|---|---|---|
| Permanent | 1-90 days | 7 days | High | Analytics |
| Transient | 0-1 day | None | 50% lower | Staging |
| Temporary | Session | None | Lowest | Session work |
Seven strategies to handle source system schema changes gracefully:
1. Communication & Governance
- Establish change request process with cross-functional reviews
- Notify downstream teams before changes
- Document all schema changes in data catalog
2. Use VARIANT Columns
-- Semi-structured data resilient to schema changes
CREATE TABLE events (
event_id INT,
event_metadata VARIANT -- Flexible JSON, can handle additions
);
3. Additive Schema Changes Only
Add new columns (safe) instead of modifying/deleting (risky).
4. View Layer for Compatibility
-- Source table changes: email → primary_email
-- Create view maintaining backward compatibility
CREATE VIEW v_customers AS
SELECT
customer_id,
primary_email AS email -- Alias old name
FROM raw_customers;
5. Version Tables
-- Keep multiple versions raw_customers_v1 (old schema) raw_customers_v2 (new schema, new columns) -- Consumers migrate gradually to v2
6. Data Contracts
- Define expected schema formally using Avro or Protobuf
- Enforce via schema registry
7. Graceful Degradation
-- Handle missing columns safely
SELECT
id,
COALESCE(new_column, default_value) AS new_column
FROM raw_data;
↑ Back to top
Note: Ephemeral is a DBT concept, not a Snowflake feature
Ephemeral Models
{{ config(materialized='ephemeral') }}
SELECT
customer_id,
SUM(amount) as total_spent
FROM {{ ref('stg_orders') }}
GROUP BY customer_id
Characteristics:
- No physical object created (not a table or view)
- Compiled into downstream models as CTEs
- Not directly queryable
- Zero storage cost
- Perfect for reusable intermediate logic
When to Use Ephemeral:
- Simple cleaning (one-to-one mapping)
- Intermediate calculations used in multiple models
- When you don't need to query directly
Permanent Tables
- Full storage with time travel/fail-safe
- Queryable directly
- Best for final marts and dimensions
Two native approaches to implement Change Data Capture in Snowflake:
Method 1: Timestamp-Based CDC
CREATE TABLE staging (
id INT,
data VARCHAR,
last_modified TIMESTAMP
);
MERGE INTO target_table AS t
USING (
SELECT * FROM staging
WHERE last_modified > (SELECT COALESCE(MAX(last_modified), '1900-01-01') FROM target_table)
) AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.data = s.data
WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.data);
Issues: Misses deletes, unreliable if timestamps aren't consistent
Method 2: Snowflake Streams (Recommended)
-- Create stream on source table CREATE STREAM customer_stream ON TABLE raw_customers; -- Query stream to see changes SELECT * FROM customer_stream; -- Columns: -- - METADATA$ACTION: 'INSERT' or 'DELETE' -- - METADATA$ISUPDATE: TRUE if part of UPDATE -- - METADATA$ROW_ID: Unique identifier -- Consume changes atomically MERGE INTO dim_customers AS t USING customer_stream AS s ON t.customer_id = s.customer_id WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN DELETE WHEN MATCHED AND s.METADATA$ISUPDATE = TRUE THEN UPDATE ... WHEN NOT MATCHED AND s.METADATA$ACTION = 'INSERT' THEN INSERT ...;
Benefits of Streams:
- Exactly-once processing (no duplicates)
- Captures all DML operations
- Low performance impact
- Automatic cleanup of consumed records
What is DBT?
An open-source framework for transforming data in your warehouse using SQL and software engineering best practices. It enables analysts to write, test, and document SQL transformations with version control.
Problems DBT Solves:
- Code Proliferation: Centralizes transformation logic (instead of scattered SQL scripts, BI tool calculations)
- Lack of Testing: Provides built-in data quality testing framework
- Poor Documentation: Auto-generates documentation from YAML definitions
- Dependency Hell: Automatically determines correct execution order via DAG
- Version Control: Integrates with Git for audit trails and collaboration
Core Workflow:
dbt run → Execute models (create tables/views) dbt test → Run data quality tests dbt docs → Generate auto documentation dbt debug → Diagnose issues
Typical Project Structure:
my_dbt_project/ ├── models/ │ ├── staging/ # Light transformations (one-to-one mappings) │ ├── intermediate/ # Complex business logic │ └── marts/ # Final user-facing tables ├── tests/ # Data quality tests ├── seeds/ # Static reference data (CSVs) ├── snapshots/ # SCD Type 2 tracking ├── macros/ # Reusable SQL functions └── dbt_project.yml # Configuration
Key Concepts:
- Models: SQL SELECT statements → views or tables
- ref(): References other models safely (no hardcoded schema names)
- Jinja: Templating for dynamic SQL
- Materializations: View (logical), Table (physical), Incremental (append-only), Ephemeral (CTE)
- Tests: not_null, unique, relationships, accepted_values, or custom
Continuous loading: Data flows as soon as available (not batch windows)
Why It's Critical:
- Near Real-time: Data available within minutes, not 12-24 hour batches
- Reduced Latency: Dashboards and decisions based on fresh data
- Better Resource Utilization: Spread compute throughout day instead of one big batch
- Data Quality Feedback: Issues detected quickly, not days later
- Eliminates Batch Window Constraints: No more "end of day" cutoffs
Real Use Cases:
- E-commerce: Real-time inventory → prevents overselling
- Fraud Detection: Transaction analysis in seconds, not after fact
- IoT/Sensors: Predictive maintenance on live machine data
- Clickstream: Personalize user experience instantly
- Financial: Risk analysis on latest market data
- Supply Chain: Real-time tracking and optimization
Continuous vs Batch Trade-offs:
| Aspect | Batch (Daily) | Continuous |
|---|---|---|
| Latency | 12-24 hours | Minutes |
| Implementation | Simple | Complex |
| Cost | Lower (one big job) | Higher (constant load) |
| Use Case | Historical analysis | Operational decisions |
Real-World Scenario: A B2B SaaS company needs a pipeline that ingests from 4 sources (Oracle ERP, Salesforce, Stripe, S3 event files), transforms into clean analytics models, and serves 3 BI tools with different freshness SLAs — finance needs T+1, operations needs near-real-time, executives need daily snapshots.
Full Architecture
SOURCES
Oracle ERP (orders, inventory) ──┐
Salesforce CRM (accounts, opps) ──┤ Qlik Replicate (CDC, log-based)
PostgreSQL App DB (events, users) ──┘
Stripe API (payments, invoices) ──── Fivetran (API connector)
S3 file drops (logistics partner) ──── Snowpipe AUTO_INGEST
│
┌───────────▼───────────┐
│ Snowflake RAW │ Schema per source
│ (VARIANT + typed) │ _loaded_at, _file_name
└───────────┬───────────┘
│ Streams + Tasks (micro-batch, 5 min)
┌───────────▼───────────┐
│ Snowflake STAGING │ dbt stg_* views
│ │ Rename, cast, dedupe
└───────────┬───────────┘
│ dbt hourly incremental models
┌───────────▼───────────┐
│ INTERMEDIATE │ dbt int_* (ephemeral/view)
│ │ Joins, enrichment
└───────────┬───────────┘
│ dbt daily table / incremental
┌───────────▼───────────┐
│ MARTS │ fact_*, dim_* tables
│ Finance / Ops / Exec │ Clustered, governed
└───────────┬───────────┘
│
┌──────────────────────────┼────────────────────────┐
Tableau Power BI (Executive) Looker
(Ops - near-RT) (Finance - T+1) (Self-service)
Ingestion Layer Configuration
-- Snowflake Streams trigger micro-batch merges every 5 minutes
-- Only fires when stream has unconsumed data (zero idle cost)
CREATE TASK pipeline.merge_orders
WAREHOUSE = INGEST_WH
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('raw.oracle_orders_stream')
AS CALL pipeline.sp_merge_orders();
-- Snowpipe for S3 file drops (sub-minute latency)
CREATE PIPE raw.logistics_pipe AUTO_INGEST = TRUE AS
COPY INTO raw.logistics_events FROM @raw.logistics_stage;
Transformation Layer — dbt Model Tags & Scheduling
-- dbt_project.yml — tag models by run frequency
models:
my_project:
staging:
+tags: ['hourly'] # rebuild every hour (views, cheap)
+materialized: view
intermediate:
+tags: ['hourly']
+materialized: ephemeral
marts:
finance:
+tags: ['daily'] # heavy models run nightly
+materialized: table
operations:
+tags: ['hourly'] # ops needs freshness
+materialized: incremental
-- dbt Cloud: separate jobs per frequency
-- Job "Hourly": dbt run --select tag:hourly (runs :00 every hour)
-- Job "Daily": dbt run --select tag:daily (runs 02:00 UTC nightly)
SLA Matrix by Consumer
| Consumer | Source | Freshness SLA | Strategy |
|---|---|---|---|
| Operations dashboard | Orders, Inventory | < 15 min | Streams + Tasks + hourly dbt |
| Finance reports | Stripe, Oracle | T+1 by 07:00 | Daily dbt + Fivetran nightly sync |
| Executive KPI | All sources | Daily snapshot | dbt snapshot + daily table rebuild |
| Self-service analysts | Marts | Daily | Read-only ANALYST_ROLE on marts |
Real-World Example: E-commerce Inventory System
Problem Without CDC (Nightly Full Load):
- Inventory is 12-24 hours old in dashboards
- Customers order items marked as in-stock (actually sold out)
- Returns processed, but system doesn't know until next load
- Warehouse team can't make real-time restocking decisions
- Server load spikes during night batch
Solution With CDC (Streaming Changes):
Inventory DB → CDC → Kafka → Snowflake (streaming)
↓
Real-time dashboard
(Website always knows actual stock)
↓
Automated reorder alerts
↓
Efficient warehouse operations
Results:
- Stock levels updated within seconds
- Customer experience: "Out of Stock" shown immediately
- Warehouse: Optimal restocking based on live data
- Server load: Distributed throughout day (no midnight spikes)
- ROI: Prevented overselling, reduced returns
Business Impact: 15-20% reduction in overselling, better inventory turnover, happier customers
↑ Back to topSnowflake Tasks provide native orchestration with a DAG model.
Troubleshooting Failed Task Queries:
1. Find the Problem
SELECT
task_id, task_name, scheduled_time, query_id,
state, error_message, query_text
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE state = 'FAILED'
ORDER BY scheduled_time DESC;
2. Analyze Query Execution
- Click query_id to see execution plan
- Check for spilling, join issues, scan efficiency
Common Issues & Solutions:
- Warehouse Suspended: Task runs but waits for warehouse to resume (costs time!)
- Insufficient Memory: Increase warehouse size
- Table Locks: Previous task still running, new task blocked
- Missing Data: Upstream task failed silently
Performance Optimization:
- Right-size warehouse (xsmall for simple loads, large for complex transforms)
- Stagger task timing to avoid concurrent runs on same warehouse
- Monitor TASK_HISTORY for patterns
- Set SUSPEND_TASK_AFTER_NUM_FAILURES to prevent cascading failures
Real-World Scenario: Your production Snowflake environment has 2TB of data. A developer needs to test a dbt refactor that touches 40 models. Provisioning a traditional dev environment would cost thousands in storage and take hours of data copying. With Zero Copy Clone, you get a full production-sized dev environment in seconds — at near-zero storage cost.
How Zero Copy Clone Works
-- Clone points to the SAME micro-partitions as source
-- Only NEW writes to the clone create new partitions (copy-on-write)
-- Storage cost = only the delta between clone and source
CREATE DATABASE analytics_dev CLONE analytics_prod; -- seconds, not hours
CREATE SCHEMA staging_dev CLONE analytics_prod.staging;
CREATE TABLE orders_test CLONE analytics_prod.marts.fact_orders;
-- Snapshot at a specific point in time (Time Travel clone)
CREATE TABLE orders_before_migration
CLONE analytics_prod.marts.fact_orders
BEFORE (TIMESTAMP => '2024-01-15 08:00:00'::TIMESTAMP);
-- Clone a schema at end of day for audit/compliance
CREATE SCHEMA audit.daily_snapshot_20240115
CLONE analytics_prod.marts;
Dev/Test Workflow with Clones
-- Create a full dev environment per developer (or per PR)
-- Script run in CI on PR open:
CREATE DATABASE analytics_pr_142
CLONE analytics_prod;
-- Developer works against analytics_pr_142 — full prod data, zero copy cost
-- dbt profiles.yml uses env var to point to the cloned database:
-- schema: "analytics_pr_{{ env_var('PR_NUMBER', 'dev') }}"
-- After PR merges / closes, drop the clone (CI cleanup step)
DROP DATABASE analytics_pr_142;
Data Migration Safety Net
-- Before running a risky migration, clone the target
-- If anything goes wrong, swap back instantly
CREATE TABLE marts.fact_orders_backup
CLONE marts.fact_orders;
-- Run the migration
ALTER TABLE marts.fact_orders
ADD COLUMN lifetime_value DECIMAL(12,2);
UPDATE marts.fact_orders fo
SET lifetime_value = (
SELECT SUM(net_revenue)
FROM marts.fact_orders x
WHERE x.customer_sk = fo.customer_sk
);
-- Validate. If wrong:
DROP TABLE marts.fact_orders;
ALTER TABLE marts.fact_orders_backup RENAME TO marts.fact_orders;
-- Instant rollback — no data movement
Cost Mechanics
| Scenario | Traditional Copy | Zero Copy Clone |
|---|---|---|
| 2TB production clone | ~$46/month storage + copy time | ~$0 storage until writes diverge |
| 10 developer environments | 10x storage cost | Near-zero until each dev writes data |
| CI per PR (200 PRs/month) | Impractical | Feasible — clone + drop per PR |
- Clone inherits Time Travel and Fail-Safe settings from the source object
- Clones are fully independent — DML on the clone does NOT affect the source
- You can clone databases, schemas, tables, streams, and stages
- Clones do NOT inherit future grants — you must re-grant access on the cloned object
Snowflake has three timestamp types. Convert between timezones easily.
Method 1: CONVERT_TIMEZONE (Recommended)
-- Convert UTC to EST
SELECT CONVERT_TIMEZONE('UTC', 'America/New_York', '2025-07-04 10:00:00'::TIMESTAMP_NTZ);
-- Result: 2025-07-04 06:00:00
-- With column
SELECT
event_id,
event_timestamp,
CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', event_timestamp) AS time_pacific
FROM events;
Method 2: AT TIME ZONE
SELECT '2025-07-04 10:00:00'::TIMESTAMP_NTZ AT TIME ZONE 'America/New_York';
Common Timezones:
- UTC
- America/New_York (EST/EDT)
- America/Los_Angeles (PST/PDT)
- Europe/London
- Asia/Kolkata (IST)
Timestamp Types:
- TIMESTAMP_LTZ: Stores UTC, displays in session timezone
- TIMESTAMP_NTZ: No timezone, same everywhere
- TIMESTAMP_TZ: With timezone offset
Clustering Keys optimize data layout for faster queries on specific columns.
How They Work:
- Snowflake's automatic clustering service re-organizes micro-partitions
- Groups similar values together physically
- Improves query pruning and reduces data scanned
Syntax:
-- Define during creation
CREATE TABLE orders (
order_date DATE,
customer_id INT,
amount DECIMAL
) CLUSTER BY (order_date, customer_id);
-- Add after creation
ALTER TABLE orders CLUSTER BY (order_date);
-- Remove clustering
ALTER TABLE orders DROP CLUSTERING KEY;
When to Use:
- Very large tables (100GB+)
- Frequent filtering on specific columns (e.g., date, region)
- High cardinality columns with skewed distribution
Cost Consideration:
Automatic clustering consumes credits to reorganize data. Costs vs benefits of reduced query time.
↑ Back to topThree Layers of Monitoring:
1. Snowflake Web UI
- Query History: View all queries, duration, credits, status
- Query Profile: Execution plan, bottleneck operators
- Warehouses: Monitor status, current/historical usage
2. Account Usage Views (Query Historically)
-- Top 10 most expensive queries
SELECT query_id, user_name, warehouse_name, total_elapsed_time, credits_used
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= CURRENT_DATE - 7
ORDER BY credits_used DESC
LIMIT 10;
-- Daily warehouse costs
SELECT warehouse_name, DATE(METERING_START_TIME) as date,
SUM(credits_used) as daily_credits
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
GROUP BY warehouse_name, date
ORDER BY daily_credits DESC;
Key Views:
- QUERY_HISTORY: All query execution details
- WAREHOUSE_METERING_HISTORY: Daily warehouse usage
- AUTOMATIC_CLUSTERING_HISTORY: Clustering costs
- PIPE_USAGE_HISTORY: Snowpipe credit consumption
- TASK_HISTORY: Task execution details
3. External Tools
- Datadog, New Relic: APM and monitoring
- CloudWatch, Azure Monitor: Cloud-native dashboards
- Custom BI dashboards: Pull from ACCOUNT_USAGE
Both are data governance platforms with different strengths.
Collibra - Business-Centric Governance
- Strength: Business Glossary, stewardship, workflow automation
- Focus: Business users, governance programs
- Best for: Strong business alignment, change management
- Unique: Powerful workflow engine for governance processes
Ataccama ONE - Data Quality & MDM Focus
- Strength: Data quality, Master Data Management, ML/AI automation
- Focus: Data engineers, quality teams
- Best for: Data quality frameworks, MDM solutions
- Unique: Strong data quality and deduplication engines
| Aspect | Collibra | Ataccama |
|---|---|---|
| Core Focus | Governance, Stewardship | Quality, MDM |
| Catalog Type | Business Glossary First | Technical First |
| Primary Users | Business, Stewards | Engineers, QA |
| Automation | Workflow/Process | ML/ML Quality Rules |
Choice: Use Collibra if governance/stewardship is priority. Use Ataccama if data quality is critical.
↑ Back to topSnowpipe loads data as soon as files appear in cloud storage (AWS S3/Azure/GCS).
Architecture: S3 → SNS → SQS → Snowpipe → COPY INTO
Key Steps:
1. Create Storage Integration
CREATE STORAGE INTEGRATION s3_pipe_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::ACCOUNT:role/SnowflakeRole'
STORAGE_ALLOWED_LOCATIONS = ('s3://bucket/path/');
2. Create External Stage
CREATE STAGE raw_stage STORAGE_INTEGRATION = s3_pipe_int URL = 's3://bucket/data/' FILE_FORMAT = (TYPE = PARQUET);
3. Create Pipe
CREATE PIPE raw_data_pipe AUTO_INGEST = TRUE AWS_SQS_QUEUE_ARN = 'arn:aws:sqs:region:account:queue' AS COPY INTO raw_data FROM @raw_stage;
4. Optional: Post-Load Transformations
CREATE STREAM raw_stream ON TABLE raw_data;
CREATE TASK process_new_data
WHEN SYSTEM$STREAM_HAS_DATA('raw_stream')
AS
MERGE INTO fact_data AS t
USING raw_stream AS s ON t.id = s.id
WHEN NOT MATCHED THEN INSERT ...;
Benefits: Automatic, serverless, minimal configuration
↑ Back to topReal-World Scenario: Your analytics team needs: (1) rank customers by LTV within each region, (2) calculate running revenue totals per day, (3) find the previous order date for each customer, (4) flag the top 3 products per category for a promotional report. All solvable with window functions — no self-joins needed.
Ranking Functions — RANK vs DENSE_RANK vs ROW_NUMBER
-- Find the 4th highest salary per department (classic interview question)
SELECT employee_id, employee_name, department, salary
FROM employees
QUALIFY DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) = 4;
-- ROW_NUMBER → always unique (1,2,3,4,5) — use for deduplication
-- RANK → gaps on ties (1,2,2,4,5) — use for competition ranking
-- DENSE_RANK → no gaps (1,2,2,3,4) — use for nth highest value
-- PERCENT_RANK → 0.0–1.0 percentile position
-- NTILE(4) → quartile buckets (1,2,3,4)
-- Deduplicate: keep latest record per customer (ROW_NUMBER pattern)
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY customer_id
ORDER BY updated_at DESC) AS rn
FROM staging.customers
)
WHERE rn = 1; -- or use QUALIFY rn = 1
Aggregate Window Functions — Running Totals & Moving Averages
-- Running revenue total (cumulative sum resets per region per month)
SELECT
order_date,
region,
daily_revenue,
SUM(daily_revenue) OVER (
PARTITION BY region, DATE_TRUNC('month', order_date)
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_monthly_revenue,
-- 7-day moving average
AVG(daily_revenue) OVER (
PARTITION BY region
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS revenue_7d_moving_avg,
-- Running count of orders
COUNT(order_id) OVER (
PARTITION BY region
ORDER BY order_date
ROWS UNBOUNDED PRECEDING
) AS cumulative_order_count
FROM fact_daily_sales;
LAG & LEAD — Compare Rows to Previous / Next
-- Customer order gap analysis: how many days since last order?
SELECT
customer_id,
order_date,
LAG(order_date) OVER (PARTITION BY customer_id ORDER BY order_date)
AS previous_order_date,
DATEDIFF('day',
LAG(order_date) OVER (PARTITION BY customer_id ORDER BY order_date),
order_date
) AS days_since_last_order,
-- Flag customers who haven't ordered in 90+ days (churn signal)
IFF(DATEDIFF('day',
LAG(order_date) OVER (PARTITION BY customer_id ORDER BY order_date),
order_date) > 90, TRUE, FALSE
) AS is_returning_after_gap
FROM fact_orders
ORDER BY customer_id, order_date;
FIRST_VALUE / LAST_VALUE — Carry First/Last Values Across Partition
-- Attach the first and most recent order date to every order row
SELECT
customer_id,
order_date,
order_id,
net_revenue,
FIRST_VALUE(order_date) OVER (
PARTITION BY customer_id ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS first_order_date,
LAST_VALUE(order_date) OVER (
PARTITION BY customer_id ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) AS latest_order_date,
DATEDIFF('day', first_order_date, latest_order_date) AS customer_lifespan_days
FROM fact_orders;
RATIO_TO_REPORT — Percentage of Total (Snowflake-specific)
-- What % of region total revenue does each product contribute?
SELECT
region,
product_category,
SUM(net_revenue) AS category_revenue,
RATIO_TO_REPORT(SUM(net_revenue)) OVER (PARTITION BY region) * 100
AS pct_of_region_revenue
FROM fact_orders
GROUP BY region, product_category
ORDER BY region, pct_of_region_revenue DESC;
Window Frame Cheatsheet
| Frame Clause | Includes | Common Use |
|---|---|---|
ROWS UNBOUNDED PRECEDING | All rows from partition start to current | Running total |
ROWS 6 PRECEDING AND CURRENT ROW | Last 7 rows | 7-day moving average |
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | Entire partition | FIRST_VALUE / LAST_VALUE over whole group |
RANGE BETWEEN INTERVAL '7 DAYS' PRECEDING AND CURRENT ROW | Rows within 7 calendar days | Rolling 7-day window on time series |
Simple SQL to swap Male ↔ Female values:
UPDATE users
SET gender = CASE
WHEN gender = 'Male' THEN 'Female'
WHEN gender = 'Female' THEN 'Male'
ELSE gender -- Keep other values (null, other, unknown)
END
WHERE gender IN ('Male', 'Female');
Explanation:
- CASE: Evaluates current gender value
- WHEN gender = 'Male': Changes to Female
- WHEN gender = 'Female': Changes to Male
- ELSE gender: Keeps any other values unchanged
- WHERE clause: Only updates relevant rows (optimization)
Alternative (Using IIF):
UPDATE users
SET gender = IIF(gender = 'Male', 'Female', IIF(gender = 'Female', 'Male', gender))
WHERE gender IN ('Male', 'Female');
↑ Back to top
Real-World Scenario: A 200-model dbt project with no naming conventions causes every new hire to ask "where does this model live?" and every PR to touch 10 unrelated files. A well-structured project makes every model's purpose, owner, and position in the DAG immediately obvious.
Standard Project Layout
my_dbt_project/ ├── dbt_project.yml # Core config, model materializations, vars ├── profiles.yml # Connection config (usually in ~/.dbt/) ├── packages.yml # dbt-utils, dbt-expectations, etc. │ ├── models/ │ ├── staging/ # stg_* — one model per source table │ │ ├── salesforce/ # sub-folder per source system │ │ │ ├── stg_sf_accounts.sql │ │ │ ├── stg_sf_opportunities.sql │ │ │ └── schema.yml # sources + column-level docs + tests │ │ ├── oracle_erp/ │ │ │ ├── stg_orders.sql │ │ │ └── schema.yml │ │ └── stripe/ │ │ ├── stg_payments.sql │ │ └── schema.yml │ │ │ ├── intermediate/ # int_* — joins, business logic │ │ ├── int_orders_enriched.sql # order + customer + product joined │ │ ├── int_revenue_by_region.sql │ │ └── schema.yml │ │ │ └── marts/ # fact_* dim_* — BI-ready, governed │ ├── finance/ │ │ ├── fact_revenue.sql │ │ ├── fact_invoices.sql │ │ └── schema.yml │ ├── sales/ │ │ ├── fact_orders.sql │ │ ├── dim_customers.sql │ │ └── schema.yml │ └── operations/ │ ├── fact_inventory.sql │ └── schema.yml │ ├── macros/ # Reusable Jinja: generate_surrogate_key, etc. ├── snapshots/ # SCD Type 2: snap_customers, snap_products ├── seeds/ # dim_date.csv, region_codes.csv ├── tests/ # Singular tests: reconciliation SQL ├── analyses/ # Exploratory SQL (not deployed) ├── models/exposures.yml # Documents BI tools consuming dbt models └── .github/workflows/dbt_ci.yml # Slim CI config
Layer Responsibilities — The Contract
| Layer | Prefix | Materialization | Rule |
|---|---|---|---|
| Staging | stg_ | view | 1:1 with source. Rename, cast, light clean. NO joins. NO business logic. |
| Intermediate | int_ | ephemeral / view | Joins between staging models. Business logic. NOT directly queried by BI. |
| Marts | fact_ / dim_ | table / incremental | BI-ready. Governed. Documented. Wide tables acceptable. Conformed dims. |
dbt_project.yml — Materializations & Tags by Layer
name: 'my_dbt_project'
version: '1.0.0'
vars:
start_date: '2020-01-01'
data_environment: "{{ env_var('DBT_TARGET', 'dev') }}"
models:
my_dbt_project:
staging:
+materialized: view
+tags: ['hourly']
+schema: staging # → ANALYTICS_DEV.STAGING in dev
intermediate:
+materialized: ephemeral # inlined as CTE, zero storage cost
+tags: ['hourly']
marts:
+materialized: table
+tags: ['daily']
+schema: marts
+post_hook: "GRANT SELECT ON {{ this }} TO ROLE ANALYST_ROLE"
finance:
+tags: ['daily', 'finance']
operations:
+materialized: incremental # ops models are large — use incremental
+tags: ['hourly', 'operations']
Naming Conventions (enforce in PR review)
stg_<source>_<entity>— e.g.stg_oracle_orders,stg_sf_accountsint_<verb>_<noun>— e.g.int_orders_enriched,int_revenue_attributedfact_<event_or_process>— e.g.fact_orders,fact_daily_revenuedim_<entity>— e.g.dim_customers,dim_products,dim_datesnap_<entity>— e.g.snap_customers(SCD2 history)
Real-World Scenario: Qlik Replicate delivers CDC events into Snowflake — INSERTs, UPDATEs, and DELETEs mixed together. A naive INSERT OVERWRITE would blow away the table. You need MERGE to apply each operation correctly, handle duplicates from the CDC stream, and close old SCD2 records atomically.
Basic MERGE Pattern
-- Classic upsert: update if exists, insert if new, delete if flagged
MERGE INTO staging.customers AS tgt
USING (
-- Deduplicate CDC stream: take latest version per key
SELECT * FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY customer_id
ORDER BY header__change_seq DESC) AS rn
FROM raw.customer_stream
WHERE header__change_oper != 'B' -- exclude before-images
) WHERE rn = 1
) AS src
ON tgt.customer_id = src.customer_id
WHEN MATCHED AND src.header__change_oper = 'D'
THEN DELETE
WHEN MATCHED AND src.updated_at > tgt.updated_at
THEN UPDATE SET
tgt.email = src.email,
tgt.tier = src.tier,
tgt.updated_at = src.updated_at,
tgt._updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED AND src.header__change_oper != 'D'
THEN INSERT (customer_id, email, tier, updated_at, _loaded_at)
VALUES (src.customer_id, src.email, src.tier,
src.updated_at, CURRENT_TIMESTAMP);
SCD Type 2 MERGE — Close Old Records & Open New Ones
-- When a customer's tier changes, close the old record and open a new one
-- This is what dbt snapshots automate — but useful to know the raw SQL
MERGE INTO marts.dim_customers AS tgt
USING (
-- Source: latest version of each customer
SELECT customer_id, email, tier, updated_at
FROM staging.customers
WHERE updated_at > (SELECT MAX(valid_from) FROM marts.dim_customers)
) AS src
ON tgt.customer_id = src.customer_id
AND tgt.is_current = TRUE -- only match against current records
-- Row exists and tier changed → close current record
WHEN MATCHED AND tgt.tier != src.tier
THEN UPDATE SET
tgt.valid_to = CURRENT_DATE - 1,
tgt.is_current = FALSE;
-- Note: After the MERGE, INSERT new open records for changed customers
-- (Snowflake MERGE doesn't support WHEN MATCHED THEN INSERT in same statement)
INSERT INTO marts.dim_customers (customer_sk, customer_id, email, tier,
valid_from, valid_to, is_current)
SELECT
MD5(src.customer_id || 'current') AS customer_sk,
src.customer_id, src.email, src.tier,
CURRENT_DATE AS valid_from,
NULL AS valid_to,
TRUE AS is_current
FROM staging.customers src
WHERE EXISTS (
SELECT 1 FROM marts.dim_customers old
WHERE old.customer_id = src.customer_id
AND old.is_current = FALSE
AND old.valid_to = CURRENT_DATE - 1 -- just closed by the MERGE above
);
Idempotent MERGE — Safe to Re-Run
-- A MERGE is idempotent if running it twice produces the same result
-- Key: include the updated_at guard in WHEN MATCHED
-- BAD (not idempotent): running twice creates duplicate timestamps
WHEN MATCHED THEN UPDATE SET tgt.processed_at = CURRENT_TIMESTAMP;
-- GOOD (idempotent): only update if source is actually newer
WHEN MATCHED AND src.updated_at > tgt.updated_at
THEN UPDATE SET
tgt.status = src.status,
tgt.updated_at = src.updated_at;
-- Running this 100 times produces same result as running once
MERGE vs INSERT OVERWRITE vs dbt Strategies
| Approach | Best For | Handles Deletes | Idempotent |
|---|---|---|---|
| MERGE (upsert) | Mutable records, CDC | Yes (WHEN MATCHED DELETE) | Yes (with updated_at guard) |
| DELETE + INSERT | Date partitions, aggregates | Yes (DELETE first) | Yes |
| INSERT OVERWRITE | Full partition replace | Yes (replace whole partition) | Yes |
| INSERT (append) | Immutable event logs | No | No (duplicates on re-run) |
Data Masking hides sensitive data while maintaining utility for dev/test/analytics.
Snowflake Masking Policy (Dynamic, at Query Time):
-- Create masking policy
CREATE MASKING POLICY email_mask AS (val VARCHAR) RETURNS VARCHAR ->
CASE
WHEN CURRENT_ROLE() IN ('ANALYST_ROLE') THEN '****'
WHEN CURRENT_ROLE() = 'DATA_OWNER_ROLE' THEN val
ELSE 'MASKED'
END;
-- Apply to column
ALTER TABLE customers ALTER COLUMN email SET MASKING POLICY email_mask;
How It Works:
- Underlying data NOT modified
- Masking applied at query time based on user role
- Different users see different data
- DATA_OWNER sees real emails, ANALYST sees masked
Common Masking Techniques:
- Substitution: Replace with fixed character (****)
- Redaction: Hide completely
- Truncation: Show only partial (first 2 chars of email)
- Hashing: Replace with hash
- Encryption: Reversible transformation
Two fundamental cryptographic concepts with different purposes:
Hashing - One-Way Function
- Purpose: Integrity verification, password storage
- Reversible: No - cannot reverse hash to get original
- Deterministic: Same input always → same hash output
- Examples: MD5 (32 chars), SHA256 (64 chars)
- Use: Check if file modified, store passwords safely
Encryption - Two-Way Function
- Purpose: Confidentiality, protect sensitive data
- Reversible: Yes - decrypt with correct key to get original
- Key-Dependent: Requires encryption and decryption key
- Examples: AES-256, RSA
- Use: Protect credit cards, SSN, medical records
| Feature | Hashing | Encryption |
|---|---|---|
| Reversible | No, one-way | Yes, two-way |
| Key Needed | No | Yes |
| Output Size | Fixed | Variable |
| Purpose | Integrity | Confidentiality |
Secure, credential-less access to AWS S3 from Snowflake using IAM roles.
Step 1: AWS Setup - Create IAM Policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:ListBucket"
],
"Resource": "arn:aws:s3:::my-bucket/*"
}
]
}
Step 2: Create Snowflake Storage Integration
CREATE STORAGE INTEGRATION s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789012:role/SnowflakeRole'
STORAGE_ALLOWED_LOCATIONS = ('s3://my-bucket/data/');
Step 3: Get Snowflake's IAM User Details
DESCRIBE STORAGE INTEGRATION s3_int -- Returns: STORAGE_AWS_IAM_USER_ARN, STORAGE_AWS_EXTERNAL_ID
Step 4: Update AWS IAM Role Trust Relationship
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::SNOWFLAKE-ACCOUNT:user/SNOWFLAKE-USER"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "EXTERNAL-ID-FROM-STEP3"
}
}
}
]
}
Step 5: Create External Stage
CREATE STAGE s3_stage STORAGE_INTEGRATION = s3_int URL = 's3://my-bucket/data/' FILE_FORMAT = (TYPE = PARQUET);
Benefits: Secure (no exposed AWS keys), centralized control, easy to audit
↑ Back to topReal-World Scenario: Your data team ingests Stripe payments, Salesforce CRM records and warehouse inventory into Snowflake. You use DBT to transform raw data into trusted analytics tables. One bad deploy introduced NULL order IDs in production — custom tests would have caught it before it reached the BI layer.
1. Models — SQL Transformations as Code
-- models/staging/stg_orders.sql
-- Staging layer: thin cleaning only, no business logic
{{ config(materialized='view') }}
SELECT
order_id::INT AS order_id,
customer_id::INT AS customer_id,
UPPER(TRIM(status)) AS status,
amount::DECIMAL(10,2) AS amount,
order_date::DATE AS order_date,
_loaded_at AS ingested_at
FROM {{ source('stripe', 'raw_orders') }}
WHERE order_id IS NOT NULL
-- models/marts/fact_daily_revenue.sql
-- Mart layer: business logic, joined, aggregated
{{ config(materialized='table') }}
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}
WHERE status = 'COMPLETED'
),
customers AS (
SELECT * FROM {{ ref('stg_customers') }}
)
SELECT
o.order_date,
c.region,
c.customer_segment,
COUNT(o.order_id) AS total_orders,
SUM(o.amount) AS total_revenue,
AVG(o.amount) AS avg_order_value
FROM orders o
JOIN customers c USING (customer_id)
GROUP BY 1, 2, 3
2. Built-in Schema Tests
# models/staging/schema.yml
version: 2
models:
- name: stg_orders
description: "Cleaned Stripe orders from raw layer"
columns:
- name: order_id
description: "Primary key — must be unique and present"
tests:
- not_null
- unique
- name: status
tests:
- accepted_values:
values: ['COMPLETED', 'PENDING', 'CANCELLED', 'REFUNDED']
- name: customer_id
tests:
- not_null
- relationships:
to: ref('stg_customers')
field: customer_id
- name: amount
tests:
- not_null
3. Custom Singular Tests — SQL That Returns Failures
A singular test is a plain .sql file in tests/. DBT runs it and fails if any rows are returned. Think of it as: "show me the violations."
-- tests/assert_order_amount_positive.sql
-- Fails if any completed order has a zero or negative amount
SELECT
order_id,
amount,
status
FROM {{ ref('stg_orders') }}
WHERE status = 'COMPLETED'
AND amount <= 0
-- tests/assert_no_future_orders.sql
-- Fails if any order has a date beyond today (data quality anomaly)
SELECT
order_id,
order_date
FROM {{ ref('stg_orders') }}
WHERE order_date > CURRENT_DATE
-- tests/assert_revenue_matches_stripe.sql
-- Cross-source reconciliation: DBT total vs Stripe dashboard total
-- Fails if the variance exceeds 0.1%
WITH dbt_total AS (
SELECT SUM(amount) AS total FROM {{ ref('stg_orders') }}
WHERE status = 'COMPLETED'
AND order_date = CURRENT_DATE - 1
),
stripe_control AS (
-- Loaded from a separate control/audit table
SELECT expected_revenue AS total
FROM {{ ref('stripe_daily_control') }}
WHERE report_date = CURRENT_DATE - 1
)
SELECT
d.total AS dbt_total,
s.total AS stripe_total,
ABS(d.total - s.total) / NULLIF(s.total,0) AS pct_variance
FROM dbt_total d, stripe_control s
WHERE ABS(d.total - s.total) / NULLIF(s.total,0) > 0.001
4. Custom Generic Tests — Reusable Macros
Generic tests live in macros/ and can be applied to any column across any model via schema.yml.
-- macros/test_not_negative.sql
-- Reusable test: column must not contain negative values
{% test not_negative(model, column_name) %}
SELECT
{{ column_name }} AS failing_value,
COUNT(*) AS row_count
FROM {{ model }}
WHERE {{ column_name }} < 0
GROUP BY 1
{% endtest %}
-- macros/test_within_date_range.sql
-- Reusable test: date column must fall within a specified range
{% test within_date_range(model, column_name, min_date, max_date) %}
SELECT {{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} < '{{ min_date }}'
OR {{ column_name }} > '{{ max_date }}'
{% endtest %}
-- macros/test_row_count_above_threshold.sql
-- Reusable test: table must have at least N rows (catches empty-load bugs)
{% test row_count_above_threshold(model, column_name, threshold) %}
WITH row_count AS (
SELECT COUNT(*) AS cnt FROM {{ model }}
)
SELECT cnt
FROM row_count
WHERE cnt < {{ threshold }}
{% endtest %}
Apply generic tests in schema.yml:
# Apply custom generic tests across models
models:
- name: fact_daily_revenue
columns:
- name: total_revenue
tests:
- not_negative # custom generic macro
- not_null
- name: order_date
tests:
- within_date_range: # custom generic macro with params
min_date: '2020-01-01'
max_date: '{{ run_started_at.strftime("%Y-%m-%d") }}'
- name: stg_orders
tests:
- row_count_above_threshold: # model-level test (no column)
column_name: order_id
threshold: 1000 # alert if fewer than 1000 rows loaded
5. dbt-expectations — Great Expectations Integration
# Add statistical / distribution tests via dbt-expectations package
# packages.yml
packages:
- package: calogica/dbt_expectations
version: [">=0.9.0", "<1.0.0"]
# schema.yml — rich statistical tests
models:
- name: fact_daily_revenue
columns:
- name: avg_order_value
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 1
max_value: 50000
- dbt_expectations.expect_column_mean_to_be_between:
min_value: 80
max_value: 500
- name: total_orders
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: number
tests:
- dbt_expectations.expect_table_row_count_to_be_between:
min_value: 100
max_value: 1000000
6. Seeds — Version-Controlled Reference Data
-- data/region_mapping.csv region_code,region_name,timezone,currency US-EAST,US East,America/New_York,USD US-WEST,US West,America/Los_Angeles,USD EU-WEST,Europe West,Europe/London,GBP APAC,Asia Pacific,Asia/Singapore,SGD
# seeds/properties.yml — add column-level docs and tests to seeds
seeds:
- name: region_mapping
description: "Static region reference loaded from Git"
columns:
- name: region_code
tests: [not_null, unique]
- name: currency
tests:
- accepted_values:
values: ['USD', 'GBP', 'EUR', 'SGD', 'AUD']
7. Running & Triaging Tests
# Run all tests dbt test # Run tests for a specific model only dbt test --select stg_orders # Run only schema tests (not singular) dbt test --select test_type:singular # Run and store results for audit dbt test --store-failures # Failed rows saved to schema dbt_test__audit # Compile a test to see the generated SQL before running dbt compile --select test_type:generic
Test Severity — Warn vs Error
# schema.yml — set severity so non-critical issues warn but don't block
models:
- name: stg_orders
columns:
- name: customer_id
tests:
- relationships:
to: ref('stg_customers')
field: customer_id
severity: warn # Warn only — orphaned orders alert, don't fail CI
- not_null:
severity: error # Hard fail — pipeline stops
↑ Back to top
Real-World Scenario: Your e-commerce platform processes 2M orders/day into a Snowflake fact table. Running a full-refresh takes 45 minutes and costs $80 in credits. Incremental models reduce that to 3 minutes and $2 — but only if you choose the right strategy for your data pattern.
Incremental Model Fundamentals
On first run, DBT builds the full table. On subsequent runs, it only processes new/changed rows. The is_incremental() macro controls which rows to load:
-- models/marts/fact_orders.sql
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
) }}
SELECT
order_id,
customer_id,
status,
amount,
updated_at
FROM {{ source('raw', 'orders') }}
{% if is_incremental() %}
-- Only process rows updated since our last run
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Strategy 1: merge (Default for Snowflake)
Performs a SQL MERGE — matched rows are updated, new rows are inserted. Requires a unique_key. Best for tables where existing records can change (orders updating status, inventory changing).
-- DBT generates this SQL under the hood
MERGE INTO analytics.fact_orders AS target
USING (
-- Your model SQL (new/updated rows)
SELECT order_id, customer_id, status, amount, updated_at
FROM raw.orders
WHERE updated_at > '2024-01-15 06:00:00'
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
target.status = source.status,
target.amount = source.amount,
target.updated_at = source.updated_at
WHEN NOT MATCHED THEN INSERT
(order_id, customer_id, status, amount, updated_at)
VALUES
(source.order_id, source.customer_id,
source.status, source.amount, source.updated_at);
-- Config in schema.yml or model config block
{{ config(
materialized='incremental',
unique_key='order_id', -- Required for merge
incremental_strategy='merge',
merge_update_columns=['status', 'amount', 'updated_at'] -- Only update these cols
) }}
When to use merge: Order management systems (status changes from pending → shipped → delivered), inventory tables, CRM contact updates, any mutable source where records evolve over time.
Strategy 2: append
Only inserts new rows — never updates existing ones. No unique_key needed. Fastest strategy, but only valid for truly immutable append-only sources.
-- models/marts/fact_clickstream.sql
-- Web click events: immutable once written, only new events arrive
{{ config(
materialized='incremental',
incremental_strategy='append' -- No unique_key needed
) }}
SELECT
event_id,
session_id,
user_id,
page_url,
event_type,
event_timestamp
FROM {{ source('segment', 'raw_events') }}
{% if is_incremental() %}
WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}
When to use append: Clickstream/event logs, IoT sensor readings, audit trails, financial transaction logs, server logs — any source where events are written once and never modified. Attempting append on mutable data creates duplicates.
Strategy 3: delete+insert (Partition Replacement)
Deletes all rows for a given partition (usually a date), then re-inserts the full partition. Avoids the overhead of row-level matching while still correcting late-arriving data. Ideal for date-partitioned fact tables.
-- models/marts/fact_daily_sales.sql
{{ config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='sale_date' -- Partition key to delete+replace
) }}
SELECT
sale_date,
product_id,
region,
SUM(revenue) AS total_revenue,
COUNT(order_id) AS order_count
FROM {{ source('raw', 'sales') }}
{% if is_incremental() %}
-- Reprocess last 3 days to handle late-arriving data
WHERE sale_date >= DATEADD('day', -3, CURRENT_DATE)
{% endif %}
GROUP BY 1, 2, 3
-- What DBT executes:
-- Step 1: Delete the affected partition
DELETE FROM analytics.fact_daily_sales
WHERE sale_date >= DATEADD('day', -3, CURRENT_DATE);
-- Step 2: Insert fresh data for that partition
INSERT INTO analytics.fact_daily_sales
SELECT sale_date, product_id, region,
SUM(revenue), COUNT(order_id)
FROM raw.sales
WHERE sale_date >= DATEADD('day', -3, CURRENT_DATE)
GROUP BY 1, 2, 3;
When to use delete+insert: Daily aggregated fact tables, reporting tables partitioned by date, any scenario where you want full partition correctness without a row-level unique key. Handles late-arriving data naturally by re-processing the window.
Strategy 4: insert_overwrite (BigQuery / Spark)
-- Note: insert_overwrite is for BigQuery/Spark, not Snowflake
-- Snowflake equivalent is delete+insert
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "order_date",
"data_type": "date",
"granularity": "day"
}
) }}
Strategy Comparison Matrix
| Strategy | SQL Operation | Needs unique_key | Handles Updates | Speed | Best For |
|---|---|---|---|---|---|
merge |
MERGE (upsert) | Yes | Yes | Medium | Mutable records (orders, CRM) |
append |
INSERT only | No | No | Fastest | Immutable events (clicks, logs) |
delete+insert |
DELETE then INSERT | Partition key | Partition-level | Fast | Date-partitioned aggregates |
insert_overwrite |
Overwrite partition | No | Partition-level | Fast | BigQuery / Spark partitions |
DBT Snapshots — SCD Type 2 Automation
Snapshots track how a record changes over time. DBT automatically maintains dbt_valid_from, dbt_valid_to and dbt_scd_id columns — giving you a full audit trail without writing a single line of SCD logic.
-- snapshots/snap_customers.sql
-- Real-world: track customer tier changes for churn analysis
{% snapshot snap_customers %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp', -- Use updated_at to detect changes
updated_at='updated_at'
)
}}
SELECT
customer_id,
email,
subscription_tier, -- tracks: FREE -> PRO -> ENTERPRISE changes
monthly_spend,
account_manager,
updated_at
FROM {{ source('salesforce', 'accounts') }}
{% endsnapshot %}
-- snapshots/snap_product_pricing.sql
-- Real-world: track price changes for accurate historical revenue calc
{% snapshot snap_product_pricing %}
{{
config(
target_schema='snapshots',
unique_key='product_id',
strategy='check', -- Detect change in specific columns
check_cols=['unit_price', 'discount_pct', 'is_active']
)
}}
SELECT product_id, product_name, unit_price, discount_pct, is_active
FROM {{ source('erp', 'products') }}
{% endsnapshot %}
-- Querying snapshots for time-travel analysis
-- Q: What tier was customer 1001 in on Black Friday 2023?
SELECT
customer_id,
subscription_tier,
dbt_valid_from,
dbt_valid_to
FROM {{ ref('snap_customers') }}
WHERE customer_id = 1001
AND dbt_valid_from <= '2023-11-24'
AND (dbt_valid_to > '2023-11-24' OR dbt_valid_to IS NULL);
-- Q: Which customers upgraded their tier in Q4 2023?
SELECT
customer_id,
LAG(subscription_tier) OVER (PARTITION BY customer_id ORDER BY dbt_valid_from) AS previous_tier,
subscription_tier AS new_tier,
dbt_valid_from AS upgrade_date
FROM {{ ref('snap_customers') }}
WHERE subscription_tier != LAG(subscription_tier) OVER (PARTITION BY customer_id ORDER BY dbt_valid_from)
AND dbt_valid_from BETWEEN '2023-10-01' AND '2023-12-31';
Advanced: Incremental + Snapshot Architecture Pattern
-- Layer 1: Staging (view) — thin cleaning -- stg_orders, stg_customers, stg_products -- Layer 2: Snapshots (SCD2) — history tracking -- snap_customers (tier changes) -- snap_product_pricing (price changes) -- Layer 3: Intermediate (incremental/merge) — enrichment -- int_orders_enriched: joins orders + current customer tier -- Layer 4: Marts (table or incremental) — final analytics -- fact_orders: incremental merge, unique_key=order_id -- fact_daily_revenue: incremental delete+insert, partition=order_date -- dim_customers: table, rebuilt nightly from snap_customers current view
Incremental Model Pitfalls & How to Avoid Them
- Schema changes break incremental models — use
on_schema_change='append_new_columns'to auto-handle new source columns - Backfilling silently skips old data — always use
dbt run --full-refreshafter schema changes or data corrections - Duplicate rows with append — if the source is not truly immutable, switch to merge with a unique_key
- Incorrect watermark on first run — test
is_incremental()carefully; on first run the table does not exist yet - Clock skew — subtract a small buffer from the watermark (
updated_at > MAX(updated_at) - INTERVAL '5 minutes') to avoid missing in-flight rows
The ref() function is the magic glue connecting DBT models.
What ref() Does:
-- In int_orders.sql
SELECT * FROM {{ ref('stg_orders') }}
WHERE total > 100
-- Compiles to:
SELECT * FROM "ANALYTICS"."INTERMEDIATE"."stg_orders"
WHERE total > 100
Why ref() is Powerful:
- No hardcoded schema names: Works in dev/test/prod
- Automatic dependencies: DBT knows stg_orders must run first
- DAG generation: Builds dependency graph automatically
- Parallelization: Run independent models concurrently
Automatic DAG (Directed Acyclic Graph):
stg_customers → int_customer_orders ←→ fct_sales
stg_orders ↗ ↗ ↗
(ref() creates edges)
Execution Order:
dbt run: 1. Run stg_customers (no dependencies) 2. Run stg_orders (no dependencies) 3. Run int_customer_orders (depends on both) 4. Run fct_sales (depends on all above)
Best Practices:
- Always use ref() instead of hardcoding table names
- Use source() for raw tables: {{ source('raw', 'users') }}
- Keep dependencies clear and linear when possible
Jinja adds programming logic to SQL. Processed before warehouse execution.
Simple Variable Example:
{% set start_date = '2020-01-01' %}
SELECT * FROM {{ ref('raw_data') }}
WHERE created_at >= '{{ start_date }}'
Environment-Specific Logic:
{% if target.name == 'prod' %}
WHERE is_active = TRUE
{% elif target.name == 'dev' %}
WHERE created_at >= CURRENT_DATE - 7
{% endif %}
Loop Through Columns:
{% set columns = ['customer_id', 'order_id', 'amount'] %}
SELECT
{% for col in columns %}
COALESCE({{ col }}, 0) as {{ col }}
{% if not loop.last %},{% endif %}
{% endfor %}
FROM orders
Compiled Output (Production):
SELECT * FROM "PROD"."RAW"."raw_data" WHERE created_at >= '2020-01-01' AND is_active = TRUE
Benefits:
- DRY - Don't Repeat Yourself
- Environment-specific logic
- Dynamic SQL generation
- Macro reusability
Comprehensive approach to governing data assets:
1. Define Ownership & Stewardship
- Assign data owners for each domain
- Identify stewards (day-to-day managers)
- Document responsibilities
2. Build Data Dictionary
- Document ALL data elements
- Business and technical definitions
- Relationships between tables
- Tool: Collibra, DataHub, or dbt docs
3. Metadata Management
- Track technical metadata (schema, lineage)
- Track business metadata (definitions, owners)
- Implement data catalog
4. Quality Standards
- Define quality rules (not_null, uniqueness)
- Implement monitoring and alerts
- Document remediation processes
5. Access Control
- RBAC (Role-Based Access Control)
- Row-level security for sensitive data
- Column-level masking for PII
6. Data Lineage & Audit
- Track data flow source → consumer
- Maintain audit logs of access
- Enable compliance auditing
7. Version Control & Documentation
- All code (SQL, Python, DBT) in Git
- Auto-generate docs from code
- Change request process
Multi-Layer Approach to Data Quality:
1. Schema Tests (Column Constraints)
not_null: Column has no NULLs unique: All values distinct relationships: Foreign key constraint accepted_values: Values from whitelist
2. Statistical Tests (Distribution)
- Range checks: value between min/max
- Outlier detection: flag extreme values
- Distribution analysis: compare to expected
3. Business Logic Tests
- Cross-table consistency: totals match
- Recency checks: data updated recently
- Completeness: all expected records present
4. Freshness Tests
-- Check last update time SELECT MAX(updated_at) FROM orders -- Should be within last 24 hours
Implementation (DBT Test):
-- schema.yml
models:
- name: orders
tests:
- dbt_utils.recency:
datepart: day
interval: 1
columns:
- name: order_id
tests:
- not_null
- unique
- name: customer_id
tests:
- relationships:
to: ref('dim_customer')
field: customer_id
Tools: DBT built-in, Great Expectations, Soda SQL, Ataccama
↑ Back to topTypical Tech Stack Across Layers:
1. Source Systems
- Operational databases (PostgreSQL, Oracle, MySQL)
- SaaS applications (Salesforce, Shopify, HubSpot)
- APIs and webhooks
- Streaming platforms (Kafka, Kinesis)
2. Data Ingestion
- Managed: Fivetran, Stitch
- Open-source: Airbyte
- AWS native: AWS DMS, AWS Glue
- CDC-focused: Qlik Replicate, Debezium
3. Cloud Data Warehouse
- Snowflake, BigQuery, Redshift
4. Transformation
- DBT (recommended)
- Spark for advanced processing
- Python for ML/custom logic
5. Orchestration
- Apache Airflow (most flexible)
- Prefect, Dagster (Python-native)
- Cloud native: Snowflake Tasks, Step Functions
6. Data Governance & Catalog
- Collibra, Ataccama, DataHub
7. Analytics & BI
- Tableau, Power BI, Looker
8. Reverse ETL (Optional)
- Sync insights back to source systems
- Tools: Hightouch, Census
Selection Criteria: Scalability, cost, ease of use, integrations, security, community support
↑ Back to topEmerging Trends Shaping the Data Industry:
1. DataOps
Applying DevOps principles to data pipelines: CI/CD, monitoring, automation, version control
2. Real-Time Analytics
Move from batch to streaming: Kafka, streaming SQL, real-time dashboards
3. AI/ML Integration
ML models in data pipelines: feature engineering, model scoring, automated workflows
4. Data Mesh
Decentralized data ownership: domain-oriented teams managing their data products
5. Lakehouse Architecture
Combine data lake flexibility with warehouse performance: Delta Lake, Iceberg, Hudi
6. Serverless Data
Pay-per-query computing: Athena, BigQuery, Snowflake replacing servers
7. Data Quality as First-Class
Quality built into pipelines, not afterthought. Tools like Great Expectations, Soda
8. Self-Service Analytics
Business users directly accessing/analyzing data without technical middlemen
9. Reverse ETL
Syncing insights back to operational systems for immediate action
10. Privacy-Preserving Tech
Differential privacy, federated learning for sensitive data
Skills for Success: Cloud (AWS/Azure/GCP), DBT, Python, Spark, SQL, Docker, Git, ML basics
↑ Back to topDBT Snapshots automate SCD Type 2 implementation without manual coding:
Timestamp Strategy (When source has updated_at):
{% snapshot dim_customer_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='timestamp',
updated_at='updated_at'
)
}}
SELECT customer_id, name, email, phone, updated_at
FROM {{ ref('stg_customers') }}
{% endsnapshot %}
Check Strategy (Track specific columns):
{% snapshot dim_product_snapshot %}
{{
config(
target_schema='snapshots',
unique_key='product_id',
strategy='check',
check_cols=['price', 'stock_status']
)
}}
SELECT product_id, name, price, stock_status
FROM {{ ref('stg_products') }}
{% endsnapshot %}
DBT Automatically Creates:
dbt_valid_from- When this version became activedbt_valid_to- When superseded (NULL if current)dbt_scd_id- Unique ID for tracking row versions
Querying Snapshots (Temporal):
-- Get current customer state SELECT * FROM dim_customer_snapshot WHERE dbt_valid_to IS NULL; -- Get historical state at a point in time SELECT * FROM dim_customer_snapshot WHERE dbt_valid_from <= '2024-01-15' AND (dbt_valid_to > '2024-01-15' OR dbt_valid_to IS NULL);↑ Back to top
Schema drift occurs when the structure of source data changes unexpectedly — new columns appear, data types change, or columns are removed — breaking downstream pipelines.
Real-World Scenario:
Your e-commerce source system adds a new column loyalty_tier to the orders table overnight. Your Snowflake pipeline that does SELECT * into a fixed schema now fails because the target table does not have that column.
Types of Schema Drift
- Additive drift — New columns added (least destructive, easiest to handle)
- Destructive drift — Columns renamed, removed, or reordered
- Type drift — Column data type changes (e.g.,
INTtoVARCHAR) - Semantic drift — Column meaning changes without name change (hardest to detect)
Strategy 1: Use VARIANT for Flexible Ingestion
-- Land raw JSON as-is; schema changes won't break ingestion
CREATE TABLE raw_orders (
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_system VARCHAR,
payload VARIANT -- Entire row stored as JSON
);
-- Query specific fields dynamically
SELECT
payload:order_id::INT AS order_id,
payload:customer_id::INT AS customer_id,
payload:loyalty_tier::VARCHAR AS loyalty_tier -- New field, no DDL needed
FROM raw_orders;
Strategy 2: Detect Drift with Information Schema
-- Compare source vs target columns to surface drift
WITH source_cols AS (
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'SOURCE_ORDERS'
),
target_cols AS (
SELECT column_name, data_type
FROM information_schema.columns
WHERE table_name = 'TARGET_ORDERS'
)
SELECT
s.column_name,
s.data_type AS source_type,
t.data_type AS target_type,
CASE
WHEN t.column_name IS NULL THEN 'NEW COLUMN — ADD TO TARGET'
WHEN s.data_type != t.data_type THEN 'TYPE MISMATCH'
ELSE 'OK'
END AS drift_status
FROM source_cols s
LEFT JOIN target_cols t ON s.column_name = t.column_name
WHERE t.column_name IS NULL OR s.data_type != t.data_type;
Strategy 3: Auto-Evolve with Snowflake COPY INTO
-- Snowflake natively handles additive drift during COPY COPY INTO orders FROM @my_stage/orders/ FILE_FORMAT = (TYPE = 'PARQUET') MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE ON_ERROR = 'CONTINUE'; -- Enable column evolution (Snowflake feature) ALTER TABLE orders SET ENABLE_SCHEMA_EVOLUTION = TRUE;
Strategy 4: DBT Schema Contracts
# schema.yml — define expected schema contracts
models:
- name: stg_orders
columns:
- name: order_id
tests: [not_null, unique]
- name: amount
tests:
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: number
- name: status
tests:
- accepted_values:
values: ['pending', 'shipped', 'delivered', 'cancelled']
Strategy 5: Schema Registry for Kafka Pipelines
- Use Confluent Schema Registry with AVRO/Protobuf to enforce contracts at the producer level
- Configure
BACKWARDcompatibility — consumers can read old and new schemas - Breaking changes are rejected before they ever reach your pipeline
Handling Matrix
| Drift Type | Recommended Approach | Risk |
|---|---|---|
| New column added | Auto-evolve or VARIANT landing zone | Low |
| Column removed | Alert + graceful NULL fallback | Medium |
| Type change | Fail fast + alert on-call engineer | High |
| Column rename | Schema registry + versioned contracts | High |
| Semantic drift | Data observability tools (Monte Carlo, Bigeye) | Very High |
Best Practices:
- Always land raw data in a schema-flexible zone (VARIANT / Parquet) before transforming
- Use a two-layer architecture: raw (flexible) then curated (strict schema)
- Instrument data observability alerts on column count, null rates, and type changes
- Treat schema contracts as code — version them in Git alongside your dbt models
Late-arriving data occurs when records arrive in your pipeline after the time window they logically belong to has already been processed — causing incorrect aggregations, incomplete reports, and stale dashboards.
Real-World Scenario:
A mobile app records user click events locally and syncs when the device reconnects. Events from 2024-01-10 arrive in your pipeline on 2024-01-15. Your daily revenue report for Jan 10 was already published — and it is now wrong.
Root Causes
- Offline mobile/IoT devices syncing after reconnection
- Third-party API delays (payment gateways, logistics providers)
- Network partitions or upstream pipeline failures
- Timezone mismatches causing apparent late arrival
- Manual data corrections or backfills from source systems
Strategy 1: Separate Event Time vs Processing Time
-- Always store BOTH timestamps
CREATE TABLE user_events (
event_id VARCHAR,
user_id INT,
event_type VARCHAR,
event_time TIMESTAMP, -- When the event actually happened (source)
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP -- When we received it
);
-- Identify late arrivals (arrived more than 1 day after event)
SELECT
event_id,
event_time,
ingested_at,
DATEDIFF('hour', event_time, ingested_at) AS hours_late
FROM user_events
WHERE DATEDIFF('day', event_time, ingested_at) > 1
ORDER BY hours_late DESC;
Strategy 2: Watermarking in Streaming Pipelines
-- Accept events up to 3 days late; skip older ones
INSERT INTO fact_events
SELECT
event_id,
user_id,
event_time,
ingested_at,
'LATE_ARRIVAL' AS load_type
FROM staging_events s
WHERE
event_time >= CURRENT_DATE - 3 -- Late acceptance window
AND NOT EXISTS (
SELECT 1 FROM fact_events f
WHERE f.event_id = s.event_id -- Deduplicate
);
Strategy 3: Reprocess Affected Partitions (Idempotent Backfill)
-- Step 1: Find which dates need reprocessing
SELECT
event_time::DATE AS affected_date,
COUNT(*) AS late_records
FROM staging_events
WHERE ingested_at::DATE = CURRENT_DATE
AND event_time::DATE < CURRENT_DATE - 1
GROUP BY 1;
-- Step 2: Remove stale aggregates for those dates
DELETE FROM daily_event_summary
WHERE summary_date IN (
SELECT DISTINCT event_time::DATE
FROM staging_events
WHERE ingested_at::DATE = CURRENT_DATE
AND event_time::DATE < CURRENT_DATE - 1
);
-- Step 3: Recompute cleanly (safe to re-run)
INSERT INTO daily_event_summary
SELECT
event_time::DATE AS summary_date,
event_type,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users
FROM fact_events
GROUP BY 1, 2;
Strategy 4: DBT Incremental with Lookback Window
-- models/fact_events.sql
{{ config(
materialized='incremental',
unique_key='event_id',
on_schema_change='append_new_columns'
) }}
SELECT event_id, user_id, event_type, event_time, ingested_at
FROM {{ source('raw', 'events') }}
{% if is_incremental() %}
-- Look back 3 days on every run to catch late arrivals
WHERE event_time >= DATEADD('day', -3, CURRENT_DATE)
{% endif %}
Strategy 5: Audit-Trail Fact Table (Never Delete, Only Append)
-- Keep all versions with a change reason
CREATE TABLE fact_orders_audit (
order_id INT,
order_date DATE,
amount DECIMAL(10,2),
record_version INT,
is_current BOOLEAN,
change_reason VARCHAR -- 'INITIAL_LOAD', 'LATE_ARRIVAL', 'CORRECTION'
);
-- Always query the current version
SELECT * FROM fact_orders_audit WHERE is_current = TRUE;
-- Audit trail: what changed for a given order
SELECT order_id, record_version, amount, change_reason
FROM fact_orders_audit
WHERE order_id = 12345
ORDER BY record_version;
Handling Matrix by Latency
| Latency | Pattern | Snowflake Tool |
|---|---|---|
| < 1 hour | Streaming watermark window | Snowpipe + deduplication |
| 1–24 hours | Rolling incremental with lookback | DBT incremental (-3 day window) |
| 1–7 days | Partition reprocessing | DELETE + re-INSERT on affected dates |
| 7+ days | Full historical backfill | DBT full-refresh on affected models |
Prevention & Monitoring:
- Alert when
MAX(ingested_at) - MAX(event_time) > threshold— signals upstream delays - Track ingestion lag as a first-class data quality metric in your observability stack
- Design all aggregation pipelines to be idempotent — safe to re-run for any date range
- Use Snowflake Time Travel to restore pre-correction snapshots if a backfill goes wrong
- Document your SLA — e.g. "Reports may reflect data up to 3 days late" — to set stakeholder expectations
Real-World Scenario: Your company supports 100+ business users across multiple teams. The source is a 500-table Oracle ERP with millions of rows mutating daily. Business needs sub-30-minute data freshness in Snowflake for operational dashboards. Design the full pipeline.
Architecture Overview
Oracle ERP / SQL Server / PostgreSQL
|
| (Log-based CDC — reads redo/transaction logs, zero load on source)
v
Qlik Replicate (CDC Engine)
|
| APPLY mode: continuous micro-batch MERGE into landing tables
| Full-load + ongoing replication, DDL change capture
v
Snowflake Landing Schema (raw.<table>)
|
| Snowflake Streams (track INSERT/UPDATE/DELETE offsets)
v
Snowflake Tasks (scheduled every 5 min)
|
| MERGE into staging schema — deduplicate, apply deletes
v
dbt Cloud (hourly incremental runs)
|
| stg_ → int_ → mart_ transformation layers
v
Snowflake Mart Schema — BI / Analytics consumers
Qlik Replicate Configuration
-- Qlik Replicate task settings (conceptual) TASK MODE : Full Load + CDC APPLY MODE : Transactional (row-level, ordered commits) STAGING : Direct Apply to Snowflake landing tables ERROR HANDLING : Suspend table on DML error, continue others DDL HANDLING : Auto-create / alter target columns on source DDL -- Metadata columns Qlik adds to every landing row header__change_seq VARCHAR -- Unique monotonic CDC offset header__change_oper CHAR(1) -- I=Insert, U=Update, D=Delete, B=Before-image header__timestamp TIMESTAMP -- Source commit timestamp header__stream_position VARCHAR -- Log position for resume
Snowflake Landing → Staging via Streams + Tasks
-- Step 1: Stream on the landing table (tracks all Qlik-applied changes)
CREATE OR REPLACE STREAM raw.orders_stream
ON TABLE raw.orders
SHOW_INITIAL_ROWS = FALSE; -- Only delta since last consumption
-- Step 2: Task to merge stream into clean staging table (runs every 5 min)
CREATE OR REPLACE TASK etl.merge_orders_task
WAREHOUSE = 'TRANSFORM_WH'
SCHEDULE = '5 MINUTE'
WHEN system$stream_has_data('raw.orders_stream')
AS
MERGE INTO staging.orders AS tgt
USING (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY header__change_seq DESC
) AS rn
FROM raw.orders_stream
WHERE header__change_oper != 'B' -- Exclude before-images
) AS src
ON tgt.order_id = src.order_id AND src.rn = 1
WHEN MATCHED AND src.header__change_oper = 'D'
THEN DELETE
WHEN MATCHED AND src.header__change_oper IN ('I','U')
THEN UPDATE SET
tgt.customer_id = src.customer_id,
tgt.status = src.status,
tgt.amount = src.amount,
tgt.updated_at = src.header__timestamp
WHEN NOT MATCHED AND src.header__change_oper != 'D'
THEN INSERT (order_id, customer_id, status, amount, updated_at)
VALUES (src.order_id, src.customer_id, src.status,
src.amount, src.header__timestamp);
ALTER TASK etl.merge_orders_task RESUME;
dbt Incremental Layer on Top
-- models/staging/stg_orders.sql (view — thin clean)
{{ config(materialized='view') }}
SELECT
order_id, customer_id, UPPER(status) AS status,
amount::DECIMAL(12,2) AS amount, updated_at
FROM {{ source('staging', 'orders') }}
WHERE order_id IS NOT NULL
-- models/marts/fact_orders.sql (incremental merge)
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge',
on_schema_change='append_new_columns'
) }}
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
Failure Handling & Recovery Design
- Qlik Replicate checkpoint — stores log position; on restart it resumes from last committed offset, no data loss
- Snowflake Stream offset — streams are transactional; if the Task fails mid-run, the stream offset is NOT advanced, so the next run re-processes the same delta safely
- Duplicate protection —
ROW_NUMBER() OVER (PARTITION BY pk ORDER BY change_seq DESC)ensures only the latest version of a key is applied even if Qlik delivers multiple events for one row - Schema drift — Qlik auto-ALTERs target; dbt uses
on_schema_change='append_new_columns'to absorb new columns without breaking runs - Monitoring — alert on
SYSTEM$STREAM_HAS_DATAbeing stale for >15 min; alert on Task failure via Snowflake email notifications
Latency Breakdown
| Hop | Typical Latency | Tuning Lever |
|---|---|---|
| Oracle log → Qlik Apply | 1–5 sec | Commit frequency, batch size |
| Qlik → Snowflake landing | 5–30 sec | Apply frequency, warehouse size |
| Stream + Task merge | 5 min (scheduled) | Reduce schedule interval |
| dbt incremental run | hourly | Increase run frequency in dbt Cloud |
| End-to-end | < 15 min | Well within 30-min SLA |
Real-World Scenario: You inherit a single monolithic SQL transformation running nightly on a 2XL warehouse, taking 4 hours, failing regularly, and blocking BI reports until morning. Design a production-grade ELT platform that is resilient, observable, and cost-efficient.
Full Platform Architecture
┌─────────────────────────────────────────────────────────────┐
│ SOURCE SYSTEMS │
│ Oracle ERP │ Salesforce CRM │ Stripe │ PostgreSQL App DB │
└──────┬──────────────┬─────────────┬────────────┬────────────┘
│ │ │ │
Qlik Replicate Fivetran Airbyte Custom Python
(CDC / near-RT) (SaaS APIs) (OSS) (S3 drops)
│ │ │ │
└──────────────┴─────────────┴────────────┘
│
┌─────────▼──────────┐
│ Snowflake RAW │ ← Schema per source
│ (landing zone) │ No transforms here
└─────────┬──────────┘
│ Streams + Tasks (micro-batch)
┌─────────▼──────────┐
│ Snowflake STAGING │ ← dbt stg_ models (views)
│ │ Rename, cast, dedupe
└─────────┬──────────┘
│ dbt incremental models
┌─────────▼──────────┐
│ Snowflake INTERMEDIATE│ ← dbt int_ models
│ │ Joins, enrichment
└─────────┬──────────┘
│ dbt table / incremental
┌─────────▼──────────┐
│ Snowflake MARTS │ ← fact_ dim_ models
│ │ BI-ready, governed
└─────────┬──────────┘
│
┌──────────────┼──────────────┐
Tableau Looker Power BI
(Secure Views / roles)
Warehouse Strategy — Right-Sizing per Workload
-- Never use one warehouse for everything
-- Isolate workloads to prevent contention and enable independent scaling
-- Ingestion warehouse (Qlik / Snowpipe writes)
CREATE WAREHOUSE INGEST_WH
WAREHOUSE_SIZE = 'SMALL'
AUTO_SUSPEND = 60
AUTO_RESUME = TRUE;
-- dbt transformation warehouse (scales up for heavy models)
CREATE WAREHOUSE TRANSFORM_WH
WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 120
AUTO_RESUME = TRUE
MAX_CLUSTER_COUNT = 3 -- Multi-cluster for parallel dbt runs
SCALING_POLICY = 'ECONOMY'; -- Scale conservatively
-- BI / analyst query warehouse (fast, responsive)
CREATE WAREHOUSE ANALYTICS_WH
WAREHOUSE_SIZE = 'LARGE'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE;
-- Admin / monitoring (tiny, always on)
CREATE WAREHOUSE ADMIN_WH
WAREHOUSE_SIZE = 'X-SMALL'
AUTO_SUSPEND = 60;
dbt Project Structure
dbt_project/ ├── models/ │ ├── staging/ # stg_* — view, one-to-one with source tables │ │ ├── salesforce/ │ │ ├── stripe/ │ │ └── oracle_erp/ │ ├── intermediate/ # int_* — joins, enrichment (ephemeral or view) │ └── marts/ │ ├── finance/ # fact_revenue, fact_invoices │ ├── sales/ # fact_orders, dim_customers │ └── operations/ # fact_inventory, fact_shipments ├── macros/ # generate_surrogate_key, test_not_negative, etc. ├── snapshots/ # SCD Type 2 for slowly changing dims ├── seeds/ # region_codes, currency_map, etc. ├── tests/ # Singular tests — reconciliation, cross-source checks ├── analyses/ # Ad-hoc exploratory SQL (not deployed) └── dbt_project.yml
dbt Cloud CI/CD Jobs
# Production job (dbt Cloud scheduler)
Job: "Production — Full Run"
Schedule: Every hour
Commands:
- dbt source freshness # Fail fast if sources are stale
- dbt run --select tag:hourly # Only hourly models
- dbt test --select tag:hourly # Test immediately after run
Notifications: Slack on failure
# CI job (triggered on every PR to main)
Job: "Slim CI — PR Validation"
Trigger: Pull Request opened / updated
Commands:
- dbt run --select state:modified+ --defer --state ./prod-artifacts
- dbt test --select state:modified+ --defer --state ./prod-artifacts
# Only runs models changed in the PR + their downstream dependents
# --defer uses production compiled state so unchanged upstream refs resolve
Observability Stack
-- 1. Source freshness checks in sources.yml
sources:
- name: oracle_erp
freshness:
warn_after: {count: 1, period: hour}
error_after: {count: 3, period: hour}
loaded_at_field: _loaded_at
-- 2. Row count anomaly detection via custom test
-- tests/assert_fact_orders_row_count_not_dropped.sql
WITH today AS (
SELECT COUNT(*) AS cnt FROM {{ ref('fact_orders') }}
WHERE order_date = CURRENT_DATE
),
yesterday AS (
SELECT COUNT(*) AS cnt FROM {{ ref('fact_orders') }}
WHERE order_date = CURRENT_DATE - 1
)
SELECT today.cnt AS today_count, yesterday.cnt AS yesterday_count
FROM today, yesterday
WHERE today.cnt < yesterday.cnt * 0.7 -- Alert if >30% drop vs yesterday
-- 3. Snowflake Query History monitoring
SELECT query_text, total_elapsed_time/1000 AS seconds,
credits_used_cloud_services
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE
AND total_elapsed_time > 300000 -- Queries over 5 min
ORDER BY total_elapsed_time DESC;
Cost Governance
- Tag every warehouse with
cost_centerresource monitor — alert at 80% of monthly credit budget - Use Zero Copy Clone for dev/staging environments — no storage duplication cost
- Model materialization decisions: views for staging (zero compute on build), incremental for large facts (avoid full rebuild), ephemeral for simple CTEs
- Run
dbt run --select tag:dailynightly for heavy models; tag lighter modelstag:hourly
Real-World Scenario: Rather than scheduling dbt runs on a fixed clock, you want Snowflake to automatically trigger transformations the moment new data arrives from Qlik Replicate. Zero polling, zero wasted credits on empty runs.
Streams — Change Data Capture Inside Snowflake
-- A Stream is a change-tracking cursor on a table
-- It records every INSERT, UPDATE, DELETE since the stream was last consumed
CREATE OR REPLACE STREAM raw.customer_stream
ON TABLE raw.customers
SHOW_INITIAL_ROWS = FALSE; -- FALSE = only new changes going forward
-- Stream metadata columns automatically added:
-- METADATA$ACTION : INSERT | DELETE (UPDATE = DELETE + INSERT pair)
-- METADATA$ISUPDATE : TRUE for the INSERT half of an UPDATE
-- METADATA$ROW_ID : Internal unique row identifier
-- Consume the stream — reconstruct latest state of each customer
SELECT
customer_id,
email,
tier,
METADATA$ACTION,
METADATA$ISUPDATE
FROM raw.customer_stream
WHERE METADATA$ACTION = 'INSERT' -- After dedup, INSERTs = final state
ORDER BY METADATA$ROW_ID;
-- IMPORTANT: A Stream offset advances ONLY when consumed inside a DML
-- transaction that commits successfully. If the Task fails, offset is preserved.
-- This gives you exactly-once-like semantics.
-- Check if a stream has unconsumed data (use in Task WHEN clause)
SELECT SYSTEM$STREAM_HAS_DATA('raw.customer_stream');
Tasks — Scheduled or Event-Driven Orchestration
-- Root Task: fires every 2 minutes ONLY when stream has data
CREATE OR REPLACE TASK etl.process_customers_root
WAREHOUSE = 'TRANSFORM_WH'
SCHEDULE = '2 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('raw.customer_stream')
AS
MERGE INTO staging.customers AS tgt
USING (
SELECT customer_id, email, tier,
ROW_NUMBER() OVER (PARTITION BY customer_id
ORDER BY METADATA$ROW_ID DESC) AS rn
FROM raw.customer_stream
WHERE METADATA$ACTION = 'INSERT'
) AS src ON tgt.customer_id = src.customer_id AND src.rn = 1
WHEN MATCHED THEN UPDATE SET tgt.email = src.email, tgt.tier = src.tier
WHEN NOT MATCHED THEN INSERT (customer_id, email, tier)
VALUES (src.customer_id, src.email, src.tier);
-- Child Task: fires AFTER root task completes (DAG chaining)
CREATE OR REPLACE TASK etl.refresh_customer_dim_task
WAREHOUSE = 'TRANSFORM_WH'
AFTER etl.process_customers_root
AS
-- Refresh the dimension table by calling a stored procedure or dbt
CALL etl.sp_refresh_dim_customers();
-- Activate the DAG (tasks are SUSPENDED by default)
ALTER TASK etl.refresh_customer_dim_task RESUME;
ALTER TASK etl.process_customers_root RESUME;
Task DAG — Multi-Step Pipeline
-- Build a full task tree: ingest → stage → mart
--
-- process_customers_root (every 2 min, stream-gated)
-- └── stage_customers_task
-- └── refresh_dim_customers_task
-- └── refresh_fact_orders_task
--
-- Monitor the DAG
SELECT *
FROM TABLE(information_schema.task_history(
scheduled_time_range_start => DATEADD('hour', -1, CURRENT_TIMESTAMP),
task_name => 'PROCESS_CUSTOMERS_ROOT'
))
ORDER BY scheduled_time DESC;
Stream Types Comparison
| Stream Type | Works On | Tracks Deletes | Best For |
|---|---|---|---|
| Standard | Tables | Yes | CDC pipelines, mutable tables |
| Append-Only | Tables | No | Event logs, immutable inserts |
| Insert-Only | External / Directory Tables | No | S3 file ingestion triggers |
Gotchas & Best Practices
- Stale stream — streams have a staleness limit equal to the table's
DATA_RETENTION_TIME. If a stream is not consumed within that window, it becomes invalid. Set retention >= 14 days on critical source tables. - Multiple consumers — if two Tasks consume the same stream, they race. Create separate streams for separate consumers.
- Zero-credit idle Tasks — a Task with a
WHEN SYSTEM$STREAM_HAS_DATA()guard does NOT start a warehouse if the condition is false. Crucial for cost efficiency. - Serverless Tasks — use
USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZEinstead of a named warehouse for Tasks that run <60 seconds; Snowflake auto-sizes and charges per-second.
Real-World Scenario: Your dbt project has grown to 200+ models. Developers are copy-pasting the same surrogate key logic, date spine generation, and column renaming patterns across dozens of files. Macros eliminate the duplication and make the whole project consistent.
Macro 1 — Reusable Surrogate Key (used across all dimension tables)
-- macros/generate_surrogate_key.sql
{% macro generate_surrogate_key(field_list) %}
MD5(
CONCAT_WS('||',
{% for field in field_list %}
COALESCE(CAST({{ field }} AS VARCHAR), '~~NULL~~')
{% if not loop.last %}, {% endif %}
{% endfor %}
)
)
{% endmacro %}
-- Usage in any dimension model
SELECT
{{ generate_surrogate_key(['customer_id', 'source_system']) }} AS customer_sk,
customer_id,
email,
tier
FROM {{ ref('stg_customers') }}
Macro 2 — Dynamic Column Renaming (staging standardisation)
-- macros/rename_columns.sql
-- Accepts a dict of {old_name: new_name} and generates SELECT list
{% macro rename_columns(relation, column_map) %}
SELECT
{% for old_col, new_col in column_map.items() %}
{{ old_col }} AS {{ new_col }}{% if not loop.last %},{% endif %}
{% endfor %}
FROM {{ relation }}
{% endmacro %}
-- Usage
{{ rename_columns(
source('oracle_erp', 'CUST_MAST'),
{
'CUST_ID': 'customer_id',
'CUST_EMAIL': 'email',
'CUST_TIER': 'subscription_tier',
'UPD_DT': 'updated_at'
}
) }}
Macro 3 — Generate Date Spine (for gap-filling time series)
-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
WITH RECURSIVE date_series AS (
SELECT {{ start_date }}::DATE AS d
UNION ALL
SELECT DATEADD('day', 1, d)
FROM date_series
WHERE d < {{ end_date }}::DATE
)
SELECT d AS date_day FROM date_series
{% endmacro %}
-- Usage: fill gaps in daily sales (days with zero orders still appear)
SELECT
ds.date_day,
COALESCE(s.total_revenue, 0) AS total_revenue
FROM ( {{ date_spine("'2023-01-01'", "CURRENT_DATE") }} ) ds
LEFT JOIN {{ ref('fact_daily_sales') }} s ON s.sale_date = ds.date_day
Macro 4 — Union All Sources (multi-region same schema)
-- macros/union_relations.sql
{% macro union_relations(relations) %}
{% for relation in relations %}
SELECT *, '{{ relation.identifier }}' AS source_table
FROM {{ relation }}
{% if not loop.last %} UNION ALL {% endif %}
{% endfor %}
{% endmacro %}
-- Combine orders from US, EU, APAC schemas into one model
{{ union_relations([
source('us_store', 'orders'),
source('eu_store', 'orders'),
source('apac_store', 'orders')
]) }}
Jinja Control Flow — Environment-Aware Models
-- Behave differently in dev vs prod using var() and target
{{ config(
materialized = 'table' if target.name == 'prod' else 'view',
post_hook = "GRANT SELECT ON {{ this }} TO ROLE ANALYST_ROLE"
if target.name == 'prod' else ""
) }}
-- Limit rows in dev to speed up iteration
SELECT * FROM {{ ref('stg_orders') }}
{% if target.name != 'prod' %}
LIMIT 10000
{% endif %}
Exposures — Document BI Consumption of dbt Models
# models/exposures.yml
# Exposures document WHERE dbt models are consumed (BI, APIs, etc.)
# Enables lineage: source → dbt model → Tableau dashboard
exposures:
- name: executive_revenue_dashboard
type: dashboard
maturity: high # high | medium | low
url: https://tableau.company.com/views/ExecutiveRevenue
description: "C-suite daily revenue KPIs sourced from dbt marts"
owner:
name: Analytics Engineering Team
email: vsurya@duck.com
depends_on:
- ref('fact_daily_revenue')
- ref('dim_customers')
- ref('dim_products')
- name: stripe_reconciliation_report
type: analysis
maturity: medium
description: "Finance team daily Stripe vs warehouse reconciliation"
owner:
name: Finance Team
depends_on:
- ref('fact_orders')
- ref('stg_stripe_charges')
Why Exposures Matter at Architect Level:
- Run
dbt ls --select +exposure:executive_revenue_dashboardto see every model that feeds a dashboard — instant blast radius analysis before a refactor - Exposures appear in the dbt DAG UI — stakeholders can trace data from Tableau all the way back to the Oracle source table
- Used in Slim CI: only re-test models that feed changed exposures
Real-World Scenario: You are building the analytics data model for a B2B SaaS company with three subject areas — Sales, Finance, and Operations. BI teams in each area need to slice and dice independently, but the CEO needs a single cross-functional dashboard. You must design a model that works for both.
Star Schema Design — fact_orders
-- FACT TABLE: fact_orders (grain = one row per order line item)
CREATE TABLE marts.fact_orders (
-- Surrogate keys (join to dims)
order_sk VARCHAR NOT NULL, -- MD5 of order_id + source
customer_sk VARCHAR NOT NULL,
product_sk VARCHAR NOT NULL,
date_sk INT NOT NULL, -- YYYYMMDD integer for fast joins
-- Degenerate dimensions (no dim table needed)
order_id INT NOT NULL,
order_line_no INT NOT NULL,
-- Measures (always additive unless noted)
quantity INT,
unit_price DECIMAL(10,2),
gross_revenue DECIMAL(12,2), -- additive
discount_amount DECIMAL(10,2), -- additive
net_revenue DECIMAL(12,2), -- additive (gross - discount)
cogs DECIMAL(12,2), -- additive
gross_margin DECIMAL(12,2), -- additive (net_revenue - cogs)
-- Audit
_dbt_inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
CLUSTER BY (date_sk); -- Snowflake: prune by date on BI queries
-- DIMENSION: dim_customers (SCD Type 2 via dbt snapshot)
CREATE TABLE marts.dim_customers (
customer_sk VARCHAR PRIMARY KEY, -- surrogate key
customer_id INT NOT NULL, -- natural key (from source)
customer_name VARCHAR,
email VARCHAR,
region VARCHAR,
segment VARCHAR, -- SMB | MID_MARKET | ENTERPRISE
account_manager VARCHAR,
-- SCD2 columns
valid_from DATE NOT NULL,
valid_to DATE, -- NULL = current record
is_current BOOLEAN NOT NULL DEFAULT TRUE
);
-- Always join on both SK AND is_current=TRUE for current state
-- Join on SK alone for historical point-in-time analysis
Conformed Dimensions — Shared Across Subject Areas
-- A conformed dimension is defined ONCE and reused across all fact tables -- dim_customers is conformed: used by fact_orders, fact_invoices, fact_support_tickets -- Bus Matrix: shows which dims each fact uses (architect planning tool) -- -- | dim_customers | dim_products | dim_date | dim_region | dim_sales_rep -- -------------|---------------|--------------|----------|------------|--------------- -- fact_orders | ✓ | ✓ | ✓ | ✓ | ✓ -- fact_invoices| ✓ | | ✓ | ✓ | -- fact_support | ✓ | | ✓ | | -- fact_inventory| | ✓ | ✓ | ✓ | -- -- If dim_customers changes schema, ALL fact joins are affected → high blast radius -- This is why conformed dims must be tightly governed and version-controlled in dbt
Surrogate Key Generation in dbt
-- models/marts/dim_customers.sql
-- Generate a stable, source-agnostic surrogate key
SELECT
{{ generate_surrogate_key(['customer_id', 'source_system']) }} AS customer_sk,
customer_id,
source_system,
customer_name,
email,
region,
segment,
valid_from,
valid_to,
is_current
FROM {{ ref('snap_customers') }} -- dbt snapshot provides SCD2 history
Date Dimension — Critical for BI Performance
-- seeds/dim_date.csv approach OR generate with macro
-- dim_date has one row per calendar day with every attribute BI needs
-- Never compute DATEPART() in BI queries — pre-compute in dim_date
SELECT
TO_NUMBER(TO_CHAR(d, 'YYYYMMDD')) AS date_sk, -- INT join key
d AS full_date,
EXTRACT(YEAR FROM d) AS year,
EXTRACT(QUARTER FROM d) AS quarter,
EXTRACT(MONTH FROM d) AS month,
MONTHNAME(d) AS month_name,
EXTRACT(WEEK FROM d) AS week_of_year,
DAYOFWEEK(d) AS day_of_week,
DAYNAME(d) AS day_name,
IFF(DAYOFWEEK(d) IN (1,7), FALSE, TRUE) AS is_weekday,
IFF(d = LAST_DAY(d), TRUE, FALSE) AS is_month_end,
-- Fiscal year (example: FY starts April 1)
IFF(MONTH(d) >= 4, YEAR(d), YEAR(d)-1) AS fiscal_year,
IFF(MONTH(d) >= 4,
CEIL((MONTH(d)-3)/3.0),
CEIL((MONTH(d)+9)/3.0)) AS fiscal_quarter
FROM ( {{ date_spine("'2018-01-01'", "DATEADD('year',3,CURRENT_DATE)") }} )
Common Modelling Anti-Patterns to Avoid
| Anti-Pattern | Problem | Fix |
|---|---|---|
| Natural keys as FK in facts | Breaks historical joins when source key recycled | Always use surrogate keys |
| Snowflake schema (over-normalised) | Too many joins, slow BI queries | Denormalise into conformed dims |
| Semi-additive measures as additive | Wrong SUM on account balances | Document grain; use AVG or last-period for semi-additive |
| No SCD on dimensions | Customer region changes corrupt historical revenue by region | SCD2 via dbt snapshot |
| One giant fact table | Mixed grain — impossible to aggregate correctly | One fact per grain; separate fact tables for header vs line |
Real-World Scenario: A third-party logistics partner drops daily shipment files (Parquet) into an S3 bucket. You need these files auto-ingested into Snowflake within minutes of arrival — without polling, without a scheduler, and with full security via IAM roles (no hardcoded credentials).
Step 1: IAM Role-Based Trust (no credentials in Snowflake)
-- Step 1a: Create storage integration (Snowflake side)
CREATE OR REPLACE STORAGE INTEGRATION s3_logistics_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789:role/snowflake-s3-role'
STORAGE_ALLOWED_LOCATIONS = ('s3://logistics-drops/shipments/');
-- Retrieve the Snowflake-generated IAM principal to add to AWS trust policy
DESC INTEGRATION s3_logistics_int;
-- Copy: STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID
# Step 1b: AWS IAM Trust Policy (add Snowflake as trusted principal)
{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::SNOWFLAKE_ACCOUNT:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "SNOWFLAKE_EXTERNAL_ID_FROM_DESC"
}
}
}]
}
# IAM Role Permission Policy (attached to the role)
{
"Statement": [{
"Effect": "Allow",
"Action": ["s3:GetObject","s3:GetObjectVersion","s3:ListBucket"],
"Resource": [
"arn:aws:s3:::logistics-drops",
"arn:aws:s3:::logistics-drops/shipments/*"
]
}]
}
Step 2: External Stage + File Format
-- Create the external stage pointing to S3 via integration (no keys!)
CREATE OR REPLACE STAGE raw.logistics_stage
URL = 's3://logistics-drops/shipments/'
STORAGE_INTEGRATION = s3_logistics_int
FILE_FORMAT = (
TYPE = 'PARQUET'
SNAPPY_COMPRESSION = TRUE
NULL_IF = ('NULL', 'null', '')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
);
-- Inspect files in the stage
LIST @raw.logistics_stage;
-- Query Parquet directly without loading (schema-on-read)
SELECT $1:shipment_id::INT, $1:carrier::VARCHAR, $1:delivered_at::TIMESTAMP
FROM @raw.logistics_stage (FILE_FORMAT => 'PARQUET_FMT')
LIMIT 100;
Step 3: Snowpipe — Auto-Ingest on S3 PUT Event
-- Create the target landing table
CREATE TABLE raw.shipments_landing (
shipment_id INT,
order_id INT,
carrier VARCHAR,
status VARCHAR,
delivered_at TIMESTAMP,
_file_name VARCHAR DEFAULT METADATA$FILENAME,
_loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Create Snowpipe with AUTO_INGEST
CREATE OR REPLACE PIPE raw.shipments_pipe
AUTO_INGEST = TRUE -- S3 SQS event triggers this pipe
AS
COPY INTO raw.shipments_landing
FROM @raw.logistics_stage
FILE_FORMAT = (TYPE = 'PARQUET' MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE)
ON_ERROR = 'CONTINUE'
PURGE = FALSE; -- Keep files in S3 for audit / replay
-- Get the SQS ARN to configure S3 Event Notification
SHOW PIPES LIKE 'shipments_pipe';
-- Copy: notification_channel → configure as S3 bucket notification
# S3 Event Notification (AWS Console or Terraform)
resource "aws_s3_bucket_notification" "snowpipe_trigger" {
bucket = "logistics-drops"
queue {
queue_arn = "<notification_channel from SHOW PIPES>"
events = ["s3:ObjectCreated:*"]
filter_prefix = "shipments/"
filter_suffix = ".parquet"
}
}
Step 4: Monitor & Validate the Pipeline
-- Check Snowpipe ingestion history
SELECT system$pipe_status('raw.shipments_pipe');
SELECT *
FROM TABLE(information_schema.copy_history(
TABLE_NAME => 'SHIPMENTS_LANDING',
START_TIME => DATEADD('hour', -6, CURRENT_TIMESTAMP)
))
ORDER BY last_load_time DESC;
-- Validate row counts after each file
SELECT _file_name, COUNT(*) AS rows_loaded, MAX(_loaded_at) AS loaded_at
FROM raw.shipments_landing
GROUP BY 1
ORDER BY 3 DESC;
End-to-End Latency: < 3 minutes from S3 PUT to queryable in Snowflake
- S3 PUT → SQS notification: ~1 sec
- SQS → Snowpipe trigger: ~10–30 sec (Snowpipe polls SQS every 30s)
- COPY INTO execution: 30–90 sec depending on file size and warehouse
- Stream + Task fires downstream transformation within next schedule window
Real-World Scenario: Your dbt project has 300 models. Every PR currently runs the full project in CI — taking 45 minutes and costing $15 in credits per PR. With Slim CI, you only run models that changed in the PR and their downstream dependents. CI drops to 5 minutes and $2.
How Slim CI Works
-- dbt generates a manifest.json after every production run -- manifest.json = compiled graph of all models, their SQL hashes, dependencies -- -- On a PR branch, dbt compares: -- branch manifest (current PR state) -- vs -- production manifest (last successful prod run — "deferred state") -- -- state:modified+ selects ONLY: -- - Models whose SQL/config changed in this PR -- - ALL downstream models of those changed models (+)
dbt Cloud CI Job Setup
# dbt Cloud CI Job configuration
Job Name: "Slim CI — PR Check"
Trigger: Pull Request (dbt Cloud native integration with GitHub/GitLab)
Environment: CI (separate Snowflake schema: dbt_ci_<PR_number>)
Deferral: Production environment (uses prod manifest.json)
Commands:
- dbt build --select state:modified+ --defer --state ./prod-artifacts --target ci
# dbt build = dbt run + dbt test in one command
# What --defer does:
# For any model NOT in state:modified+, dbt resolves {{ ref('dim_customers') }}
# to the PRODUCTION table instead of trying to build it in CI schema
# Result: CI only builds changed models; unchanged upstream deps resolve from prod
GitHub Actions Pipeline (self-hosted dbt Core)
# .github/workflows/dbt_ci.yml
name: dbt Slim CI
on:
pull_request:
branches: [main]
paths: ['models/**', 'macros/**', 'tests/**', 'dbt_project.yml']
jobs:
slim_ci:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install dbt
run: pip install dbt-snowflake==1.8.0
- name: Download prod manifest
env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_KEY }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET }}
run: aws s3 cp s3://dbt-artifacts/prod/manifest.json ./prod-artifacts/manifest.json
- name: dbt deps
run: dbt deps
- name: dbt Slim CI build
env:
SNOWFLAKE_ACCOUNT: ${{ secrets.SNOWFLAKE_ACCOUNT }}
SNOWFLAKE_USER: ${{ secrets.SNOWFLAKE_USER }}
SNOWFLAKE_PASSWORD: ${{ secrets.SNOWFLAKE_PASSWORD }}
SNOWFLAKE_ROLE: CI_ROLE
DBT_TARGET_SCHEMA: ci_pr_${{ github.event.number }}
run: |
dbt build --select state:modified+ --defer --state ./prod-artifacts --target ci --profiles-dir .
- name: Upload updated manifest as artifact
if: github.ref == 'refs/heads/main'
run: aws s3 cp ./target/manifest.json s3://dbt-artifacts/prod/manifest.json
Production Deployment Job
# dbt Cloud Production Job Job Name: "Production — Hourly" Schedule: Every hour (or triggered by Airflow / Orchestrator) Environment: Production Commands: - dbt source freshness --select source:oracle_erp source:stripe - dbt run --select tag:hourly --target prod - dbt test --select tag:hourly --store-failures - dbt run --select tag:daily --target prod # if midnight run - dbt docs generate # refresh lineage docs # Post-run: upload manifest.json to S3 for next CI run # Failure notification: Slack #data-alerts channel via webhook
Branch Strategy
# Git branching for dbt:
# main → production (auto-deploy on merge)
# dev → development environment (team integration testing)
# feat/* → feature branches (CI runs on PR to main)
#
# Environments map to Snowflake schemas:
# main branch → ANALYTICS schema (prod)
# dev branch → ANALYTICS_DEV schema
# CI PR run → ANALYTICS_CI_123 schema (auto-dropped after PR closes)
#
# profiles.yml uses env vars so same code runs in all environments:
snowflake_profile:
target: "{{ env_var('DBT_TARGET', 'dev') }}"
outputs:
prod:
type: snowflake
schema: "{{ env_var('DBT_SCHEMA', 'analytics') }}"
role: "{{ env_var('SNOWFLAKE_ROLE', 'TRANSFORM_ROLE') }}"
↑ Back to top
Real-World Scenario: A Tableau dashboard querying fact_orders (800M rows) takes 45 seconds. The SLA is under 5 seconds. You have a $5,000/month Snowflake bill and need to cut it by 30% without impacting query performance.
Step 1: Profile the Slow Query
-- Find expensive queries in the last 7 days
SELECT
query_id,
query_text,
total_elapsed_time / 1000 AS elapsed_sec,
bytes_scanned / POWER(1024, 3) AS gb_scanned,
partitions_scanned,
partitions_total,
ROUND(partitions_scanned / partitions_total * 100, 1) AS pct_partitions_scanned,
credits_used_cloud_services
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE - 7
AND execution_status = 'SUCCESS'
ORDER BY total_elapsed_time DESC
LIMIT 20;
-- If pct_partitions_scanned is near 100%: clustering will help
-- If bytes_scanned is high but rows_produced is low: pruning is failing
Step 2: Clustering Keys — Micro-Partition Pruning
-- Snowflake automatically clusters data on ingest into micro-partitions (~100MB each)
-- But if queries always filter by ORDER_DATE and your data arrived randomly,
-- Snowflake scans ALL partitions instead of just the relevant date range.
-- Check current clustering quality
SELECT SYSTEM$CLUSTERING_INFORMATION('MARTS.FACT_ORDERS', '(order_date)');
-- Returns average_overlaps: high number = poor clustering = many partitions scanned
-- Add a clustering key on the most common BI filter column
ALTER TABLE marts.fact_orders
CLUSTER BY (order_date); -- Automatic Background Clustering (ABC) kicks in
-- For compound filters: cluster on the most selective column first
ALTER TABLE marts.fact_orders
CLUSTER BY (order_date, region);
-- Monitor clustering progress (may take hours for large tables)
SELECT SYSTEM$CLUSTERING_INFORMATION('MARTS.FACT_ORDERS', '(ORDER_DATE, REGION)');
Step 3: Understand Snowflake's 3-Layer Cache
-- Layer 1: Metadata Cache (FREE — no warehouse needed)
-- Stores table metadata, row counts, min/max per partition
-- Enables partition pruning WITHOUT scanning data
-- Always-on; no configuration needed
-- Layer 2: Result Cache (FREE — no warehouse needed)
-- Stores exact query results for 24 hours
-- If same query runs again with same data, returns instantly
-- Invalidated when underlying table changes
SELECT COUNT(*) FROM fact_orders WHERE order_date = '2024-01-15';
-- First run: 12 seconds. Second run: 4 milliseconds (result cache hit)
-- Layer 3: Local Disk Cache (per warehouse, virtual warehouse memory + SSD)
-- Caches raw micro-partition data on warehouse nodes
-- Subsequent queries scanning SAME data are much faster
-- Destroyed when warehouse suspends — key reason to NOT auto-suspend too aggressively
-- For frequently-queried marts: set AUTO_SUSPEND = 600 (10 min) not 60 sec
-- Check cache usage in query profile
SELECT query_id,
bytes_scanned,
bytes_written_to_result,
percentage_scanned_from_cache -- > 80% = excellent cache hit rate
FROM snowflake.account_usage.query_history
WHERE query_id = '<your_query_id>';
Step 4: Search Optimization Service (for selective point lookups)
-- For queries like: WHERE customer_id = 12345 (selective, non-clustering-key columns)
-- Clustering won't help here; Search Optimization will
ALTER TABLE marts.fact_orders
ADD SEARCH OPTIMIZATION ON EQUALITY(customer_id, order_id);
-- Check cost/benefit before enabling (SOS has storage overhead)
SELECT SYSTEM$ESTIMATE_SEARCH_OPTIMIZATION_COSTS('MARTS.FACT_ORDERS',
'EQUALITY(customer_id)');
-- Returns estimated_query_credits_saved and search_access_cost
Step 5: Warehouse Right-Sizing
-- Find the optimal warehouse size by running the same query at different sizes
-- Cost doubles per size tier, but runtime often halves → sweet spot exists
-- Rule of thumb:
-- X-SMALL / SMALL : simple queries, <10M rows
-- MEDIUM : standard dbt transforms, 10M–500M rows
-- LARGE : complex joins, window functions on 500M+ rows
-- X-LARGE+ : data loads, heavy aggregations, initial full-refresh
-- For concurrent BI users — use Multi-Cluster warehouses
ALTER WAREHOUSE ANALYTICS_WH SET
MAX_CLUSTER_COUNT = 4 -- Scale out horizontally for concurrency
MIN_CLUSTER_COUNT = 1
SCALING_POLICY = 'STANDARD'; -- Aggressive scale-out for BI latency
Step 6: Query Rewrite Patterns
-- BAD: Function on indexed column destroys pruning SELECT * FROM fact_orders WHERE YEAR(order_date) = 2024; -- GOOD: Range filter preserves micro-partition pruning SELECT * FROM fact_orders WHERE order_date BETWEEN '2024-01-01' AND '2024-12-31'; -- BAD: SELECT * from a 200-column wide table in a BI tool SELECT * FROM fact_orders WHERE order_date = '2024-01-15'; -- GOOD: Only fetch columns the BI tool needs (columnar storage wins here) SELECT order_id, customer_id, net_revenue FROM fact_orders WHERE order_date = '2024-01-15'; -- BAD: DISTINCT on massive table (often a modelling symptom) SELECT DISTINCT customer_id FROM fact_orders; -- GOOD: Push deduplication upstream in dbt staging layer↑ Back to top
Real-World Scenario: After a silent data quality failure corrupted the monthly revenue report — which went undetected for 3 days — you are asked to design a comprehensive data quality framework that catches issues at every layer before they reach BI consumers.
Quality Framework Architecture — 4 Layers of Defence
Layer 1: Source Freshness Checks
↓ (fail fast if upstream is stale)
Layer 2: Schema Contract Tests (dbt built-in + custom generic)
↓ (fail if data shape changes)
Layer 3: Business Rule Validation (singular SQL tests + dbt-expectations)
↓ (fail if data violates domain logic)
Layer 4: Observability & Anomaly Detection (Monte Carlo / Bigeye / custom SQL)
↓ (alert on statistical drift — row counts, null rates, distribution shifts)
Layer 1: Source Freshness Monitoring
# sources.yml — enforce data arrival SLAs
sources:
- name: oracle_erp
database: RAW
schema: ORACLE_LANDING
freshness:
warn_after: { count: 1, period: hour }
error_after: { count: 4, period: hour }
loaded_at_field: header__timestamp
tables:
- name: orders
freshness:
warn_after: { count: 30, period: minute } # orders are critical
error_after: { count: 90, period: minute }
- name: inventory
freshness:
warn_after: { count: 4, period: hour }
# Run freshness check before any transformation
# dbt source freshness → returns WARN or ERROR before wasting compute
Layer 2: Schema & Integrity Tests
# Critical tests that must NEVER fail (severity: error → blocks pipeline)
models:
- name: fact_orders
tests:
- dbt_utils.equal_rowcount: # Rowcount matches staging
compare_model: ref('stg_orders')
- dbt_utils.recency: # Table has rows from last 2 hours
datepart: hour
field: order_date
interval: 2
columns:
- name: order_sk
tests: [not_null, unique]
- name: net_revenue
tests:
- not_null
- not_negative # Custom macro
- dbt_expectations.expect_column_values_to_be_between:
min_value: 0
max_value: 1000000 # Max plausible order value
- name: customer_sk
tests:
- relationships:
to: ref('dim_customers')
field: customer_sk
severity: warn # Orphans warn but don't block
Layer 3: Business Rule Singular Tests
-- tests/assert_revenue_reconciliation.sql
-- Revenue in Snowflake must match Stripe control total within 0.01%
-- This runs daily as part of the production dbt job
WITH snowflake_total AS (
SELECT SUM(net_revenue) AS total
FROM {{ ref('fact_orders') }}
WHERE order_date = CURRENT_DATE - 1
AND payment_gateway = 'STRIPE'
),
stripe_control AS (
SELECT expected_revenue AS total
FROM {{ ref('stripe_daily_reconciliation') }}
WHERE report_date = CURRENT_DATE - 1
)
SELECT
s.total AS snowflake_revenue,
c.total AS stripe_control_total,
ABS(s.total - c.total) / NULLIF(c.total, 0) * 100 AS variance_pct
FROM snowflake_total s, stripe_control c
WHERE ABS(s.total - c.total) / NULLIF(c.total, 0) > 0.0001 -- Fail if >0.01% off
-- tests/assert_no_duplicate_orders.sql
-- Catch if Qlik double-delivers an event (CDC dedup failure)
SELECT order_id, COUNT(*) AS cnt
FROM {{ ref('fact_orders') }}
WHERE order_date >= CURRENT_DATE - 3
GROUP BY order_id
HAVING cnt > 1
Layer 4: Statistical Anomaly Detection (custom SQL)
-- Store daily metrics and alert on Z-score anomalies
-- models/monitoring/mon_daily_row_counts.sql
{{ config(materialized='incremental', unique_key='model_name||check_date') }}
SELECT
'{{ this.name }}' AS model_name,
CURRENT_DATE AS check_date,
COUNT(*) AS row_count,
SUM(net_revenue) AS total_revenue,
COUNT(DISTINCT customer_sk) AS unique_customers
FROM {{ ref('fact_orders') }}
WHERE order_date = CURRENT_DATE - 1
-- Anomaly alert: flag if today's row count deviates >3 sigma from 30-day avg
WITH stats AS (
SELECT
AVG(row_count) AS mean_rows,
STDDEV(row_count) AS stddev_rows
FROM mon_daily_row_counts
WHERE check_date BETWEEN CURRENT_DATE - 31 AND CURRENT_DATE - 1
)
SELECT
t.check_date,
t.row_count,
s.mean_rows,
(t.row_count - s.mean_rows) / NULLIF(s.stddev_rows, 0) AS z_score
FROM mon_daily_row_counts t, stats s
WHERE t.check_date = CURRENT_DATE - 1
AND ABS((t.row_count - s.mean_rows) / NULLIF(s.stddev_rows, 0)) > 3;
Test Execution Strategy
| When | What Runs | Failure Action |
|---|---|---|
| Every PR (CI) | Tests on modified+ models only | Block merge |
| Every hourly run | Source freshness + tag:hourly tests | Slack alert, pause pipeline |
| Daily 06:00 | Full test suite + reconciliation tests | PagerDuty, incident ticket |
| Weekly | Statistical anomaly queries (Z-score) | Email data team lead |
Real-World Scenario: Your client is migrating from a Redshift-based monolith (3TB, 800 tables, 50 reports) to a modern Snowflake + dbt + S3 Lakehouse. As the lead data engineer, you must design the migration, preserve data history, and ensure zero BI downtime.
Migration Architecture
PHASE 1 — ASSESSMENT (Week 1–2) - Inventory all 800 Redshift tables: size, row count, query frequency - Classify: Hot (queried daily), Warm (weekly), Cold (archival only) - Identify: Redshift-specific SQL (LISTAGG, DECODE, distribution keys) - Map: which tables feed which reports (use Redshift STL_QUERY logs) PHASE 2 — FOUNDATION (Week 2–4) - Provision Snowflake: account, databases, warehouses, roles, RBAC - Set up S3 as the neutral landing zone (both Redshift and Snowflake read it) - Deploy dbt Cloud + connect to Snowflake CI/CD - Establish: RAW → STAGING → MARTS layer structure PHASE 3 — PARALLEL RUN (Week 4–8) - Run BOTH Redshift and Snowflake in parallel - Validate: row counts, SUM of key metrics, NULL rates match daily - BI tools connected to BOTH; analysts validate dashboard numbers - Fix discrepancies in dbt models before cutover PHASE 4 — CUTOVER (Week 8–10) - Freeze Redshift writes (maintenance window, e.g. Friday 10 PM) - Final delta load: CDC all changes since last full load - Switch BI tool data source connections to Snowflake - Monitor for 48 hours; rollback plan = flip BI connections back to Redshift PHASE 5 — DECOMMISSION (Week 12+) - Redshift cluster to read-only for 30 days (safety net) - Migrate cold/archival data to S3 Parquet (Snowflake external tables) - Terminate Redshift cluster
Data Migration: Redshift → S3 → Snowflake
-- Step 1: Unload hot tables from Redshift to S3 in Parquet
UNLOAD ('SELECT * FROM sales.fact_orders')
TO 's3://migration-bucket/fact_orders/'
IAM_ROLE 'arn:aws:iam::123456789:role/redshift-s3-role'
FORMAT AS PARQUET
PARTITION BY (order_date)
MAXFILESIZE 256 MB;
-- Step 2: Load from S3 into Snowflake (external stage → COPY INTO)
CREATE STAGE migration.redshift_stage
URL = 's3://migration-bucket/'
STORAGE_INTEGRATION = s3_migration_int;
COPY INTO raw.fact_orders_migration
FROM @migration.redshift_stage/fact_orders/
FILE_FORMAT = (TYPE = 'PARQUET' MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE)
PATTERN = '.*\.parquet'
ON_ERROR = 'CONTINUE';
-- Step 3: Validate row counts match
SELECT
'REDSHIFT' AS source, COUNT(*) AS rows FROM redshift_control_table
UNION ALL
SELECT
'SNOWFLAKE' AS source, COUNT(*) AS rows FROM raw.fact_orders_migration;
SQL Compatibility: Redshift → Snowflake Translation Guide
| Redshift SQL | Snowflake Equivalent | Notes |
|---|---|---|
LISTAGG(col, ',') | LISTAGG(col, ',') | Same syntax ✓ |
DECODE(col, v1, r1, v2, r2) | IFF / CASE WHEN | DECODE not in Snowflake |
GETDATE() | CURRENT_TIMESTAMP | Rename |
DATEDIFF('day', a, b) | DATEDIFF('day', a, b) | Same ✓ |
DATEPART(month, d) | DATE_PART('month', d) | String argument |
NVL(a, b) | COALESCE(a, b) / IFF(a IS NULL, b, a) | Use COALESCE |
| DISTKEY / SORTKEY | CLUSTER BY | Different mechanism |
| VARCHAR(MAX) | VARCHAR (up to 16MB) | Remove MAX |
Modern Lakehouse Stack Post-Migration
┌─────────────────────────────────────────────────────────────────┐
│ INGESTION LAYER │
│ Qlik Replicate (CDC) │ Fivetran (SaaS) │ Snowpipe (Files) │
└───────────────────────────────┬─────────────────────────────────┘
│
┌───────────▼──────────┐
│ S3 Data Lake │ Raw Parquet / JSON
│ (source of truth, │ Partitioned by date
│ infinite retention) │ Lifecycle: 7yr Glacier
└───────────┬──────────┘
│ COPY INTO / External Tables
┌───────────▼──────────┐
│ Snowflake (Compute) │ Zero-copy over S3 data
│ RAW → STG → MARTS │ via Iceberg / Ext Tables
└───────────┬──────────┘
│ dbt Cloud (Transform)
┌───────────▼──────────┐
│ Semantic Layer │ dbt metrics / Looker LookML
└───────────┬──────────┘
│
┌───────────────┼──────────────────┐
Tableau Power BI Self-Service
(Operational) (Executive) (Mode / Hex)
RBAC Design (Role-Based Access Control)
-- Principle of least privilege — no direct user-to-table grants
-- All access via roles; roles assigned to users
CREATE ROLE ANALYST_ROLE; -- SELECT on marts only
CREATE ROLE ENGINEER_ROLE; -- SELECT + INSERT on staging + marts
CREATE ROLE TRANSFORM_ROLE; -- Used by dbt service account
CREATE ROLE INGEST_ROLE; -- Used by Qlik / Fivetran service accounts
CREATE ROLE ADMIN_ROLE; -- Full DDL + GRANT
-- dbt service account permissions
GRANT ROLE TRANSFORM_ROLE TO USER dbt_service_account;
GRANT USAGE ON WAREHOUSE TRANSFORM_WH TO ROLE TRANSFORM_ROLE;
GRANT USAGE, CREATE SCHEMA ON DATABASE ANALYTICS TO ROLE TRANSFORM_ROLE;
GRANT SELECT ON ALL TABLES IN SCHEMA RAW TO ROLE TRANSFORM_ROLE;
GRANT CREATE TABLE, INSERT, UPDATE, DELETE
ON FUTURE TABLES IN SCHEMA ANALYTICS.STAGING TO ROLE TRANSFORM_ROLE;
-- Analysts get Secure Views over marts (never raw data)
GRANT SELECT ON ALL VIEWS IN SCHEMA ANALYTICS.MARTS TO ROLE ANALYST_ROLE;
-- Column-level masking for PII
CREATE MASKING POLICY email_mask AS (val VARCHAR) RETURNS VARCHAR ->
CASE WHEN CURRENT_ROLE() IN ('ADMIN_ROLE','ENGINEER_ROLE') THEN val
ELSE '***@***.***'
END;
ALTER TABLE marts.dim_customers
MODIFY COLUMN email SET MASKING POLICY email_mask;
↑ Back to top
Real-World Scenario: Your daily pipeline must: wait for an S3 file from a partner to arrive (unknown time), then ingest it into Snowflake, run dbt, and only email the finance team when all models pass tests. If any step fails, retry twice and alert Slack. You need Airflow — not just a cron job.
Core Airflow Concepts
# DAG = Directed Acyclic Graph of tasks # Operator = a single unit of work (SQL, Python, bash, sensor, etc.) # Sensor = special operator that polls until a condition is true # XCom = cross-task communication (pass values between tasks) # TaskGroup = visual grouping of related tasks in the UI # Connection = stored credential (Snowflake, AWS, dbt Cloud, etc.)
Production DAG: S3 Arrival → Ingest → dbt → Alert
# dags/daily_finance_pipeline.py
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
SNOWFLAKE_CONN = 'snowflake_prod'
S3_BUCKET = 'finance-drops'
def check_row_count(**context):
"""ShortCircuit: only proceed if ingestion loaded rows"""
row_count = context['ti'].xcom_pull(task_ids='ingest.copy_into_snowflake')
return row_count > 0 # False = skip all downstream tasks
with DAG(
dag_id = 'daily_finance_pipeline',
schedule_interval = '0 6 * * 1-5', # 6 AM UTC, weekdays only
start_date = datetime(2024, 1, 1),
catchup = False,
default_args = {
'retries': 2,
'retry_delay': timedelta(minutes=10),
'on_failure_callback': lambda ctx: SlackWebhookOperator(
task_id='slack_fail',
slack_webhook_conn_id='slack_data_alerts',
message=f":red_circle: DAG {ctx['dag'].dag_id} failed on {ctx['ds']}"
).execute(ctx),
},
tags = ['finance', 'snowflake', 'dbt'],
) as dag:
# 1. Wait for partner file (polls every 60s, times out after 4 hours)
wait_for_file = S3KeySensor(
task_id = 'wait_for_s3_file',
bucket_name = S3_BUCKET,
bucket_key = 'finance/daily_report_{{ ds_nodash }}.parquet',
aws_conn_id = 'aws_prod',
poke_interval = 60,
timeout = 14400, # 4 hours
mode = 'reschedule', # releases worker slot while waiting
)
with TaskGroup('ingest') as ingest_group:
copy_into = SnowflakeOperator(
task_id = 'copy_into_snowflake',
snowflake_conn_id = SNOWFLAKE_CONN,
sql = """
COPY INTO raw.finance_report
FROM @raw.finance_stage/finance/daily_report_{{ ds_nodash }}.parquet
FILE_FORMAT = parquet_fmt
ON_ERROR = 'ABORT_STATEMENT';
SELECT ROW_COUNT();
""",
warehouse = 'INGEST_WH',
)
row_check = ShortCircuitOperator(
task_id = 'check_rows_loaded',
python_callable = check_row_count,
provide_context = True,
)
copy_into >> row_check
# 2. dbt transform (Slim CI: only changed models)
dbt_run = DbtCloudRunJobOperator(
task_id = 'dbt_daily_models',
dbt_cloud_conn_id = 'dbt_cloud',
job_id = 12345,
check_interval = 30,
timeout = 3600,
)
# 3. Notify finance team on success
notify = SlackWebhookOperator(
task_id = 'notify_finance_team',
slack_webhook_conn_id = 'slack_finance',
message = ":white_check_mark: Finance pipeline complete for {{ ds }}. Models ready in Snowflake.",
)
wait_for_file >> ingest_group >> dbt_run >> notify
Sensor Modes — Critical for Production
# mode='poke' — sensor holds the worker slot the whole time (bad for long waits) # mode='reschedule' — sensor releases worker slot between checks (cost-efficient) # Use 'reschedule' for any sensor that may wait more than a few minutes
XCom — Passing Data Between Tasks
def extract_row_count(ti, **kwargs):
# Push a value to XCom
ti.xcom_push(key='rows_ingested', value=150000)
def validate_count(ti, **kwargs):
# Pull value from another task
count = ti.xcom_pull(task_ids='count_task', key='rows_ingested')
if count < 1000:
raise ValueError(f"Only {count} rows ingested — possible data issue")
# XCom is for small values (IDs, counts, flags) — never pass DataFrames via XCom
Airflow Best Practices
- Idempotent tasks — every task must be safely re-runnable. Use
MERGEnotINSERT, use{{ ds }}templating for date partitions - Atomic tasks — each task does one thing. Don't combine ingest + transform in one SnowflakeOperator call
- Use
mode='reschedule'on all sensors to avoid starving the worker pool - TaskGroups — group related tasks visually but don't over-nest; max 2 levels deep
- SLA alerts — set
sla=timedelta(hours=2)on critical tasks; Airflow emails if a task misses its SLA - Pool usage — create Airflow Pools for Snowflake connections to cap concurrency and prevent warehouse overload
Real-World Scenario: Your analytics team needs to run a customer churn prediction model. Traditionally this means pulling data out of Snowflake into a Jupyter notebook, running Python, and writing predictions back — with data leaving the secure environment. Snowpark lets you run the entire pipeline inside Snowflake — no data movement, no separate compute cluster.
Snowpark Session & DataFrame API
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum as sum_, avg, when, lit
from snowflake.snowpark.types import IntegerType, FloatType
# Connect (same creds as SQL — uses warehouse, role, schema from profile)
session = Session.builder.configs({
'account': 'orgname-acctname',
'user': 'dbt_service_account',
'password': 'xxx',
'role': 'TRANSFORM_ROLE',
'warehouse': 'TRANSFORM_WH',
'database': 'ANALYTICS',
'schema': 'MARTS',
}).create()
# DataFrame — lazy evaluation, pushes computation to Snowflake
orders = session.table('fact_orders')
# Filter + aggregate (no data leaves Snowflake)
daily_revenue = (orders
.filter(col('status') == 'COMPLETED')
.filter(col('order_date') >= '2024-01-01')
.group_by('order_date', 'region')
.agg(
sum_('net_revenue').alias('total_revenue'),
avg('net_revenue').alias('avg_order_value'),
)
.sort('order_date')
)
# Inspect without materialising
daily_revenue.show(10)
print(daily_revenue.explain()) # prints the SQL Snowpark will execute
# Write result back to Snowflake
daily_revenue.write.mode('overwrite').save_as_table('MARTS.daily_revenue_summary')
Vectorised Python UDF (batch processing, fast)
from snowflake.snowpark.functions import udf, col
from snowflake.snowpark.types import FloatType
import pandas as pd
# Vectorised UDF: receives a pd.Series, returns a pd.Series
# Much faster than row-by-row UDFs for large tables
@udf(name='calculate_ltv_score', is_permanent=True,
stage_location='@raw.udf_stage', replace=True,
input_types=[FloatType(), FloatType(), FloatType()],
return_type=FloatType(),
packages=['pandas'])
def calculate_ltv_score(total_spend: pd.Series,
order_count: pd.Series,
months_active: pd.Series) -> pd.Series:
# LTV scoring: recency + frequency + monetary
rfm_score = (
(total_spend / total_spend.max()) * 0.5 +
(order_count / order_count.max()) * 0.3 +
(months_active / months_active.max()) * 0.2
)
return rfm_score.clip(0, 1)
# Use UDF in a DataFrame operation
customers = session.table('dim_customers')
scored = customers.with_column(
'ltv_score',
calculate_ltv_score(col('lifetime_spend'), col('order_count'), col('months_active'))
)
scored.write.mode('overwrite').save_as_table('MARTS.dim_customers_scored')
ML Pipeline inside Snowflake (Snowpark ML)
from snowflake.ml.modeling.preprocessing import StandardScaler
from snowflake.ml.modeling.ensemble import RandomForestClassifier
from snowflake.ml.modeling.model_selection import train_test_split
# Load features from Snowflake
features = session.table('ML.churn_features')
train_df, test_df = train_test_split(features, test_size=0.2, random_state=42)
# Scale features (runs inside Snowflake)
scaler = StandardScaler(
input_cols = ['total_spend', 'days_since_last_order', 'order_count'],
output_cols = ['spend_scaled', 'recency_scaled', 'frequency_scaled']
)
train_scaled = scaler.fit(train_df).transform(train_df)
# Train model (runs on Snowflake compute)
clf = RandomForestClassifier(
input_cols = ['spend_scaled', 'recency_scaled', 'frequency_scaled'],
label_cols = ['is_churned'],
output_cols = ['predicted_churn'],
n_estimators = 100,
)
clf.fit(train_scaled)
# Score entire customer table — predictions written back to Snowflake
predictions = clf.predict(scaler.transform(session.table('dim_customers')))
predictions.write.mode('overwrite').save_as_table('ML.churn_predictions')
When to Use Snowpark vs dbt vs Stored Procedures
| Tool | Best For | Language |
|---|---|---|
| dbt | SQL transformations, testing, documentation, CI/CD | SQL + Jinja |
| Snowpark Python | Complex logic, ML, pandas-style operations, UDFs | Python |
| Stored Procedures (JS) | Admin tasks, dynamic SQL, metadata operations | JavaScript |
| Stored Procedures (Python) | Python logic that must run transactionally with SQL | Python |
Real-World Scenario: Your finance team calculates "Monthly Recurring Revenue" one way in Tableau, the sales team calculates it differently in Looker, and the exec team uses yet another formula in Excel. Three definitions of MRR means three different numbers in board meetings. The dbt Semantic Layer solves this — define MRR once in dbt, all BI tools query the same definition.
dbt Semantic Layer Architecture
# The Semantic Layer sits between dbt models and BI tools
# BI tools query the Semantic Layer API instead of tables directly
# This means: metric logic lives in code, version-controlled, tested
BI Tool (Tableau / Looker / Mode)
│ "Give me MRR by region for last quarter"
▼
dbt Semantic Layer API (MetricFlow)
│ Translates to optimised SQL against your Snowflake marts
▼
Snowflake MARTS (fact_subscriptions, dim_customers, dim_date)
Defining a Semantic Model (dbt Core 1.6+)
# models/semantic_models/sem_orders.yml
# A semantic model describes the grain and measures of a model
semantic_models:
- name: orders
description: "Order-level semantic model — grain is one row per order"
model: ref('fact_orders')
defaults:
agg_time_dimension: order_date
entities:
- name: order
type: primary
expr: order_sk
- name: customer
type: foreign
expr: customer_sk
dimensions:
- name: order_date
type: time
type_params:
time_granularity: day
- name: region
type: categorical
expr: region
- name: status
type: categorical
expr: status
measures:
- name: order_count
description: "Total number of orders"
agg: count
expr: order_sk
- name: gross_revenue
description: "Sum of gross revenue before discounts"
agg: sum
expr: gross_revenue
- name: net_revenue
description: "Sum of net revenue after discounts"
agg: sum
expr: net_revenue
Defining Metrics on Top of Semantic Models
# models/metrics/metrics.yml
metrics:
- name: monthly_recurring_revenue
label: "MRR"
description: "Monthly recurring revenue from active subscriptions"
type: simple
type_params:
measure: net_revenue
filter: |
{{ Dimension('status') }} = 'ACTIVE'
AND {{ Dimension('subscription_type') }} = 'RECURRING'
- name: average_order_value
label: "AOV"
type: ratio
type_params:
numerator: net_revenue
denominator: order_count
- name: revenue_growth_mom
label: "MoM Revenue Growth %"
type: derived
type_params:
expr: "(current_period_revenue - prior_period_revenue) / prior_period_revenue * 100"
metrics:
- name: net_revenue
alias: current_period_revenue
offset_window: 0 months
- name: net_revenue
alias: prior_period_revenue
offset_window: 1 month
- name: customer_lifetime_value
label: "LTV"
type: cumulative
type_params:
measure: net_revenue
window: unbounded # sum all revenue for each customer, ever
Querying Metrics via CLI or API
# Query a metric directly from dbt CLI (MetricFlow)
mf query --metrics monthly_recurring_revenue --group-by metric_time__month,region --where "region = 'US-EAST'" --order metric_time__month
# Output: generates and runs optimised Snowflake SQL
# SELECT
# DATE_TRUNC('month', order_date) AS metric_time__month,
# region,
# SUM(net_revenue) AS monthly_recurring_revenue
# FROM marts.fact_orders
# WHERE status = 'ACTIVE' AND subscription_type = 'RECURRING'
# AND region = 'US-EAST'
# GROUP BY 1, 2
# ORDER BY 1
Why the Semantic Layer Matters for Analytics Engineers
- Single source of truth — MRR is defined once in YAML, tested in CI, and used by every BI tool. No more metric drift between teams
- Version-controlled business logic — when the CFO changes the MRR definition, you open a PR, get approval, and deploy. Full audit trail
- Self-service with guardrails — analysts query metrics without writing SQL, but the SQL they get is governed and correct
- Reuse measures across metrics — define
net_revenueonce as a measure, build MRR, AOV, LTV, cohort revenue all from the same base without duplication
Real-World Scenario: The backend team adds a new field to the orders API response and renames customer_id to customerId (camelCase). Your Snowflake pipeline breaks silently — orders still load, but all customer joins now return NULL. You find out when the weekly revenue report is wrong. Data contracts would have caught this before the change was deployed.
What is a Data Contract?
# A data contract is a formal, versioned agreement between a data producer # (backend team, source system) and data consumer (data engineering, analytics) # specifying: # - Schema: column names, types, nullability # - Semantics: what each field means # - SLAs: freshness, availability guarantees # - Ownership: who is responsible for breaking changes # - Versioning: how breaking changes are communicated and rolled out
Contract as Code — YAML Schema Definition
# contracts/orders_v2.yml
# This file lives in the PRODUCER's repo (backend team)
# AND is referenced by the consumer's repo (data engineering)
# Breaking changes require a version bump and migration period
id: orders-contract
version: "2.1.0"
owner: vsurya@duck.com
consumers:
- vsurya@duck.com
- vsurya@duck.com
schema:
fields:
- name: order_id
type: integer
nullable: false
description: "Unique order identifier. Never recycled."
- name: customer_id
type: integer
nullable: false
description: "FK to customers table. Always present for completed orders."
- name: status
type: string
nullable: false
enum: ["PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED", "REFUNDED"]
description: "Order lifecycle status. Only terminal states: DELIVERED, CANCELLED, REFUNDED."
- name: amount_usd
type: decimal(12,2)
nullable: false
min: 0
description: "Order total in USD. Always positive. Tax inclusive."
- name: created_at
type: timestamp_tz
nullable: false
sla:
freshness_minutes: 5 # Data must be available within 5 min of creation
availability_pct: 99.9 # 99.9% uptime guarantee
changelog:
- version: "2.1.0"
date: "2024-01-15"
changes: "Added optional loyalty_tier field (nullable, non-breaking)"
- version: "2.0.0"
date: "2023-06-01"
changes: "BREAKING: renamed customer_id from customerId. Migration period: 90 days."
Enforcing Contracts in dbt (Consumer Side)
# models/staging/schema.yml
# Reference the contract — dbt validates against it at compile time
models:
- name: stg_orders
config:
contract:
enforced: true # dbt will validate column types match the model
columns:
- name: order_id
data_type: int
constraints:
- type: not_null
- type: primary_key
- name: customer_id
data_type: int
constraints:
- type: not_null
- type: foreign_key
to: ref('stg_customers')
to_columns: [customer_id]
- name: status
data_type: varchar
constraints:
- type: not_null
# When contract: enforced: true, dbt adds a CREATE OR REPLACE TABLE with
# explicit column definitions — if your SQL returns wrong types, it fails at compile
Detecting Contract Violations (Producer Side)
-- Run in CI on producer's repo before merging schema changes
-- Compares current schema against the contract YAML
WITH contract_columns AS (
-- What the contract says should exist
SELECT 'order_id' AS col, 'INTEGER' AS expected_type UNION ALL
SELECT 'customer_id', 'INTEGER' UNION ALL
SELECT 'status', 'VARCHAR' UNION ALL
SELECT 'amount_usd', 'DECIMAL'
),
actual_columns AS (
SELECT LOWER(column_name) AS col, data_type AS actual_type
FROM information_schema.columns
WHERE table_name = 'ORDERS' AND table_schema = 'APP'
)
SELECT
c.col,
c.expected_type,
a.actual_type,
CASE
WHEN a.col IS NULL THEN 'MISSING COLUMN — BREAKING CHANGE'
WHEN c.expected_type != a.actual_type THEN 'TYPE MISMATCH — BREAKING CHANGE'
ELSE 'OK'
END AS status
FROM contract_columns c
LEFT JOIN actual_columns a ON c.col = a.col
WHERE a.col IS NULL OR c.expected_type != a.actual_type;
Data Contract Lifecycle
- Non-breaking changes (add nullable column, add enum value) — allowed without version bump, but must notify consumers
- Breaking changes (rename column, change type, remove column, change enum values) — require major version bump, migration period (30–90 days), and dual-publishing the old + new schema simultaneously
- Contract registry — store contracts centrally (Git repo, data catalog like DataHub) so any team can discover what data is available and what is guaranteed
- Ownership — the producer is contractually responsible for notifying consumers before breaking changes and maintaining SLAs
Real-World Scenario: A customer moves from the "SMB" segment to "Enterprise". How you handle this change determines whether your historical revenue reports show correct segmentation — or silently backfill the customer's entire history as Enterprise, inflating enterprise metrics and deflating SMB metrics going back years.
SCD Type 0 — Never Changes (retain original)
-- Use when: the original value at acquisition time is the analytically correct one
-- Example: customer's first acquisition channel — should never change even if
-- the customer signs up for a new product via a different channel later
CREATE TABLE dim_customers_t0 (
customer_sk VARCHAR PRIMARY KEY,
customer_id INT,
first_channel VARCHAR, -- set at acquisition, immutable forever
signup_date DATE
);
-- dbt: use a simple table materialization — no SCD logic needed
SCD Type 1 — Overwrite (no history)
-- Use when: corrections (data was wrong) or attributes where history is irrelevant
-- Example: fixing a customer's email address typo, updating a phone number
-- WARNING: destroys historical context — all past joins use the new value
-- dbt: standard incremental merge
{{ config(materialized='incremental', unique_key='customer_sk',
incremental_strategy='merge') }}
SELECT
{{ generate_surrogate_key(['customer_id']) }} AS customer_sk,
customer_id, email, phone_number, region
FROM {{ ref('stg_customers') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
SCD Type 2 — Full History (most common, interview favourite)
-- Use when: dimension changes and history matters for analytics
-- Example: customer segment changes SMB → Enterprise
-- Each version gets its own row with validity dates
-- dim_customers with SCD2:
-- | customer_sk | customer_id | segment | valid_from | valid_to | is_current |
-- |-------------|-------------|------------|------------|------------|------------|
-- | abc123 | 1001 | SMB | 2020-01-01 | 2023-06-14 | FALSE |
-- | def456 | 1001 | ENTERPRISE | 2023-06-15 | NULL | TRUE |
-- dbt snapshot (automates SCD2)
{% snapshot snap_customers %}
{{
config(
target_schema='snapshots',
unique_key='customer_id',
strategy='check',
check_cols=['segment', 'account_manager', 'region']
)
}}
SELECT customer_id, name, email, segment, account_manager, region, updated_at
FROM {{ source('crm', 'customers') }}
{% endsnapshot %}
-- Point-in-time join: what segment was customer 1001 in on Black Friday 2022?
SELECT f.*, c.segment
FROM fact_orders f
JOIN dim_customers c
ON f.customer_sk = c.customer_sk
AND f.order_date BETWEEN c.valid_from AND COALESCE(c.valid_to, '9999-12-31');
-- Result: correctly shows SMB for pre-June 2023 orders, ENTERPRISE after
SCD Type 3 — Limited History (current + previous column)
-- Use when: only the most recent change matters and storage is constrained
-- Example: show both current and previous region for a customer
-- Limitation: only tracks ONE previous value — two changes ago is lost
CREATE TABLE dim_customers_t3 (
customer_sk VARCHAR,
customer_id INT,
current_region VARCHAR,
previous_region VARCHAR, -- only the immediately prior value
region_changed_at DATE
);
-- When region changes: shift current → previous, write new current
UPDATE dim_customers_t3 SET
previous_region = current_region,
current_region = :new_region,
region_changed_at = CURRENT_DATE
WHERE customer_id = :customer_id;
SCD Type 4 — History Table (rapid change handling)
-- Use when: an attribute changes very frequently (e.g., real-time status)
-- Keep a lean current-state table + separate history table
-- Avoids SCD2 tables bloating with millions of status change rows
-- Current state (small, fast joins)
CREATE TABLE dim_order_status_current (
order_id INT PRIMARY KEY,
status VARCHAR,
updated_at TIMESTAMP
);
-- Full history (append-only audit trail)
CREATE TABLE dim_order_status_history (
order_id INT,
status VARCHAR,
valid_from TIMESTAMP,
valid_to TIMESTAMP,
changed_by VARCHAR
);
-- BI joins to current table for dashboards (fast)
-- Data science queries history table for status transition analysis
SCD Type 6 — Hybrid (Type 1 + 2 + 3 combined)
-- Use when: you need history AND easy access to current value on every row
-- Adds "current_" columns to a Type 2 table — denormalised but very convenient for BI
CREATE TABLE dim_customers_t6 (
customer_sk VARCHAR PRIMARY KEY,
customer_id INT,
segment VARCHAR, -- value at this VERSION's time
current_segment VARCHAR, -- always the LATEST value (Type 1 overwrite)
valid_from DATE,
valid_to DATE,
is_current BOOLEAN
);
-- BI can join without date logic for current-state reports (use current_segment)
-- Data science uses segment + valid_from/valid_to for historical accuracy
SCD Type Comparison
| Type | Rows per Change | History | Complexity | Best For |
|---|---|---|---|---|
| Type 0 | 0 | Original only | None | Acquisition channel, first touch |
| Type 1 | 0 (overwrite) | None | Low | Data corrections, non-analytic attrs |
| Type 2 | Close old + open new | Full | High | Segment, region, account manager changes |
| Type 3 | 0 (column update) | One previous value | Low | When only last change matters |
| Type 4 | 1 in history table | Full (separate table) | Medium | Rapidly changing attributes (status) |
| Type 6 | Close old + open new | Full + current cols | Very High | BI convenience + historical accuracy |
Real-World Scenario: Your ride-sharing app emits 50,000 trip events per second. You need trip completion events in Snowflake within 30 seconds for operational monitoring, and in a clean aggregated form within 5 minutes for the driver payout system. Design the streaming pipeline.
Architecture: Kafka → Kafka Connect → Snowflake
Mobile App / Backend Services
│ (publish events as Avro/JSON to Kafka topics)
▼
Apache Kafka Cluster (Confluent Cloud or self-hosted)
Topics:
- trip.completed (50K events/sec)
- trip.status_updated (200K events/sec)
- driver.location (500K events/sec — usually filtered, not all to SF)
│
│ Kafka Connect — Snowflake Sink Connector
│ (batches micro-groups every 30 seconds into S3 intermediate)
▼
Snowflake Snowpipe (auto-ingest from S3 buffer)
│
▼
raw.trip_events_landing (VARIANT column, schema-flexible)
│
│ Snowflake Stream + Task (every 1 min)
▼
staging.trip_events (typed, deduplicated)
│
│ dbt incremental (every 5 min)
▼
marts.fact_trips (aggregated, clustered by trip_date)
Kafka Connect Snowflake Sink Configuration
# kafka-connect-snowflake-sink.json
{
"name": "snowflake-trip-sink",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "4",
"topics": "trip.completed",
"snowflake.url.name": "orgname-acctname.snowflakecomputing.com",
"snowflake.user.name": "kafka_service_account",
"snowflake.private.key": "${file:/secrets/snowflake.properties:private.key}",
"snowflake.database.name": "RAW",
"snowflake.schema.name": "KAFKA_LANDING",
"snowflake.topic2table.map": "trip.completed:trip_events",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "https://schema-registry:8081",
"buffer.count.records": "10000", # flush after 10K records...
"buffer.flush.time": "30", # ...or 30 seconds
"buffer.size.bytes": "67108864", # ...or 64MB — whichever first
"snowflake.metadata.createtable": "true"
}
}
Landing Table Schema + Stream
-- Kafka connector creates this automatically, but here's the structure:
CREATE TABLE raw.trip_events (
record_metadata VARIANT, -- Kafka offset, partition, topic
record_content VARIANT -- the actual event payload
);
-- Stream captures every new event landed by Kafka connector
CREATE STREAM raw.trip_events_stream ON TABLE raw.trip_events;
-- Task processes stream every 60 seconds
CREATE TASK raw.process_trips_task
WAREHOUSE = INGEST_WH
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('raw.trip_events_stream')
AS
INSERT INTO staging.trip_events
SELECT
record_content:trip_id::VARCHAR AS trip_id,
record_content:driver_id::INT AS driver_id,
record_content:rider_id::INT AS rider_id,
record_content:fare_amount::DECIMAL(10,2) AS fare_amount,
record_content:completed_at::TIMESTAMP AS completed_at,
record_metadata:CreateTime::TIMESTAMP AS kafka_timestamp,
-- Detect late arrivals (event time vs Kafka ingestion time)
DATEDIFF('second',
record_content:completed_at::TIMESTAMP,
record_metadata:CreateTime::TIMESTAMP) AS latency_seconds
FROM raw.trip_events_stream
WHERE record_content:event_type::VARCHAR = 'TRIP_COMPLETED'
-- Deduplicate: skip if already processed
AND record_content:trip_id::VARCHAR NOT IN (
SELECT trip_id FROM staging.trip_events
WHERE completed_at >= CURRENT_DATE - 1
);
Handling Schema Evolution with Schema Registry
# Confluent Schema Registry enforces Avro schema compatibility
# BACKWARD compatibility: new schema can read data written with old schema
# Producers must register schema before publishing
# Breaking changes (rename/remove field) are REJECTED at the registry
# In practice:
# - Add new optional fields freely (backward compatible)
# - Never rename or remove fields without a deprecation period
# - Use nullable fields with defaults for all new additions
# Example Avro schema for trip.completed topic:
{
"type": "record",
"name": "TripCompleted",
"namespace": "com.rideshare.events",
"fields": [
{"name": "trip_id", "type": "string"},
{"name": "fare_amount", "type": "double"},
{"name": "completed_at", "type": "long", "logicalType": "timestamp-millis"},
{"name": "tip_amount", "type": ["null","double"], "default": null} -- v2: nullable
]
}
Latency Budget
| Hop | Latency | Tuning |
|---|---|---|
| App → Kafka publish | < 5ms | async producer, acks=1 |
| Kafka → Connector buffer flush | 30 sec | reduce buffer.flush.time |
| Snowpipe ingest | 30–60 sec | auto-ingest, can't reduce below ~30s |
| Stream + Task process | 1 min | reduce Task schedule |
| Total: raw landing | ~2 min | Well within 30s for monitoring |
| dbt incremental marts | 5 min | hourly dbt run job |
Real-World Scenario: Your Snowflake bill jumped from $8K to $22K in one month. No new pipelines were added. The culprit: a BI analyst accidentally left a LARGE warehouse running without auto-suspend, and a broken dbt model ran a full table scan on a 500M-row table 48 times due to a retry loop. Here's how to prevent and diagnose this.
Step 1: Find the Cost Drivers
-- Top credit consumers by warehouse (last 30 days)
SELECT
warehouse_name,
ROUND(SUM(credits_used), 2) AS total_credits,
ROUND(SUM(credits_used) * 3.0, 2) AS est_cost_usd, -- ~$3/credit on Enterprise
COUNT(DISTINCT query_id) AS query_count,
MAX(warehouse_size) AS max_size_used
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= CURRENT_DATE - 30
GROUP BY warehouse_name
ORDER BY total_credits DESC;
-- Most expensive queries (by credits consumed)
SELECT
query_id,
LEFT(query_text, 80) AS query_preview,
warehouse_name,
total_elapsed_time / 1000 AS elapsed_sec,
credits_used_cloud_services,
ROUND(bytes_scanned / POWER(1024,3), 2) AS gb_scanned,
partitions_scanned,
partitions_total,
ROUND(partitions_scanned * 100.0 / NULLIF(partitions_total,0), 1) AS pct_scanned
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE - 7
AND execution_status = 'SUCCESS'
ORDER BY total_elapsed_time DESC
LIMIT 20;
Step 2: Resource Monitors — Budget Caps Per Warehouse
-- Create a resource monitor that alerts at 80% and suspends at 100%
CREATE RESOURCE MONITOR bi_warehouse_monitor
WITH CREDIT_QUOTA = 500 -- 500 credits per month
TRIGGERS
ON 80 PERCENT DO NOTIFY -- email account admins
ON 100 PERCENT DO SUSPEND -- suspend warehouse mid-run if exceeded
ON 110 PERCENT DO SUSPEND_IMMEDIATE; -- hard stop, kill running queries
-- Assign to BI warehouse
ALTER WAREHOUSE BI_WH SET RESOURCE_MONITOR = bi_warehouse_monitor;
-- Account-level monitor (catch-all safety net)
CREATE RESOURCE MONITOR account_monthly_cap
WITH CREDIT_QUOTA = 5000 -- total monthly account budget
TRIGGERS
ON 75 PERCENT DO NOTIFY
ON 90 PERCENT DO NOTIFY
ON 100 PERCENT DO SUSPEND; -- suspends ALL warehouses
Step 3: Warehouse Right-Sizing & Auto-Suspend Tuning
-- Rule: auto-suspend should reflect the gap between queries
-- BI warehouse: users query in bursts → 5 min suspend (keep warm between clicks)
ALTER WAREHOUSE BI_WH SET AUTO_SUSPEND = 300;
-- dbt transform: runs in a scheduled job, idle between runs → 2 min suspend
ALTER WAREHOUSE TRANSFORM_WH SET AUTO_SUSPEND = 120;
-- Ingest: brief bursts → aggressive suspend
ALTER WAREHOUSE INGEST_WH SET AUTO_SUSPEND = 60;
-- Check how long warehouses actually run vs idle (find over-provisioned ones)
SELECT
warehouse_name,
ROUND(SUM(CASE WHEN credits_used > 0 THEN 1 ELSE 0 END) * 100.0
/ COUNT(*), 1) AS pct_time_active,
ROUND(SUM(credits_used), 2) AS credits_used
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= CURRENT_DATE - 30
GROUP BY warehouse_name
ORDER BY pct_time_active ASC;
-- Warehouses with <10% active time are candidates for downsizing or consolidation
Step 4: Query Governance — Prevent Runaway Queries
-- Set statement timeout on warehouses (kill queries running over 10 min)
ALTER WAREHOUSE TRANSFORM_WH SET
STATEMENT_TIMEOUT_IN_SECONDS = 600; -- kill at 10 min
ALTER WAREHOUSE BI_WH SET
STATEMENT_TIMEOUT_IN_SECONDS = 300; -- BI queries should be fast
-- Set max concurrency to prevent one user starving others
ALTER WAREHOUSE BI_WH SET
MAX_CONCURRENCY_LEVEL = 8; -- max 8 parallel queries per cluster
-- Query tag: tag every dbt model run for cost attribution
-- In dbt_project.yml:
-- pre-hook: "ALTER SESSION SET QUERY_TAG = 'dbt_run|{{ model.name }}|{{ target.name }}'"
-- Now you can attribute credits to specific dbt models:
SELECT
PARSE_JSON(query_tag):model_name::VARCHAR AS dbt_model,
SUM(credits_used_cloud_services) AS credits
FROM snowflake.account_usage.query_history
WHERE query_tag LIKE '%dbt_run%'
AND start_time >= CURRENT_DATE - 7
GROUP BY 1
ORDER BY 2 DESC;
Step 5: Storage Cost Optimisation
-- Find large tables consuming expensive Active Storage
SELECT
table_schema,
table_name,
ROUND(active_bytes / POWER(1024,3), 2) AS active_gb,
ROUND(time_travel_bytes / POWER(1024,3), 2) AS time_travel_gb,
ROUND(failsafe_bytes / POWER(1024,3), 2) AS failsafe_gb,
ROUND((active_bytes + time_travel_bytes + failsafe_bytes) / POWER(1024,3), 2) AS total_gb
FROM snowflake.account_usage.table_storage_metrics
WHERE active_bytes > 1073741824 -- tables over 1GB
ORDER BY total_gb DESC;
-- Reduce Time Travel on non-critical tables to save storage
ALTER TABLE raw.clickstream_events SET DATA_RETENTION_TIME = 1; -- 1 day (default is 1)
ALTER TABLE marts.fact_orders SET DATA_RETENTION_TIME = 14; -- keep 14 days for prod marts
-- Use Transient tables for staging (no Fail-Safe = 50% storage cost reduction)
CREATE TRANSIENT TABLE staging.orders_temp AS SELECT * FROM raw.orders;
Cost Optimisation Checklist
| Action | Typical Savings | Risk |
|---|---|---|
| Auto-suspend all warehouses | 20–40% | Low |
| Right-size warehouses (downsize over-provisioned) | 15–30% | Low if tested |
| Use Transient tables for staging | 10–20% storage | Low (lose Fail-Safe) |
| Add clustering keys to high-scan tables | 20–50% on large tables | Low (background process) |
| Move cold data to External Tables on S3 | 30–60% storage | Medium (query performance) |
| Add STATEMENT_TIMEOUT to all warehouses | Prevents runaway bills | Low |
Real-World Scenario: A company hires both a Data Engineer and an Analytics Engineer. The Data Engineer builds and maintains the ingestion pipeline from Salesforce → Snowflake raw. The Analytics Engineer picks it up from there — they own the dbt models, define metrics, write documentation, and make sure the revenue number in Tableau matches the revenue number the CFO sees in the board pack.
Role Distinction
| Dimension | Data Engineer | Analytics Engineer |
|---|---|---|
| Primary focus | Data movement, reliability, infrastructure | Data transformation, modelling, trust |
| Primary tool | Python, Spark, Airflow, Kafka, Qlik | dbt, SQL, BI tools |
| Owns | Pipelines, ingestion, raw landing | Staging → marts, metrics, docs, tests |
| Interacts with | Backend engineers, DevOps, cloud | Business analysts, BI developers, product |
| Success metric | Pipeline uptime, data freshness, latency | Metric accuracy, self-service adoption, trust |
| Typical deliverable | "Raw orders table is in Snowflake within 5 min of creation" | "fact_orders is tested, documented, and all BI tools use it" |
The Analytics Engineer's Core Responsibilities
# 1. Transform raw data into trusted analytics models # Input: raw.oracle_orders (messy, source-schema-dependent) # Output: marts.fact_orders (clean, tested, documented, BI-ready) # 2. Define and govern metrics # "Monthly Recurring Revenue" has ONE definition in dbt metrics # All BI tools query the semantic layer — no metric drift # 3. Write and maintain dbt tests # Every mart has: not_null, unique, relationships, freshness, custom business rules # 4. Document everything (for self-service analytics) # Every column has a description, every model has a business context note # dbt docs generate → searchable data catalog # 5. Own the semantic layer # dbt metrics / MetricFlow definitions # Exposure YAML: documents which dashboards use which models # 6. Enable stakeholder self-service # Build well-named, well-documented marts so analysts can write their own SQL # Replace ad-hoc Excel with governed dbt models
Analytics Engineering in Practice — A Day in the Life
# Morning standup: "fact_orders freshness test failed in production" # ───────────────────────────────────────────────────────────────── # 1. Check dbt Cloud job logs → source freshness check flagged RAW.ORACLE.ORDERS as stale # 2. Check Qlik Replicate → task suspended due to source DB maintenance window # 3. Notify stakeholders: "Revenue dashboard data is 3 hours behind, fixing now" # 4. Resume Qlik task → stream catches up → dbt hourly job runs → dashboard fresh # 5. Add monitoring: alert if source freshness > 2 hours (add to sources.yml) # Afternoon: new business request # ───────────────────────────────────────────────────────────────── # Finance: "We need net revenue excluding marketplace fees, split by subscription type" # AE response: # 1. Check if source fields exist in stg_orders # 2. Add marketplace_fee column to stg_orders if missing → PR → CI → deploy # 3. Create int_orders_net_of_fees.sql → apply fee deduction logic # 4. Add net_revenue_excl_fees to fact_orders → update schema.yml # 5. Define metric: net_revenue_subscription in metrics.yml # 6. Update exposure: finance_dashboard depends_on fact_orders # 7. Run dbt test + dbt docs generate → share docs link with finance team
Key dbt Skills for an Analytics Engineer
# Modelling: staging → intermediate → marts, surrogate keys, SCD2 # Testing: schema tests, singular tests, dbt-expectations, freshness # Documentation: column descriptions, model descriptions, exposures # Macros: generate_surrogate_key, date_spine, union_relations # Metrics: MetricFlow semantic models and metric definitions # CI/CD: Slim CI, state:modified+, --defer, PR-based workflow # Performance: incremental strategies, materialisation decisions # Governance: post-hooks for GRANT, masking policies, row-level security
The Trust Chain
Raw data (untrusted, messy)
│
│ Data Engineer: reliable delivery
▼
Snowflake RAW schema
│
│ Analytics Engineer: transform, test, document, govern
▼
Snowflake MARTS schema (trusted, tested, documented)
│
│ Business Analysts / BI Developers: self-service
▼
Tableau / Looker / Power BI dashboards
│
▼
Business decisions (the whole point)
↑ Back to top
Snowflake has three distinct caching layers. Understanding which one fires — and when — is critical both for performance tuning and for explaining query behaviour to stakeholders.
Cache 1 — Result Cache (Global, FREE)
The result cache stores the exact output of every query for 24 hours. If you run the same SQL again and the underlying data has not changed, Snowflake returns the cached result instantly — without spinning up a warehouse or consuming a single credit. This is the most powerful cache because it costs absolutely nothing to hit.
-- First run: hits the warehouse, takes 8 seconds SELECT region, SUM(net_revenue) FROM fact_orders WHERE order_date = '2024-03-01' GROUP BY 1; -- Second run (within 24 hours, table unchanged): returns in < 100ms -- Query profile shows "Results Reused" = 100% SELECT region, SUM(net_revenue) FROM fact_orders WHERE order_date = '2024-03-01' GROUP BY 1;
What invalidates the result cache?
- Any DML on the underlying table (INSERT, UPDATE, DELETE, MERGE, COPY INTO)
- The query SQL changes (even adding a space changes the hash)
- Session parameters that affect results change (e.g. timezone, date format)
- The 24-hour TTL expires
- Functions like
CURRENT_TIMESTAMP(),RANDOM(),UUID_STRING()disable result caching entirely for that query
Cache 2 — Metadata Cache (Global, FREE — no warehouse needed)
The Cloud Services layer maintains a persistent metadata store containing row counts, min/max values per column per micro-partition, null counts, and distinct value counts. Simple metadata-only queries resolve entirely here — zero compute cost.
-- These queries answer from metadata cache ONLY — no warehouse needed SELECT COUNT(*) FROM fact_orders; -- row count from metadata SELECT MIN(order_date), MAX(order_date) -- min/max from partition stats FROM fact_orders; -- Check in Query Profile: "Cloud Services" time = 100%, no warehouse time -- Credit attribution: metadata queries appear in cloud_services credits (usually <10%)
How metadata enables micro-partition pruning: Before a query even touches the warehouse, the query optimizer reads partition metadata to determine which micro-partitions cannot contain rows matching your WHERE clause — and skips them entirely. A query filtering WHERE order_date = '2024-03-01' on a 500-partition table may only read 3 partitions if the data is well-clustered.
Cache 3 — Warehouse (Local Disk) Cache — per warehouse, consumed credits
When a warehouse executes a query, it reads micro-partitions from S3 and caches them on the warehouse's local SSD. Subsequent queries that scan the same micro-partitions will read from local disk instead of S3 — significantly faster (SSD vs object storage network round-trip).
-- Query A: cold warehouse, scans 10 micro-partitions from S3 → 12 sec SELECT * FROM fact_orders WHERE region = 'US-EAST' AND order_date = '2024-03-01'; -- Query B (same warehouse, same partitions, running 30 seconds later): -- The 10 micro-partitions are now on local SSD → 2.5 sec SELECT * FROM fact_orders WHERE region = 'US-EAST' AND order_date = '2024-02-29'; -- Check: in Query Profile, "Bytes Scanned from Cache" will be high
Critical: Warehouse cache is destroyed when the warehouse SUSPENDS. If your warehouse auto-suspends between runs, the next run starts cold again. This is why:
- BI warehouses serving interactive dashboards should have longer auto-suspend (e.g., 5–10 min) to keep the cache warm between user clicks
- ETL warehouses that run once per hour can suspend aggressively (60 sec) — they start cold anyway
Cache Summary Table
| Cache | Scope | Cost | Survives Suspend | Invalidated By |
|---|---|---|---|---|
| Result Cache | Global (all sessions) | FREE | Yes (24h TTL) | Table DML, SQL change, TTL expire |
| Metadata Cache | Global (Cloud Services) | FREE | Yes (persistent) | Table DML (updates partition stats) |
| Warehouse (Disk) Cache | Per warehouse cluster | Credits consumed | No — lost on suspend | Warehouse suspend/resize |
Practical tuning tips:
- Maximise result cache hits — standardise BI queries (same SQL = cache hit). Avoid
CURRENT_TIMESTAMP()in SELECT; useCURRENT_DATEand filter in WHERE instead - Keep BI warehouse warm — set
AUTO_SUSPEND = 300(5 min) so the SSD cache survives between dashboard page loads - Use clustering keys to maximise metadata pruning — fewer partitions read = less S3 I/O = warehouse cache more effective for what is read
- Multi-cluster warehouses do NOT share disk cache across clusters — each cluster has its own local cache
Problem: Two tables each with a single column. TableA has values: 1, 1, 0, NULL. TableB has values: 1, 1, 0, NULL. Find the output for different JOIN types.
Sample Tables Setup
-- TableA (1 column) -- val -- --- -- 1 -- 1 -- 0 -- NULL -- TableB (1 column) -- val -- --- -- 1 -- 1 -- 0 -- NULL CREATE TABLE tableA (val INT); INSERT INTO tableA VALUES (1), (1), (0), (NULL); CREATE TABLE tableB (val INT); INSERT INTO tableB VALUES (1), (1), (0), (NULL);
INNER JOIN ON tableA.val = tableB.val
SELECT A.val, B.val FROM tableA A INNER JOIN tableB B ON A.val = B.val; -- Result: -- val (A) | val (B) -- ---------|-------- -- 1 | 1 ← A(1) × B(1,1) = 2 rows (cartesian on duplicates) -- 1 | 1 -- 1 | 1 -- 1 | 1 -- 0 | 0 ← A(0) × B(0) = 1 row -- KEY: NULL vs NULL = UNKNOWN (not TRUE) → no matches for NULL rows -- Total: 5 rows
LEFT JOIN ON tableA.val = tableB.val
SELECT A.val, B.val FROM tableA A LEFT JOIN tableB B ON A.val = B.val; -- Result: -- val (A) | val (B) -- ---------|-------- -- 1 | 1 (A not NULL, matches B) -- 1 | 1 (A not NULL, matches B) -- 1 | 1 (A not NULL, matches B) -- 1 | 1 (A not NULL, matches B) -- 0 | 0 (A not NULL, matches B) -- NULL | NULL (A is NULL, no match in ON clause → filled with NULL) -- KEY: All rows from Left table are preserved -- Total: 6 rows
RIGHT JOIN ON tableA.val = tableB.val
SELECT A.val, B.val FROM tableA A RIGHT JOIN tableB B ON A.val = B.val; -- Result: -- val (A) | val (B) -- ---------|-------- -- 1 | 1 (matched) -- 1 | 1 (matched) -- 1 | 1 (matched) -- 1 | 1 (matched) -- 0 | 0 (matched) -- NULL | NULL (B is NULL, no match from A → filled with NULL) -- KEY: All rows from Right table are preserved -- Total: 6 rows
FULL OUTER JOIN ON tableA.val = tableB.val
SELECT A.val, B.val FROM tableA A FULL OUTER JOIN tableB B ON A.val = B.val; -- Result: -- val (A) | val (B) -- ---------|-------- -- 1 | 1 (matched) -- 1 | 1 (matched) -- 1 | 1 (matched) -- 1 | 1 (matched) -- 0 | 0 (matched) -- NULL | NULL (A's NULL unmatched) -- NULL | NULL (B's NULL unmatched) -- KEY: Union of LEFT + RIGHT semantics -- Total: 7 rows
Key Takeaways
- NULL joining rule: NULL = NULL is UNKNOWN in SQL, so NULL rows never match in ON clauses
- Cartesian product: Duplicate values create multiple match combinations
- INNER JOIN: Only matching rows (NULLs excluded)
- LEFT JOIN: All left rows + matched right rows + NULL-filled for unmatched
- RIGHT JOIN: All right rows + matched left rows + NULL-filled for unmatched
- FULL OUTER: All rows from both tables, NULL-filled where no match
CROSS JOIN — cartesian product (all combinations)
SELECT a.id, a.val AS a_val, b.val AS b_val FROM a CROSS JOIN b; -- Returns 4 × 3 = 12 rows (every row in A combined with every row in B) -- Use case: generating all combinations (e.g., all dates × all products)
Quick Reference Card
| JOIN Type | Rows Included | NULL key matched? | Duplicate key effect |
|---|---|---|---|
| INNER | Matching rows only | No | Cartesian on matching side |
| LEFT | All from left + matches from right | Left NULLs preserved (NULL fill right) | Cartesian on matching side |
| RIGHT | All from right + matches from left | Right NULLs preserved (NULL fill left) | Cartesian on matching side |
| FULL OUTER | All from both sides | All NULLs preserved, unmatched | Cartesian on matching side |
| CROSS | All combinations | Yes (no ON clause) | M × N rows always |
Salary aggregation questions are deceptively tricky — the common pitfall is knowing which user has the max/min salary, especially when multiple users tie. Let's cover all common patterns.
Sample Data
CREATE TABLE salaries (
user_id INT,
dept VARCHAR,
salary DECIMAL(10,2),
hire_date DATE
);
INSERT INTO salaries VALUES
(1, 'Engineering', 95000, '2021-01-10'),
(2, 'Engineering', 120000,'2019-06-15'),
(3, 'Marketing', 72000, '2022-03-01'),
(4, 'Marketing', 95000, '2020-07-20'),
(5, 'HR', 58000, '2023-02-14'),
(6, 'HR', 58000, '2021-11-30'), -- tie with user 5
(7, 'Engineering', 120000,'2020-01-05'); -- tie with user 2
Pattern 1 — Simple Global Aggregates
-- Min, Max, and Avg salary across all employees
SELECT
MIN(salary) AS min_salary,
MAX(salary) AS max_salary,
ROUND(AVG(salary), 2) AS avg_salary,
COUNT(*) AS employee_count
FROM salaries;
-- Result:
-- min_salary | max_salary | avg_salary | employee_count
-- 58000 | 120000 | 88285.71 | 7
Pattern 2 — Find the user_id with max salary (handles ties)
-- WRONG approach: this doesn't give you the user_id SELECT MAX(salary) AS max_salary FROM salaries; -- CORRECT: use a subquery or QUALIFY to get user(s) with max salary -- If there are ties, BOTH user_ids are returned (correct behaviour) SELECT user_id, dept, salary FROM salaries WHERE salary = (SELECT MAX(salary) FROM salaries); -- Returns: user_id=2 AND user_id=7 (both earn 120000) -- Alternative using QUALIFY + RANK (handles ties naturally) SELECT user_id, dept, salary FROM salaries QUALIFY RANK() OVER (ORDER BY salary DESC) = 1; -- Same result: both tied users returned -- Alternative using DENSE_RANK (same for global max) SELECT user_id, dept, salary FROM salaries QUALIFY DENSE_RANK() OVER (ORDER BY salary DESC) = 1;
Pattern 3 — Min, Max, Avg per department WITH user_id of min/max earner
-- This is the hard version: include user_id for min and max salary in each dept
-- Challenge: standard GROUP BY can't include user_id (not in aggregate)
WITH dept_stats AS (
SELECT
dept,
MIN(salary) AS min_salary,
MAX(salary) AS max_salary,
ROUND(AVG(salary), 2) AS avg_salary
FROM salaries
GROUP BY dept
),
min_earners AS (
-- Get user_id(s) with min salary per dept
SELECT s.dept, s.user_id AS min_user_id, s.salary AS min_salary
FROM salaries s
WHERE (s.dept, s.salary) IN (
SELECT dept, MIN(salary) FROM salaries GROUP BY dept
)
),
max_earners AS (
-- Get user_id(s) with max salary per dept
SELECT s.dept, s.user_id AS max_user_id, s.salary AS max_salary
FROM salaries s
WHERE (s.dept, s.salary) IN (
SELECT dept, MAX(salary) FROM salaries GROUP BY dept
)
)
SELECT
ds.dept,
ds.min_salary,
mi.min_user_id,
ds.max_salary,
ma.max_user_id,
ds.avg_salary
FROM dept_stats ds
JOIN min_earners mi ON ds.dept = mi.dept
JOIN max_earners ma ON ds.dept = ma.dept
ORDER BY ds.dept;
-- Result (ties shown as separate rows):
-- dept | min_salary | min_user_id | max_salary | max_user_id | avg_salary
-- Engineering | 95000 | 1 | 120000 | 2 | 111666.67
-- Engineering | 95000 | 1 | 120000 | 7 | 111666.67 ← tie
-- HR | 58000 | 5 | 58000 | 5 | 58000
-- HR | 58000 | 6 | 58000 | 6 | 58000 ← tie
-- Marketing | 72000 | 3 | 95000 | 4 | 83500
Pattern 4 — Cleaner version using window functions + QUALIFY
-- Rank employees within each dept by salary (for min use ASC, for max use DESC)
-- Then filter using QUALIFY for clean, readable SQL
-- Top earner per department (handles ties)
SELECT user_id, dept, salary
FROM salaries
QUALIFY DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) = 1;
-- Returns ALL tied top earners per dept
-- Lowest earner per department
SELECT user_id, dept, salary
FROM salaries
QUALIFY DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary ASC) = 1;
-- Combined: attach min/max user_ids alongside dept averages
SELECT
user_id,
dept,
salary,
AVG(salary) OVER (PARTITION BY dept) AS dept_avg,
MIN(salary) OVER (PARTITION BY dept) AS dept_min,
MAX(salary) OVER (PARTITION BY dept) AS dept_max,
RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS salary_rank_in_dept
FROM salaries
ORDER BY dept, salary_rank_in_dept;
Pattern 5 — Employees earning above department average
SELECT user_id, dept, salary,
ROUND(AVG(salary) OVER (PARTITION BY dept), 2) AS dept_avg
FROM salaries
QUALIFY salary > AVG(salary) OVER (PARTITION BY dept);
-- Result:
-- user_id | dept | salary | dept_avg
-- 2 | Engineering | 120000 | 111666.67
-- 7 | Engineering | 120000 | 111666.67
-- 4 | Marketing | 95000 | 83500.00
Common pitfalls in salary aggregation:
- NULL salaries —
AVG(salary)ignores NULLs;MIN/MAXalso ignore NULLs. UseCOALESCE(salary, 0)if you want NULLs treated as zero - Ties in min/max — always ask: "if two employees have the same max salary, return one or both?" DENSE_RANK returns all tied rows; ROW_NUMBER returns exactly one (arbitrary choice)
- GROUP BY error — you cannot SELECT user_id in a GROUP BY query unless user_id is in the GROUP BY or aggregate. Use window functions or subqueries instead
is_incremental() is a Jinja macro that evaluates to TRUE only when the model is running in incremental mode AND the target table already exists. Understanding exactly when it fires — and when it does not — prevents the most common incremental model bugs.
Core Behaviour — When is_incremental() is TRUE vs FALSE
-- is_incremental() = TRUE when ALL of these are true: -- 1. The model's materialization is 'incremental' -- 2. The target table already exists in the warehouse -- 3. The run is NOT a full-refresh (dbt run --full-refresh = FALSE) -- is_incremental() = FALSE when: -- 1. First-ever run (table doesn't exist yet) → dbt does a full build -- 2. dbt run --full-refresh is passed → forces complete rebuild -- 3. materialized != 'incremental'
ref() OUTSIDE is_incremental() — always executes
-- models/marts/fact_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}
-- This part runs on EVERY execution (full build AND incremental):
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }} -- ← ref() OUTSIDE the block
),
customers AS (
SELECT * FROM {{ ref('stg_customers') }} -- ← also OUTSIDE
)
SELECT
o.order_id,
o.customer_id,
c.region,
o.amount,
o.updated_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id
{% if is_incremental() %}
-- This WHERE clause is ONLY added during incremental runs
WHERE o.updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
-- On FIRST RUN (is_incremental() = FALSE):
-- Compiles to: SELECT ... FROM stg_orders JOIN stg_customers
-- (no WHERE clause → loads everything)
-- On SUBSEQUENT RUNS (is_incremental() = TRUE):
-- Compiles to: SELECT ... FROM stg_orders JOIN stg_customers
-- WHERE o.updated_at > (SELECT MAX(updated_at) FROM fact_orders)
-- (only loads new/updated rows)
ref() INSIDE is_incremental() — only executes on incremental runs
-- A ref() placed inside an {% if is_incremental() %} block
-- creates a dependency that is ONLY active during incremental runs
-- This is an unusual but valid pattern for lookback/enrichment logic
{{ config(materialized='incremental', unique_key='order_id') }}
SELECT
o.order_id,
o.amount,
o.updated_at
FROM {{ ref('stg_orders') }} o -- ← OUTSIDE: always a dependency
{% if is_incremental() %}
-- ref() INSIDE: only joined during incremental runs
-- On first run this entire block is skipped
JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
WHERE o.updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
-- RISK: On first run, dim_customers join is skipped → you may get different
-- column counts or logic between first run and incremental runs.
-- BEST PRACTICE: Keep all refs OUTSIDE the is_incremental block.
-- Use the block ONLY for WHERE / LIMIT filtering.
The {{ this }} macro — references the model's own target table
-- {{ this }} resolves to the fully-qualified table name of THIS model
-- In dev: ANALYTICS_DEV.MARTS.FACT_ORDERS
-- In prod: ANALYTICS.MARTS.FACT_ORDERS
-- Common pattern: watermark via MAX of existing table
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
-- Safety: handle the case where the watermark column has all NULLs
{% if is_incremental() %}
WHERE updated_at > (
SELECT COALESCE(MAX(updated_at), '1900-01-01'::TIMESTAMP)
FROM {{ this }}
)
{% endif %}
When does a dbt incremental job FAIL? — Common causes
-- FAILURE 1: Schema mismatch — new column in source not in target table
-- Symptom: "Column 'loyalty_tier' does not exist in target table"
-- Cause: stg_orders got a new column; fact_orders still has old schema
-- Fix: Add on_schema_change config
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='append_new_columns' -- auto-add new columns to target
-- Options: 'ignore' | 'fail' | 'append_new_columns' | 'sync_all_columns'
) }}
-- FAILURE 2: Watermark subquery returns NULL on first incremental run
-- Symptom: WHERE clause evaluates to "WHERE updated_at > NULL" → zero rows loaded
-- Cause: First run succeeded but loaded 0 rows (empty source); on 2nd run
-- MAX(updated_at) is NULL → WHERE filters everything out
-- Fix: Use COALESCE as shown above
-- FAILURE 3: unique_key has duplicates in the incoming batch
-- Symptom: MERGE key column uniqueness violation
-- Cause: Source table has duplicate PKs; dbt's generated MERGE fails
-- Fix: Deduplicate in the model before the MERGE executes:
SELECT *,
ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC) AS rn
FROM {{ ref('stg_orders') }}
QUALIFY rn = 1 -- keep only the latest version of each order_id
-- FAILURE 4: {{ this }} doesn't exist (table was manually dropped)
-- Symptom: "Object 'ANALYTICS.MARTS.FACT_ORDERS' does not exist"
-- Fix: Run with --full-refresh to rebuild the table from scratch:
-- dbt run --select fact_orders --full-refresh
-- FAILURE 5: Upstream model changed materialization type
-- Symptom: ref() resolves to a view instead of a table; performance degrades
-- or the view itself fails due to changed upstream schema
-- Fix: Run dbt run --select +fact_orders to rebuild from root
Scenario: Initial run succeeds, all subsequent runs fail
-- This is the "succeeds once, breaks forever" pattern — very common in practice
-- Example: fact_daily_revenue with delete+insert strategy
{{ config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='sale_date'
) }}
SELECT
sale_date,
SUM(revenue) AS total_revenue
FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE sale_date >= DATEADD('day', -3, CURRENT_DATE)
{% endif %}
GROUP BY sale_date
-- PROBLEM: On first run (no WHERE), loads all history. Works great.
-- On incremental runs: WHERE filters to last 3 days.
-- But stg_orders was truncated/rebuilt and ONLY has last 7 days of data.
-- SYMPTOM: Runs succeed but older dates silently disappear from the mart.
-- This is a data correctness failure, not a SQL error.
-- REAL FAILURE PATTERN:
-- First run: 10M rows, takes 45 min, succeeds
-- Second run: 50K rows (incremental), takes 2 min, succeeds
-- Third run: ERROR — stg_orders added a NOT NULL column "region"
-- but fact_orders was built without it
-- Fix: on_schema_change='append_new_columns' + dbt run --full-refresh
Troubleshooting checklist for incremental model failures
- Check compiled SQL — run
dbt compile --select <model>and inspecttarget/compiled/to see what SQL was actually generated - Check is_incremental() evaluation — if the table doesn't exist,
is_incremental()is FALSE; rundbt run --full-refreshto reset - Check watermark logic — run the MAX subquery manually in Snowflake to verify it returns the expected timestamp
- Check schema drift — compare
SHOW COLUMNS IN TABLE <target>vs the model's SELECT list - Add clock skew buffer — use
updated_at > DATEADD('minute', -5, MAX(updated_at))to avoid missing in-flight rows - Use --store-failures —
dbt test --store-failuressaves failing rows to an audit schema for inspection
What is a Winning Streak?
A winning streak means consecutive DAYS where the player won at least ONE match, regardless of losses on the same day. Key points:
- Streak is day-based, not match-based: What matters is whether the player won AT LEAST ONE match on a day, not the total number of wins
- Mixed results on same day: Win + Lose on same day = STREAK CONTINUES. The presence of at least one win counts
- Complete loss day: If a day has ONLY losses or no matches, the streak BREAKS immediately
- Consecutive days rule: Day1 has a win → Streak = 1. Day2 has a win → Streak = 2. Day3 has zero wins → Streak = 0 (reset). Day4 has a win → Streak = 1 (fresh count)
- Key insight: Streak measures consecutive winning DAYS, not consecutive winning matches. One loss per day doesn't break the streak if there's at least one win
Example Timeline:
Day 1: Win, Lose, Win → Streak = 1 (at least one win on Day 1; losses don't break it) Day 2: Lose → Streak = 0 (no wins on Day 2; streak broken) Day 3: Win → Streak = 1 (fresh count; at least one win on Day 3) Day 4: Win, Lose, Lose → Streak = 2 (at least one win on Day 4; continues from Day 3) Day 5: Lose, Lose → Streak = 0 (no wins on Day 5; streak broken) Day 6: Win → Streak = 1 (fresh count begins) Maximum winning streak across all timeline = 2 (Day 3 through Day 4)
Streak problems are about consecutive "winning DAYS" — not consecutive individual wins. The key technique is the "gaps and islands" pattern, but applied at the DAY level: identify which days have at least one win, then find consecutive sequences of such days.
Sample Data
CREATE TABLE events (
player_id INT,
event_id INT, -- global unique event sequence number
winning_score INT, -- 1 = player won this event, 0 = lost, NULL = did not participate
event_date DATE
);
INSERT INTO events VALUES
-- Player 1: Day1 (W+L) → Day2 (L only) → Day3+ (W → W+L+L)
(1, 1, 1, '2024-01-01'), -- Day 1: Win
(1, 2, 0, '2024-01-01'), -- Day 1: Lose (but Day 1 has at least 1 win → streak continues)
(1, 3, 0, '2024-01-02'), -- Day 2: Lose only (no wins → streak breaks)
(1, 4, 1, '2024-01-03'), -- Day 3: Win (fresh streak starts = 1)
(1, 5, 1, '2024-01-04'), -- Day 4: Win (streak continues = 2)
(1, 6, 0, '2024-01-04'), -- Day 4: Lose (but Day 4 has at least 1 win → streak still continues = 2)
(1, 7, 0, '2024-01-04'), -- Day 4: Lose (still at least 1 win on Day 4)
-- Player 2: L → W → W+L → W+L+L
(2, 8, 0, '2024-01-01'), -- Day 1: Lose only (no wins → no streak)
(2, 9, 1, '2024-01-02'), -- Day 2: Win (fresh streak starts = 1)
(2, 10, 1, '2024-01-03'), -- Day 3: Win (streak continues = 2)
(2, 11, 0, '2024-01-03'), -- Day 3: Lose (but Day 3 has at least 1 win → streak still continues = 2)
(2, 12, 1, '2024-01-04'), -- Day 4: Win (streak continues = 3)
(2, 13, 0, '2024-01-04'), -- Day 4: Lose (but Day 4 has at least 1 win)
(2, 14, 0, '2024-01-04'); -- Day 4: Lose (still at least 1 win on Day 4 → streak = 3)
Step 1 — Identify days with at least one win per player
-- Group by player and date; if SUM(winning_score) > 0, that day has wins
WITH winning_days AS (
SELECT
player_id,
event_date,
SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) AS wins_on_day
FROM events
GROUP BY player_id, event_date
HAVING SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) > 0
)
SELECT * FROM winning_days ORDER BY player_id, event_date;
Step 2 — Gaps and islands on DAYS (not individual matches)
-- For consecutive winning days, the difference (day_row_num - day_offset) stays constant
-- When a day with NO wins breaks the streak, the row number changes
WITH winning_days AS (
SELECT
player_id,
event_date,
SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) AS wins_on_day
FROM events
GROUP BY player_id, event_date
HAVING SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) > 0
),
ordered_days AS (
SELECT
player_id,
event_date,
ROW_NUMBER() OVER (PARTITION BY player_id ORDER BY event_date) AS day_seq,
ROW_NUMBER() OVER (PARTITION BY player_id ORDER BY event_date)
- (SELECT COUNT(DISTINCT event_date) FROM events e2
WHERE e2.player_id = winning_days.player_id
AND e2.event_date < winning_days.event_date
AND (SELECT SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END)
FROM events e3 WHERE e3.player_id = e2.player_id AND e3.event_date = e2.event_date) > 0) AS streak_group
FROM winning_days
)
SELECT player_id, event_date, day_seq, streak_group FROM ordered_days ORDER BY player_id, event_date;
Step 3 — Count consecutive winning days and find maximum streak
WITH winning_days AS (
SELECT
player_id,
event_date,
SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) AS wins_on_day
FROM events
GROUP BY player_id, event_date
HAVING SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) > 0
),
-- Tag each winning day with a "day gap" marker
-- If the previous day (for this player) was also a winning day, gap = 0; otherwise gap = 1
day_gaps AS (
SELECT
player_id,
event_date,
wins_on_day,
CASE
WHEN LAG(event_date) OVER (PARTITION BY player_id ORDER BY event_date)
= DATEADD(day, -1, event_date)
THEN 0 -- consecutive day (previous day was yesterday)
ELSE 1 -- new streak (gap or first day)
END AS streak_start
FROM winning_days
),
-- Assign streak ID by summing up all streak starts
streak_ids AS (
SELECT
player_id,
event_date,
SUM(streak_start) OVER (PARTITION BY player_id ORDER BY event_date ROWS UNBOUNDED PRECEDING) AS streak_id
FROM day_gaps
),
-- Count the number of consecutive winning days per streak
streak_lengths AS (
SELECT
player_id,
streak_id,
COUNT(*) AS streak_length
FROM streak_ids
GROUP BY player_id, streak_id
)
SELECT
player_id,
MAX(streak_length) AS max_winning_streak
FROM streak_lengths
GROUP BY player_id
ORDER BY player_id;
-- Result:
-- player_id | max_winning_streak
-- 1 | 2 ← Day 3 (Jan 3) + Day 4 (Jan 4)
-- 2 | 3 ← Day 2 (Jan 2) + Day 3 (Jan 3) + Day 4 (Jan 4)
Alternative — Simpler approach: Check if each day has at least one win
-- Simplified: group by date, check SUM(wins), then detect consecutive date gaps
WITH daily_wins AS (
SELECT
player_id,
event_date,
SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) AS total_wins
FROM events
GROUP BY player_id, event_date
),
winning_days_only AS (
SELECT
player_id,
event_date,
total_wins,
ROW_NUMBER() OVER (PARTITION BY player_id ORDER BY event_date) AS day_seq
FROM daily_wins
WHERE total_wins > 0 -- only days with at least 1 win
),
-- Detect day gaps: if consecutive days are not 1 apart, a gap occurred
streak_markers AS (
SELECT
player_id,
event_date,
day_seq,
DATEDIFF(day, LAG(event_date) OVER (PARTITION BY player_id ORDER BY event_date), event_date) AS day_gap
FROM winning_days_only
),
-- Assign streak IDs based on gaps
streak_ids AS (
SELECT
player_id,
event_date,
SUM(CASE WHEN day_gap > 1 OR day_gap IS NULL THEN 1 ELSE 0 END)
OVER (PARTITION BY player_id ORDER BY event_date ROWS UNBOUNDED PRECEDING) AS streak_id
FROM streak_markers
),
streak_counts AS (
SELECT
player_id,
streak_id,
COUNT(*) AS winning_days_in_streak
FROM streak_ids
GROUP BY player_id, streak_id
)
SELECT
player_id,
MAX(winning_days_in_streak) AS max_winning_streak
FROM streak_counts
GROUP BY player_id;
-- Result: Same as above
-- player_id | max_winning_streak
-- 1 | 2 ← Day 3 + Day 4
-- 2 | 3 ← Day 2 + Day 3 + Day 4
Key concepts to explain in interviews:
- Day-level vs match-level: This problem counts consecutive DAYS with wins, not consecutive individual wins. One loss mixed with wins on the same day does NOT break the streak
- Gaps and islands on dates: Use ROW_NUMBER() to assign day sequences, then detect gaps. A gap > 1 day marks the start of a new streak
- DATEDIFF for date gaps: DATEDIFF(day, previous_date, current_date) tells you if days are consecutive. If > 1, there's a gap
- NULL handling: A day with no events at all is not counted. Only days with at least one event are relevant. A day with all losses has zero wins — it breaks the streak
- Business definition: Always clarify with the interviewer: does "streak" mean consecutive individual wins, or consecutive days with at least one win? This fundamentally changes the solution
Snowflake supports four levels of pattern matching: simple wildcards (LIKE/ILIKE), POSIX regex (RLIKE), and full REGEXP functions. Knowing when to use each — and their performance implications — is essential for data cleaning and filtering tasks.
LIKE — case-sensitive wildcard matching
-- Wildcards: % = any sequence of characters, _ = exactly one character -- Emails ending in @gmail.com SELECT email FROM users WHERE email LIKE '%@gmail.com'; -- Names starting with 'Sur' SELECT name FROM employees WHERE name LIKE 'Sur%'; -- Exactly 5-character product codes SELECT code FROM products WHERE code LIKE '_____'; -- 5 underscores -- Contains 'cloud' anywhere in the string SELECT description FROM docs WHERE description LIKE '%cloud%'; -- LIKE is case-sensitive in Snowflake by default! SELECT * FROM users WHERE name LIKE 'surya%'; -- will NOT match 'Surya'
ILIKE — case-INsensitive wildcard matching (Snowflake extension)
-- Same as LIKE but ignores case — very useful for user-entered data -- Match 'Surya', 'surya', 'SURYA' all at once SELECT * FROM users WHERE name ILIKE 'surya%'; -- Find any cloud-related tag regardless of capitalisation SELECT tag FROM labels WHERE tag ILIKE '%cloud%'; -- Standard SQL alternative (works everywhere): SELECT * FROM users WHERE LOWER(name) LIKE 'surya%'; -- ILIKE is cleaner and often faster (no LOWER() function overhead)
RLIKE — regular expression matching (alias: REGEXP, REGEXP_LIKE)
-- RLIKE uses POSIX extended regex syntax
-- Returns TRUE/FALSE — use in WHERE clause
-- Valid email format check
SELECT email
FROM users
WHERE email RLIKE '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$';
-- Phone numbers in formats: 555-1234, (555) 123-4567, +1-555-123-4567
SELECT phone FROM contacts
WHERE phone RLIKE '^(\\+1-)?\\(?[0-9]{3}\\)?[-. ]?[0-9]{3}[-. ]?[0-9]{4}$';
-- Strings containing only digits
SELECT value FROM raw_data WHERE value RLIKE '^[0-9]+$';
-- Match Indian PAN card format: ABCDE1234F (5 letters, 4 digits, 1 letter)
SELECT pan FROM kyc_data WHERE pan RLIKE '^[A-Z]{5}[0-9]{4}[A-Z]$';
-- RLIKE is case-sensitive by default in Snowflake
-- For case-insensitive regex, use (?i) flag or REGEXP_LIKE with 'i' parameter:
SELECT name FROM employees WHERE name RLIKE '(?i)^surya';
REGEXP_LIKE, REGEXP_SUBSTR, REGEXP_REPLACE, REGEXP_COUNT — full family
-- REGEXP_LIKE: explicit function form of RLIKE
SELECT REGEXP_LIKE('Surya@gmail.com', '^[a-zA-Z]+@[a-zA-Z]+\\.com$') AS is_valid_email;
-- Returns: TRUE
-- REGEXP_LIKE with case-insensitive flag 'i'
SELECT * FROM users WHERE REGEXP_LIKE(email, '^surya.*\\.com$', 'i');
-- REGEXP_SUBSTR: extract the FIRST match from a string
-- Extract domain name from email
SELECT
email,
REGEXP_SUBSTR(email, '@(.+)', 1, 1, 'e', 1) AS domain
-- params: (string, pattern, position, occurrence, flag, group)
FROM users;
-- 'suryateja@gmail.com' → 'gmail.com'
-- Extract all digits from a mixed string
SELECT REGEXP_SUBSTR('Order #12345 placed', '[0-9]+') AS order_number;
-- Returns: '12345'
-- REGEXP_REPLACE: replace matches with substitution
-- Mask credit card number (keep last 4 digits)
SELECT REGEXP_REPLACE('4111-1111-1111-1234', '[0-9]{4}-[0-9]{4}-[0-9]{4}-', '****-****-****-')
AS masked_card;
-- Returns: '****-****-****-1234'
-- Remove all non-alphanumeric characters
SELECT REGEXP_REPLACE(phone_raw, '[^a-zA-Z0-9]', '') AS phone_clean
FROM contacts;
-- REGEXP_COUNT: count occurrences of a pattern
SELECT REGEXP_COUNT('the cat sat on the mat', 'at') AS count_at;
-- Returns: 3 (c-at, s-at, m-at)
Comparison Table
| Function | Case Sensitive? | Pattern Type | Returns | Best For |
|---|---|---|---|---|
| LIKE | Yes | Wildcards (%, _) | TRUE/FALSE | Simple prefix/suffix/contains |
| ILIKE | No | Wildcards (%, _) | TRUE/FALSE | Case-insensitive simple matching |
| RLIKE / REGEXP | Yes (use (?i) for not) | Full POSIX regex | TRUE/FALSE | Complex validation, WHERE clause |
| REGEXP_LIKE | Configurable ('i' flag) | Full POSIX regex | TRUE/FALSE | Explicit regex WHERE filtering |
| REGEXP_SUBSTR | Configurable | Full POSIX regex | Matched substring | Extraction (email domain, order IDs) |
| REGEXP_REPLACE | Configurable | Full POSIX regex | Modified string | Masking, cleaning, normalisation |
| REGEXP_COUNT | Configurable | Full POSIX regex | Integer count | Counting pattern occurrences |
Performance notes:
- LIKE with a leading wildcard (
'%value') cannot use partition metadata for pruning — it must scan all rows. Prefer trailing wildcards ('value%') where possible - ILIKE is generally faster than
LOWER(col) LIKE 'val%'because it avoids the function transformation - RLIKE/REGEXP functions are significantly more expensive than LIKE — use only when simple wildcards are insufficient
- For high-volume filtering on regex patterns, consider pre-extracting the relevant part into a separate column and indexing/clustering on it instead
Real-world JSON payloads rarely have simple arrays — they have arrays of objects that themselves contain arrays. Chaining multiple LATERAL FLATTEN calls handles arbitrary nesting depth, one level at a time.
Sample JSON Structure — deeply nested
-- A single order payload with nested line items and nested discount codes per item:
{
"order_id": "ORD-001",
"customer_id": 1001,
"items": [
{
"sku": "SHOE-RED",
"quantity": 2,
"price": 49.99,
"discounts": [
{"code": "SAVE10", "amount": 5.00},
{"code": "LOYALTY", "amount": 2.50}
]
},
{
"sku": "BAG-BLUE",
"quantity": 1,
"price": 89.00,
"discounts": [
{"code": "WELCOME", "amount": 10.00}
]
}
]
}
Level 1 — Flatten the items array
-- First LATERAL FLATTEN unpacks the outer 'items' array
-- Each item becomes its own row, with the parent order_id preserved
SELECT
o.value:order_id::VARCHAR AS order_id,
o.value:customer_id::INT AS customer_id,
item.value:sku::VARCHAR AS sku,
item.value:quantity::INT AS quantity,
item.value:price::DECIMAL(10,2) AS unit_price,
item.index AS item_position, -- 0-based position in array
item.value AS item_raw -- full item object (for next level)
FROM raw.orders_landing,
LATERAL FLATTEN(input => value:items) AS item;
-- Result: 2 rows for ORD-001 (one per item)
-- order_id | sku | quantity | unit_price | item_position
-- ORD-001 | SHOE-RED | 2 | 49.99 | 0
-- ORD-001 | BAG-BLUE | 1 | 89.00 | 1
Level 2 — Chain a second FLATTEN to unpack discounts within each item
-- Each LATERAL FLATTEN can reference the output of the previous one
-- item.value is passed as input to the second FLATTEN
SELECT
o.value:order_id::VARCHAR AS order_id,
o.value:customer_id::INT AS customer_id,
item.value:sku::VARCHAR AS sku,
item.value:quantity::INT AS quantity,
item.value:price::DECIMAL(10,2) AS unit_price,
item.index AS item_position,
disc.value:code::VARCHAR AS discount_code,
disc.value:amount::DECIMAL(10,2) AS discount_amount,
disc.index AS discount_position
FROM raw.orders_landing,
LATERAL FLATTEN(input => value:items) AS item, -- Level 1
LATERAL FLATTEN(input => item.value:discounts) AS disc; -- Level 2 (chained)
-- Result: 3 rows for ORD-001 (one per discount across all items)
-- order_id | sku | unit_price | discount_code | discount_amount
-- ORD-001 | SHOE-RED | 49.99 | SAVE10 | 5.00
-- ORD-001 | SHOE-RED | 49.99 | LOYALTY | 2.50
-- ORD-001 | BAG-BLUE | 89.00 | WELCOME | 10.00
Handling optional/missing nested arrays — OUTER FLATTEN
-- Problem: if an item has NO discounts, the chained FLATTEN produces zero rows
-- for that item → the item disappears from results entirely
-- Solution: use OUTER => TRUE on the inner FLATTEN
-- OUTER = TRUE means "if the array is empty or null, still produce one row with NULL values"
SELECT
o.value:order_id::VARCHAR AS order_id,
item.value:sku::VARCHAR AS sku,
item.value:price::DECIMAL(10,2) AS unit_price,
disc.value:code::VARCHAR AS discount_code, -- NULL if no discounts
disc.value:amount::DECIMAL(10,2) AS discount_amount -- NULL if no discounts
FROM raw.orders_landing,
LATERAL FLATTEN(input => value:items) AS item,
LATERAL FLATTEN(input => item.value:discounts, OUTER => TRUE) AS disc;
-- Items with no discounts appear with NULL discount_code/amount (not dropped)
Level 3 — Three levels of nesting (e.g., order → items → attributes → values)
-- If each discount itself has an array of "applicable_dates":
SELECT
o.value:order_id::VARCHAR AS order_id,
item.value:sku::VARCHAR AS sku,
disc.value:code::VARCHAR AS discount_code,
apply_date.value::DATE AS applicable_date
FROM raw.orders_landing,
LATERAL FLATTEN(input => value:items) AS item,
LATERAL FLATTEN(input => item.value:discounts, OUTER => TRUE) AS disc,
LATERAL FLATTEN(input => disc.value:applicable_dates, OUTER => TRUE) AS apply_date;
-- Three chained FLATTENs — each level adds one more join via LATERAL
FLATTEN in a dbt model
-- models/staging/stg_order_discounts.sql
-- Unpack orders → items → discounts into a clean relational table
{{ config(materialized='view') }}
SELECT
src.value:order_id::VARCHAR AS order_id,
src.value:customer_id::INT AS customer_id,
item.value:sku::VARCHAR AS sku,
item.value:quantity::INT AS quantity,
item.value:price::DECIMAL(10,2) AS unit_price,
disc.value:code::VARCHAR AS discount_code,
disc.value:amount::DECIMAL(10,2) AS discount_amount,
src._loaded_at AS ingested_at
FROM {{ source('raw', 'orders_landing') }} src,
LATERAL FLATTEN(input => src.value:items) item,
LATERAL FLATTEN(input => item.value:discounts, OUTER => TRUE) disc
Key FLATTEN parameters:
| Parameter | Default | Description |
|---|---|---|
input | required | The VARIANT / ARRAY / OBJECT to flatten |
OUTER | FALSE | TRUE = produce a row with NULLs when input is empty/null (like LEFT JOIN) |
RECURSIVE | FALSE | TRUE = flatten all levels recursively (use with caution on deeply nested data) |
MODE | BOTH | ARRAY / OBJECT / BOTH — what types to expand |
f.value | — | The element value at this position |
f.index | — | 0-based position in the array (NULL for objects) |
f.key | — | Key name (for object flattening) |
f.path | — | Full dot-notation path to this element |
Cost optimisation is a core skill for senior data engineers — interviewers want specific, quantified examples. Here is a framework for identifying, measuring, and implementing savings across compute, storage, ingestion, and pipeline design.
1. Compute Cost Savings — Warehouse Right-Sizing & Auto-Suspend
-- Identify warehouses that are over-provisioned (low activity %)
SELECT
warehouse_name,
SUM(credits_used) AS total_credits,
ROUND(SUM(credits_used) * 3.0, 2) AS est_cost_usd,
ROUND(AVG(CASE WHEN credits_used > 0 THEN 100 ELSE 0 END), 1) AS pct_active
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= CURRENT_DATE - 30
GROUP BY 1 ORDER BY 3 DESC;
-- Action: downsize warehouses with <20% active time
ALTER WAREHOUSE ANALYTICS_WH SET WAREHOUSE_SIZE = 'MEDIUM'; -- was LARGE
-- Immediate 50% compute cost reduction on this warehouse
-- Action: aggressively set auto-suspend
ALTER WAREHOUSE DEV_WH SET AUTO_SUSPEND = 60; -- 1 min for dev
ALTER WAREHOUSE ETL_WH SET AUTO_SUSPEND = 120; -- 2 min for batch ETL
-- Typical savings: 20-40% on idle warehouse costs
2. Storage Cost Savings — Transient Tables & Retention Tuning
-- Staging tables don't need Time Travel or Fail-Safe
-- Transient tables have no Fail-Safe and 0-1 day Time Travel → ~50% lower storage cost
-- In dbt: set staging layer to transient
{{ config(materialized='table', transient=true) }} -- staging models
-- For permanent tables: reduce retention on large, low-risk tables
ALTER TABLE raw.clickstream_events SET DATA_RETENTION_TIME = 1; -- was 14
ALTER TABLE raw.server_logs SET DATA_RETENTION_TIME = 1;
-- Find Time Travel storage waste (high time_travel_bytes)
SELECT table_name,
ROUND(time_travel_bytes / POWER(1024,3), 2) AS time_travel_gb,
ROUND(failsafe_bytes / POWER(1024,3), 2) AS failsafe_gb
FROM snowflake.account_usage.table_storage_metrics
ORDER BY (time_travel_bytes + failsafe_bytes) DESC LIMIT 20;
-- Typical savings: 10-30% on storage for large event/log tables
3. Query Efficiency — Eliminate Full Table Scans
-- Find queries scanning >80% of a large table's partitions (no pruning)
SELECT
LEFT(query_text, 100) AS query_preview,
ROUND(partitions_scanned * 100.0 / NULLIF(partitions_total, 0), 1) AS pct_scanned,
total_elapsed_time / 1000 AS elapsed_sec,
warehouse_name
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE - 7
AND partitions_total > 100
AND partitions_scanned / NULLIF(partitions_total, 0) > 0.8
ORDER BY total_elapsed_time DESC;
-- Fix: add clustering key so BI queries prune by date
ALTER TABLE marts.fact_orders CLUSTER BY (order_date);
-- Typical savings: 40-70% query time reduction → cheaper compute needed
4. Pipeline Architecture — Incremental vs Full Refresh
-- Scenario: dbt model doing full rebuild of 500M row table nightly
-- Takes 45 min on X-LARGE warehouse ($80/run × 30 days = $2,400/month)
-- Fix: convert to incremental model
{{ config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
) }}
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
-- Result: 3 min on MEDIUM warehouse ($2/run × 30 days = $60/month)
-- Savings: $2,340/month on ONE model alone
5. Smart Scheduling — Only Run When Data Exists
-- Use Snowflake Streams to gate Tasks (zero credits if no new data)
CREATE TASK etl.process_orders_task
WAREHOUSE = TRANSFORM_WH
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('raw.orders_stream') -- only runs if stream has data
AS CALL etl.sp_merge_orders();
-- Without WHEN guard: task starts warehouse every 5 min, 288 times/day
-- With WHEN guard: warehouse starts only when data arrives → 80-95% fewer cold starts
6. Data Architecture — Move Cold Data to External Tables
-- Data older than 2 years: store in S3 Parquet, query via external table
-- S3 cost: ~$0.023/GB/month vs Snowflake storage: ~$40/TB/month
-- 10TB of cold data: $400/month in Snowflake vs $230/month in S3 (~$170/month saved)
CREATE EXTERNAL TABLE archive.orders_historical (
order_id INT AS (value:order_id::INT),
order_date DATE AS (value:order_date::DATE),
net_revenue FLOAT AS (value:net_revenue::FLOAT)
)
PARTITION BY (order_year INT AS YEAR(order_date))
LOCATION = @archive.s3_archive_stage/orders/
FILE_FORMAT = (TYPE = 'PARQUET')
AUTO_REFRESH = TRUE;
7. Tooling Consolidation — Rationalise Ingestion Tools
- Replace Fivetran (per-MAR pricing) with Airbyte (open source) for internal source connectors where Qlik Replicate already handles CDC — avoid double-paying for the same source
- Consolidate overlapping dbt Cloud and Airflow orchestration — standardise on one orchestrator to reduce licensing + maintenance cost
- Use Snowflake Tasks + Streams for simple Snowflake-internal pipelines instead of triggering Airflow for a single SQL statement
Cost Savings Summary Framework
| Category | Action | Typical Savings |
|---|---|---|
| Compute | Right-size + auto-suspend warehouses | 20–40% |
| Compute | Full refresh → incremental models | 50–90% per model |
| Compute | Stream-gated tasks (no empty runs) | 80–95% on idle tasks |
| Storage | Transient staging tables | ~50% on staging storage |
| Storage | Reduce Time Travel retention on raw tables | 10–30% |
| Storage | Move cold data to S3 external tables | 30–60% on archive storage |
| Query | Clustering keys on large fact tables | 40–70% query time |
| Licensing | Consolidate ingestion tools | Varies by contract |
These four techniques all protect sensitive data but work at different layers, with different performance characteristics and governance implications. Interviewers often conflate them — here is a precise, side-by-side breakdown.
1. Data Redaction — Data is physically removed or replaced at source
Redaction means the sensitive value is permanently removed or overwritten in the storage layer — it does not exist in the table at all. There is no dynamic evaluation — the data is gone. Common in ETL pipelines before loading to analytics environments.
-- Static redaction during ETL: replace PII before loading to Snowflake
INSERT INTO staging.customers (customer_id, email, phone)
SELECT
customer_id,
'***@redacted.com' AS email, -- permanently replaced, original gone
'XXX-XXX-XXXX' AS phone
FROM source.raw_customers;
-- Use case: dev/test environments — load redacted data so developers
-- can work with realistic data shapes without ever seeing real PII
-- Unlike masking, there is NO way to recover the original — it never landed
2. Static (Format-Preserving) Masking — Data replaced consistently, value-preserving
Static masking replaces the value with a fake but realistic one — format is preserved so downstream logic still works. Unlike redaction, the substitution is often consistent (same input → same fake output), enabling joins across masked tables.
-- Static masking: replace with fake but format-preserving values
-- Used when: test/dev DB needs referential integrity but not real values
-- Mask credit card: keep format, replace digits
SELECT
card_number,
REGEXP_REPLACE(card_number, '[0-9]', 'X') AS masked_card
-- 4111-1111-1111-1234 → XXXX-XXXX-XXXX-XXXX
-- Format-preserving: keep last 4 digits (realistic for display)
SELECT
RPAD('****', LENGTH(card_number) - 4, '*') || RIGHT(card_number, 4) AS display_card
-- 4111-1111-1111-1234 → ****-****-****-1234
-- Consistent substitution using hash (same input → same fake output)
SELECT MD5(email) || '@fake.com' AS masked_email
FROM customers;
-- 'john@real.com' → always maps to same fake email → joins work
3. Dynamic Data Masking (DDM) — Real-time masking at query time, role-based
DDM is the most powerful technique for production analytics environments. The real data is stored unmasked in Snowflake. When a query runs, the masking policy function is evaluated on the fly and the result depends on the querying user's role. Different roles see different values from the same physical table — with zero data duplication.
-- Step 1: Create masking policy with role-based logic
CREATE OR REPLACE MASKING POLICY pii.email_mask
AS (val VARCHAR) RETURNS VARCHAR ->
CASE
-- DBA and data owners see the real email
WHEN CURRENT_ROLE() IN ('SYSADMIN', 'DATA_OWNER_ROLE')
THEN val
-- Analysts see first letter + masked domain (useful for support)
WHEN CURRENT_ROLE() = 'ANALYST_ROLE'
THEN LEFT(val, 1) || '***@***.***'
-- All other roles see fully masked value
ELSE '***MASKED***'
END;
-- Step 2: Apply policy to the column (one-time DDL)
ALTER TABLE staging.customers
MODIFY COLUMN email SET MASKING POLICY pii.email_mask;
-- Step 3: Observe different results for different roles
USE ROLE DATA_OWNER_ROLE;
SELECT email FROM staging.customers;
-- Output: john.doe@gmail.com (real value)
USE ROLE ANALYST_ROLE;
SELECT email FROM staging.customers;
-- Output: j***@***.*** (partially masked)
USE ROLE DEVELOPER_ROLE;
SELECT email FROM staging.customers;
-- Output: ***MASKED***
-- KEY: the data in Snowflake storage is ALWAYS the real email.
-- Masking is applied at query time — zero storage of duplicate masked data.
4. Row-Level Security (Row Access Policies) — Control WHICH ROWS a user can see
Where DDM controls WHAT VALUE is shown in a column, Row-Level Security controls WHICH ROWS are returned at all. A user simply does not see rows they are not permitted to — the rows are filtered out as if they don't exist.
-- Use case: regional sales managers should only see their own region's data
-- Without RLS: all 10M rows returned to every user — must rely on WHERE clause
-- With RLS: Snowflake enforces the filter automatically, user can't bypass it
-- Step 1: Create a mapping table (user → allowed region)
CREATE TABLE access.region_access (
user_name VARCHAR,
region VARCHAR
);
INSERT INTO access.region_access VALUES
('analyst_apac', 'APAC'),
('analyst_emea', 'EMEA'),
('analyst_us', 'US-EAST'),
('analyst_us', 'US-WEST'); -- user can see multiple regions
-- Step 2: Create row access policy
CREATE OR REPLACE ROW ACCESS POLICY security.region_filter
AS (region_col VARCHAR) RETURNS BOOLEAN ->
-- ADMIN sees all rows
CURRENT_ROLE() = 'ADMIN_ROLE'
-- Other users see only their allowed regions
OR EXISTS (
SELECT 1 FROM access.region_access
WHERE user_name = CURRENT_USER()
AND region = region_col
);
-- Step 3: Apply to table
ALTER TABLE marts.fact_orders
ADD ROW ACCESS POLICY security.region_filter ON (region);
-- Now analyst_apac runs:
SELECT * FROM marts.fact_orders;
-- Only returns rows WHERE region = 'APAC' — automatically, silently, enforced
Combining DDM + Row-Level Security
-- Most enterprise setups combine both: -- Row Access Policy: analyst sees only their region's rows -- Masking Policy: within those rows, email is masked unless they have DBO role -- This gives two independent dimensions of access control
Comparison Table
| Technique | Where Data is Protected | Reversible? | Role-Based? | Best For |
|---|---|---|---|---|
| Redaction | ETL pipeline (data never lands) | No — original gone | No | Dev/test environments, GDPR deletion |
| Static Masking | Physical table (fake values stored) | No — original replaced | No | Test DB clones, consistent anonymisation |
| Dynamic Masking | Query time (real data stored, masked at read) | Yes — real data preserved | Yes (CURRENT_ROLE) | Production analytics, PII column-level control |
| Row-Level Security | Query time (filter on rows) | Yes — real data preserved | Yes (CURRENT_USER/ROLE) | Multi-tenant, regional access control |
MongoDB is a document-oriented NoSQL database that stores data as JSON-like BSON documents. As a data engineer, you encounter MongoDB as a source system — understanding its data model and query language is essential for designing ingestion pipelines.
Core Concepts — SQL vs MongoDB Terminology
| SQL Concept | MongoDB Equivalent | Description |
|---|---|---|
| Database | Database | Same concept |
| Table | Collection | Group of documents |
| Row | Document (BSON) | JSON-like key-value object |
| Column | Field | Key in a document |
| Primary Key | _id | Auto-generated ObjectId or custom |
| JOIN | $lookup (aggregation) | Expensive — denormalise instead |
| INDEX | Index | Same concept, different syntax |
| SELECT | find() / aggregate() | Query documents |
Document Structure — schemaless, nested, flexible
// A MongoDB order document (BSON stored as JSON)
{
"_id": ObjectId("64f2a1b3c4e5f6789a0b1c2d"),
"order_id": "ORD-001",
"customer": {
"id": 1001,
"name": "Suryateja",
"email": "surya@example.com"
},
"items": [
{ "sku": "SHOE-RED", "qty": 2, "price": 49.99 },
{ "sku": "BAG-BLUE", "qty": 1, "price": 89.00 }
],
"status": "DELIVERED",
"created_at": ISODate("2024-01-15T10:30:00Z"),
"total": 188.98
}
// Key insight: nested objects and arrays are first-class — no JOIN needed
// But this makes querying and flattening for analytics harder
Basic CRUD Operations
// INSERT one document
db.orders.insertOne({
order_id: "ORD-002",
customer: { id: 1002, name: "Ravi" },
status: "PENDING",
total: 75.00
});
// FIND all delivered orders (equivalent to SELECT WHERE)
db.orders.find({ status: "DELIVERED" });
// FIND with projection (equivalent to SELECT specific columns)
db.orders.find(
{ status: "DELIVERED" }, // filter
{ order_id: 1, total: 1, _id: 0 } // projection: 1=include, 0=exclude
);
// Nested field query: find orders where customer.id = 1001
db.orders.find({ "customer.id": 1001 });
// Array query: find orders containing SKU 'SHOE-RED' in items
db.orders.find({ "items.sku": "SHOE-RED" });
// UPDATE one document
db.orders.updateOne(
{ order_id: "ORD-001" },
{ $set: { status: "RETURNED" } }
);
// DELETE documents older than 1 year
db.orders.deleteMany({
created_at: { $lt: new Date("2023-01-01") }
});
// Common query operators:
// $eq, $ne, $gt, $gte, $lt, $lte → comparison
// $in, $nin → value in/not in array
// $and, $or, $not → logical
// $exists → field exists check
// $regex → regex match
Aggregation Pipeline — the MongoDB equivalent of GROUP BY + HAVING + JOIN
// Aggregation pipeline: each stage transforms the document stream
// Equivalent to: SELECT status, COUNT(*), SUM(total) FROM orders GROUP BY status
db.orders.aggregate([
// Stage 1: Filter (WHERE)
{ $match: { status: { $in: ["DELIVERED", "RETURNED"] } } },
// Stage 2: Unwind array (like LATERAL FLATTEN — one row per item)
{ $unwind: "$items" },
// Stage 3: Project computed fields
{ $project: {
order_id: 1,
status: 1,
sku: "$items.sku",
line_total: { $multiply: ["$items.qty", "$items.price"] }
}},
// Stage 4: Group by (GROUP BY + aggregate)
{ $group: {
_id: "$status",
order_count: { $sum: 1 },
total_revenue: { $sum: "$line_total" },
avg_line_value: { $avg: "$line_total" }
}},
// Stage 5: Sort (ORDER BY)
{ $sort: { total_revenue: -1 } },
// Stage 6: Limit (LIMIT)
{ $limit: 10 }
]);
// $lookup: LEFT JOIN equivalent (use sparingly — expensive)
db.orders.aggregate([
{ $lookup: {
from: "customers", // right collection
localField: "customer.id", // field in orders
foreignField: "_id", // field in customers
as: "customer_details" // output array field
}},
{ $unwind: "$customer_details" } // flatten the joined array
]);
Indexes — critical for query performance
// Without index: MongoDB does a full collection scan (like Snowflake without pruning)
// Single field index
db.orders.createIndex({ status: 1 }); // 1 = ascending, -1 = descending
// Compound index (order matters — most selective field first)
db.orders.createIndex({ status: 1, created_at: -1 });
// Index on nested field
db.orders.createIndex({ "customer.id": 1 });
// Check which indexes exist
db.orders.getIndexes();
// Explain query plan (check if index is used)
db.orders.find({ status: "DELIVERED" }).explain("executionStats");
// Look for: winningPlan.stage = "IXSCAN" (index scan) vs "COLLSCAN" (full scan)
MongoDB as a Data Engineering Source — CDC & Ingestion
// MongoDB Change Streams — native CDC for downstream pipelines
// Similar to Oracle redo logs or SQL Server transaction logs but for MongoDB
const changeStream = db.orders.watch([
{ $match: { operationType: { $in: ['insert', 'update', 'delete'] } } }
]);
changeStream.on('change', (event) => {
// event.operationType: 'insert' | 'update' | 'delete' | 'replace'
// event.fullDocument: the document after change (for inserts/updates)
// event.documentKey._id: the _id of the changed document
// event.updateDescription: fields changed + removed (for updates)
console.log(event);
// Ship to Kafka topic → Snowflake via Kafka Connect
});
// Tools for MongoDB → Snowflake ingestion:
// Debezium MongoDB connector → Kafka → Snowflake Kafka Connector
// Fivetran MongoDB connector (managed, no-code)
// MongoDB Atlas Data Federation (if using Atlas cloud)
When to use MongoDB vs Snowflake
| Factor | Use MongoDB | Use Snowflake |
|---|---|---|
| Data shape | Flexible, nested, schema evolves often | Tabular, defined schema |
| Access pattern | Operational reads/writes (OLTP) | Analytical queries (OLAP) |
| Scale | High write throughput, low latency | Large analytical scans |
| JOINs | Avoid — denormalise into documents | Excellent multi-table joins |
| Aggregations | Limited, slower than SQL | Highly optimised |
| Typical use | App backend, product catalogue, CMS | Data warehouse, analytics, BI |
JSON flattening is the process of converting nested JSON structures into relational rows and columns. In Snowflake, the VARIANT type stores arbitrary JSON and a family of operators/functions lets you query and flatten it without pre-defining a schema.
VARIANT — Snowflake's JSON storage type
-- VARIANT stores any valid JSON: object, array, string, number, boolean, null
CREATE TABLE raw.events (
event_id INT,
payload VARIANT, -- entire JSON document stored here
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Insert raw JSON (common during Snowpipe/COPY INTO loads)
INSERT INTO raw.events (event_id, payload) SELECT
SEQ4() AS event_id,
PARSE_JSON('{
"user_id": 42,
"event_type": "purchase",
"amount": 149.99,
"metadata": {
"device": "mobile",
"os": "iOS",
"version": "17.2"
},
"tags": ["sale", "premium", "new_user"]
}') AS payload;
Accessing values — colon (:) path notation + double-colon (::) casting
-- Colon notation navigates into the JSON structure
SELECT
payload:user_id::INT AS user_id, -- top-level field
payload:event_type::VARCHAR AS event_type,
payload:amount::DECIMAL(10,2) AS amount,
payload:metadata:device::VARCHAR AS device, -- nested object field
payload:metadata:os::VARCHAR AS os,
payload:metadata.version::VARCHAR AS version -- dot notation also works
-- Alternative: GET() function (useful for dynamic field names)
-- GET(payload, 'user_id')::INT AS user_id_alt,
-- GET_PATH() for deep nesting
-- GET_PATH(payload, 'metadata.os')::VARCHAR AS os_alt
FROM raw.events;
-- Handling missing fields — returns NULL if path doesn't exist
SELECT payload:nonexistent_field::VARCHAR FROM raw.events; -- returns NULL, no error
TRY_CAST and safe type coercion
-- Problem: source JSON may have type inconsistencies
-- payload:amount could be "149.99" (string) in some events, 149.99 (number) in others
-- TRY_CAST: returns NULL instead of ERROR on conversion failure
SELECT
TRY_CAST(payload:amount::VARCHAR AS DECIMAL(10,2)) AS amount_safe,
TRY_CAST(payload:user_id::VARCHAR AS INT) AS user_id_safe
FROM raw.events;
-- TRY_ prefix functions (Snowflake family):
-- TRY_CAST, TRY_TO_DATE, TRY_TO_TIMESTAMP, TRY_TO_NUMBER, TRY_TO_DECIMAL
-- All return NULL on failure instead of raising an error
-- Essential for production ETL where source data quality is variable
Flattening arrays — LATERAL FLATTEN (review + examples)
-- Unpack the "tags" array into individual rows
SELECT
payload:user_id::INT AS user_id,
payload:event_type::VARCHAR AS event_type,
tag.value::VARCHAR AS tag,
tag.index AS tag_position
FROM raw.events,
LATERAL FLATTEN(input => payload:tags) AS tag;
-- Result:
-- user_id | event_type | tag | tag_position
-- 42 | purchase | sale | 0
-- 42 | purchase | premium | 1
-- 42 | purchase | new_user | 2
Flattening JSON objects — LATERAL FLATTEN on an object
-- Flatten an object into key-value pairs (useful for EAV-style metadata)
SELECT
payload:user_id::INT AS user_id,
meta.key AS metadata_key,
meta.value::VARCHAR AS metadata_value
FROM raw.events,
LATERAL FLATTEN(input => payload:metadata) AS meta;
-- Result:
-- user_id | metadata_key | metadata_value
-- 42 | device | mobile
-- 42 | os | iOS
-- 42 | version | 17.2
RECURSIVE FLATTEN — flatten all levels at once
-- RECURSIVE = TRUE traverses all nesting levels
-- Useful for exploration — not recommended for production (output can be unpredictable)
SELECT
f.key,
f.value,
f.path,
f.index
FROM raw.events,
LATERAL FLATTEN(input => payload, RECURSIVE => TRUE) AS f;
-- Returns every key-value pair at every nesting level
-- path shows the dot-notation path: e.g., 'metadata.device', 'tags[0]'
Complete flattening pipeline — raw VARIANT → relational staging table
-- models/staging/stg_events.sql (dbt model)
-- Full pipeline: VARIANT → typed, flat, relational rows
{{ config(materialized='view') }}
WITH flattened_tags AS (
SELECT
e.event_id,
e.payload:user_id::INT AS user_id,
e.payload:event_type::VARCHAR AS event_type,
TRY_CAST(e.payload:amount::VARCHAR AS DECIMAL(10,2)) AS amount,
e.payload:metadata:device::VARCHAR AS device,
e.payload:metadata:os::VARCHAR AS os,
TRY_TO_TIMESTAMP(e.payload:timestamp::VARCHAR) AS event_timestamp,
tag.value::VARCHAR AS tag,
tag.index AS tag_position,
e.loaded_at
FROM {{ source('raw', 'events') }} e,
LATERAL FLATTEN(input => e.payload:tags, OUTER => TRUE) AS tag
-- OUTER => TRUE: events with empty tags[] still appear (with NULL tag)
)
SELECT * FROM flattened_tags
WHERE user_id IS NOT NULL -- filter malformed records
PARSE_JSON vs OBJECT_CONSTRUCT vs TO_VARIANT
-- PARSE_JSON: converts a JSON string into VARIANT
SELECT PARSE_JSON('{"key": "value", "num": 42}');
-- OBJECT_CONSTRUCT: build a VARIANT object from key-value pairs
SELECT OBJECT_CONSTRUCT('user_id', 42, 'event', 'click', 'score', 9.5);
-- Useful for creating JSON payloads from relational columns
-- ARRAY_CONSTRUCT: build a VARIANT array
SELECT ARRAY_CONSTRUCT('tag1', 'tag2', 'tag3');
-- Combine them:
SELECT OBJECT_CONSTRUCT(
'user_id', 42,
'tags', ARRAY_CONSTRUCT('sale', 'premium'),
'meta', OBJECT_CONSTRUCT('device', 'mobile', 'os', 'iOS')
) AS rebuilt_payload;
Common JSON flattening mistakes:
- Casting without TRY —
payload:amount::INTwill error if amount is "N/A". Always use TRY_CAST for user-provided or third-party JSON - Missing OUTER on FLATTEN — if any record has an empty array, it silently disappears. Add
OUTER => TRUEto preserve parent rows - Forgetting STRIP_OUTER_ARRAY in file format — if your JSON file is
[{...},{...}](array at root), addSTRIP_OUTER_ARRAY = TRUEto your file format or it loads as one row - NULL vs missing field —
payload:field::VARCHARreturns NULL for both a missing field and a JSON null value. UseIS_NULL_VALUE(payload:field)to distinguish
Ingesting data from APIs is one of the most common tasks in data engineering — Stripe, Salesforce, HubSpot, GitHub, and hundreds of SaaS tools expose their data via REST APIs. Understanding every layer of API interaction prevents the silent data gaps that plague naive implementations.
1. Core API Elements
| Element | Description | Example |
|---|---|---|
| Base URL | Root endpoint of the API | https://api.stripe.com/v1 |
| Endpoint | Specific resource path | /charges, /customers/{id} |
| HTTP Method | Action on the resource | GET (read), POST (create), PUT/PATCH (update), DELETE |
| Headers | Metadata sent with request | Authorization, Content-Type, Accept |
| Query Parameters | Filter/sort/paginate in URL | ?limit=100&created_after=2024-01-01 |
| Request Body | Data sent (POST/PUT) | JSON payload |
| Response Body | Data returned by API | JSON, XML, CSV |
| Status Code | HTTP result code | 200 OK, 201 Created, 400 Bad Request, 401 Unauthorized, 429 Rate Limited, 500 Server Error |
2. Authentication Methods
import requests
# Method 1: API Key in Header (most common for data APIs)
headers = {
"Authorization": "Bearer sk_live_abc123xyz", # Stripe pattern
"Content-Type": "application/json"
}
response = requests.get("https://api.stripe.com/v1/charges", headers=headers)
# Method 2: Basic Auth (username:password base64 encoded)
import base64
creds = base64.b64encode(b"api_key:").decode()
headers = {"Authorization": f"Basic {creds}"}
# Method 3: OAuth 2.0 (Salesforce, Google, HubSpot)
# Step 1: Get access token
token_response = requests.post("https://login.salesforce.com/services/oauth2/token", data={
"grant_type": "password",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"username": SF_USERNAME,
"password": SF_PASSWORD + SF_SECURITY_TOKEN
})
access_token = token_response.json()["access_token"]
instance_url = token_response.json()["instance_url"]
# Step 2: Use access token for API calls
headers = {"Authorization": f"Bearer {access_token}"}
records = requests.get(f"{instance_url}/services/data/v58.0/query",
params={"q": "SELECT Id, Name FROM Account"},
headers=headers).json()
# Method 4: API Key in Query Parameter (less secure, some older APIs)
response = requests.get("https://api.example.com/data?api_key=MY_KEY")
3. HTTP Status Codes — what to do with each
STATUS_HANDLING = {
200: "Success — process the response body",
201: "Created — resource was created (POST)",
204: "No Content — success but empty body (DELETE)",
400: "Bad Request — your request is malformed, fix the query/body",
401: "Unauthorized — invalid/expired credentials, refresh token",
403: "Forbidden — valid credentials but insufficient permissions",
404: "Not Found — resource doesn't exist (log + skip, don't retry)",
429: "Too Many Requests — rate limited, back off and retry",
500: "Internal Server Error — API side issue, retry with backoff",
502: "Bad Gateway — transient, retry",
503: "Service Unavailable — API down, retry with exponential backoff",
}
4. Pagination Strategies
import time
# Strategy 1: Cursor-based pagination (most reliable — Stripe, GitHub)
# API returns a cursor pointing to next page; immune to data changes mid-fetch
def fetch_all_charges_cursor(base_url, headers):
all_records = []
params = {"limit": 100}
while True:
resp = requests.get(base_url + "/charges", headers=headers, params=params).json()
all_records.extend(resp["data"])
if not resp.get("has_more"): # Stripe: has_more = False means last page
break
params["starting_after"] = resp["data"][-1]["id"] # cursor = last record ID
return all_records
# Strategy 2: Offset/Limit pagination (simple, common in REST APIs)
# WARNING: if records are inserted mid-fetch, you may miss records or get duplicates
def fetch_all_offset(base_url, headers):
all_records, page, limit = [], 0, 100
while True:
resp = requests.get(base_url + "/orders",
headers=headers,
params={"limit": limit, "offset": page * limit}).json()
if not resp["items"]:
break
all_records.extend(resp["items"])
page += 1
return all_records
# Strategy 3: Page-number pagination
def fetch_all_pages(base_url, headers):
all_records, page = [], 1
while True:
resp = requests.get(base_url + "/records",
params={"page": page, "per_page": 100}).json()
all_records.extend(resp["results"])
if page >= resp["total_pages"]:
break
page += 1
return all_records
# Strategy 4: Link header pagination (GitHub REST API)
def fetch_all_link_header(url, headers):
all_records = []
while url:
resp = requests.get(url, headers=headers)
all_records.extend(resp.json())
url = resp.links.get("next", {}).get("url") # follow next link
return all_records
5. Push vs Pull — Two Fundamental Acquisition Strategies
# PULL (polling) — your pipeline fetches data on a schedule
# ─────────────────────────────────────────────────────────
# - You control the frequency
# - Simple to implement (cron job + API call)
# - Always a latency gap between real-time and your data
# - Can hit rate limits if polling too frequently
# - Example: fetch Stripe charges every 15 minutes
while True:
charges = fetch_charges_since(last_watermark)
load_to_snowflake(charges)
last_watermark = charges[-1]["created"] if charges else last_watermark
time.sleep(900) # 15-minute poll interval
# PUSH (webhooks/streaming) — the API sends data to YOU as it happens
# ─────────────────────────────────────────────────────────────────────
# - Real-time (data arrives seconds after the event)
# - No polling cost
# - You need a receiver endpoint (REST endpoint or message queue)
# - Source must support webhooks
# - Example: Stripe sends a webhook POST to your endpoint on charge.created
# Webhook receiver (Flask example)
from flask import Flask, request
import hmac, hashlib
app = Flask(__name__)
@app.route("/stripe-webhook", methods=["POST"])
def stripe_webhook():
payload = request.get_data()
sig_header = request.headers.get("Stripe-Signature")
secret = "whsec_your_secret"
# Verify webhook authenticity (CRITICAL — never skip this)
try:
event = stripe.Webhook.construct_event(payload, sig_header, secret)
except stripe.error.SignatureVerificationError:
return "Invalid signature", 400
if event["type"] == "charge.succeeded":
charge = event["data"]["object"]
load_charge_to_snowflake(charge)
return "", 200
6. Rate Limiting — Handling 429 with Exponential Backoff
import time, random
def api_request_with_retry(url, headers, params=None, max_retries=5):
for attempt in range(max_retries):
resp = requests.get(url, headers=headers, params=params)
if resp.status_code == 200:
return resp.json()
elif resp.status_code == 429:
# Check Retry-After header (some APIs tell you exactly how long to wait)
retry_after = int(resp.headers.get("Retry-After", 60))
wait = retry_after + random.uniform(0, 10) # jitter to avoid thundering herd
print(f"Rate limited. Waiting {wait:.1f}s before retry {attempt+1}/{max_retries}")
time.sleep(wait)
elif resp.status_code in (500, 502, 503):
# Exponential backoff: 1s, 2s, 4s, 8s, 16s...
wait = (2 ** attempt) + random.uniform(0, 1)
print(f"Server error {resp.status_code}. Waiting {wait:.1f}s...")
time.sleep(wait)
elif resp.status_code in (400, 401, 403, 404):
# Non-retryable errors: fix the request or skip
raise ValueError(f"Non-retryable error: {resp.status_code} {resp.text}")
raise RuntimeError(f"Max retries exceeded for {url}")
7. Complete API Ingestion Pipeline — Steps
# STEP 1: Authentication setup
# - Obtain API key / OAuth token / service account
# - Store credentials in secrets manager (AWS Secrets Manager, not hardcoded)
# - Set up credential rotation if tokens expire
# STEP 2: Watermarking / Incremental state management
# - Store last successful extraction timestamp in a control table
# - Use this to fetch ONLY new/changed data (avoid re-fetching everything)
last_run = get_last_run_timestamp('stripe_charges') # from control table
# STEP 3: Fetch data with pagination and rate limiting
all_data = []
for page_data in paginate_api(endpoint, since=last_run):
all_data.extend(page_data)
time.sleep(0.1) # respect rate limits proactively
# STEP 4: Validate and transform
for record in all_data:
assert 'id' in record, f"Missing id: {record}"
record['_ingested_at'] = datetime.utcnow().isoformat()
# STEP 5: Load to Snowflake (land raw, then transform)
# Land as VARIANT to handle schema evolution
write_to_snowflake_stage(all_data, stage_path)
execute_snowflake_copy_into(target_table)
# STEP 6: Update watermark (only after successful load)
update_last_run_timestamp('stripe_charges', max(r['created'] for r in all_data))
# STEP 7: Monitor and alert
# - Log row counts, API response times, error rates
# - Alert on: zero rows (potential API issue), high error rate, stale watermark
Summary — Pull vs Push trade-offs
| Dimension | Pull (Polling) | Push (Webhooks/Streaming) |
|---|---|---|
| Latency | Poll interval (minutes–hours) | Near real-time (seconds) |
| Complexity | Low (schedule + GET) | High (receiver + queue + dedup) |
| Reliability | You control retries | Must ACK or webhook resends |
| Volume handling | Batch — good for large historical loads | Event-by-event — good for low-latency |
| Rate limits | Must manage carefully | Not applicable (API pushes to you) |
| Best for | Historical backfill, less critical freshness | Payment events, real-time triggers, notifications |