🔒 Content Protected

This page requires the live web environment to render correctly.
Saved local copies cannot display content.

Open Live Version →

Questions? Contact me:

© Vemula Suryateja

📋 Maintained by  Vemula Suryateja
vsurya@duck.com
0 / 0 viewed
❄ Interview Prep

Snowflake &
Data Engineering

topic://
❄ Snowflake ▴ DBT ☁ Cloud 📈 SQL ⇌ CDC 🔒 Data Governance 🚀 Pipelines 📊 Analytics ⚡ Streaming 🔌 Schemas
📄
0
Questions
🌐
0
Topics
💻
0
SQL Examples
Your Progress 0 / 52 viewed
70 questions
01
Question 1
Write a Query to Get Compute Hours Consumed

Calculate total compute hours consumed using LAG() to capture the start state and timestamp, then LEAD() to look ahead to the next end timestamp — DATEDIFF between them gives run duration in seconds, which is summed and converted to hours.

Sample Data

-- your_table_name
-- | Date_timestamp          | progress |
-- |-------------------------|----------|
-- | 2024-01-15 08:00:00     | start    |
-- | 2024-01-15 09:45:00     | end      |
-- | 2024-01-15 11:00:00     | start    |
-- | 2024-01-15 12:30:00     | end      |

Step 1 — Use LAG to carry forward the previous state and its timestamp

WITH lagged AS (
    SELECT
        Date_timestamp,
        progress,
        -- Pull the previous row's state into this row
        LAG(progress)       OVER (ORDER BY Date_timestamp) AS prev_state,
        LAG(Date_timestamp) OVER (ORDER BY Date_timestamp) AS prev_timestamp
    FROM your_table_name
)
-- At this point every 'end' row also knows its paired 'start' timestamp
SELECT * FROM lagged WHERE progress = 'end';

Step 2 — Use LEAD on the start rows to look ahead to the paired end timestamp

WITH windowed AS (
    SELECT
        Date_timestamp,
        progress,
        LAG(progress)       OVER (ORDER BY Date_timestamp) AS prev_state,
        LAG(Date_timestamp) OVER (ORDER BY Date_timestamp) AS prev_timestamp,
        -- Look ahead from this row to the next row's timestamp
        LEAD(Date_timestamp) OVER (ORDER BY Date_timestamp) AS next_timestamp
    FROM your_table_name
)

Step 3 — DATEDIFF between start and its LEAD end timestamp, SUM & divide by 3600

WITH windowed AS (
    SELECT
        Date_timestamp,
        progress,
        LAG(progress)        OVER (ORDER BY Date_timestamp) AS prev_state,
        LAG(Date_timestamp)  OVER (ORDER BY Date_timestamp) AS prev_timestamp,
        LEAD(Date_timestamp) OVER (ORDER BY Date_timestamp) AS next_timestamp
    FROM your_table_name
),
run_seconds AS (
    SELECT
        Date_timestamp                              AS run_start,
        next_timestamp                              AS run_end,
        TO_DATE(Date_timestamp)                     AS run_date,
        -- DATEDIFF between this start row and the LEAD end timestamp
        DATEDIFF(SECOND, Date_timestamp, next_timestamp) AS duration_seconds
    FROM windowed
    WHERE progress = 'start'       -- anchor on start rows
      AND next_timestamp IS NOT NULL -- guard against last row
)
SELECT
    run_date,
    SUM(duration_seconds)          AS total_run_seconds,
    SUM(duration_seconds) / 3600.0 AS total_run_hours   -- divide by 3600 to get hours
FROM run_seconds
GROUP BY run_date
ORDER BY run_date;

Overall total across all days

-- Wrap the above to get a grand total
WITH windowed AS (
    SELECT
        Date_timestamp,
        progress,
        LEAD(Date_timestamp) OVER (ORDER BY Date_timestamp) AS next_timestamp
    FROM your_table_name
),
run_seconds AS (
    SELECT DATEDIFF(SECOND, Date_timestamp, next_timestamp) AS duration_seconds
    FROM windowed
    WHERE progress = 'start' AND next_timestamp IS NOT NULL
)
SELECT
    SUM(duration_seconds)          AS total_run_seconds,
    SUM(duration_seconds) / 3600.0 AS total_compute_hours
FROM run_seconds;

How it works step by step

  • LAG(progress) — brings the previous row's state alongside each row, so we can confirm the row before an end was a start
  • LEAD(Date_timestamp) — from a start row, looks one row ahead to fetch the end timestamp — this is the key change from the naive LAG-only approach
  • DATEDIFF(SECOND, start_ts, end_ts) — produces the raw run duration in seconds for each start→end pair
  • SUM(...) / 3600.0 — aggregates all run durations and converts seconds → hours (3600 seconds per hour)
  • Anchoring the filter on progress = 'start' rows keeps the logic clean — each start row directly carries its own paired end timestamp via LEAD
↑ Back to top
02
Question 2
Snowflake Architecture Explained

Real-World Scenario: Your BI team runs 50 concurrent Tableau dashboards while your data engineering team runs nightly dbt transforms. On a traditional monolith both workloads starve each other. On Snowflake you give each team a dedicated virtual warehouse — they share the same data but use completely independent compute. No contention, no queuing.

The Three-Layer Architecture

┌─────────────────────────────────────────────────────┐
│               CLOUD SERVICES LAYER                  │
│  Authentication · Query Optimiser · Metadata · RBAC │
│  Transaction Manager · Infrastructure Management    │
└───────────────────┬─────────────────────────────────┘
                    │  (orchestrates everything)
        ┌───────────┴────────────┐
        │                        │
┌───────▼────────┐    ┌──────────▼──────────┐
│  ANALYTICS_WH  │    │   TRANSFORM_WH      │  ← Virtual Warehouses (Compute)
│  (Tableau / BI)│    │   (dbt runs here)   │    Independent, auto-suspend
└───────┬────────┘    └──────────┬──────────┘
        │                        │
        └────────────┬───────────┘
                     │  (both read same data, zero contention)
           ┌─────────▼──────────┐
           │   STORAGE LAYER    │  ← S3 / Azure Blob / GCS
           │  Micro-partitions  │    Columnar, compressed, encrypted
           │  (50–500 MB each)  │    Decoupled from compute entirely
           └────────────────────┘

1. Storage Layer — Immutable Micro-Partitions

  • All data is automatically reorganised into micro-partitions (50–500 MB compressed) in columnar format
  • Each micro-partition stores min/max metadata per column — Snowflake uses this to prune partitions at query time without scanning them
  • Compression is typically 10–20x. A 1TB uncompressed table may occupy 60GB in Snowflake
  • Stored in cloud object storage (S3 on AWS, Azure Blob, GCS). You do not manage disks or nodes
  • Data is never modified in-place — mutations create new partitions (copy-on-write), enabling Time Travel and Fail-Safe

2. Compute Layer — Virtual Warehouses

-- Create separate warehouses per workload to eliminate contention
CREATE WAREHOUSE BI_WH        WAREHOUSE_SIZE='LARGE'  AUTO_SUSPEND=300;
CREATE WAREHOUSE TRANSFORM_WH WAREHOUSE_SIZE='MEDIUM' AUTO_SUSPEND=120;
CREATE WAREHOUSE INGEST_WH    WAREHOUSE_SIZE='SMALL'  AUTO_SUSPEND=60;
CREATE WAREHOUSE ADMIN_WH     WAREHOUSE_SIZE='XSMALL' AUTO_SUSPEND=60;

-- Multi-cluster warehouse for BI concurrency spikes (Black Friday dashboards)
ALTER WAREHOUSE BI_WH SET
    MAX_CLUSTER_COUNT = 4
    MIN_CLUSTER_COUNT = 1
    SCALING_POLICY    = 'STANDARD';
  • Warehouses are MPP clusters that spin up in seconds and auto-suspend when idle — you only pay while running
  • Size doubles compute (and cost) each tier: X-SMALL → SMALL → MEDIUM → LARGE → X-LARGE → 2X-LARGE → 3X-LARGE → 4X-LARGE
  • Multi-cluster warehouses scale out horizontally (add nodes) for concurrency. Scale up (bigger size) for complex single queries
  • Separation of warehouses is the #1 cost and performance best practice in Snowflake

3. Cloud Services Layer — The Brain

  • Query optimiser — rewrites your SQL, chooses join order, decides which micro-partitions to read, all before the warehouse starts
  • Metadata store — tracks every partition, column stats, clustering depth, transaction history
  • Transaction manager — ACID compliance via MVCC (multi-version concurrency control). Reads never block writes
  • Result cache — identical queries return instantly for 24 hours at zero warehouse cost
  • Billed as Cloud Services credits — usually <10% of compute spend; Snowflake credits back anything under 10%

Why This Architecture Wins Over Traditional MPP (Redshift, Teradata)

DimensionTraditional MPPSnowflake
Scale storageMust add nodes (costly)Storage scales independently (pay-per-TB)
Scale computeResize whole clusterNew warehouse in 5 seconds
ConcurrencyShared queueSeparate warehouses per team
Idle costPay 24/7Auto-suspend (pay per second)
MaintenanceVacuum, reindex, patchZero maintenance
↑ Back to top
03
Question 3
Time-Travel Features: OFFSET, TIMESTAMP, STATEMENT

Snowflake's Time Travel allows querying data as it existed at any point in the past (default 1 day, up to 90 days for Enterprise).

1. Using AT (OFFSET) - Relative Time

-- Query data from 5 minutes ago
SELECT * FROM my_table AT (OFFSET => -300);

-- Query data from 1 hour ago
SELECT * FROM my_table AT (OFFSET => -3600);

2. Using AT (TIMESTAMP) - Absolute Time

-- Query data as it existed at specific timestamp
SELECT * FROM my_table AT (TIMESTAMP => '2025-06-25 10:30:00'::TIMESTAMP_LTZ);

-- Create table with historical data
CREATE TABLE my_table_history AS
SELECT * FROM my_table AT (TIMESTAMP => '2025-06-24 00:00:00'::TIMESTAMP_LTZ);

3. Using BEFORE (STATEMENT) - Before a Specific Query

-- Recover data deleted by a specific query
CREATE TABLE recovered_data AS
SELECT * FROM my_table BEFORE (STATEMENT => 'query_id_123');

Real Use Cases:

  • Mistakenly deleted rows? Recover them instantly without restore
  • Audit trail of data changes
  • Debugging failed transformations
  • Point-in-time analysis
↑ Back to top
04
Question 4
Query Performance Optimization Strategy

Five-step approach to optimize slow queries:

1. Profile the Query (Root Cause Analysis)

  • Check query execution plan in Snowflake UI
  • Identify bottleneck: scanning, joining, aggregating, or sorting?
  • Look for spilling to disk (memory constraint)
  • Evaluate partition/micro-partition pruning efficiency

2. Right-Size Compute

-- If query shows high queue time, increase warehouse size
ALTER WAREHOUSE MY_WH SET WAREHOUSE_SIZE = 'LARGE';

-- If query spills to disk, increase memory
ALTER WAREHOUSE MY_WH SET AUTO_SUSPEND = 10;  -- Minutes

3. Optimize SQL Logic

  • SELECT specific columns: Avoid SELECT *, specify needed columns only
  • Filter early: Apply WHERE clauses on clustered/partitioned columns first
  • Join optimization: Smaller tables on left, larger on right; use INNER JOIN when possible
  • Avoid correlated subqueries: Use joins instead
  • Eliminate unnecessary operations: Remove redundant DISTINCT or ORDER BY unless needed

4. Physical Data Organization

  • Clustering Keys: Define on columns frequently used in WHERE or JOIN
  • Materialized Views: Pre-compute complex, frequently-run logic

5. Leverage Caching

  • Result Cache: Snowflake caches query results for 24 hours; run same queries for cache hits
  • Warehouse Cache: Keep warehouse running to leverage SSD cache of recently scanned blocks

Example Before & After:

-- BEFORE: Slow query (20 seconds)
SELECT * FROM huge_table WHERE created_date = '2025-01-01'
ORDER BY customer_id;

-- AFTER: Optimized (2 seconds)
SELECT customer_id, order_id, amount FROM huge_table 
WHERE created_date = '2025-01-01'  -- Filter early
QUALIFY ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY created_date DESC) = 1;
↑ Back to top
05
Question 5
Partitions vs Micro-partitions: Key Differences

Partitions are logical divisions; Micro-partitions are Snowflake's intelligent physical chunks.

Traditional Partitions:

  • Admin-defined logical divisions (e.g., one partition per month)
  • Manual creation and management
  • Typically large (GB-TB per partition)
  • Used in databases like PostgreSQL, Oracle

Snowflake Micro-partitions:

  • Automatic, immutable chunks (50-500 MB)
  • Transparent to users - no manual management needed
  • Columnar, compressed format
  • Snowflake maintains rich metadata about each micro-partition (value ranges, distinct counts, NULL counts)

Key Advantages of Micro-partitions:

  • Automatic Pruning: Metadata allows skipping micro-partitions that don't match WHERE clause
  • No Manual Maintenance: Snowflake handles partitioning automatically
  • Clustering Keys: Optional optimization to group similar values physically
  • Efficient Compression: Columnar storage compresses repetitive data 10x+
Feature Traditional Partition Micro-partition
Management Manual Automatic
Size GB-TB 50-500 MB
Format Row or Column Always Columnar
Pruning Basic Intelligent with Metadata
↑ Back to top
06
Question 6
SCD Types 0-4: Change Data Handling

Slowly Changing Dimensions (SCD) are methods to handle data that changes over time in dimension tables.

Type 0: Retain Original

No updates. Historical value never changes.

Customer ID 1 registered on 2020-01-01 as "John" → stays "John" forever

Type 1: Overwrite

Overwrite old value with new. No history preserved.

Customer 1 name: "John" (2020-01-01) → "John Smith" (2024-01-01)
Result: Only latest name is stored

Type 2: Add New Row (Full History)

Most common in analytics. New row added for each change with effective dates.

-- Initial row
ID=1, Name=John, Dept=Sales, Valid_From=2020-01-01, Valid_To=9999-12-31, Is_Current=TRUE

-- After promotion to Marketing (2024-01-01)
-- Old row gets end date
ID=1, Name=John, Dept=Sales, Valid_From=2020-01-01, Valid_To=2023-12-31, Is_Current=FALSE
-- New row for current state
ID=1, Name=John, Dept=Marketing, Valid_From=2024-01-01, Valid_To=9999-12-31, Is_Current=TRUE

Type 3: Add Previous Attribute

New column stores previous value. Partial history.

Dept (current), Dept_Previous columns
When change happens: move current to previous, update current value

Type 4: Separate History Table

Main table holds current (Type 1), separate history table stores all versions (Type 2).

Which to Use?

  • Type 2 is most common: Complete audit trail, easy to analyze "as of" dates
  • Type 1 for: Attributes that don't need historical tracking
  • Type 2 via dbt snapshots: Automated in modern workflows
↑ Back to top
07
Question 7
How to Ingest JSON Data into Snowflake

Real-World Scenario: A logistics partner sends webhook events in JSON — each event has a nested shipment object with an array of items. You need to land the raw JSON, query nested fields, and flatten the array into relational rows for downstream dbt models.

Step 1 — Create File Format & Stage

CREATE FILE FORMAT json_fmt
    TYPE             = 'JSON'
    STRIP_OUTER_ARRAY = TRUE    -- unwrap top-level array [{}{}{}] → rows
    NULL_IF           = ('', 'NULL', 'null');

CREATE STAGE raw.logistics_stage
    URL                  = 's3://logistics-bucket/events/'
    STORAGE_INTEGRATION  = s3_logistics_int
    FILE_FORMAT          = json_fmt;

Step 2 — Land Raw JSON into a VARIANT Column

-- VARIANT stores any JSON structure; schema changes in the source don't break this table
CREATE TABLE raw.shipment_events (
    event_raw   VARIANT,
    _file_name  VARCHAR DEFAULT METADATA$FILENAME,
    _loaded_at  TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

COPY INTO raw.shipment_events (event_raw)
FROM @raw.logistics_stage
FILE_FORMAT = json_fmt
ON_ERROR    = 'CONTINUE';   -- log bad rows, don't fail the whole load

Step 3 — Query Nested Fields with : and :: operators

-- Sample JSON structure:
-- {
--   "shipment_id": "SHP-001",
--   "carrier": "DHL",
--   "status": "delivered",
--   "delivered_at": "2024-01-15T14:30:00Z",
--   "destination": { "city": "Hyderabad", "country": "IN" },
--   "items": [
--     {"sku": "SKU-A", "qty": 2, "weight_kg": 1.5},
--     {"sku": "SKU-B", "qty": 1, "weight_kg": 0.8}
--   ]
-- }

SELECT
    event_raw:shipment_id::VARCHAR        AS shipment_id,
    event_raw:carrier::VARCHAR            AS carrier,
    event_raw:status::VARCHAR             AS status,
    event_raw:delivered_at::TIMESTAMP     AS delivered_at,
    event_raw:destination:city::VARCHAR   AS city,       -- nested object
    event_raw:destination:country::VARCHAR AS country
FROM raw.shipment_events;

Step 4 — FLATTEN to Explode JSON Arrays into Rows

-- FLATTEN turns the items array into one row per item (lateral join)
SELECT
    e.event_raw:shipment_id::VARCHAR   AS shipment_id,
    e.event_raw:carrier::VARCHAR       AS carrier,
    f.value:sku::VARCHAR               AS sku,
    f.value:qty::INT                   AS quantity,
    f.value:weight_kg::FLOAT           AS weight_kg,
    f.index                            AS item_position   -- 0-based array index
FROM raw.shipment_events e,
LATERAL FLATTEN(input => e.event_raw:items) f;

Step 5 — Promote to Staging with dbt (typed, named columns)

-- models/staging/stg_shipment_items.sql
-- Flatten and cast the raw VARIANT into a typed relational model
SELECT
    event_raw:shipment_id::VARCHAR     AS shipment_id,
    event_raw:carrier::VARCHAR         AS carrier,
    UPPER(event_raw:status::VARCHAR)   AS status,
    event_raw:delivered_at::TIMESTAMP  AS delivered_at,
    event_raw:destination:country::VARCHAR AS destination_country,
    f.value:sku::VARCHAR               AS sku,
    f.value:qty::INT                   AS quantity,
    f.value:weight_kg::FLOAT           AS weight_kg,
    _loaded_at                         AS ingested_at
FROM {{ source('raw', 'shipment_events') }},
LATERAL FLATTEN(input => event_raw:items) f

Key Points

  • VARIANT is schema-flexible — new fields added by the source don't break the landing table. Extract only what you need in the staging layer
  • : path notation navigates nested keys. :: casts to SQL types. Use TRY_CAST for fields that might be null or malformed
  • LATERAL FLATTEN is a lateral join — it produces multiple rows per parent row, one per array element
  • STRIP_OUTER_ARRAY = TRUE handles the common pattern where a file contains a single JSON array of objects
  • Always store _file_name and _loaded_at for lineage and debugging
↑ Back to top
08
Question 8
How to Ingest 50GB File into Snowflake

Strategy for large file ingestion:

1. Use External Stage (Mandatory for 50GB)

  • Deploy file to AWS S3, Azure Blob, or GCS before loading
  • Much faster and more reliable than internal stage (PUT command)
  • Enables parallel processing by Snowflake

2. Use Large Warehouse

-- Use at least X-Large or 2X-Large for parallel processing
ALTER WAREHOUSE etl_wh SET WAREHOUSE_SIZE = '2X-LARGE';

3. Execute COPY INTO

COPY INTO target_table
FROM @external_stage/50gb_file.parquet
FILE_FORMAT = (TYPE = PARQUET)
ON_ERROR = 'CONTINUE'
PURGE = TRUE;

Key Considerations:

  • Use columnar formats: Parquet or ORC instead of CSV
  • Parallel warehouse: Larger warehouse processes in parallel
  • Network bandwidth: Ensure good connectivity to cloud storage
  • COPY INTO is inherently parallel: Snowflake handles parallelization
  • Monitor progress: Check query history and COPY_HISTORY view
↑ Back to top
09
Question 9
Internal vs External Stages

Snowflake supports two types of stages for file management:

Internal Stages (Snowflake-Managed)

  • User Stage: @~/ (default per user)
  • Table Stage: @%table_name (auto-created for each table)
  • Named Stage: CREATE STAGE my_stage
  • Use: Smaller files, ad-hoc loads, testing
  • Storage: Snowflake-managed cloud storage (included in account)

External Stages (User-Managed Cloud Storage)

  • Points to AWS S3, Azure Blob Storage, or Google Cloud Storage
  • Use: Large-scale production loads, data lakes, Snowpipe
  • Cost: Cloud provider charges for storage and egress
  • Requires storage integration and IAM permissions
Feature Internal External
Storage Snowflake-managed AWS/Azure/GCP
File Access PUT / GET commands COPY INTO
Cost Included Cloud provider pays
Scale Small files Large scale/GB-TB
Use Case Testing, ad-hoc Production, Snowpipe
↑ Back to top
10
Question 10
Scheduling Data Pipelines in Snowflake

Real-World Scenario: You have a daily pipeline: ingest from S3 at 6 AM → run dbt transforms → run data quality tests → refresh Tableau extracts. If ingestion fails at 6 AM, dbt should not run. You need dependency-aware scheduling with failure alerting — not just a cron job.

Option 1: Snowflake Native Tasks (best for Snowflake-only pipelines)

-- Root task: fires daily at 6 AM UTC, only if stream has new data
CREATE OR REPLACE TASK pipeline.ingest_root
    WAREHOUSE = INGEST_WH
    SCHEDULE  = 'USING CRON 0 6 * * * UTC'
    WHEN SYSTEM$STREAM_HAS_DATA('raw.orders_stream')
AS
COPY INTO raw.orders FROM @raw.s3_stage FILE_FORMAT = parquet_fmt;

-- Child task: runs after ingestion completes
CREATE OR REPLACE TASK pipeline.dbt_transform
    WAREHOUSE = TRANSFORM_WH
    AFTER pipeline.ingest_root
AS
CALL pipeline.sp_run_dbt_hourly();   -- calls a stored procedure or shell

-- Grandchild: quality check after transforms
CREATE OR REPLACE TASK pipeline.quality_check
    WAREHOUSE = TRANSFORM_WH
    AFTER pipeline.dbt_transform
AS
CALL pipeline.sp_run_dbt_tests();

-- Activate the whole DAG (children first, then root)
ALTER TASK pipeline.quality_check  RESUME;
ALTER TASK pipeline.dbt_transform  RESUME;
ALTER TASK pipeline.ingest_root    RESUME;

-- Monitor task run history
SELECT name, state, error_message, scheduled_time, completed_time
FROM TABLE(information_schema.task_history(
    scheduled_time_range_start => DATEADD('day', -1, CURRENT_TIMESTAMP)
))
ORDER BY scheduled_time DESC;

Option 2: Apache Airflow (best for multi-system, complex DAGs)

# dags/daily_pipeline.py  — Airflow DAG with Snowflake + dbt
from airflow import DAG
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner':            'data-engineering',
    'retries':          2,
    'retry_delay':      timedelta(minutes=5),
    'email_on_failure': True,
    'email':            ['vsurya@duck.com'],
}

with DAG(
    dag_id          = 'daily_snowflake_pipeline',
    schedule_interval = '0 6 * * *',    # 6 AM UTC daily
    start_date      = datetime(2024, 1, 1),
    catchup         = False,
    default_args    = default_args,
    tags            = ['snowflake', 'dbt', 'production'],
) as dag:

    ingest = SnowflakeOperator(
        task_id     = 'copy_from_s3',
        snowflake_conn_id = 'snowflake_prod',
        sql         = "COPY INTO raw.orders FROM @raw.s3_stage;",
        warehouse   = 'INGEST_WH',
    )

    transform = DbtCloudRunJobOperator(
        task_id         = 'dbt_run_daily_models',
        dbt_cloud_conn_id = 'dbt_cloud',
        job_id          = 12345,       # dbt Cloud job ID
        check_interval  = 30,
        timeout         = 3600,
        additional_run_config = {
            'steps_override': ['dbt run --select tag:daily']
        },
    )

    test = DbtCloudRunJobOperator(
        task_id         = 'dbt_test',
        dbt_cloud_conn_id = 'dbt_cloud',
        job_id          = 12346,
        additional_run_config = {
            'steps_override': ['dbt test --select tag:daily --store-failures']
        },
    )

    ingest >> transform >> test   # dependency chain: fail early, stop downstream

Comparison: When to Use Each

FactorSnowflake TasksApache Airflow
Setup complexityZero (native SQL)Requires infrastructure
Cross-system orchestrationSnowflake onlyAny system (S3, Kafka, APIs, dbt, Spark)
DAG complexityLinear / tree DAGsComplex fan-out, dynamic DAGs, sensors
Monitoring UITask History in SnowflakeFull Airflow UI with logs, retries, SLAs
CostWarehouse credits + serverlessCompute for Airflow workers
Best forMicro-batch Snowflake pipelinesEnterprise multi-system orchestration
↑ Back to top
11
Question 11
Stored Procedures Fundamentals

Real-World Scenario: Your CDC pipeline receives deletes from Qlik Replicate as soft-deletes (a flag). Every night you run a stored procedure that physically removes expired records, rebuilds affected aggregates, and logs the cleanup to an audit table — all as a single transactional unit.

JavaScript Stored Procedure (classic Snowflake approach)

CREATE OR REPLACE PROCEDURE raw.sp_purge_expired_records(retention_days FLOAT)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS
$$
try {
    // Step 1: Archive records before deletion
    var archive_sql = `
        INSERT INTO audit.deleted_records_log
        SELECT *, CURRENT_TIMESTAMP AS deleted_at
        FROM staging.orders
        WHERE is_deleted = TRUE
          AND updated_at < DATEADD('day', -${RETENTION_DAYS}, CURRENT_DATE)`;
    snowflake.execute({ sqlText: archive_sql });

    // Step 2: Delete from main table
    var delete_sql = `
        DELETE FROM staging.orders
        WHERE is_deleted = TRUE
          AND updated_at < DATEADD('day', -${RETENTION_DAYS}, CURRENT_DATE)`;
    var result = snowflake.execute({ sqlText: delete_sql });

    // Step 3: Log the run
    var rows_deleted = snowflake.execute({ sqlText: "SELECT ROW_COUNT()" })
                                .next() ? 0 : 0;
    return `SUCCESS: purged expired records older than ${RETENTION_DAYS} days`;

} catch(err) {
    return `ERROR: ${err.message}`;
}
$$;

-- Call it
CALL raw.sp_purge_expired_records(90);

Python Stored Procedure (modern approach — richer libraries)

CREATE OR REPLACE PROCEDURE analytics.sp_rebuild_daily_aggregates(run_date DATE)
RETURNS TABLE (summary_date DATE, rows_rebuilt INT, status VARCHAR)
LANGUAGE PYTHON
RUNTIME_VERSION = '3.11'
PACKAGES = ('snowflake-snowpark-python', 'pandas')
HANDLER = 'run'
EXECUTE AS CALLER
AS
$$
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, current_date, dateadd

def run(session: Session, run_date):
    # Use Snowpark DataFrame API — no SQL strings, type-safe
    orders = session.table('staging.orders')

    # Filter to the date being rebuilt
    daily = (orders
        .filter(col('order_date') == run_date)
        .filter(col('is_deleted') == False)
        .group_by('order_date', 'region', 'product_category')
        .agg({'net_revenue': 'sum', 'order_id': 'count'})
        .rename({'sum(net_revenue)': 'total_revenue',
                 'count(order_id)': 'order_count'}))

    # Delete old aggregate for this date (idempotent)
    session.sql(f"DELETE FROM marts.fact_daily_sales WHERE sale_date = '{run_date}'").collect()

    # Write new aggregate
    daily.write.mode('append').save_as_table('marts.fact_daily_sales')

    row_count = daily.count()
    result_df = pd.DataFrame([{'summary_date': run_date,
                                'rows_rebuilt': row_count,
                                'status': 'SUCCESS'}])
    return session.create_dataframe(result_df)
$$;

CALL analytics.sp_rebuild_daily_aggregates('2024-01-15');

Key Design Principles

  • EXECUTE AS CALLER vs OWNER — CALLER uses the calling user's privileges (safer for DML); OWNER uses the SP owner's privileges (useful for granting limited elevated access)
  • Idempotency — always design SPs to be safely re-runnable. Delete before insert, not update-or-insert
  • Error handling — wrap in try/catch and return structured error messages; log failures to an audit table
  • Use Snowpark Python for complex logic — it compiles DataFrame operations to Snowflake-native SQL, running inside the warehouse rather than pulling data out
  • Call from Tasks — SPs pair naturally with Snowflake Tasks for event-driven or scheduled execution
↑ Back to top
12
Question 12
User Defined Functions (UDFs)

What is a UDF?

A custom-built function that encapsulates logic reusable in SQL queries, similar to built-in functions.

Types of UDFs:

  • Scalar UDF: Returns single value per input row
  • Table UDF (UDTF): Returns multiple rows per input row

Scalar UDF Example:

CREATE FUNCTION calculate_net_sales(gross DECIMAL, returns DECIMAL)
RETURNS DECIMAL(10,2)
AS
$$
    gross - COALESCE(returns, 0)
$$;

-- Use in query
SELECT order_id, 
       calculate_net_sales(gross_amount, return_amount) AS net_sales
FROM orders;

Languages Supported:

  • SQL (simple transformations)
  • JavaScript (logic-heavy operations)
  • Python (via Snowpark)
  • Java (enterprise use)

When to Use:

  • Repetitive calculations across multiple queries
  • Complex business logic
  • Masking sensitive data
↑ Back to top
13
Question 13
Git Fundamentals for Data Teams

Real-World Scenario: Three data engineers work on the same dbt project. Without Git, two engineers overwrite each other's models on the same day. With Git, every change lives on its own branch, CI runs tests before merge, and the main branch always reflects production-verified code.

Core Git Workflow for Data Teams

# Daily workflow for a data engineer
git checkout main && git pull origin main    # always start fresh from main

git checkout -b feat/add-revenue-mart        # new branch per feature
# ... edit models, macros, tests ...
git add models/marts/fact_revenue.sql
git commit -m "feat: add fact_revenue model with daily grain"
git push origin feat/add-revenue-mart        # push to remote
# Open Pull Request → CI runs dbt Slim CI → reviewer approves → merge

Branch Strategy for dbt Projects

# Branch types and their purpose:
# main      → production. Auto-deployed to Snowflake ANALYTICS schema on merge
# develop   → integration. Multiple features merged here for joint testing
# feat/*    → feature branches. One per Jira ticket / logical change
# fix/*     → hotfix branches. Cut from main, merged back to main + develop
# release/* → release candidates (optional for teams with staged rollouts)

# Environment mapping:
# main     → ANALYTICS       schema (production Snowflake data)
# develop  → ANALYTICS_DEV   schema (team integration testing)
# feat/*   → ANALYTICS_CI_NNN schema (ephemeral, created per PR, auto-dropped)

Essential Git Commands for Data Engineers

# Undo a staged change before commit (safe)
git restore --staged models/staging/stg_orders.sql

# Amend last commit message (before pushing)
git commit --amend -m "fix: correct revenue calculation in fact_orders"

# Interactive rebase — clean up messy commits before PR
git rebase -i HEAD~3    # squash last 3 commits into one clean commit

# Cherry-pick a hotfix from another branch
git cherry-pick abc1234  # apply specific commit hash to current branch

# See what changed in a file across the last 5 commits
git log --follow -p -- models/marts/fact_revenue.sql

# Find when a bug was introduced (binary search)
git bisect start
git bisect bad HEAD            # current commit is broken
git bisect good v1.2.0         # last known good tag
# Git checks out midpoint; you test + mark good/bad until it finds the commit

Git + dbt: Slim CI in Practice

# .github/workflows/dbt_ci.yml (runs on every PR)
# Only builds and tests models changed in this PR
# --defer means unchanged upstream models resolve from production

name: dbt Slim CI
on: [pull_request]
jobs:
  slim_ci:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - run: pip install dbt-snowflake
      - run: aws s3 cp s3://dbt-state/manifest.json ./prod-artifacts/
      - run: |
          dbt build             --select state:modified+             --defer             --state ./prod-artifacts             --target ci

Git Best Practices for Data Teams

  • One model per commit — small, focused PRs are easier to review and roll back
  • Never commit secrets — use .env files and add to .gitignore. Rotate any key accidentally committed immediately
  • Tag releasesgit tag v2.1.0 marks the manifest used for production deploys, enabling exact rollback
  • Protect main branch — require PR reviews + passing CI before merge. Prevents direct pushes to production
  • Conventional commits — prefix messages with feat:, fix:, refactor:, docs: for automated changelogs
↑ Back to top
14
Question 14
QUALIFY Clause in Snowflake

What is QUALIFY?

Unique Snowflake clause that filters window function results without subqueries or CTEs. Syntax sugar for cleaner SQL.

Problem QUALIFY Solves:

-- Without QUALIFY (needs CTE)
WITH ranked_employees AS (
    SELECT employee_id, salary, department,
           RANK() OVER (PARTITION BY department ORDER BY salary DESC) as rnk
    FROM employees
)
SELECT * FROM ranked_employees WHERE rnk <= 3;

-- With QUALIFY (simpler)
SELECT employee_id, salary, department
FROM employees
QUALIFY RANK() OVER (PARTITION BY department ORDER BY salary DESC) <= 3;

Real Examples:

-- Get top 3 earners per department
SELECT employee_id, salary, department
FROM employees
QUALIFY RANK() OVER (PARTITION BY department ORDER BY salary DESC) <= 3;

-- Get only increased salaries (previous < current)
SELECT employee_id, salary, prev_salary
FROM salary_history
QUALIFY salary > LAG(salary) OVER (PARTITION BY employee_id ORDER BY year);

Benefits:

  • Cleaner, more readable SQL
  • No CTE nesting required
  • Direct filtering on window functions
  • Performance improvements in some cases
↑ Back to top
15
Question 15
Key Benefits of Snowflake

Why Snowflake is the Modern Data Warehouse Choice:

1. Separation of Compute & Storage (Most Important)

Scale independently. Run light queries on XS warehouse ($1-2/hour), heavy queries on 3XL ($16-32/hour). Pay only when warehouse runs. No unused capacity costs.

2. Elasticity & Concurrency

Instantly scale warehouses (XS to 3XL in seconds). Multiple teams work simultaneously: BI team on WH1, ETL team on WH2, Data Science on WH3 - zero contention.

3. Zero Management

Snowflake handles: patching, updates, hardware provisioning, backups, compression, encryption. No DBAs needed for infrastructure.

4. Semi-Structured Data (VARIANT Type)

Native JSON/Avro/Parquet support. Query nested data with dot notation without flattening.

5. Time Travel & Fail-safe

-- Query data 1-90 days ago
SELECT * FROM my_table AT (OFFSET => -86400);

-- Accidentally dropped table?
UNDROP TABLE my_table;

6. Data Sharing (Game-Changer)

Live data sharing with other Snowflake accounts. No copying, instant access, secure.

7. Performance (Auto-Optimized)

  • Automatic micro-partitioning with intelligent pruning
  • Multi-layer caching (result cache + warehouse cache)
  • Optional clustering keys for fine-tuning

8. Enterprise Security

  • AES-256 encryption at rest and in transit
  • Row-level and column-level masking
  • SOC2, HIPAA, GDPR, PCI-DSS certified

Cost Comparison Example:

Traditional DW: Pay $100K/year for hardware (month 1-12). Snowflake: Pay $20K for actual compute, scale on demand.

↑ Back to top
16
Question 16
CDC (Change Data Capture): Concept & Mechanisms

What is CDC?

A technique (not a tool) that identifies and captures INSERT/UPDATE/DELETE changes from source systems and delivers to target efficiently, instead of reprocessing entire datasets.

Why CDC Matters:

WITHOUT CDC:
- Load entire 100GB table daily (2 hours)
- Heavy impact on source system
- Only 1GB changed, but process all 100GB

WITH CDC:
- Load only 1GB changed (5 minutes)
- Minimal source system impact
- Near real-time data availability

Common CDC Mechanisms:

1. Timestamp-Based (Polling)

SELECT * FROM source_table 
WHERE last_updated_timestamp > previous_check_time;
  • Simple but unreliable (miss deletes, timestamp skew)

2. Log-Based (Most Robust)

Read database transaction logs (Oracle redo logs, MySQL binlog, SQL Server transaction log).

  • Captures ALL changes in real-time
  • No impact on source DB
  • Handles deletes properly

3. Trigger-Based

Database triggers fire on DML, write to change table.

  • Real-time but adds overhead to source

Popular CDC Tools: Fivetran, Qlik Replicate, Debezium, AWS DMS, Oracle GoldenGate

Use Cases: Real-time dashboards, microservices sync, database migration with zero downtime

↑ Back to top
17
Question 17
Python: Lists vs Tuples

List [] - Mutable (Changeable)

my_list = [10, "apple", 3.14, [1, 2]]  # Can contain mixed types

# Modify
my_list[0] = 20
my_list.append("banana")
my_list[1:3] = ["orange", "grape"]

# Operations
for item in my_list:
    print(item)
my_list.sort()
my_list.reverse()
my_list.pop()  # Remove and return last

Tuple () - Immutable (Fixed)

my_tuple = (10, "apple", 3.14, (1, 2))  # Can contain mixed types

# Cannot modify
# my_tuple[0] = 20  # TypeError!

# Single element tuple REQUIRES comma
single = (5,)  # Correct tuple
wrong = (5)    # This is just int!

# Unpacking
x, y, z = (1, 2, 3)

# Can use as dict key (lists cannot)
cache = {(0, 0): "origin", (1, 1): "diagonal"}
Feature List Tuple
Mutability Changeable Fixed
Performance Slower Faster
Memory More overhead Less overhead
As Dict Key No (unhashable) Yes (if elements hashable)
Best Use Dynamic data Immutable, fixed data
↑ Back to top
18
Question 18
File Formats: AVRO, Parquet, ORC

Three major columnar/serialization formats and when to use each:

AVRO - Row-Oriented with Self-Describing Schema

  • Best for: Real-time streaming with Kafka, CDC platforms
  • Schema Evolution: Excellent (reader/writer schemas)
  • Compression: Moderate
  • Limitation: Row-based, slow for OLAP queries
  • Tools: Kafka, Debezium, Confluent

Parquet - Columnar, Analytics-Optimized

  • Best for: Data lakes (S3, Azure), analytics queries
  • Compression: Excellent (10-100x on repetitive columns)
  • Query Speed: 10-100x faster for column subset queries
  • Use Case: AWS S3, Snowflake, BigQuery, Spark native
  • Popularity: Most used in modern data stacks

ORC - Optimized Row Columnar (Hadoop-Specific)

  • Best for: Hive, Hadoop ecosystems
  • Compression: Best (smaller than Parquet)
  • Predicate Pushdown: Filtering during read, not after
  • Limitation: Less ecosystem support outside Hadoop
Feature AVRO Parquet ORC
Type Row-oriented Columnar Columnar
Compression Moderate Very Good Best
Schema Evolution Excellent Good Good
Best For Streaming Data lakes Hive/Hadoop

Recommendation: Use Parquet for data lakes and Snowflake. Use AVRO for streaming. ORC only if using Hadoop.

↑ Back to top
19
Question 19
CTE vs Temporary Tables

CTE (WITH Clause) - Query Scope

WITH top_customers AS (
    SELECT customer_id, SUM(amount) as total
    FROM orders
    GROUP BY customer_id
)
SELECT * FROM top_customers WHERE total > 1000;

Characteristics:

  • Scope: Single query only
  • Storage: Logical, not stored
  • Reusability: Within same query
  • Cost: None
  • Materialization: Optimizer decides (often inlined)

Temporary Table - Session Scope

CREATE TEMPORARY TABLE top_customers AS
SELECT customer_id, SUM(amount) as total
FROM orders
GROUP BY customer_id;

-- Use in Query 1
SELECT * FROM top_customers WHERE total > 1000;

-- Use in Query 2 (same session)
INSERT INTO summary SELECT COUNT(*) FROM top_customers;

Characteristics:

  • Scope: Entire session
  • Storage: Physical table in database
  • Reusability: Across multiple queries
  • Cost: Counts toward storage
  • Materialization: Always materialized (fast lookups)
  • Cleanup: Auto-drops at session end
Aspect CTE Temp Table
Scope Single query Entire session
Persistence Logical Physical
When to Use Breaking down complex queries Expensive calc reused many times
↑ Back to top
20
Question 20
Snowflake Table Types

Permanent Table (Default)

  • Persistence: Until explicitly dropped
  • Time Travel: 1-90 days
  • Fail-safe: 7 days recovery
  • Cost: Highest
  • Use: Core analytics tables, historical data

Transient Table (For Staging)

  • Time Travel: 0 days (configurable up to 1 day)
  • Fail-safe: None
  • Cost: ~50% lower than permanent
  • Use: Daily staging tables, intermediate ETL results

Temporary Table (Session-Scoped)

  • Persistence: Auto-drops at session end
  • Time Travel: Session-bound
  • Fail-safe: None
  • Cost: Lowest
  • Use: Ad-hoc analysis, session-specific work
Type Time Travel Fail-safe Cost Use Case
Permanent 1-90 days 7 days High Analytics
Transient 0-1 day None 50% lower Staging
Temporary Session None Lowest Session work
↑ Back to top
21
Question 21
Handling Schema Changes in Downstream Systems

Seven strategies to handle source system schema changes gracefully:

1. Communication & Governance

  • Establish change request process with cross-functional reviews
  • Notify downstream teams before changes
  • Document all schema changes in data catalog

2. Use VARIANT Columns

-- Semi-structured data resilient to schema changes
CREATE TABLE events (
    event_id INT,
    event_metadata VARIANT  -- Flexible JSON, can handle additions
);

3. Additive Schema Changes Only

Add new columns (safe) instead of modifying/deleting (risky).

4. View Layer for Compatibility

-- Source table changes: email → primary_email
-- Create view maintaining backward compatibility
CREATE VIEW v_customers AS 
SELECT 
    customer_id, 
    primary_email AS email  -- Alias old name
FROM raw_customers;

5. Version Tables

-- Keep multiple versions
raw_customers_v1  (old schema)
raw_customers_v2  (new schema, new columns)

-- Consumers migrate gradually to v2

6. Data Contracts

  • Define expected schema formally using Avro or Protobuf
  • Enforce via schema registry

7. Graceful Degradation

-- Handle missing columns safely
SELECT 
    id,
    COALESCE(new_column, default_value) AS new_column
FROM raw_data;
↑ Back to top
22
Question 22
Ephemeral vs Permanent Tables in DBT

Note: Ephemeral is a DBT concept, not a Snowflake feature

Ephemeral Models

{{ config(materialized='ephemeral') }}

SELECT 
    customer_id,
    SUM(amount) as total_spent
FROM {{ ref('stg_orders') }}
GROUP BY customer_id

Characteristics:

  • No physical object created (not a table or view)
  • Compiled into downstream models as CTEs
  • Not directly queryable
  • Zero storage cost
  • Perfect for reusable intermediate logic

When to Use Ephemeral:

  • Simple cleaning (one-to-one mapping)
  • Intermediate calculations used in multiple models
  • When you don't need to query directly

Permanent Tables

  • Full storage with time travel/fail-safe
  • Queryable directly
  • Best for final marts and dimensions
↑ Back to top
23
Question 23
Implementing CDC in Snowflake (Without Tools)

Two native approaches to implement Change Data Capture in Snowflake:

Method 1: Timestamp-Based CDC

CREATE TABLE staging (
    id INT, 
    data VARCHAR, 
    last_modified TIMESTAMP
);

MERGE INTO target_table AS t
USING (
    SELECT * FROM staging 
    WHERE last_modified > (SELECT COALESCE(MAX(last_modified), '1900-01-01') FROM target_table)
) AS s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.data = s.data
WHEN NOT MATCHED THEN INSERT VALUES (s.id, s.data);

Issues: Misses deletes, unreliable if timestamps aren't consistent

Method 2: Snowflake Streams (Recommended)

-- Create stream on source table
CREATE STREAM customer_stream ON TABLE raw_customers;

-- Query stream to see changes
SELECT * FROM customer_stream;
-- Columns:
-- - METADATA$ACTION: 'INSERT' or 'DELETE'
-- - METADATA$ISUPDATE: TRUE if part of UPDATE
-- - METADATA$ROW_ID: Unique identifier

-- Consume changes atomically
MERGE INTO dim_customers AS t
USING customer_stream AS s
ON t.customer_id = s.customer_id
WHEN MATCHED AND s.METADATA$ACTION = 'DELETE' THEN DELETE
WHEN MATCHED AND s.METADATA$ISUPDATE = TRUE THEN UPDATE ...
WHEN NOT MATCHED AND s.METADATA$ACTION = 'INSERT' THEN INSERT ...;

Benefits of Streams:

  • Exactly-once processing (no duplicates)
  • Captures all DML operations
  • Low performance impact
  • Automatic cleanup of consumed records
↑ Back to top
24
Question 24
DBT (Data Build Tool) - Complete Overview

What is DBT?

An open-source framework for transforming data in your warehouse using SQL and software engineering best practices. It enables analysts to write, test, and document SQL transformations with version control.

Problems DBT Solves:

  • Code Proliferation: Centralizes transformation logic (instead of scattered SQL scripts, BI tool calculations)
  • Lack of Testing: Provides built-in data quality testing framework
  • Poor Documentation: Auto-generates documentation from YAML definitions
  • Dependency Hell: Automatically determines correct execution order via DAG
  • Version Control: Integrates with Git for audit trails and collaboration

Core Workflow:

dbt run      → Execute models (create tables/views)
dbt test     → Run data quality tests
dbt docs     → Generate auto documentation
dbt debug    → Diagnose issues

Typical Project Structure:

my_dbt_project/
├── models/
│   ├── staging/        # Light transformations (one-to-one mappings)
│   ├── intermediate/   # Complex business logic
│   └── marts/          # Final user-facing tables
├── tests/              # Data quality tests
├── seeds/              # Static reference data (CSVs)
├── snapshots/          # SCD Type 2 tracking
├── macros/             # Reusable SQL functions
└── dbt_project.yml     # Configuration

Key Concepts:

  • Models: SQL SELECT statements → views or tables
  • ref(): References other models safely (no hardcoded schema names)
  • Jinja: Templating for dynamic SQL
  • Materializations: View (logical), Table (physical), Incremental (append-only), Ephemeral (CTE)
  • Tests: not_null, unique, relationships, accepted_values, or custom
↑ Back to top
25
Question 25
Why Use Continuous Data Loading?

Continuous loading: Data flows as soon as available (not batch windows)

Why It's Critical:

  • Near Real-time: Data available within minutes, not 12-24 hour batches
  • Reduced Latency: Dashboards and decisions based on fresh data
  • Better Resource Utilization: Spread compute throughout day instead of one big batch
  • Data Quality Feedback: Issues detected quickly, not days later
  • Eliminates Batch Window Constraints: No more "end of day" cutoffs

Real Use Cases:

  • E-commerce: Real-time inventory → prevents overselling
  • Fraud Detection: Transaction analysis in seconds, not after fact
  • IoT/Sensors: Predictive maintenance on live machine data
  • Clickstream: Personalize user experience instantly
  • Financial: Risk analysis on latest market data
  • Supply Chain: Real-time tracking and optimization

Continuous vs Batch Trade-offs:

Aspect Batch (Daily) Continuous
Latency 12-24 hours Minutes
Implementation Simple Complex
Cost Lower (one big job) Higher (constant load)
Use Case Historical analysis Operational decisions
↑ Back to top
26
Question 26
End-to-End Data Pipeline: Design & Tools

Real-World Scenario: A B2B SaaS company needs a pipeline that ingests from 4 sources (Oracle ERP, Salesforce, Stripe, S3 event files), transforms into clean analytics models, and serves 3 BI tools with different freshness SLAs — finance needs T+1, operations needs near-real-time, executives need daily snapshots.

Full Architecture

SOURCES
  Oracle ERP (orders, inventory)    ──┐
  Salesforce CRM (accounts, opps)   ──┤  Qlik Replicate (CDC, log-based)
  PostgreSQL App DB (events, users) ──┘
  Stripe API (payments, invoices)   ──── Fivetran (API connector)
  S3 file drops (logistics partner) ──── Snowpipe AUTO_INGEST
                                          │
                              ┌───────────▼───────────┐
                              │   Snowflake RAW        │  Schema per source
                              │   (VARIANT + typed)    │  _loaded_at, _file_name
                              └───────────┬───────────┘
                                          │  Streams + Tasks (micro-batch, 5 min)
                              ┌───────────▼───────────┐
                              │  Snowflake STAGING     │  dbt stg_* views
                              │                        │  Rename, cast, dedupe
                              └───────────┬───────────┘
                                          │  dbt hourly incremental models
                              ┌───────────▼───────────┐
                              │  INTERMEDIATE          │  dbt int_* (ephemeral/view)
                              │                        │  Joins, enrichment
                              └───────────┬───────────┘
                                          │  dbt daily table / incremental
                              ┌───────────▼───────────┐
                              │  MARTS                 │  fact_*, dim_* tables
                              │  Finance / Ops / Exec  │  Clustered, governed
                              └───────────┬───────────┘
                                          │
               ┌──────────────────────────┼────────────────────────┐
           Tableau               Power BI (Executive)         Looker
        (Ops - near-RT)           (Finance - T+1)          (Self-service)

Ingestion Layer Configuration

-- Snowflake Streams trigger micro-batch merges every 5 minutes
-- Only fires when stream has unconsumed data (zero idle cost)
CREATE TASK pipeline.merge_orders
    WAREHOUSE = INGEST_WH
    SCHEDULE  = '5 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('raw.oracle_orders_stream')
AS CALL pipeline.sp_merge_orders();

-- Snowpipe for S3 file drops (sub-minute latency)
CREATE PIPE raw.logistics_pipe AUTO_INGEST = TRUE AS
    COPY INTO raw.logistics_events FROM @raw.logistics_stage;

Transformation Layer — dbt Model Tags & Scheduling

-- dbt_project.yml — tag models by run frequency
models:
  my_project:
    staging:
      +tags: ['hourly']          # rebuild every hour (views, cheap)
      +materialized: view
    intermediate:
      +tags: ['hourly']
      +materialized: ephemeral
    marts:
      finance:
        +tags: ['daily']         # heavy models run nightly
        +materialized: table
      operations:
        +tags: ['hourly']        # ops needs freshness
        +materialized: incremental

-- dbt Cloud: separate jobs per frequency
-- Job "Hourly": dbt run --select tag:hourly  (runs :00 every hour)
-- Job "Daily":  dbt run --select tag:daily   (runs 02:00 UTC nightly)

SLA Matrix by Consumer

ConsumerSourceFreshness SLAStrategy
Operations dashboardOrders, Inventory< 15 minStreams + Tasks + hourly dbt
Finance reportsStripe, OracleT+1 by 07:00Daily dbt + Fivetran nightly sync
Executive KPIAll sourcesDaily snapshotdbt snapshot + daily table rebuild
Self-service analystsMartsDailyRead-only ANALYST_ROLE on marts
↑ Back to top
27
Question 27
Why CDC Over Full Loads - Real Use Case

Real-World Example: E-commerce Inventory System

Problem Without CDC (Nightly Full Load):

  • Inventory is 12-24 hours old in dashboards
  • Customers order items marked as in-stock (actually sold out)
  • Returns processed, but system doesn't know until next load
  • Warehouse team can't make real-time restocking decisions
  • Server load spikes during night batch

Solution With CDC (Streaming Changes):

Inventory DB → CDC → Kafka → Snowflake (streaming)
               ↓
        Real-time dashboard
        (Website always knows actual stock)
        ↓
        Automated reorder alerts
        ↓
        Efficient warehouse operations

Results:

  • Stock levels updated within seconds
  • Customer experience: "Out of Stock" shown immediately
  • Warehouse: Optimal restocking based on live data
  • Server load: Distributed throughout day (no midnight spikes)
  • ROI: Prevented overselling, reduced returns

Business Impact: 15-20% reduction in overselling, better inventory turnover, happier customers

↑ Back to top
28
Question 28
Snowflake Tasks: Troubleshooting & Optimization

Snowflake Tasks provide native orchestration with a DAG model.

Troubleshooting Failed Task Queries:

1. Find the Problem

SELECT 
    task_id, task_name, scheduled_time, query_id, 
    state, error_message, query_text
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY())
WHERE state = 'FAILED'
ORDER BY scheduled_time DESC;

2. Analyze Query Execution

  • Click query_id to see execution plan
  • Check for spilling, join issues, scan efficiency

Common Issues & Solutions:

  • Warehouse Suspended: Task runs but waits for warehouse to resume (costs time!)
  • Insufficient Memory: Increase warehouse size
  • Table Locks: Previous task still running, new task blocked
  • Missing Data: Upstream task failed silently

Performance Optimization:

  • Right-size warehouse (xsmall for simple loads, large for complex transforms)
  • Stagger task timing to avoid concurrent runs on same warehouse
  • Monitor TASK_HISTORY for patterns
  • Set SUSPEND_TASK_AFTER_NUM_FAILURES to prevent cascading failures
↑ Back to top
29
Question 29
Zero Copy Clone: Instant Data Copies

Real-World Scenario: Your production Snowflake environment has 2TB of data. A developer needs to test a dbt refactor that touches 40 models. Provisioning a traditional dev environment would cost thousands in storage and take hours of data copying. With Zero Copy Clone, you get a full production-sized dev environment in seconds — at near-zero storage cost.

How Zero Copy Clone Works

-- Clone points to the SAME micro-partitions as source
-- Only NEW writes to the clone create new partitions (copy-on-write)
-- Storage cost = only the delta between clone and source

CREATE DATABASE analytics_dev    CLONE analytics_prod;   -- seconds, not hours
CREATE SCHEMA  staging_dev       CLONE analytics_prod.staging;
CREATE TABLE   orders_test       CLONE analytics_prod.marts.fact_orders;

-- Snapshot at a specific point in time (Time Travel clone)
CREATE TABLE orders_before_migration
    CLONE analytics_prod.marts.fact_orders
    BEFORE (TIMESTAMP => '2024-01-15 08:00:00'::TIMESTAMP);

-- Clone a schema at end of day for audit/compliance
CREATE SCHEMA audit.daily_snapshot_20240115
    CLONE analytics_prod.marts;

Dev/Test Workflow with Clones

-- Create a full dev environment per developer (or per PR)
-- Script run in CI on PR open:
CREATE DATABASE analytics_pr_142
    CLONE analytics_prod;

-- Developer works against analytics_pr_142 — full prod data, zero copy cost
-- dbt profiles.yml uses env var to point to the cloned database:
-- schema: "analytics_pr_{{ env_var('PR_NUMBER', 'dev') }}"

-- After PR merges / closes, drop the clone (CI cleanup step)
DROP DATABASE analytics_pr_142;

Data Migration Safety Net

-- Before running a risky migration, clone the target
-- If anything goes wrong, swap back instantly
CREATE TABLE marts.fact_orders_backup
    CLONE marts.fact_orders;

-- Run the migration
ALTER TABLE marts.fact_orders
    ADD COLUMN lifetime_value DECIMAL(12,2);

UPDATE marts.fact_orders fo
SET lifetime_value = (
    SELECT SUM(net_revenue)
    FROM marts.fact_orders x
    WHERE x.customer_sk = fo.customer_sk
);

-- Validate. If wrong:
DROP TABLE marts.fact_orders;
ALTER TABLE marts.fact_orders_backup RENAME TO marts.fact_orders;
-- Instant rollback — no data movement

Cost Mechanics

ScenarioTraditional CopyZero Copy Clone
2TB production clone~$46/month storage + copy time~$0 storage until writes diverge
10 developer environments10x storage costNear-zero until each dev writes data
CI per PR (200 PRs/month)ImpracticalFeasible — clone + drop per PR
  • Clone inherits Time Travel and Fail-Safe settings from the source object
  • Clones are fully independent — DML on the clone does NOT affect the source
  • You can clone databases, schemas, tables, streams, and stages
  • Clones do NOT inherit future grants — you must re-grant access on the cloned object
↑ Back to top
30
Question 30
Timezone Conversion in Snowflake

Snowflake has three timestamp types. Convert between timezones easily.

Method 1: CONVERT_TIMEZONE (Recommended)

-- Convert UTC to EST
SELECT CONVERT_TIMEZONE('UTC', 'America/New_York', '2025-07-04 10:00:00'::TIMESTAMP_NTZ);
-- Result: 2025-07-04 06:00:00

-- With column
SELECT 
    event_id,
    event_timestamp,
    CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', event_timestamp) AS time_pacific
FROM events;

Method 2: AT TIME ZONE

SELECT '2025-07-04 10:00:00'::TIMESTAMP_NTZ AT TIME ZONE 'America/New_York';

Common Timezones:

  • UTC
  • America/New_York (EST/EDT)
  • America/Los_Angeles (PST/PDT)
  • Europe/London
  • Asia/Kolkata (IST)

Timestamp Types:

  • TIMESTAMP_LTZ: Stores UTC, displays in session timezone
  • TIMESTAMP_NTZ: No timezone, same everywhere
  • TIMESTAMP_TZ: With timezone offset
↑ Back to top
31
Question 31
Clustering Keys for Query Performance

Clustering Keys optimize data layout for faster queries on specific columns.

How They Work:

  • Snowflake's automatic clustering service re-organizes micro-partitions
  • Groups similar values together physically
  • Improves query pruning and reduces data scanned

Syntax:

-- Define during creation
CREATE TABLE orders (
    order_date DATE,
    customer_id INT,
    amount DECIMAL
) CLUSTER BY (order_date, customer_id);

-- Add after creation
ALTER TABLE orders CLUSTER BY (order_date);

-- Remove clustering
ALTER TABLE orders DROP CLUSTERING KEY;

When to Use:

  • Very large tables (100GB+)
  • Frequent filtering on specific columns (e.g., date, region)
  • High cardinality columns with skewed distribution

Cost Consideration:

Automatic clustering consumes credits to reorganize data. Costs vs benefits of reduced query time.

↑ Back to top
32
Question 32
Monitoring Snowflake Performance

Three Layers of Monitoring:

1. Snowflake Web UI

  • Query History: View all queries, duration, credits, status
  • Query Profile: Execution plan, bottleneck operators
  • Warehouses: Monitor status, current/historical usage

2. Account Usage Views (Query Historically)

-- Top 10 most expensive queries
SELECT query_id, user_name, warehouse_name, total_elapsed_time, credits_used
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= CURRENT_DATE - 7
ORDER BY credits_used DESC
LIMIT 10;

-- Daily warehouse costs
SELECT warehouse_name, DATE(METERING_START_TIME) as date,
       SUM(credits_used) as daily_credits
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
GROUP BY warehouse_name, date
ORDER BY daily_credits DESC;

Key Views:

  • QUERY_HISTORY: All query execution details
  • WAREHOUSE_METERING_HISTORY: Daily warehouse usage
  • AUTOMATIC_CLUSTERING_HISTORY: Clustering costs
  • PIPE_USAGE_HISTORY: Snowpipe credit consumption
  • TASK_HISTORY: Task execution details

3. External Tools

  • Datadog, New Relic: APM and monitoring
  • CloudWatch, Azure Monitor: Cloud-native dashboards
  • Custom BI dashboards: Pull from ACCOUNT_USAGE
↑ Back to top
33
Question 33
Ataccama vs Collibra: Data Governance Tools

Both are data governance platforms with different strengths.

Collibra - Business-Centric Governance

  • Strength: Business Glossary, stewardship, workflow automation
  • Focus: Business users, governance programs
  • Best for: Strong business alignment, change management
  • Unique: Powerful workflow engine for governance processes

Ataccama ONE - Data Quality & MDM Focus

  • Strength: Data quality, Master Data Management, ML/AI automation
  • Focus: Data engineers, quality teams
  • Best for: Data quality frameworks, MDM solutions
  • Unique: Strong data quality and deduplication engines
Aspect Collibra Ataccama
Core Focus Governance, Stewardship Quality, MDM
Catalog Type Business Glossary First Technical First
Primary Users Business, Stewards Engineers, QA
Automation Workflow/Process ML/ML Quality Rules

Choice: Use Collibra if governance/stewardship is priority. Use Ataccama if data quality is critical.

↑ Back to top
34
Question 34
Snowpipe: Continuous Data Ingestion

Snowpipe loads data as soon as files appear in cloud storage (AWS S3/Azure/GCS).

Architecture: S3 → SNS → SQS → Snowpipe → COPY INTO

Key Steps:

1. Create Storage Integration

CREATE STORAGE INTEGRATION s3_pipe_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::ACCOUNT:role/SnowflakeRole'
  STORAGE_ALLOWED_LOCATIONS = ('s3://bucket/path/');

2. Create External Stage

CREATE STAGE raw_stage
  STORAGE_INTEGRATION = s3_pipe_int
  URL = 's3://bucket/data/'
  FILE_FORMAT = (TYPE = PARQUET);

3. Create Pipe

CREATE PIPE raw_data_pipe
  AUTO_INGEST = TRUE
  AWS_SQS_QUEUE_ARN = 'arn:aws:sqs:region:account:queue'
  AS
  COPY INTO raw_data FROM @raw_stage;

4. Optional: Post-Load Transformations

CREATE STREAM raw_stream ON TABLE raw_data;

CREATE TASK process_new_data
  WHEN SYSTEM$STREAM_HAS_DATA('raw_stream')
  AS
  MERGE INTO fact_data AS t
  USING raw_stream AS s ON t.id = s.id
  WHEN NOT MATCHED THEN INSERT ...;

Benefits: Automatic, serverless, minimal configuration

↑ Back to top
35
Question 35
Window Functions: Max & Nth Salary Query

Real-World Scenario: Your analytics team needs: (1) rank customers by LTV within each region, (2) calculate running revenue totals per day, (3) find the previous order date for each customer, (4) flag the top 3 products per category for a promotional report. All solvable with window functions — no self-joins needed.

Ranking Functions — RANK vs DENSE_RANK vs ROW_NUMBER

-- Find the 4th highest salary per department (classic interview question)
SELECT employee_id, employee_name, department, salary
FROM employees
QUALIFY DENSE_RANK() OVER (PARTITION BY department ORDER BY salary DESC) = 4;

-- ROW_NUMBER  → always unique (1,2,3,4,5)        — use for deduplication
-- RANK        → gaps on ties  (1,2,2,4,5)        — use for competition ranking
-- DENSE_RANK  → no gaps       (1,2,2,3,4)        — use for nth highest value
-- PERCENT_RANK → 0.0–1.0      percentile position
-- NTILE(4)    → quartile buckets (1,2,3,4)

-- Deduplicate: keep latest record per customer (ROW_NUMBER pattern)
SELECT * FROM (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY customer_id
                              ORDER BY updated_at DESC) AS rn
    FROM staging.customers
)
WHERE rn = 1;   -- or use QUALIFY rn = 1

Aggregate Window Functions — Running Totals & Moving Averages

-- Running revenue total (cumulative sum resets per region per month)
SELECT
    order_date,
    region,
    daily_revenue,
    SUM(daily_revenue) OVER (
        PARTITION BY region, DATE_TRUNC('month', order_date)
        ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) AS running_monthly_revenue,

    -- 7-day moving average
    AVG(daily_revenue) OVER (
        PARTITION BY region
        ORDER BY order_date
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) AS revenue_7d_moving_avg,

    -- Running count of orders
    COUNT(order_id) OVER (
        PARTITION BY region
        ORDER BY order_date
        ROWS UNBOUNDED PRECEDING
    ) AS cumulative_order_count
FROM fact_daily_sales;

LAG & LEAD — Compare Rows to Previous / Next

-- Customer order gap analysis: how many days since last order?
SELECT
    customer_id,
    order_date,
    LAG(order_date) OVER (PARTITION BY customer_id ORDER BY order_date)
        AS previous_order_date,
    DATEDIFF('day',
        LAG(order_date) OVER (PARTITION BY customer_id ORDER BY order_date),
        order_date
    ) AS days_since_last_order,

    -- Flag customers who haven't ordered in 90+ days (churn signal)
    IFF(DATEDIFF('day',
        LAG(order_date) OVER (PARTITION BY customer_id ORDER BY order_date),
        order_date) > 90, TRUE, FALSE
    ) AS is_returning_after_gap
FROM fact_orders
ORDER BY customer_id, order_date;

FIRST_VALUE / LAST_VALUE — Carry First/Last Values Across Partition

-- Attach the first and most recent order date to every order row
SELECT
    customer_id,
    order_date,
    order_id,
    net_revenue,
    FIRST_VALUE(order_date) OVER (
        PARTITION BY customer_id ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) AS first_order_date,
    LAST_VALUE(order_date) OVER (
        PARTITION BY customer_id ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) AS latest_order_date,
    DATEDIFF('day', first_order_date, latest_order_date) AS customer_lifespan_days
FROM fact_orders;

RATIO_TO_REPORT — Percentage of Total (Snowflake-specific)

-- What % of region total revenue does each product contribute?
SELECT
    region,
    product_category,
    SUM(net_revenue)                                    AS category_revenue,
    RATIO_TO_REPORT(SUM(net_revenue)) OVER (PARTITION BY region) * 100
                                                        AS pct_of_region_revenue
FROM fact_orders
GROUP BY region, product_category
ORDER BY region, pct_of_region_revenue DESC;

Window Frame Cheatsheet

Frame ClauseIncludesCommon Use
ROWS UNBOUNDED PRECEDINGAll rows from partition start to currentRunning total
ROWS 6 PRECEDING AND CURRENT ROWLast 7 rows7-day moving average
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWINGEntire partitionFIRST_VALUE / LAST_VALUE over whole group
RANGE BETWEEN INTERVAL '7 DAYS' PRECEDING AND CURRENT ROWRows within 7 calendar daysRolling 7-day window on time series
↑ Back to top
36
Question 36
Swap Gender Values in Table

Simple SQL to swap Male ↔ Female values:

UPDATE users
SET gender = CASE
    WHEN gender = 'Male' THEN 'Female'
    WHEN gender = 'Female' THEN 'Male'
    ELSE gender  -- Keep other values (null, other, unknown)
END
WHERE gender IN ('Male', 'Female');

Explanation:

  • CASE: Evaluates current gender value
  • WHEN gender = 'Male': Changes to Female
  • WHEN gender = 'Female': Changes to Male
  • ELSE gender: Keeps any other values unchanged
  • WHERE clause: Only updates relevant rows (optimization)

Alternative (Using IIF):

UPDATE users
SET gender = IIF(gender = 'Male', 'Female', IIF(gender = 'Female', 'Male', gender))
WHERE gender IN ('Male', 'Female');
↑ Back to top
37
Question 37
DBT Project Structure & Best Practices

Real-World Scenario: A 200-model dbt project with no naming conventions causes every new hire to ask "where does this model live?" and every PR to touch 10 unrelated files. A well-structured project makes every model's purpose, owner, and position in the DAG immediately obvious.

Standard Project Layout

my_dbt_project/
├── dbt_project.yml              # Core config, model materializations, vars
├── profiles.yml                 # Connection config (usually in ~/.dbt/)
├── packages.yml                 # dbt-utils, dbt-expectations, etc.
│
├── models/
│   ├── staging/                 # stg_* — one model per source table
│   │   ├── salesforce/          # sub-folder per source system
│   │   │   ├── stg_sf_accounts.sql
│   │   │   ├── stg_sf_opportunities.sql
│   │   │   └── schema.yml       # sources + column-level docs + tests
│   │   ├── oracle_erp/
│   │   │   ├── stg_orders.sql
│   │   │   └── schema.yml
│   │   └── stripe/
│   │       ├── stg_payments.sql
│   │       └── schema.yml
│   │
│   ├── intermediate/            # int_* — joins, business logic
│   │   ├── int_orders_enriched.sql      # order + customer + product joined
│   │   ├── int_revenue_by_region.sql
│   │   └── schema.yml
│   │
│   └── marts/                   # fact_* dim_* — BI-ready, governed
│       ├── finance/
│       │   ├── fact_revenue.sql
│       │   ├── fact_invoices.sql
│       │   └── schema.yml
│       ├── sales/
│       │   ├── fact_orders.sql
│       │   ├── dim_customers.sql
│       │   └── schema.yml
│       └── operations/
│           ├── fact_inventory.sql
│           └── schema.yml
│
├── macros/                      # Reusable Jinja: generate_surrogate_key, etc.
├── snapshots/                   # SCD Type 2: snap_customers, snap_products
├── seeds/                       # dim_date.csv, region_codes.csv
├── tests/                       # Singular tests: reconciliation SQL
├── analyses/                    # Exploratory SQL (not deployed)
├── models/exposures.yml         # Documents BI tools consuming dbt models
└── .github/workflows/dbt_ci.yml # Slim CI config

Layer Responsibilities — The Contract

LayerPrefixMaterializationRule
Stagingstg_view1:1 with source. Rename, cast, light clean. NO joins. NO business logic.
Intermediateint_ephemeral / viewJoins between staging models. Business logic. NOT directly queried by BI.
Martsfact_ / dim_table / incrementalBI-ready. Governed. Documented. Wide tables acceptable. Conformed dims.

dbt_project.yml — Materializations & Tags by Layer

name: 'my_dbt_project'
version: '1.0.0'

vars:
  start_date: '2020-01-01'
  data_environment: "{{ env_var('DBT_TARGET', 'dev') }}"

models:
  my_dbt_project:
    staging:
      +materialized: view
      +tags: ['hourly']
      +schema: staging            # → ANALYTICS_DEV.STAGING in dev
    intermediate:
      +materialized: ephemeral    # inlined as CTE, zero storage cost
      +tags: ['hourly']
    marts:
      +materialized: table
      +tags: ['daily']
      +schema: marts
      +post_hook: "GRANT SELECT ON {{ this }} TO ROLE ANALYST_ROLE"
      finance:
        +tags: ['daily', 'finance']
      operations:
        +materialized: incremental   # ops models are large — use incremental
        +tags: ['hourly', 'operations']

Naming Conventions (enforce in PR review)

  • stg_<source>_<entity> — e.g. stg_oracle_orders, stg_sf_accounts
  • int_<verb>_<noun> — e.g. int_orders_enriched, int_revenue_attributed
  • fact_<event_or_process> — e.g. fact_orders, fact_daily_revenue
  • dim_<entity> — e.g. dim_customers, dim_products, dim_date
  • snap_<entity> — e.g. snap_customers (SCD2 history)
↑ Back to top
38
Question 38
MERGE vs UPSERT Explained

Real-World Scenario: Qlik Replicate delivers CDC events into Snowflake — INSERTs, UPDATEs, and DELETEs mixed together. A naive INSERT OVERWRITE would blow away the table. You need MERGE to apply each operation correctly, handle duplicates from the CDC stream, and close old SCD2 records atomically.

Basic MERGE Pattern

-- Classic upsert: update if exists, insert if new, delete if flagged
MERGE INTO staging.customers AS tgt
USING (
    -- Deduplicate CDC stream: take latest version per key
    SELECT * FROM (
        SELECT *,
               ROW_NUMBER() OVER (PARTITION BY customer_id
                                  ORDER BY header__change_seq DESC) AS rn
        FROM raw.customer_stream
        WHERE header__change_oper != 'B'   -- exclude before-images
    ) WHERE rn = 1
) AS src
ON tgt.customer_id = src.customer_id

WHEN MATCHED AND src.header__change_oper = 'D'
    THEN DELETE

WHEN MATCHED AND src.updated_at > tgt.updated_at
    THEN UPDATE SET
        tgt.email        = src.email,
        tgt.tier         = src.tier,
        tgt.updated_at   = src.updated_at,
        tgt._updated_at  = CURRENT_TIMESTAMP

WHEN NOT MATCHED AND src.header__change_oper != 'D'
    THEN INSERT (customer_id, email, tier, updated_at, _loaded_at)
         VALUES (src.customer_id, src.email, src.tier,
                 src.updated_at, CURRENT_TIMESTAMP);

SCD Type 2 MERGE — Close Old Records & Open New Ones

-- When a customer's tier changes, close the old record and open a new one
-- This is what dbt snapshots automate — but useful to know the raw SQL

MERGE INTO marts.dim_customers AS tgt
USING (
    -- Source: latest version of each customer
    SELECT customer_id, email, tier, updated_at
    FROM staging.customers
    WHERE updated_at > (SELECT MAX(valid_from) FROM marts.dim_customers)
) AS src
ON tgt.customer_id = src.customer_id
   AND tgt.is_current = TRUE          -- only match against current records

-- Row exists and tier changed → close current record
WHEN MATCHED AND tgt.tier != src.tier
    THEN UPDATE SET
        tgt.valid_to   = CURRENT_DATE - 1,
        tgt.is_current = FALSE;

-- Note: After the MERGE, INSERT new open records for changed customers
-- (Snowflake MERGE doesn't support WHEN MATCHED THEN INSERT in same statement)
INSERT INTO marts.dim_customers (customer_sk, customer_id, email, tier,
                                  valid_from, valid_to, is_current)
SELECT
    MD5(src.customer_id || 'current')        AS customer_sk,
    src.customer_id, src.email, src.tier,
    CURRENT_DATE                             AS valid_from,
    NULL                                     AS valid_to,
    TRUE                                     AS is_current
FROM staging.customers src
WHERE EXISTS (
    SELECT 1 FROM marts.dim_customers old
    WHERE old.customer_id = src.customer_id
      AND old.is_current = FALSE
      AND old.valid_to = CURRENT_DATE - 1    -- just closed by the MERGE above
);

Idempotent MERGE — Safe to Re-Run

-- A MERGE is idempotent if running it twice produces the same result
-- Key: include the updated_at guard in WHEN MATCHED

-- BAD (not idempotent): running twice creates duplicate timestamps
WHEN MATCHED THEN UPDATE SET tgt.processed_at = CURRENT_TIMESTAMP;

-- GOOD (idempotent): only update if source is actually newer
WHEN MATCHED AND src.updated_at > tgt.updated_at
    THEN UPDATE SET
        tgt.status     = src.status,
        tgt.updated_at = src.updated_at;
-- Running this 100 times produces same result as running once

MERGE vs INSERT OVERWRITE vs dbt Strategies

ApproachBest ForHandles DeletesIdempotent
MERGE (upsert)Mutable records, CDCYes (WHEN MATCHED DELETE)Yes (with updated_at guard)
DELETE + INSERTDate partitions, aggregatesYes (DELETE first)Yes
INSERT OVERWRITEFull partition replaceYes (replace whole partition)Yes
INSERT (append)Immutable event logsNoNo (duplicates on re-run)
↑ Back to top
39
Question 39
Data Masking & Masking Policies

Data Masking hides sensitive data while maintaining utility for dev/test/analytics.

Snowflake Masking Policy (Dynamic, at Query Time):

-- Create masking policy
CREATE MASKING POLICY email_mask AS (val VARCHAR) RETURNS VARCHAR ->
    CASE
        WHEN CURRENT_ROLE() IN ('ANALYST_ROLE') THEN '****'
        WHEN CURRENT_ROLE() = 'DATA_OWNER_ROLE' THEN val
        ELSE 'MASKED'
    END;

-- Apply to column
ALTER TABLE customers ALTER COLUMN email SET MASKING POLICY email_mask;

How It Works:

  • Underlying data NOT modified
  • Masking applied at query time based on user role
  • Different users see different data
  • DATA_OWNER sees real emails, ANALYST sees masked

Common Masking Techniques:

  • Substitution: Replace with fixed character (****)
  • Redaction: Hide completely
  • Truncation: Show only partial (first 2 chars of email)
  • Hashing: Replace with hash
  • Encryption: Reversible transformation
↑ Back to top
40
Question 40
Hashing vs Encryption

Two fundamental cryptographic concepts with different purposes:

Hashing - One-Way Function

  • Purpose: Integrity verification, password storage
  • Reversible: No - cannot reverse hash to get original
  • Deterministic: Same input always → same hash output
  • Examples: MD5 (32 chars), SHA256 (64 chars)
  • Use: Check if file modified, store passwords safely

Encryption - Two-Way Function

  • Purpose: Confidentiality, protect sensitive data
  • Reversible: Yes - decrypt with correct key to get original
  • Key-Dependent: Requires encryption and decryption key
  • Examples: AES-256, RSA
  • Use: Protect credit cards, SSN, medical records
Feature Hashing Encryption
Reversible No, one-way Yes, two-way
Key Needed No Yes
Output Size Fixed Variable
Purpose Integrity Confidentiality
↑ Back to top
41
Question 41
Snowflake S3 Storage Integration

Secure, credential-less access to AWS S3 from Snowflake using IAM roles.

Step 1: AWS Setup - Create IAM Policy

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:ListBucket"
      ],
      "Resource": "arn:aws:s3:::my-bucket/*"
    }
  ]
}

Step 2: Create Snowflake Storage Integration

CREATE STORAGE INTEGRATION s3_int
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'S3'
  ENABLED = TRUE
  STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::123456789012:role/SnowflakeRole'
  STORAGE_ALLOWED_LOCATIONS = ('s3://my-bucket/data/');

Step 3: Get Snowflake's IAM User Details

DESCRIBE STORAGE INTEGRATION s3_int
-- Returns: STORAGE_AWS_IAM_USER_ARN, STORAGE_AWS_EXTERNAL_ID

Step 4: Update AWS IAM Role Trust Relationship

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::SNOWFLAKE-ACCOUNT:user/SNOWFLAKE-USER"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "EXTERNAL-ID-FROM-STEP3"
        }
      }
    }
  ]
}

Step 5: Create External Stage

CREATE STAGE s3_stage
  STORAGE_INTEGRATION = s3_int
  URL = 's3://my-bucket/data/'
  FILE_FORMAT = (TYPE = PARQUET);

Benefits: Secure (no exposed AWS keys), centralized control, easy to audit

↑ Back to top
42
Question 42
DBT Models, Tests, Seeds & Custom Tests — Deep Dive

Real-World Scenario: Your data team ingests Stripe payments, Salesforce CRM records and warehouse inventory into Snowflake. You use DBT to transform raw data into trusted analytics tables. One bad deploy introduced NULL order IDs in production — custom tests would have caught it before it reached the BI layer.

1. Models — SQL Transformations as Code

-- models/staging/stg_orders.sql
-- Staging layer: thin cleaning only, no business logic
{{ config(materialized='view') }}

SELECT
    order_id::INT                         AS order_id,
    customer_id::INT                      AS customer_id,
    UPPER(TRIM(status))                   AS status,
    amount::DECIMAL(10,2)                 AS amount,
    order_date::DATE                      AS order_date,
    _loaded_at                            AS ingested_at
FROM {{ source('stripe', 'raw_orders') }}
WHERE order_id IS NOT NULL
-- models/marts/fact_daily_revenue.sql
-- Mart layer: business logic, joined, aggregated
{{ config(materialized='table') }}

WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
    WHERE status = 'COMPLETED'
),
customers AS (
    SELECT * FROM {{ ref('stg_customers') }}
)
SELECT
    o.order_date,
    c.region,
    c.customer_segment,
    COUNT(o.order_id)     AS total_orders,
    SUM(o.amount)         AS total_revenue,
    AVG(o.amount)         AS avg_order_value
FROM orders o
JOIN customers c USING (customer_id)
GROUP BY 1, 2, 3

2. Built-in Schema Tests

# models/staging/schema.yml
version: 2

models:
  - name: stg_orders
    description: "Cleaned Stripe orders from raw layer"
    columns:
      - name: order_id
        description: "Primary key — must be unique and present"
        tests:
          - not_null
          - unique

      - name: status
        tests:
          - accepted_values:
              values: ['COMPLETED', 'PENDING', 'CANCELLED', 'REFUNDED']

      - name: customer_id
        tests:
          - not_null
          - relationships:
              to: ref('stg_customers')
              field: customer_id

      - name: amount
        tests:
          - not_null

3. Custom Singular Tests — SQL That Returns Failures

A singular test is a plain .sql file in tests/. DBT runs it and fails if any rows are returned. Think of it as: "show me the violations."

-- tests/assert_order_amount_positive.sql
-- Fails if any completed order has a zero or negative amount
SELECT
    order_id,
    amount,
    status
FROM {{ ref('stg_orders') }}
WHERE status = 'COMPLETED'
  AND amount <= 0
-- tests/assert_no_future_orders.sql
-- Fails if any order has a date beyond today (data quality anomaly)
SELECT
    order_id,
    order_date
FROM {{ ref('stg_orders') }}
WHERE order_date > CURRENT_DATE
-- tests/assert_revenue_matches_stripe.sql
-- Cross-source reconciliation: DBT total vs Stripe dashboard total
-- Fails if the variance exceeds 0.1%
WITH dbt_total AS (
    SELECT SUM(amount) AS total FROM {{ ref('stg_orders') }}
    WHERE status = 'COMPLETED'
      AND order_date = CURRENT_DATE - 1
),
stripe_control AS (
    -- Loaded from a separate control/audit table
    SELECT expected_revenue AS total
    FROM {{ ref('stripe_daily_control') }}
    WHERE report_date = CURRENT_DATE - 1
)
SELECT
    d.total                                    AS dbt_total,
    s.total                                    AS stripe_total,
    ABS(d.total - s.total) / NULLIF(s.total,0) AS pct_variance
FROM dbt_total d, stripe_control s
WHERE ABS(d.total - s.total) / NULLIF(s.total,0) > 0.001

4. Custom Generic Tests — Reusable Macros

Generic tests live in macros/ and can be applied to any column across any model via schema.yml.

-- macros/test_not_negative.sql
-- Reusable test: column must not contain negative values
{% test not_negative(model, column_name) %}

SELECT
    {{ column_name }} AS failing_value,
    COUNT(*) AS row_count
FROM {{ model }}
WHERE {{ column_name }} < 0
GROUP BY 1

{% endtest %}
-- macros/test_within_date_range.sql
-- Reusable test: date column must fall within a specified range
{% test within_date_range(model, column_name, min_date, max_date) %}

SELECT {{ column_name }}
FROM {{ model }}
WHERE {{ column_name }} < '{{ min_date }}'
   OR {{ column_name }} > '{{ max_date }}'

{% endtest %}
-- macros/test_row_count_above_threshold.sql
-- Reusable test: table must have at least N rows (catches empty-load bugs)
{% test row_count_above_threshold(model, column_name, threshold) %}

WITH row_count AS (
    SELECT COUNT(*) AS cnt FROM {{ model }}
)
SELECT cnt
FROM row_count
WHERE cnt < {{ threshold }}

{% endtest %}

Apply generic tests in schema.yml:

# Apply custom generic tests across models
models:
  - name: fact_daily_revenue
    columns:
      - name: total_revenue
        tests:
          - not_negative          # custom generic macro
          - not_null

      - name: order_date
        tests:
          - within_date_range:    # custom generic macro with params
              min_date: '2020-01-01'
              max_date: '{{ run_started_at.strftime("%Y-%m-%d") }}'

  - name: stg_orders
    tests:
      - row_count_above_threshold:  # model-level test (no column)
          column_name: order_id
          threshold: 1000           # alert if fewer than 1000 rows loaded

5. dbt-expectations — Great Expectations Integration

# Add statistical / distribution tests via dbt-expectations package
# packages.yml
packages:
  - package: calogica/dbt_expectations
    version: [">=0.9.0", "<1.0.0"]
# schema.yml — rich statistical tests
models:
  - name: fact_daily_revenue
    columns:
      - name: avg_order_value
        tests:
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 1
              max_value: 50000
          - dbt_expectations.expect_column_mean_to_be_between:
              min_value: 80
              max_value: 500

      - name: total_orders
        tests:
          - dbt_expectations.expect_column_values_to_be_of_type:
              column_type: number

    tests:
      - dbt_expectations.expect_table_row_count_to_be_between:
          min_value: 100
          max_value: 1000000

6. Seeds — Version-Controlled Reference Data

-- data/region_mapping.csv
region_code,region_name,timezone,currency
US-EAST,US East,America/New_York,USD
US-WEST,US West,America/Los_Angeles,USD
EU-WEST,Europe West,Europe/London,GBP
APAC,Asia Pacific,Asia/Singapore,SGD
# seeds/properties.yml — add column-level docs and tests to seeds
seeds:
  - name: region_mapping
    description: "Static region reference loaded from Git"
    columns:
      - name: region_code
        tests: [not_null, unique]
      - name: currency
        tests:
          - accepted_values:
              values: ['USD', 'GBP', 'EUR', 'SGD', 'AUD']

7. Running & Triaging Tests

# Run all tests
dbt test

# Run tests for a specific model only
dbt test --select stg_orders

# Run only schema tests (not singular)
dbt test --select test_type:singular

# Run and store results for audit
dbt test --store-failures   # Failed rows saved to schema dbt_test__audit

# Compile a test to see the generated SQL before running
dbt compile --select test_type:generic

Test Severity — Warn vs Error

# schema.yml — set severity so non-critical issues warn but don't block
models:
  - name: stg_orders
    columns:
      - name: customer_id
        tests:
          - relationships:
              to: ref('stg_customers')
              field: customer_id
              severity: warn          # Warn only — orphaned orders alert, don't fail CI
          - not_null:
              severity: error         # Hard fail — pipeline stops
↑ Back to top
43
Question 43
DBT Incremental Strategies — merge, append, delete+insert & Snapshots

Real-World Scenario: Your e-commerce platform processes 2M orders/day into a Snowflake fact table. Running a full-refresh takes 45 minutes and costs $80 in credits. Incremental models reduce that to 3 minutes and $2 — but only if you choose the right strategy for your data pattern.

Incremental Model Fundamentals

On first run, DBT builds the full table. On subsequent runs, it only processes new/changed rows. The is_incremental() macro controls which rows to load:

-- models/marts/fact_orders.sql
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}

SELECT
    order_id,
    customer_id,
    status,
    amount,
    updated_at
FROM {{ source('raw', 'orders') }}

{% if is_incremental() %}
    -- Only process rows updated since our last run
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Strategy 1: merge (Default for Snowflake)

Performs a SQL MERGE — matched rows are updated, new rows are inserted. Requires a unique_key. Best for tables where existing records can change (orders updating status, inventory changing).

-- DBT generates this SQL under the hood
MERGE INTO analytics.fact_orders AS target
USING (
    -- Your model SQL (new/updated rows)
    SELECT order_id, customer_id, status, amount, updated_at
    FROM raw.orders
    WHERE updated_at > '2024-01-15 06:00:00'
) AS source
ON target.order_id = source.order_id

WHEN MATCHED THEN UPDATE SET
    target.status     = source.status,
    target.amount     = source.amount,
    target.updated_at = source.updated_at

WHEN NOT MATCHED THEN INSERT
    (order_id, customer_id, status, amount, updated_at)
VALUES
    (source.order_id, source.customer_id,
     source.status, source.amount, source.updated_at);
-- Config in schema.yml or model config block
{{ config(
    materialized='incremental',
    unique_key='order_id',              -- Required for merge
    incremental_strategy='merge',
    merge_update_columns=['status', 'amount', 'updated_at']  -- Only update these cols
) }}

When to use merge: Order management systems (status changes from pending → shipped → delivered), inventory tables, CRM contact updates, any mutable source where records evolve over time.

Strategy 2: append

Only inserts new rows — never updates existing ones. No unique_key needed. Fastest strategy, but only valid for truly immutable append-only sources.

-- models/marts/fact_clickstream.sql
-- Web click events: immutable once written, only new events arrive
{{ config(
    materialized='incremental',
    incremental_strategy='append'    -- No unique_key needed
) }}

SELECT
    event_id,
    session_id,
    user_id,
    page_url,
    event_type,
    event_timestamp
FROM {{ source('segment', 'raw_events') }}

{% if is_incremental() %}
    WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}

When to use append: Clickstream/event logs, IoT sensor readings, audit trails, financial transaction logs, server logs — any source where events are written once and never modified. Attempting append on mutable data creates duplicates.

Strategy 3: delete+insert (Partition Replacement)

Deletes all rows for a given partition (usually a date), then re-inserts the full partition. Avoids the overhead of row-level matching while still correcting late-arriving data. Ideal for date-partitioned fact tables.

-- models/marts/fact_daily_sales.sql
{{ config(
    materialized='incremental',
    incremental_strategy='delete+insert',
    unique_key='sale_date'            -- Partition key to delete+replace
) }}

SELECT
    sale_date,
    product_id,
    region,
    SUM(revenue)     AS total_revenue,
    COUNT(order_id)  AS order_count
FROM {{ source('raw', 'sales') }}

{% if is_incremental() %}
    -- Reprocess last 3 days to handle late-arriving data
    WHERE sale_date >= DATEADD('day', -3, CURRENT_DATE)
{% endif %}

GROUP BY 1, 2, 3
-- What DBT executes:
-- Step 1: Delete the affected partition
DELETE FROM analytics.fact_daily_sales
WHERE sale_date >= DATEADD('day', -3, CURRENT_DATE);

-- Step 2: Insert fresh data for that partition
INSERT INTO analytics.fact_daily_sales
SELECT sale_date, product_id, region,
       SUM(revenue), COUNT(order_id)
FROM raw.sales
WHERE sale_date >= DATEADD('day', -3, CURRENT_DATE)
GROUP BY 1, 2, 3;

When to use delete+insert: Daily aggregated fact tables, reporting tables partitioned by date, any scenario where you want full partition correctness without a row-level unique key. Handles late-arriving data naturally by re-processing the window.

Strategy 4: insert_overwrite (BigQuery / Spark)

-- Note: insert_overwrite is for BigQuery/Spark, not Snowflake
-- Snowflake equivalent is delete+insert
{{ config(
    materialized='incremental',
    incremental_strategy='insert_overwrite',
    partition_by={
        "field": "order_date",
        "data_type": "date",
        "granularity": "day"
    }
) }}

Strategy Comparison Matrix

Strategy SQL Operation Needs unique_key Handles Updates Speed Best For
merge MERGE (upsert) Yes Yes Medium Mutable records (orders, CRM)
append INSERT only No No Fastest Immutable events (clicks, logs)
delete+insert DELETE then INSERT Partition key Partition-level Fast Date-partitioned aggregates
insert_overwrite Overwrite partition No Partition-level Fast BigQuery / Spark partitions

DBT Snapshots — SCD Type 2 Automation

Snapshots track how a record changes over time. DBT automatically maintains dbt_valid_from, dbt_valid_to and dbt_scd_id columns — giving you a full audit trail without writing a single line of SCD logic.

-- snapshots/snap_customers.sql
-- Real-world: track customer tier changes for churn analysis
{% snapshot snap_customers %}

{{
    config(
        target_schema='snapshots',
        unique_key='customer_id',
        strategy='timestamp',       -- Use updated_at to detect changes
        updated_at='updated_at'
    )
}}

SELECT
    customer_id,
    email,
    subscription_tier,    -- tracks: FREE -> PRO -> ENTERPRISE changes
    monthly_spend,
    account_manager,
    updated_at
FROM {{ source('salesforce', 'accounts') }}

{% endsnapshot %}
-- snapshots/snap_product_pricing.sql
-- Real-world: track price changes for accurate historical revenue calc
{% snapshot snap_product_pricing %}

{{
    config(
        target_schema='snapshots',
        unique_key='product_id',
        strategy='check',            -- Detect change in specific columns
        check_cols=['unit_price', 'discount_pct', 'is_active']
    )
}}

SELECT product_id, product_name, unit_price, discount_pct, is_active
FROM {{ source('erp', 'products') }}

{% endsnapshot %}
-- Querying snapshots for time-travel analysis
-- Q: What tier was customer 1001 in on Black Friday 2023?
SELECT
    customer_id,
    subscription_tier,
    dbt_valid_from,
    dbt_valid_to
FROM {{ ref('snap_customers') }}
WHERE customer_id = 1001
  AND dbt_valid_from <= '2023-11-24'
  AND (dbt_valid_to >  '2023-11-24' OR dbt_valid_to IS NULL);

-- Q: Which customers upgraded their tier in Q4 2023?
SELECT
    customer_id,
    LAG(subscription_tier) OVER (PARTITION BY customer_id ORDER BY dbt_valid_from) AS previous_tier,
    subscription_tier                                                                AS new_tier,
    dbt_valid_from                                                                   AS upgrade_date
FROM {{ ref('snap_customers') }}
WHERE subscription_tier != LAG(subscription_tier) OVER (PARTITION BY customer_id ORDER BY dbt_valid_from)
  AND dbt_valid_from BETWEEN '2023-10-01' AND '2023-12-31';

Advanced: Incremental + Snapshot Architecture Pattern

-- Layer 1: Staging (view) — thin cleaning
--   stg_orders, stg_customers, stg_products

-- Layer 2: Snapshots (SCD2) — history tracking
--   snap_customers (tier changes)
--   snap_product_pricing (price changes)

-- Layer 3: Intermediate (incremental/merge) — enrichment
--   int_orders_enriched: joins orders + current customer tier

-- Layer 4: Marts (table or incremental) — final analytics
--   fact_orders: incremental merge, unique_key=order_id
--   fact_daily_revenue: incremental delete+insert, partition=order_date
--   dim_customers: table, rebuilt nightly from snap_customers current view

Incremental Model Pitfalls & How to Avoid Them

  • Schema changes break incremental models — use on_schema_change='append_new_columns' to auto-handle new source columns
  • Backfilling silently skips old data — always use dbt run --full-refresh after schema changes or data corrections
  • Duplicate rows with append — if the source is not truly immutable, switch to merge with a unique_key
  • Incorrect watermark on first run — test is_incremental() carefully; on first run the table does not exist yet
  • Clock skew — subtract a small buffer from the watermark (updated_at > MAX(updated_at) - INTERVAL '5 minutes') to avoid missing in-flight rows
↑ Back to top
44
Question 44
Managing DBT Dependencies with ref()

The ref() function is the magic glue connecting DBT models.

What ref() Does:

-- In int_orders.sql
SELECT * FROM {{ ref('stg_orders') }}
WHERE total > 100

-- Compiles to:
SELECT * FROM "ANALYTICS"."INTERMEDIATE"."stg_orders"
WHERE total > 100

Why ref() is Powerful:

  • No hardcoded schema names: Works in dev/test/prod
  • Automatic dependencies: DBT knows stg_orders must run first
  • DAG generation: Builds dependency graph automatically
  • Parallelization: Run independent models concurrently

Automatic DAG (Directed Acyclic Graph):

stg_customers → int_customer_orders ←→ fct_sales
stg_orders ↗     ↗                      ↗
          (ref() creates edges)

Execution Order:

dbt run:
1. Run stg_customers (no dependencies)
2. Run stg_orders (no dependencies)
3. Run int_customer_orders (depends on both)
4. Run fct_sales (depends on all above)

Best Practices:

  • Always use ref() instead of hardcoding table names
  • Use source() for raw tables: {{ source('raw', 'users') }}
  • Keep dependencies clear and linear when possible
↑ Back to top
45
Question 45
Jinja Templating in DBT

Jinja adds programming logic to SQL. Processed before warehouse execution.

Simple Variable Example:

{% set start_date = '2020-01-01' %}

SELECT * FROM {{ ref('raw_data') }}
WHERE created_at >= '{{ start_date }}'

Environment-Specific Logic:

{% if target.name == 'prod' %}
    WHERE is_active = TRUE
{% elif target.name == 'dev' %}
    WHERE created_at >= CURRENT_DATE - 7
{% endif %}

Loop Through Columns:

{% set columns = ['customer_id', 'order_id', 'amount'] %}

SELECT
    {% for col in columns %}
        COALESCE({{ col }}, 0) as {{ col }}
        {% if not loop.last %},{% endif %}
    {% endfor %}
FROM orders

Compiled Output (Production):

SELECT * FROM "PROD"."RAW"."raw_data"
WHERE created_at >= '2020-01-01'
  AND is_active = TRUE

Benefits:

  • DRY - Don't Repeat Yourself
  • Environment-specific logic
  • Dynamic SQL generation
  • Macro reusability
↑ Back to top
46
Question 46
Data Governance Best Practices

Comprehensive approach to governing data assets:

1. Define Ownership & Stewardship

  • Assign data owners for each domain
  • Identify stewards (day-to-day managers)
  • Document responsibilities

2. Build Data Dictionary

  • Document ALL data elements
  • Business and technical definitions
  • Relationships between tables
  • Tool: Collibra, DataHub, or dbt docs

3. Metadata Management

  • Track technical metadata (schema, lineage)
  • Track business metadata (definitions, owners)
  • Implement data catalog

4. Quality Standards

  • Define quality rules (not_null, uniqueness)
  • Implement monitoring and alerts
  • Document remediation processes

5. Access Control

  • RBAC (Role-Based Access Control)
  • Row-level security for sensitive data
  • Column-level masking for PII

6. Data Lineage & Audit

  • Track data flow source → consumer
  • Maintain audit logs of access
  • Enable compliance auditing

7. Version Control & Documentation

  • All code (SQL, Python, DBT) in Git
  • Auto-generate docs from code
  • Change request process
↑ Back to top
47
Question 47
Data Quality Testing Strategies

Multi-Layer Approach to Data Quality:

1. Schema Tests (Column Constraints)

not_null: Column has no NULLs
unique: All values distinct
relationships: Foreign key constraint
accepted_values: Values from whitelist

2. Statistical Tests (Distribution)

  • Range checks: value between min/max
  • Outlier detection: flag extreme values
  • Distribution analysis: compare to expected

3. Business Logic Tests

  • Cross-table consistency: totals match
  • Recency checks: data updated recently
  • Completeness: all expected records present

4. Freshness Tests

-- Check last update time
SELECT MAX(updated_at) FROM orders
-- Should be within last 24 hours

Implementation (DBT Test):

-- schema.yml
models:
  - name: orders
    tests:
      - dbt_utils.recency:
          datepart: day
          interval: 1
    columns:
      - name: order_id
        tests:
          - not_null
          - unique
      - name: customer_id
        tests:
          - relationships:
              to: ref('dim_customer')
              field: customer_id

Tools: DBT built-in, Great Expectations, Soda SQL, Ataccama

↑ Back to top
48
Question 48
Building a Modern Data Stack

Typical Tech Stack Across Layers:

1. Source Systems

  • Operational databases (PostgreSQL, Oracle, MySQL)
  • SaaS applications (Salesforce, Shopify, HubSpot)
  • APIs and webhooks
  • Streaming platforms (Kafka, Kinesis)

2. Data Ingestion

  • Managed: Fivetran, Stitch
  • Open-source: Airbyte
  • AWS native: AWS DMS, AWS Glue
  • CDC-focused: Qlik Replicate, Debezium

3. Cloud Data Warehouse

  • Snowflake, BigQuery, Redshift

4. Transformation

  • DBT (recommended)
  • Spark for advanced processing
  • Python for ML/custom logic

5. Orchestration

  • Apache Airflow (most flexible)
  • Prefect, Dagster (Python-native)
  • Cloud native: Snowflake Tasks, Step Functions

6. Data Governance & Catalog

  • Collibra, Ataccama, DataHub

7. Analytics & BI

  • Tableau, Power BI, Looker

8. Reverse ETL (Optional)

  • Sync insights back to source systems
  • Tools: Hightouch, Census

Selection Criteria: Scalability, cost, ease of use, integrations, security, community support

↑ Back to top
49
Question 49
Future Trends in Data Engineering

Emerging Trends Shaping the Data Industry:

1. DataOps

Applying DevOps principles to data pipelines: CI/CD, monitoring, automation, version control

2. Real-Time Analytics

Move from batch to streaming: Kafka, streaming SQL, real-time dashboards

3. AI/ML Integration

ML models in data pipelines: feature engineering, model scoring, automated workflows

4. Data Mesh

Decentralized data ownership: domain-oriented teams managing their data products

5. Lakehouse Architecture

Combine data lake flexibility with warehouse performance: Delta Lake, Iceberg, Hudi

6. Serverless Data

Pay-per-query computing: Athena, BigQuery, Snowflake replacing servers

7. Data Quality as First-Class

Quality built into pipelines, not afterthought. Tools like Great Expectations, Soda

8. Self-Service Analytics

Business users directly accessing/analyzing data without technical middlemen

9. Reverse ETL

Syncing insights back to operational systems for immediate action

10. Privacy-Preserving Tech

Differential privacy, federated learning for sensitive data

Skills for Success: Cloud (AWS/Azure/GCP), DBT, Python, Spark, SQL, Docker, Git, ML basics

↑ Back to top
50
Question 50
DBT Snapshots: Implementing SCD Type 2

DBT Snapshots automate SCD Type 2 implementation without manual coding:

Timestamp Strategy (When source has updated_at):

{% snapshot dim_customer_snapshot %}
{{
    config(
        target_schema='snapshots',
        unique_key='customer_id',
        strategy='timestamp',
        updated_at='updated_at'
    )
}}

SELECT customer_id, name, email, phone, updated_at
FROM {{ ref('stg_customers') }}

{% endsnapshot %}

Check Strategy (Track specific columns):

{% snapshot dim_product_snapshot %}
{{
    config(
        target_schema='snapshots',
        unique_key='product_id',
        strategy='check',
        check_cols=['price', 'stock_status']
    )
}}

SELECT product_id, name, price, stock_status
FROM {{ ref('stg_products') }}

{% endsnapshot %}

DBT Automatically Creates:

  • dbt_valid_from - When this version became active
  • dbt_valid_to - When superseded (NULL if current)
  • dbt_scd_id - Unique ID for tracking row versions

Querying Snapshots (Temporal):

-- Get current customer state
SELECT * FROM dim_customer_snapshot
WHERE dbt_valid_to IS NULL;

-- Get historical state at a point in time
SELECT * FROM dim_customer_snapshot
WHERE dbt_valid_from <= '2024-01-15'
  AND (dbt_valid_to > '2024-01-15' OR dbt_valid_to IS NULL);
↑ Back to top
51
Question 51
Schema Drift: Detection, Handling & Real-World Strategies

Schema drift occurs when the structure of source data changes unexpectedly — new columns appear, data types change, or columns are removed — breaking downstream pipelines.

Real-World Scenario:
Your e-commerce source system adds a new column loyalty_tier to the orders table overnight. Your Snowflake pipeline that does SELECT * into a fixed schema now fails because the target table does not have that column.

Types of Schema Drift

  • Additive drift — New columns added (least destructive, easiest to handle)
  • Destructive drift — Columns renamed, removed, or reordered
  • Type drift — Column data type changes (e.g., INT to VARCHAR)
  • Semantic drift — Column meaning changes without name change (hardest to detect)

Strategy 1: Use VARIANT for Flexible Ingestion

-- Land raw JSON as-is; schema changes won't break ingestion
CREATE TABLE raw_orders (
    ingested_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    source_system VARCHAR,
    payload       VARIANT   -- Entire row stored as JSON
);

-- Query specific fields dynamically
SELECT
    payload:order_id::INT         AS order_id,
    payload:customer_id::INT      AS customer_id,
    payload:loyalty_tier::VARCHAR AS loyalty_tier  -- New field, no DDL needed
FROM raw_orders;

Strategy 2: Detect Drift with Information Schema

-- Compare source vs target columns to surface drift
WITH source_cols AS (
    SELECT column_name, data_type
    FROM information_schema.columns
    WHERE table_name = 'SOURCE_ORDERS'
),
target_cols AS (
    SELECT column_name, data_type
    FROM information_schema.columns
    WHERE table_name = 'TARGET_ORDERS'
)
SELECT
    s.column_name,
    s.data_type AS source_type,
    t.data_type AS target_type,
    CASE
        WHEN t.column_name IS NULL     THEN 'NEW COLUMN — ADD TO TARGET'
        WHEN s.data_type != t.data_type THEN 'TYPE MISMATCH'
        ELSE 'OK'
    END AS drift_status
FROM source_cols s
LEFT JOIN target_cols t ON s.column_name = t.column_name
WHERE t.column_name IS NULL OR s.data_type != t.data_type;

Strategy 3: Auto-Evolve with Snowflake COPY INTO

-- Snowflake natively handles additive drift during COPY
COPY INTO orders
FROM @my_stage/orders/
FILE_FORMAT = (TYPE = 'PARQUET')
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
ON_ERROR = 'CONTINUE';

-- Enable column evolution (Snowflake feature)
ALTER TABLE orders SET ENABLE_SCHEMA_EVOLUTION = TRUE;

Strategy 4: DBT Schema Contracts

# schema.yml — define expected schema contracts
models:
  - name: stg_orders
    columns:
      - name: order_id
        tests: [not_null, unique]
      - name: amount
        tests:
          - dbt_expectations.expect_column_values_to_be_of_type:
              column_type: number
      - name: status
        tests:
          - accepted_values:
              values: ['pending', 'shipped', 'delivered', 'cancelled']

Strategy 5: Schema Registry for Kafka Pipelines

  • Use Confluent Schema Registry with AVRO/Protobuf to enforce contracts at the producer level
  • Configure BACKWARD compatibility — consumers can read old and new schemas
  • Breaking changes are rejected before they ever reach your pipeline

Handling Matrix

Drift Type Recommended Approach Risk
New column added Auto-evolve or VARIANT landing zone Low
Column removed Alert + graceful NULL fallback Medium
Type change Fail fast + alert on-call engineer High
Column rename Schema registry + versioned contracts High
Semantic drift Data observability tools (Monte Carlo, Bigeye) Very High

Best Practices:

  • Always land raw data in a schema-flexible zone (VARIANT / Parquet) before transforming
  • Use a two-layer architecture: raw (flexible) then curated (strict schema)
  • Instrument data observability alerts on column count, null rates, and type changes
  • Treat schema contracts as code — version them in Git alongside your dbt models
↑ Back to top
52
Question 52
Late-Arriving Data: Patterns, Detection & Correction Strategies

Late-arriving data occurs when records arrive in your pipeline after the time window they logically belong to has already been processed — causing incorrect aggregations, incomplete reports, and stale dashboards.

Real-World Scenario:
A mobile app records user click events locally and syncs when the device reconnects. Events from 2024-01-10 arrive in your pipeline on 2024-01-15. Your daily revenue report for Jan 10 was already published — and it is now wrong.

Root Causes

  • Offline mobile/IoT devices syncing after reconnection
  • Third-party API delays (payment gateways, logistics providers)
  • Network partitions or upstream pipeline failures
  • Timezone mismatches causing apparent late arrival
  • Manual data corrections or backfills from source systems

Strategy 1: Separate Event Time vs Processing Time

-- Always store BOTH timestamps
CREATE TABLE user_events (
    event_id        VARCHAR,
    user_id         INT,
    event_type      VARCHAR,
    event_time      TIMESTAMP,  -- When the event actually happened (source)
    ingested_at     TIMESTAMP DEFAULT CURRENT_TIMESTAMP  -- When we received it
);

-- Identify late arrivals (arrived more than 1 day after event)
SELECT
    event_id,
    event_time,
    ingested_at,
    DATEDIFF('hour', event_time, ingested_at) AS hours_late
FROM user_events
WHERE DATEDIFF('day', event_time, ingested_at) > 1
ORDER BY hours_late DESC;

Strategy 2: Watermarking in Streaming Pipelines

-- Accept events up to 3 days late; skip older ones
INSERT INTO fact_events
SELECT
    event_id,
    user_id,
    event_time,
    ingested_at,
    'LATE_ARRIVAL' AS load_type
FROM staging_events s
WHERE
    event_time >= CURRENT_DATE - 3         -- Late acceptance window
    AND NOT EXISTS (
        SELECT 1 FROM fact_events f
        WHERE f.event_id = s.event_id       -- Deduplicate
    );

Strategy 3: Reprocess Affected Partitions (Idempotent Backfill)

-- Step 1: Find which dates need reprocessing
SELECT
    event_time::DATE AS affected_date,
    COUNT(*)         AS late_records
FROM staging_events
WHERE ingested_at::DATE = CURRENT_DATE
  AND event_time::DATE  < CURRENT_DATE - 1
GROUP BY 1;

-- Step 2: Remove stale aggregates for those dates
DELETE FROM daily_event_summary
WHERE summary_date IN (
    SELECT DISTINCT event_time::DATE
    FROM staging_events
    WHERE ingested_at::DATE = CURRENT_DATE
      AND event_time::DATE < CURRENT_DATE - 1
);

-- Step 3: Recompute cleanly (safe to re-run)
INSERT INTO daily_event_summary
SELECT
    event_time::DATE        AS summary_date,
    event_type,
    COUNT(*)                AS event_count,
    COUNT(DISTINCT user_id) AS unique_users
FROM fact_events
GROUP BY 1, 2;

Strategy 4: DBT Incremental with Lookback Window

-- models/fact_events.sql
{{ config(
    materialized='incremental',
    unique_key='event_id',
    on_schema_change='append_new_columns'
) }}

SELECT event_id, user_id, event_type, event_time, ingested_at
FROM {{ source('raw', 'events') }}

{% if is_incremental() %}
    -- Look back 3 days on every run to catch late arrivals
    WHERE event_time >= DATEADD('day', -3, CURRENT_DATE)
{% endif %}

Strategy 5: Audit-Trail Fact Table (Never Delete, Only Append)

-- Keep all versions with a change reason
CREATE TABLE fact_orders_audit (
    order_id       INT,
    order_date     DATE,
    amount         DECIMAL(10,2),
    record_version INT,
    is_current     BOOLEAN,
    change_reason  VARCHAR   -- 'INITIAL_LOAD', 'LATE_ARRIVAL', 'CORRECTION'
);

-- Always query the current version
SELECT * FROM fact_orders_audit WHERE is_current = TRUE;

-- Audit trail: what changed for a given order
SELECT order_id, record_version, amount, change_reason
FROM fact_orders_audit
WHERE order_id = 12345
ORDER BY record_version;

Handling Matrix by Latency

Latency Pattern Snowflake Tool
< 1 hour Streaming watermark window Snowpipe + deduplication
1–24 hours Rolling incremental with lookback DBT incremental (-3 day window)
1–7 days Partition reprocessing DELETE + re-INSERT on affected dates
7+ days Full historical backfill DBT full-refresh on affected models

Prevention & Monitoring:

  • Alert when MAX(ingested_at) - MAX(event_time) > threshold — signals upstream delays
  • Track ingestion lag as a first-class data quality metric in your observability stack
  • Design all aggregation pipelines to be idempotent — safe to re-run for any date range
  • Use Snowflake Time Travel to restore pre-correction snapshots if a backfill goes wrong
  • Document your SLA — e.g. "Reports may reflect data up to 3 days late" — to set stakeholder expectations
↑ Back to top
53
Question 53 — System Design
Design a Real-Time CDC Pipeline: Oracle / SQL Server → Snowflake via Qlik Replicate

Real-World Scenario: Your company supports 100+ business users across multiple teams. The source is a 500-table Oracle ERP with millions of rows mutating daily. Business needs sub-30-minute data freshness in Snowflake for operational dashboards. Design the full pipeline.

Architecture Overview

Oracle ERP / SQL Server / PostgreSQL
        |
        |  (Log-based CDC — reads redo/transaction logs, zero load on source)
        v
 Qlik Replicate (CDC Engine)
        |
        |  APPLY mode: continuous micro-batch MERGE into landing tables
        |  Full-load + ongoing replication, DDL change capture
        v
 Snowflake Landing Schema  (raw.<table>)
        |
        |  Snowflake Streams (track INSERT/UPDATE/DELETE offsets)
        v
 Snowflake Tasks  (scheduled every 5 min)
        |
        |  MERGE into staging schema — deduplicate, apply deletes
        v
 dbt Cloud  (hourly incremental runs)
        |
        |  stg_ → int_ → mart_ transformation layers
        v
 Snowflake Mart Schema — BI / Analytics consumers

Qlik Replicate Configuration

-- Qlik Replicate task settings (conceptual)
TASK MODE       : Full Load + CDC
APPLY MODE      : Transactional (row-level, ordered commits)
STAGING         : Direct Apply to Snowflake landing tables
ERROR HANDLING  : Suspend table on DML error, continue others
DDL HANDLING    : Auto-create / alter target columns on source DDL

-- Metadata columns Qlik adds to every landing row
header__change_seq     VARCHAR   -- Unique monotonic CDC offset
header__change_oper    CHAR(1)   -- I=Insert, U=Update, D=Delete, B=Before-image
header__timestamp      TIMESTAMP -- Source commit timestamp
header__stream_position VARCHAR   -- Log position for resume

Snowflake Landing → Staging via Streams + Tasks

-- Step 1: Stream on the landing table (tracks all Qlik-applied changes)
CREATE OR REPLACE STREAM raw.orders_stream
ON TABLE raw.orders
SHOW_INITIAL_ROWS = FALSE;   -- Only delta since last consumption

-- Step 2: Task to merge stream into clean staging table (runs every 5 min)
CREATE OR REPLACE TASK etl.merge_orders_task
    WAREHOUSE = 'TRANSFORM_WH'
    SCHEDULE  = '5 MINUTE'
    WHEN system$stream_has_data('raw.orders_stream')
AS
MERGE INTO staging.orders AS tgt
USING (
    SELECT *,
           ROW_NUMBER() OVER (
               PARTITION BY order_id
               ORDER BY header__change_seq DESC
           ) AS rn
    FROM raw.orders_stream
    WHERE header__change_oper != 'B'    -- Exclude before-images
) AS src
ON tgt.order_id = src.order_id AND src.rn = 1

WHEN MATCHED AND src.header__change_oper = 'D'
    THEN DELETE

WHEN MATCHED AND src.header__change_oper IN ('I','U')
    THEN UPDATE SET
        tgt.customer_id  = src.customer_id,
        tgt.status       = src.status,
        tgt.amount       = src.amount,
        tgt.updated_at   = src.header__timestamp

WHEN NOT MATCHED AND src.header__change_oper != 'D'
    THEN INSERT (order_id, customer_id, status, amount, updated_at)
         VALUES (src.order_id, src.customer_id, src.status,
                 src.amount, src.header__timestamp);

ALTER TASK etl.merge_orders_task RESUME;

dbt Incremental Layer on Top

-- models/staging/stg_orders.sql  (view — thin clean)
{{ config(materialized='view') }}
SELECT
    order_id, customer_id, UPPER(status) AS status,
    amount::DECIMAL(12,2) AS amount, updated_at
FROM {{ source('staging', 'orders') }}
WHERE order_id IS NOT NULL

-- models/marts/fact_orders.sql  (incremental merge)
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge',
    on_schema_change='append_new_columns'
) }}
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

Failure Handling & Recovery Design

  • Qlik Replicate checkpoint — stores log position; on restart it resumes from last committed offset, no data loss
  • Snowflake Stream offset — streams are transactional; if the Task fails mid-run, the stream offset is NOT advanced, so the next run re-processes the same delta safely
  • Duplicate protectionROW_NUMBER() OVER (PARTITION BY pk ORDER BY change_seq DESC) ensures only the latest version of a key is applied even if Qlik delivers multiple events for one row
  • Schema drift — Qlik auto-ALTERs target; dbt uses on_schema_change='append_new_columns' to absorb new columns without breaking runs
  • Monitoring — alert on SYSTEM$STREAM_HAS_DATA being stale for >15 min; alert on Task failure via Snowflake email notifications

Latency Breakdown

HopTypical LatencyTuning Lever
Oracle log → Qlik Apply1–5 secCommit frequency, batch size
Qlik → Snowflake landing5–30 secApply frequency, warehouse size
Stream + Task merge5 min (scheduled)Reduce schedule interval
dbt incremental runhourlyIncrease run frequency in dbt Cloud
End-to-end< 15 minWell within 30-min SLA
↑ Back to top
54
Question 54 — System Design
Design a Scalable ELT Platform: 5M+ Records/Day on Snowflake + dbt Cloud

Real-World Scenario: You inherit a single monolithic SQL transformation running nightly on a 2XL warehouse, taking 4 hours, failing regularly, and blocking BI reports until morning. Design a production-grade ELT platform that is resilient, observable, and cost-efficient.

Full Platform Architecture

┌─────────────────────────────────────────────────────────────┐
│                     SOURCE SYSTEMS                          │
│  Oracle ERP │ Salesforce CRM │ Stripe │ PostgreSQL App DB   │
└──────┬──────────────┬─────────────┬────────────┬────────────┘
       │              │             │            │
   Qlik Replicate  Fivetran      Airbyte     Custom Python
   (CDC / near-RT) (SaaS APIs)   (OSS)       (S3 drops)
       │              │             │            │
       └──────────────┴─────────────┴────────────┘
                              │
                    ┌─────────▼──────────┐
                    │   Snowflake RAW     │  ← Schema per source
                    │  (landing zone)     │    No transforms here
                    └─────────┬──────────┘
                              │   Streams + Tasks (micro-batch)
                    ┌─────────▼──────────┐
                    │  Snowflake STAGING  │  ← dbt stg_ models (views)
                    │                    │    Rename, cast, dedupe
                    └─────────┬──────────┘
                              │   dbt incremental models
                    ┌─────────▼──────────┐
                    │ Snowflake INTERMEDIATE│ ← dbt int_ models
                    │                    │    Joins, enrichment
                    └─────────┬──────────┘
                              │   dbt table / incremental
                    ┌─────────▼──────────┐
                    │  Snowflake MARTS    │  ← fact_ dim_ models
                    │                    │    BI-ready, governed
                    └─────────┬──────────┘
                              │
               ┌──────────────┼──────────────┐
           Tableau          Looker        Power BI
                    (Secure Views / roles)

Warehouse Strategy — Right-Sizing per Workload

-- Never use one warehouse for everything
-- Isolate workloads to prevent contention and enable independent scaling

-- Ingestion warehouse (Qlik / Snowpipe writes)
CREATE WAREHOUSE INGEST_WH
    WAREHOUSE_SIZE = 'SMALL'
    AUTO_SUSPEND   = 60
    AUTO_RESUME    = TRUE;

-- dbt transformation warehouse (scales up for heavy models)
CREATE WAREHOUSE TRANSFORM_WH
    WAREHOUSE_SIZE = 'MEDIUM'
    AUTO_SUSPEND   = 120
    AUTO_RESUME    = TRUE
    MAX_CLUSTER_COUNT = 3          -- Multi-cluster for parallel dbt runs
    SCALING_POLICY    = 'ECONOMY'; -- Scale conservatively

-- BI / analyst query warehouse (fast, responsive)
CREATE WAREHOUSE ANALYTICS_WH
    WAREHOUSE_SIZE = 'LARGE'
    AUTO_SUSPEND   = 300
    AUTO_RESUME    = TRUE;

-- Admin / monitoring (tiny, always on)
CREATE WAREHOUSE ADMIN_WH
    WAREHOUSE_SIZE = 'X-SMALL'
    AUTO_SUSPEND   = 60;

dbt Project Structure

dbt_project/
├── models/
│   ├── staging/          # stg_* — view, one-to-one with source tables
│   │   ├── salesforce/
│   │   ├── stripe/
│   │   └── oracle_erp/
│   ├── intermediate/     # int_* — joins, enrichment (ephemeral or view)
│   └── marts/
│       ├── finance/      # fact_revenue, fact_invoices
│       ├── sales/        # fact_orders, dim_customers
│       └── operations/   # fact_inventory, fact_shipments
├── macros/               # generate_surrogate_key, test_not_negative, etc.
├── snapshots/            # SCD Type 2 for slowly changing dims
├── seeds/                # region_codes, currency_map, etc.
├── tests/                # Singular tests — reconciliation, cross-source checks
├── analyses/             # Ad-hoc exploratory SQL (not deployed)
└── dbt_project.yml

dbt Cloud CI/CD Jobs

# Production job (dbt Cloud scheduler)
Job: "Production — Full Run"
  Schedule: Every hour
  Commands:
    - dbt source freshness          # Fail fast if sources are stale
    - dbt run --select tag:hourly   # Only hourly models
    - dbt test --select tag:hourly  # Test immediately after run
  Notifications: Slack on failure

# CI job (triggered on every PR to main)
Job: "Slim CI — PR Validation"
  Trigger: Pull Request opened / updated
  Commands:
    - dbt run  --select state:modified+ --defer --state ./prod-artifacts
    - dbt test --select state:modified+ --defer --state ./prod-artifacts
  # Only runs models changed in the PR + their downstream dependents
  # --defer uses production compiled state so unchanged upstream refs resolve

Observability Stack

-- 1. Source freshness checks in sources.yml
sources:
  - name: oracle_erp
    freshness:
      warn_after:  {count: 1,  period: hour}
      error_after: {count: 3,  period: hour}
    loaded_at_field: _loaded_at

-- 2. Row count anomaly detection via custom test
-- tests/assert_fact_orders_row_count_not_dropped.sql
WITH today AS (
    SELECT COUNT(*) AS cnt FROM {{ ref('fact_orders') }}
    WHERE order_date = CURRENT_DATE
),
yesterday AS (
    SELECT COUNT(*) AS cnt FROM {{ ref('fact_orders') }}
    WHERE order_date = CURRENT_DATE - 1
)
SELECT today.cnt AS today_count, yesterday.cnt AS yesterday_count
FROM today, yesterday
WHERE today.cnt < yesterday.cnt * 0.7    -- Alert if >30% drop vs yesterday

-- 3. Snowflake Query History monitoring
SELECT query_text, total_elapsed_time/1000 AS seconds,
       credits_used_cloud_services
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE
  AND total_elapsed_time > 300000   -- Queries over 5 min
ORDER BY total_elapsed_time DESC;

Cost Governance

  • Tag every warehouse with cost_center resource monitor — alert at 80% of monthly credit budget
  • Use Zero Copy Clone for dev/staging environments — no storage duplication cost
  • Model materialization decisions: views for staging (zero compute on build), incremental for large facts (avoid full rebuild), ephemeral for simple CTEs
  • Run dbt run --select tag:daily nightly for heavy models; tag lighter models tag:hourly
↑ Back to top
55
Question 55 — Advanced Snowflake
Snowflake Streams & Tasks: Event-Driven Pipeline Automation

Real-World Scenario: Rather than scheduling dbt runs on a fixed clock, you want Snowflake to automatically trigger transformations the moment new data arrives from Qlik Replicate. Zero polling, zero wasted credits on empty runs.

Streams — Change Data Capture Inside Snowflake

-- A Stream is a change-tracking cursor on a table
-- It records every INSERT, UPDATE, DELETE since the stream was last consumed

CREATE OR REPLACE STREAM raw.customer_stream
    ON TABLE raw.customers
    SHOW_INITIAL_ROWS = FALSE;   -- FALSE = only new changes going forward

-- Stream metadata columns automatically added:
-- METADATA$ACTION      : INSERT | DELETE  (UPDATE = DELETE + INSERT pair)
-- METADATA$ISUPDATE    : TRUE for the INSERT half of an UPDATE
-- METADATA$ROW_ID      : Internal unique row identifier

-- Consume the stream — reconstruct latest state of each customer
SELECT
    customer_id,
    email,
    tier,
    METADATA$ACTION,
    METADATA$ISUPDATE
FROM raw.customer_stream
WHERE METADATA$ACTION = 'INSERT'    -- After dedup, INSERTs = final state
ORDER BY METADATA$ROW_ID;
-- IMPORTANT: A Stream offset advances ONLY when consumed inside a DML
-- transaction that commits successfully. If the Task fails, offset is preserved.
-- This gives you exactly-once-like semantics.

-- Check if a stream has unconsumed data (use in Task WHEN clause)
SELECT SYSTEM$STREAM_HAS_DATA('raw.customer_stream');

Tasks — Scheduled or Event-Driven Orchestration

-- Root Task: fires every 2 minutes ONLY when stream has data
CREATE OR REPLACE TASK etl.process_customers_root
    WAREHOUSE = 'TRANSFORM_WH'
    SCHEDULE  = '2 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('raw.customer_stream')
AS
MERGE INTO staging.customers AS tgt
USING (
    SELECT customer_id, email, tier,
           ROW_NUMBER() OVER (PARTITION BY customer_id
                              ORDER BY METADATA$ROW_ID DESC) AS rn
    FROM raw.customer_stream
    WHERE METADATA$ACTION = 'INSERT'
) AS src ON tgt.customer_id = src.customer_id AND src.rn = 1
WHEN MATCHED     THEN UPDATE SET tgt.email = src.email, tgt.tier = src.tier
WHEN NOT MATCHED THEN INSERT (customer_id, email, tier)
                      VALUES (src.customer_id, src.email, src.tier);

-- Child Task: fires AFTER root task completes (DAG chaining)
CREATE OR REPLACE TASK etl.refresh_customer_dim_task
    WAREHOUSE = 'TRANSFORM_WH'
    AFTER etl.process_customers_root
AS
-- Refresh the dimension table by calling a stored procedure or dbt
CALL etl.sp_refresh_dim_customers();

-- Activate the DAG (tasks are SUSPENDED by default)
ALTER TASK etl.refresh_customer_dim_task RESUME;
ALTER TASK etl.process_customers_root    RESUME;

Task DAG — Multi-Step Pipeline

-- Build a full task tree: ingest → stage → mart
--
--  process_customers_root (every 2 min, stream-gated)
--          └── stage_customers_task
--                  └── refresh_dim_customers_task
--                          └── refresh_fact_orders_task
--
-- Monitor the DAG
SELECT *
FROM TABLE(information_schema.task_history(
    scheduled_time_range_start => DATEADD('hour', -1, CURRENT_TIMESTAMP),
    task_name => 'PROCESS_CUSTOMERS_ROOT'
))
ORDER BY scheduled_time DESC;

Stream Types Comparison

Stream TypeWorks OnTracks DeletesBest For
StandardTablesYesCDC pipelines, mutable tables
Append-OnlyTablesNoEvent logs, immutable inserts
Insert-OnlyExternal / Directory TablesNoS3 file ingestion triggers

Gotchas & Best Practices

  • Stale stream — streams have a staleness limit equal to the table's DATA_RETENTION_TIME. If a stream is not consumed within that window, it becomes invalid. Set retention >= 14 days on critical source tables.
  • Multiple consumers — if two Tasks consume the same stream, they race. Create separate streams for separate consumers.
  • Zero-credit idle Tasks — a Task with a WHEN SYSTEM$STREAM_HAS_DATA() guard does NOT start a warehouse if the condition is false. Crucial for cost efficiency.
  • Serverless Tasks — use USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE instead of a named warehouse for Tasks that run <60 seconds; Snowflake auto-sizes and charges per-second.
↑ Back to top
56
Question 56 — Advanced dbt
dbt Macros, Jinja & Exposures — Reusable Patterns at Scale

Real-World Scenario: Your dbt project has grown to 200+ models. Developers are copy-pasting the same surrogate key logic, date spine generation, and column renaming patterns across dozens of files. Macros eliminate the duplication and make the whole project consistent.

Macro 1 — Reusable Surrogate Key (used across all dimension tables)

-- macros/generate_surrogate_key.sql
{% macro generate_surrogate_key(field_list) %}
    MD5(
        CONCAT_WS('||',
            {% for field in field_list %}
                COALESCE(CAST({{ field }} AS VARCHAR), '~~NULL~~')
                {% if not loop.last %}, {% endif %}
            {% endfor %}
        )
    )
{% endmacro %}

-- Usage in any dimension model
SELECT
    {{ generate_surrogate_key(['customer_id', 'source_system']) }} AS customer_sk,
    customer_id,
    email,
    tier
FROM {{ ref('stg_customers') }}

Macro 2 — Dynamic Column Renaming (staging standardisation)

-- macros/rename_columns.sql
-- Accepts a dict of {old_name: new_name} and generates SELECT list
{% macro rename_columns(relation, column_map) %}
    SELECT
        {% for old_col, new_col in column_map.items() %}
            {{ old_col }} AS {{ new_col }}{% if not loop.last %},{% endif %}
        {% endfor %}
    FROM {{ relation }}
{% endmacro %}

-- Usage
{{ rename_columns(
    source('oracle_erp', 'CUST_MAST'),
    {
        'CUST_ID':    'customer_id',
        'CUST_EMAIL': 'email',
        'CUST_TIER':  'subscription_tier',
        'UPD_DT':     'updated_at'
    }
) }}

Macro 3 — Generate Date Spine (for gap-filling time series)

-- macros/date_spine.sql
{% macro date_spine(start_date, end_date) %}
WITH RECURSIVE date_series AS (
    SELECT {{ start_date }}::DATE AS d
    UNION ALL
    SELECT DATEADD('day', 1, d)
    FROM date_series
    WHERE d < {{ end_date }}::DATE
)
SELECT d AS date_day FROM date_series
{% endmacro %}

-- Usage: fill gaps in daily sales (days with zero orders still appear)
SELECT
    ds.date_day,
    COALESCE(s.total_revenue, 0) AS total_revenue
FROM ( {{ date_spine("'2023-01-01'", "CURRENT_DATE") }} ) ds
LEFT JOIN {{ ref('fact_daily_sales') }} s ON s.sale_date = ds.date_day

Macro 4 — Union All Sources (multi-region same schema)

-- macros/union_relations.sql
{% macro union_relations(relations) %}
    {% for relation in relations %}
        SELECT *, '{{ relation.identifier }}' AS source_table
        FROM {{ relation }}
        {% if not loop.last %} UNION ALL {% endif %}
    {% endfor %}
{% endmacro %}

-- Combine orders from US, EU, APAC schemas into one model
{{ union_relations([
    source('us_store',   'orders'),
    source('eu_store',   'orders'),
    source('apac_store', 'orders')
]) }}

Jinja Control Flow — Environment-Aware Models

-- Behave differently in dev vs prod using var() and target
{{ config(
    materialized = 'table' if target.name == 'prod' else 'view',
    post_hook    = "GRANT SELECT ON {{ this }} TO ROLE ANALYST_ROLE"
                   if target.name == 'prod' else ""
) }}

-- Limit rows in dev to speed up iteration
SELECT * FROM {{ ref('stg_orders') }}
{% if target.name != 'prod' %}
    LIMIT 10000
{% endif %}

Exposures — Document BI Consumption of dbt Models

# models/exposures.yml
# Exposures document WHERE dbt models are consumed (BI, APIs, etc.)
# Enables lineage: source → dbt model → Tableau dashboard

exposures:
  - name: executive_revenue_dashboard
    type: dashboard
    maturity: high          # high | medium | low
    url: https://tableau.company.com/views/ExecutiveRevenue
    description: "C-suite daily revenue KPIs sourced from dbt marts"
    owner:
      name: Analytics Engineering Team
      email: vsurya@duck.com
    depends_on:
      - ref('fact_daily_revenue')
      - ref('dim_customers')
      - ref('dim_products')

  - name: stripe_reconciliation_report
    type: analysis
    maturity: medium
    description: "Finance team daily Stripe vs warehouse reconciliation"
    owner:
      name: Finance Team
    depends_on:
      - ref('fact_orders')
      - ref('stg_stripe_charges')

Why Exposures Matter at Architect Level:

  • Run dbt ls --select +exposure:executive_revenue_dashboard to see every model that feeds a dashboard — instant blast radius analysis before a refactor
  • Exposures appear in the dbt DAG UI — stakeholders can trace data from Tableau all the way back to the Oracle source table
  • Used in Slim CI: only re-test models that feed changed exposures
↑ Back to top
57
Question 57 — Data Modelling
Dimensional Modelling at Scale: Star Schema, Surrogate Keys, Conformed Dimensions & Bus Matrix

Real-World Scenario: You are building the analytics data model for a B2B SaaS company with three subject areas — Sales, Finance, and Operations. BI teams in each area need to slice and dice independently, but the CEO needs a single cross-functional dashboard. You must design a model that works for both.

Star Schema Design — fact_orders

-- FACT TABLE: fact_orders (grain = one row per order line item)
CREATE TABLE marts.fact_orders (
    -- Surrogate keys (join to dims)
    order_sk        VARCHAR NOT NULL,   -- MD5 of order_id + source
    customer_sk     VARCHAR NOT NULL,
    product_sk      VARCHAR NOT NULL,
    date_sk         INT     NOT NULL,   -- YYYYMMDD integer for fast joins

    -- Degenerate dimensions (no dim table needed)
    order_id        INT     NOT NULL,
    order_line_no   INT     NOT NULL,

    -- Measures (always additive unless noted)
    quantity        INT,
    unit_price      DECIMAL(10,2),
    gross_revenue   DECIMAL(12,2),    -- additive
    discount_amount DECIMAL(10,2),    -- additive
    net_revenue     DECIMAL(12,2),    -- additive (gross - discount)
    cogs            DECIMAL(12,2),    -- additive
    gross_margin    DECIMAL(12,2),    -- additive (net_revenue - cogs)

    -- Audit
    _dbt_inserted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
CLUSTER BY (date_sk);                  -- Snowflake: prune by date on BI queries
-- DIMENSION: dim_customers  (SCD Type 2 via dbt snapshot)
CREATE TABLE marts.dim_customers (
    customer_sk      VARCHAR PRIMARY KEY,  -- surrogate key
    customer_id      INT     NOT NULL,     -- natural key (from source)
    customer_name    VARCHAR,
    email            VARCHAR,
    region           VARCHAR,
    segment          VARCHAR,   -- SMB | MID_MARKET | ENTERPRISE
    account_manager  VARCHAR,
    -- SCD2 columns
    valid_from       DATE    NOT NULL,
    valid_to         DATE,                -- NULL = current record
    is_current       BOOLEAN NOT NULL DEFAULT TRUE
);

-- Always join on both SK AND is_current=TRUE for current state
-- Join on SK alone for historical point-in-time analysis

Conformed Dimensions — Shared Across Subject Areas

-- A conformed dimension is defined ONCE and reused across all fact tables
-- dim_customers is conformed: used by fact_orders, fact_invoices, fact_support_tickets

-- Bus Matrix: shows which dims each fact uses (architect planning tool)
--
--              | dim_customers | dim_products | dim_date | dim_region | dim_sales_rep
-- -------------|---------------|--------------|----------|------------|---------------
-- fact_orders  |      ✓        |      ✓       |    ✓     |     ✓      |      ✓
-- fact_invoices|      ✓        |              |    ✓     |     ✓      |
-- fact_support |      ✓        |              |    ✓     |            |
-- fact_inventory|              |      ✓       |    ✓     |     ✓      |
--
-- If dim_customers changes schema, ALL fact joins are affected → high blast radius
-- This is why conformed dims must be tightly governed and version-controlled in dbt

Surrogate Key Generation in dbt

-- models/marts/dim_customers.sql
-- Generate a stable, source-agnostic surrogate key
SELECT
    {{ generate_surrogate_key(['customer_id', 'source_system']) }} AS customer_sk,
    customer_id,
    source_system,
    customer_name,
    email,
    region,
    segment,
    valid_from,
    valid_to,
    is_current
FROM {{ ref('snap_customers') }}  -- dbt snapshot provides SCD2 history

Date Dimension — Critical for BI Performance

-- seeds/dim_date.csv approach OR generate with macro
-- dim_date has one row per calendar day with every attribute BI needs
-- Never compute DATEPART() in BI queries — pre-compute in dim_date

SELECT
    TO_NUMBER(TO_CHAR(d, 'YYYYMMDD'))           AS date_sk,  -- INT join key
    d                                            AS full_date,
    EXTRACT(YEAR FROM d)                         AS year,
    EXTRACT(QUARTER FROM d)                      AS quarter,
    EXTRACT(MONTH FROM d)                        AS month,
    MONTHNAME(d)                                 AS month_name,
    EXTRACT(WEEK FROM d)                         AS week_of_year,
    DAYOFWEEK(d)                                 AS day_of_week,
    DAYNAME(d)                                   AS day_name,
    IFF(DAYOFWEEK(d) IN (1,7), FALSE, TRUE)      AS is_weekday,
    IFF(d = LAST_DAY(d), TRUE, FALSE)            AS is_month_end,
    -- Fiscal year (example: FY starts April 1)
    IFF(MONTH(d) >= 4, YEAR(d), YEAR(d)-1)      AS fiscal_year,
    IFF(MONTH(d) >= 4,
        CEIL((MONTH(d)-3)/3.0),
        CEIL((MONTH(d)+9)/3.0))                  AS fiscal_quarter
FROM ( {{ date_spine("'2018-01-01'", "DATEADD('year',3,CURRENT_DATE)") }} )

Common Modelling Anti-Patterns to Avoid

Anti-PatternProblemFix
Natural keys as FK in factsBreaks historical joins when source key recycledAlways use surrogate keys
Snowflake schema (over-normalised)Too many joins, slow BI queriesDenormalise into conformed dims
Semi-additive measures as additiveWrong SUM on account balancesDocument grain; use AVG or last-period for semi-additive
No SCD on dimensionsCustomer region changes corrupt historical revenue by regionSCD2 via dbt snapshot
One giant fact tableMixed grain — impossible to aggregate correctlyOne fact per grain; separate fact tables for header vs line
↑ Back to top
58
Question 58 — Cloud Architecture
AWS + Snowflake Integration: S3 Stages, IAM, Snowpipe & Event-Driven Ingestion

Real-World Scenario: A third-party logistics partner drops daily shipment files (Parquet) into an S3 bucket. You need these files auto-ingested into Snowflake within minutes of arrival — without polling, without a scheduler, and with full security via IAM roles (no hardcoded credentials).

Step 1: IAM Role-Based Trust (no credentials in Snowflake)

-- Step 1a: Create storage integration (Snowflake side)
CREATE OR REPLACE STORAGE INTEGRATION s3_logistics_int
    TYPE                      = EXTERNAL_STAGE
    STORAGE_PROVIDER          = 'S3'
    ENABLED                   = TRUE
    STORAGE_AWS_ROLE_ARN      = 'arn:aws:iam::123456789:role/snowflake-s3-role'
    STORAGE_ALLOWED_LOCATIONS = ('s3://logistics-drops/shipments/');

-- Retrieve the Snowflake-generated IAM principal to add to AWS trust policy
DESC INTEGRATION s3_logistics_int;
-- Copy: STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID
# Step 1b: AWS IAM Trust Policy (add Snowflake as trusted principal)
{
  "Version": "2012-10-17",
  "Statement": [{
    "Effect": "Allow",
    "Principal": {
      "AWS": "arn:aws:iam::SNOWFLAKE_ACCOUNT:root"
    },
    "Action": "sts:AssumeRole",
    "Condition": {
      "StringEquals": {
        "sts:ExternalId": "SNOWFLAKE_EXTERNAL_ID_FROM_DESC"
      }
    }
  }]
}

# IAM Role Permission Policy (attached to the role)
{
  "Statement": [{
    "Effect": "Allow",
    "Action": ["s3:GetObject","s3:GetObjectVersion","s3:ListBucket"],
    "Resource": [
      "arn:aws:s3:::logistics-drops",
      "arn:aws:s3:::logistics-drops/shipments/*"
    ]
  }]
}

Step 2: External Stage + File Format

-- Create the external stage pointing to S3 via integration (no keys!)
CREATE OR REPLACE STAGE raw.logistics_stage
    URL                = 's3://logistics-drops/shipments/'
    STORAGE_INTEGRATION = s3_logistics_int
    FILE_FORMAT = (
        TYPE                   = 'PARQUET'
        SNAPPY_COMPRESSION     = TRUE
        NULL_IF                = ('NULL', 'null', '')
        MATCH_BY_COLUMN_NAME   = CASE_INSENSITIVE
    );

-- Inspect files in the stage
LIST @raw.logistics_stage;

-- Query Parquet directly without loading (schema-on-read)
SELECT $1:shipment_id::INT, $1:carrier::VARCHAR, $1:delivered_at::TIMESTAMP
FROM @raw.logistics_stage (FILE_FORMAT => 'PARQUET_FMT')
LIMIT 100;

Step 3: Snowpipe — Auto-Ingest on S3 PUT Event

-- Create the target landing table
CREATE TABLE raw.shipments_landing (
    shipment_id   INT,
    order_id      INT,
    carrier       VARCHAR,
    status        VARCHAR,
    delivered_at  TIMESTAMP,
    _file_name    VARCHAR DEFAULT METADATA$FILENAME,
    _loaded_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create Snowpipe with AUTO_INGEST
CREATE OR REPLACE PIPE raw.shipments_pipe
    AUTO_INGEST = TRUE    -- S3 SQS event triggers this pipe
AS
COPY INTO raw.shipments_landing
FROM @raw.logistics_stage
FILE_FORMAT = (TYPE = 'PARQUET' MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE)
ON_ERROR = 'CONTINUE'
PURGE = FALSE;            -- Keep files in S3 for audit / replay

-- Get the SQS ARN to configure S3 Event Notification
SHOW PIPES LIKE 'shipments_pipe';
-- Copy: notification_channel → configure as S3 bucket notification
# S3 Event Notification (AWS Console or Terraform)
resource "aws_s3_bucket_notification" "snowpipe_trigger" {
  bucket = "logistics-drops"
  queue {
    queue_arn     = "<notification_channel from SHOW PIPES>"
    events        = ["s3:ObjectCreated:*"]
    filter_prefix = "shipments/"
    filter_suffix = ".parquet"
  }
}

Step 4: Monitor & Validate the Pipeline

-- Check Snowpipe ingestion history
SELECT system$pipe_status('raw.shipments_pipe');

SELECT *
FROM TABLE(information_schema.copy_history(
    TABLE_NAME   => 'SHIPMENTS_LANDING',
    START_TIME   => DATEADD('hour', -6, CURRENT_TIMESTAMP)
))
ORDER BY last_load_time DESC;

-- Validate row counts after each file
SELECT _file_name, COUNT(*) AS rows_loaded, MAX(_loaded_at) AS loaded_at
FROM raw.shipments_landing
GROUP BY 1
ORDER BY 3 DESC;

End-to-End Latency: < 3 minutes from S3 PUT to queryable in Snowflake

  • S3 PUT → SQS notification: ~1 sec
  • SQS → Snowpipe trigger: ~10–30 sec (Snowpipe polls SQS every 30s)
  • COPY INTO execution: 30–90 sec depending on file size and warehouse
  • Stream + Task fires downstream transformation within next schedule window
↑ Back to top
59
Question 59 — DevOps / dbt
dbt CI/CD with Slim CI, State Comparison & Git-Based Deployment

Real-World Scenario: Your dbt project has 300 models. Every PR currently runs the full project in CI — taking 45 minutes and costing $15 in credits per PR. With Slim CI, you only run models that changed in the PR and their downstream dependents. CI drops to 5 minutes and $2.

How Slim CI Works

-- dbt generates a manifest.json after every production run
-- manifest.json = compiled graph of all models, their SQL hashes, dependencies
--
-- On a PR branch, dbt compares:
--   branch manifest  (current PR state)
--   vs
--   production manifest  (last successful prod run — "deferred state")
--
-- state:modified+  selects ONLY:
--   - Models whose SQL/config changed in this PR
--   - ALL downstream models of those changed models (+)

dbt Cloud CI Job Setup

# dbt Cloud CI Job configuration
Job Name: "Slim CI — PR Check"
Trigger: Pull Request (dbt Cloud native integration with GitHub/GitLab)
Environment: CI  (separate Snowflake schema: dbt_ci_<PR_number>)
Deferral: Production environment  (uses prod manifest.json)

Commands:
  - dbt build       --select state:modified+       --defer       --state ./prod-artifacts       --target ci
  # dbt build = dbt run + dbt test in one command

# What --defer does:
# For any model NOT in state:modified+, dbt resolves {{ ref('dim_customers') }}
# to the PRODUCTION table instead of trying to build it in CI schema
# Result: CI only builds changed models; unchanged upstream deps resolve from prod

GitHub Actions Pipeline (self-hosted dbt Core)

# .github/workflows/dbt_ci.yml
name: dbt Slim CI

on:
  pull_request:
    branches: [main]
    paths: ['models/**', 'macros/**', 'tests/**', 'dbt_project.yml']

jobs:
  slim_ci:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Install dbt
        run: pip install dbt-snowflake==1.8.0

      - name: Download prod manifest
        env:
          AWS_ACCESS_KEY_ID:     ${{ secrets.AWS_KEY }}
          AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET }}
        run: aws s3 cp s3://dbt-artifacts/prod/manifest.json ./prod-artifacts/manifest.json

      - name: dbt deps
        run: dbt deps

      - name: dbt Slim CI build
        env:
          SNOWFLAKE_ACCOUNT:   ${{ secrets.SNOWFLAKE_ACCOUNT }}
          SNOWFLAKE_USER:      ${{ secrets.SNOWFLAKE_USER }}
          SNOWFLAKE_PASSWORD:  ${{ secrets.SNOWFLAKE_PASSWORD }}
          SNOWFLAKE_ROLE:      CI_ROLE
          DBT_TARGET_SCHEMA:   ci_pr_${{ github.event.number }}
        run: |
          dbt build             --select state:modified+             --defer             --state ./prod-artifacts             --target ci             --profiles-dir .

      - name: Upload updated manifest as artifact
        if: github.ref == 'refs/heads/main'
        run: aws s3 cp ./target/manifest.json s3://dbt-artifacts/prod/manifest.json

Production Deployment Job

# dbt Cloud Production Job
Job Name: "Production — Hourly"
Schedule: Every hour (or triggered by Airflow / Orchestrator)
Environment: Production
Commands:
  - dbt source freshness --select source:oracle_erp source:stripe
  - dbt run  --select tag:hourly --target prod
  - dbt test --select tag:hourly --store-failures
  - dbt run  --select tag:daily  --target prod  # if midnight run
  - dbt docs generate                           # refresh lineage docs

# Post-run: upload manifest.json to S3 for next CI run
# Failure notification: Slack #data-alerts channel via webhook

Branch Strategy

# Git branching for dbt:
# main    → production (auto-deploy on merge)
# dev     → development environment (team integration testing)
# feat/*  → feature branches (CI runs on PR to main)
#
# Environments map to Snowflake schemas:
# main branch  → ANALYTICS schema  (prod)
# dev branch   → ANALYTICS_DEV schema
# CI PR run    → ANALYTICS_CI_123 schema  (auto-dropped after PR closes)
#
# profiles.yml uses env vars so same code runs in all environments:
snowflake_profile:
  target: "{{ env_var('DBT_TARGET', 'dev') }}"
  outputs:
    prod:
      type: snowflake
      schema: "{{ env_var('DBT_SCHEMA', 'analytics') }}"
      role:   "{{ env_var('SNOWFLAKE_ROLE', 'TRANSFORM_ROLE') }}"
↑ Back to top
60
Question 60 — Snowflake Performance
Snowflake Performance Tuning: Clustering, Query Pruning, Caching & Profiling

Real-World Scenario: A Tableau dashboard querying fact_orders (800M rows) takes 45 seconds. The SLA is under 5 seconds. You have a $5,000/month Snowflake bill and need to cut it by 30% without impacting query performance.

Step 1: Profile the Slow Query

-- Find expensive queries in the last 7 days
SELECT
    query_id,
    query_text,
    total_elapsed_time / 1000                                 AS elapsed_sec,
    bytes_scanned / POWER(1024, 3)                            AS gb_scanned,
    partitions_scanned,
    partitions_total,
    ROUND(partitions_scanned / partitions_total * 100, 1)     AS pct_partitions_scanned,
    credits_used_cloud_services
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE - 7
  AND execution_status = 'SUCCESS'
ORDER BY total_elapsed_time DESC
LIMIT 20;

-- If pct_partitions_scanned is near 100%: clustering will help
-- If bytes_scanned is high but rows_produced is low: pruning is failing

Step 2: Clustering Keys — Micro-Partition Pruning

-- Snowflake automatically clusters data on ingest into micro-partitions (~100MB each)
-- But if queries always filter by ORDER_DATE and your data arrived randomly,
-- Snowflake scans ALL partitions instead of just the relevant date range.

-- Check current clustering quality
SELECT SYSTEM$CLUSTERING_INFORMATION('MARTS.FACT_ORDERS', '(order_date)');
-- Returns average_overlaps: high number = poor clustering = many partitions scanned

-- Add a clustering key on the most common BI filter column
ALTER TABLE marts.fact_orders
    CLUSTER BY (order_date);   -- Automatic Background Clustering (ABC) kicks in

-- For compound filters: cluster on the most selective column first
ALTER TABLE marts.fact_orders
    CLUSTER BY (order_date, region);

-- Monitor clustering progress (may take hours for large tables)
SELECT SYSTEM$CLUSTERING_INFORMATION('MARTS.FACT_ORDERS', '(ORDER_DATE, REGION)');

Step 3: Understand Snowflake's 3-Layer Cache

-- Layer 1: Metadata Cache (FREE — no warehouse needed)
-- Stores table metadata, row counts, min/max per partition
-- Enables partition pruning WITHOUT scanning data
-- Always-on; no configuration needed

-- Layer 2: Result Cache (FREE — no warehouse needed)
-- Stores exact query results for 24 hours
-- If same query runs again with same data, returns instantly
-- Invalidated when underlying table changes
SELECT COUNT(*) FROM fact_orders WHERE order_date = '2024-01-15';
-- First run: 12 seconds. Second run: 4 milliseconds (result cache hit)

-- Layer 3: Local Disk Cache (per warehouse, virtual warehouse memory + SSD)
-- Caches raw micro-partition data on warehouse nodes
-- Subsequent queries scanning SAME data are much faster
-- Destroyed when warehouse suspends — key reason to NOT auto-suspend too aggressively
-- For frequently-queried marts: set AUTO_SUSPEND = 600 (10 min) not 60 sec

-- Check cache usage in query profile
SELECT query_id,
       bytes_scanned,
       bytes_written_to_result,
       percentage_scanned_from_cache  -- > 80% = excellent cache hit rate
FROM snowflake.account_usage.query_history
WHERE query_id = '<your_query_id>';

Step 4: Search Optimization Service (for selective point lookups)

-- For queries like: WHERE customer_id = 12345 (selective, non-clustering-key columns)
-- Clustering won't help here; Search Optimization will

ALTER TABLE marts.fact_orders
    ADD SEARCH OPTIMIZATION ON EQUALITY(customer_id, order_id);

-- Check cost/benefit before enabling (SOS has storage overhead)
SELECT SYSTEM$ESTIMATE_SEARCH_OPTIMIZATION_COSTS('MARTS.FACT_ORDERS',
                                                  'EQUALITY(customer_id)');
-- Returns estimated_query_credits_saved and search_access_cost

Step 5: Warehouse Right-Sizing

-- Find the optimal warehouse size by running the same query at different sizes
-- Cost doubles per size tier, but runtime often halves → sweet spot exists

-- Rule of thumb:
-- X-SMALL / SMALL  : simple queries, <10M rows
-- MEDIUM           : standard dbt transforms, 10M–500M rows
-- LARGE            : complex joins, window functions on 500M+ rows
-- X-LARGE+         : data loads, heavy aggregations, initial full-refresh

-- For concurrent BI users — use Multi-Cluster warehouses
ALTER WAREHOUSE ANALYTICS_WH SET
    MAX_CLUSTER_COUNT = 4          -- Scale out horizontally for concurrency
    MIN_CLUSTER_COUNT = 1
    SCALING_POLICY    = 'STANDARD'; -- Aggressive scale-out for BI latency

Step 6: Query Rewrite Patterns

-- BAD: Function on indexed column destroys pruning
SELECT * FROM fact_orders WHERE YEAR(order_date) = 2024;

-- GOOD: Range filter preserves micro-partition pruning
SELECT * FROM fact_orders
WHERE order_date BETWEEN '2024-01-01' AND '2024-12-31';

-- BAD: SELECT * from a 200-column wide table in a BI tool
SELECT * FROM fact_orders WHERE order_date = '2024-01-15';

-- GOOD: Only fetch columns the BI tool needs (columnar storage wins here)
SELECT order_id, customer_id, net_revenue FROM fact_orders
WHERE order_date = '2024-01-15';

-- BAD: DISTINCT on massive table (often a modelling symptom)
SELECT DISTINCT customer_id FROM fact_orders;

-- GOOD: Push deduplication upstream in dbt staging layer
↑ Back to top
61
Question 61 — Architecture
Data Quality Framework Design: dbt Tests, Freshness, Great Expectations & Observability

Real-World Scenario: After a silent data quality failure corrupted the monthly revenue report — which went undetected for 3 days — you are asked to design a comprehensive data quality framework that catches issues at every layer before they reach BI consumers.

Quality Framework Architecture — 4 Layers of Defence

Layer 1: Source Freshness Checks
    ↓  (fail fast if upstream is stale)
Layer 2: Schema Contract Tests (dbt built-in + custom generic)
    ↓  (fail if data shape changes)
Layer 3: Business Rule Validation (singular SQL tests + dbt-expectations)
    ↓  (fail if data violates domain logic)
Layer 4: Observability & Anomaly Detection (Monte Carlo / Bigeye / custom SQL)
    ↓  (alert on statistical drift — row counts, null rates, distribution shifts)

Layer 1: Source Freshness Monitoring

# sources.yml — enforce data arrival SLAs
sources:
  - name: oracle_erp
    database: RAW
    schema: ORACLE_LANDING
    freshness:
      warn_after:  { count: 1,  period: hour }
      error_after: { count: 4,  period: hour }
    loaded_at_field: header__timestamp

    tables:
      - name: orders
        freshness:
          warn_after:  { count: 30, period: minute }  # orders are critical
          error_after: { count: 90, period: minute }
      - name: inventory
        freshness:
          warn_after:  { count: 4,  period: hour }

# Run freshness check before any transformation
# dbt source freshness  → returns WARN or ERROR before wasting compute

Layer 2: Schema & Integrity Tests

# Critical tests that must NEVER fail (severity: error → blocks pipeline)
models:
  - name: fact_orders
    tests:
      - dbt_utils.equal_rowcount:           # Rowcount matches staging
          compare_model: ref('stg_orders')
      - dbt_utils.recency:                  # Table has rows from last 2 hours
          datepart: hour
          field: order_date
          interval: 2
    columns:
      - name: order_sk
        tests: [not_null, unique]
      - name: net_revenue
        tests:
          - not_null
          - not_negative             # Custom macro
          - dbt_expectations.expect_column_values_to_be_between:
              min_value: 0
              max_value: 1000000     # Max plausible order value
      - name: customer_sk
        tests:
          - relationships:
              to: ref('dim_customers')
              field: customer_sk
              severity: warn         # Orphans warn but don't block

Layer 3: Business Rule Singular Tests

-- tests/assert_revenue_reconciliation.sql
-- Revenue in Snowflake must match Stripe control total within 0.01%
-- This runs daily as part of the production dbt job
WITH snowflake_total AS (
    SELECT SUM(net_revenue) AS total
    FROM {{ ref('fact_orders') }}
    WHERE order_date = CURRENT_DATE - 1
      AND payment_gateway = 'STRIPE'
),
stripe_control AS (
    SELECT expected_revenue AS total
    FROM {{ ref('stripe_daily_reconciliation') }}
    WHERE report_date = CURRENT_DATE - 1
)
SELECT
    s.total   AS snowflake_revenue,
    c.total   AS stripe_control_total,
    ABS(s.total - c.total) / NULLIF(c.total, 0) * 100 AS variance_pct
FROM snowflake_total s, stripe_control c
WHERE ABS(s.total - c.total) / NULLIF(c.total, 0) > 0.0001   -- Fail if >0.01% off
-- tests/assert_no_duplicate_orders.sql
-- Catch if Qlik double-delivers an event (CDC dedup failure)
SELECT order_id, COUNT(*) AS cnt
FROM {{ ref('fact_orders') }}
WHERE order_date >= CURRENT_DATE - 3
GROUP BY order_id
HAVING cnt > 1

Layer 4: Statistical Anomaly Detection (custom SQL)

-- Store daily metrics and alert on Z-score anomalies
-- models/monitoring/mon_daily_row_counts.sql
{{ config(materialized='incremental', unique_key='model_name||check_date') }}

SELECT
    '{{ this.name }}'                        AS model_name,
    CURRENT_DATE                             AS check_date,
    COUNT(*)                                 AS row_count,
    SUM(net_revenue)                         AS total_revenue,
    COUNT(DISTINCT customer_sk)              AS unique_customers
FROM {{ ref('fact_orders') }}
WHERE order_date = CURRENT_DATE - 1
-- Anomaly alert: flag if today's row count deviates >3 sigma from 30-day avg
WITH stats AS (
    SELECT
        AVG(row_count)    AS mean_rows,
        STDDEV(row_count) AS stddev_rows
    FROM mon_daily_row_counts
    WHERE check_date BETWEEN CURRENT_DATE - 31 AND CURRENT_DATE - 1
)
SELECT
    t.check_date,
    t.row_count,
    s.mean_rows,
    (t.row_count - s.mean_rows) / NULLIF(s.stddev_rows, 0) AS z_score
FROM mon_daily_row_counts t, stats s
WHERE t.check_date = CURRENT_DATE - 1
  AND ABS((t.row_count - s.mean_rows) / NULLIF(s.stddev_rows, 0)) > 3;

Test Execution Strategy

WhenWhat RunsFailure Action
Every PR (CI)Tests on modified+ models onlyBlock merge
Every hourly runSource freshness + tag:hourly testsSlack alert, pause pipeline
Daily 06:00Full test suite + reconciliation testsPagerDuty, incident ticket
WeeklyStatistical anomaly queries (Z-score)Email data team lead
↑ Back to top
62
Question 62 — Senior Architect
Design a Multi-Source Lakehouse: Redshift-to-Snowflake Migration + Modern Stack

Real-World Scenario: Your client is migrating from a Redshift-based monolith (3TB, 800 tables, 50 reports) to a modern Snowflake + dbt + S3 Lakehouse. As the lead data engineer, you must design the migration, preserve data history, and ensure zero BI downtime.

Migration Architecture

PHASE 1 — ASSESSMENT (Week 1–2)
  - Inventory all 800 Redshift tables: size, row count, query frequency
  - Classify: Hot (queried daily), Warm (weekly), Cold (archival only)
  - Identify: Redshift-specific SQL (LISTAGG, DECODE, distribution keys)
  - Map: which tables feed which reports (use Redshift STL_QUERY logs)

PHASE 2 — FOUNDATION (Week 2–4)
  - Provision Snowflake: account, databases, warehouses, roles, RBAC
  - Set up S3 as the neutral landing zone (both Redshift and Snowflake read it)
  - Deploy dbt Cloud + connect to Snowflake CI/CD
  - Establish: RAW → STAGING → MARTS layer structure

PHASE 3 — PARALLEL RUN (Week 4–8)
  - Run BOTH Redshift and Snowflake in parallel
  - Validate: row counts, SUM of key metrics, NULL rates match daily
  - BI tools connected to BOTH; analysts validate dashboard numbers
  - Fix discrepancies in dbt models before cutover

PHASE 4 — CUTOVER (Week 8–10)
  - Freeze Redshift writes (maintenance window, e.g. Friday 10 PM)
  - Final delta load: CDC all changes since last full load
  - Switch BI tool data source connections to Snowflake
  - Monitor for 48 hours; rollback plan = flip BI connections back to Redshift

PHASE 5 — DECOMMISSION (Week 12+)
  - Redshift cluster to read-only for 30 days (safety net)
  - Migrate cold/archival data to S3 Parquet (Snowflake external tables)
  - Terminate Redshift cluster

Data Migration: Redshift → S3 → Snowflake

-- Step 1: Unload hot tables from Redshift to S3 in Parquet
UNLOAD ('SELECT * FROM sales.fact_orders')
TO 's3://migration-bucket/fact_orders/'
IAM_ROLE 'arn:aws:iam::123456789:role/redshift-s3-role'
FORMAT AS PARQUET
PARTITION BY (order_date)
MAXFILESIZE 256 MB;

-- Step 2: Load from S3 into Snowflake (external stage → COPY INTO)
CREATE STAGE migration.redshift_stage
    URL = 's3://migration-bucket/'
    STORAGE_INTEGRATION = s3_migration_int;

COPY INTO raw.fact_orders_migration
FROM @migration.redshift_stage/fact_orders/
FILE_FORMAT = (TYPE = 'PARQUET' MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE)
PATTERN = '.*\.parquet'
ON_ERROR = 'CONTINUE';

-- Step 3: Validate row counts match
SELECT
    'REDSHIFT' AS source, COUNT(*) AS rows FROM redshift_control_table
UNION ALL
SELECT
    'SNOWFLAKE' AS source, COUNT(*) AS rows FROM raw.fact_orders_migration;

SQL Compatibility: Redshift → Snowflake Translation Guide

Redshift SQLSnowflake EquivalentNotes
LISTAGG(col, ',')LISTAGG(col, ',')Same syntax ✓
DECODE(col, v1, r1, v2, r2)IFF / CASE WHENDECODE not in Snowflake
GETDATE()CURRENT_TIMESTAMPRename
DATEDIFF('day', a, b)DATEDIFF('day', a, b)Same ✓
DATEPART(month, d)DATE_PART('month', d)String argument
NVL(a, b)COALESCE(a, b) / IFF(a IS NULL, b, a)Use COALESCE
DISTKEY / SORTKEYCLUSTER BYDifferent mechanism
VARCHAR(MAX)VARCHAR (up to 16MB)Remove MAX

Modern Lakehouse Stack Post-Migration

┌─────────────────────────────────────────────────────────────────┐
│                       INGESTION LAYER                           │
│  Qlik Replicate (CDC)  │  Fivetran (SaaS)  │  Snowpipe (Files) │
└───────────────────────────────┬─────────────────────────────────┘
                                │
                    ┌───────────▼──────────┐
                    │    S3 Data Lake       │  Raw Parquet / JSON
                    │  (source of truth,    │  Partitioned by date
                    │   infinite retention) │  Lifecycle: 7yr Glacier
                    └───────────┬──────────┘
                                │  COPY INTO / External Tables
                    ┌───────────▼──────────┐
                    │  Snowflake (Compute)  │  Zero-copy over S3 data
                    │  RAW → STG → MARTS   │  via Iceberg / Ext Tables
                    └───────────┬──────────┘
                                │  dbt Cloud (Transform)
                    ┌───────────▼──────────┐
                    │    Semantic Layer     │  dbt metrics / Looker LookML
                    └───────────┬──────────┘
                                │
                ┌───────────────┼──────────────────┐
              Tableau        Power BI           Self-Service
           (Operational)   (Executive)        (Mode / Hex)

RBAC Design (Role-Based Access Control)

-- Principle of least privilege — no direct user-to-table grants
-- All access via roles; roles assigned to users

CREATE ROLE ANALYST_ROLE;           -- SELECT on marts only
CREATE ROLE ENGINEER_ROLE;          -- SELECT + INSERT on staging + marts
CREATE ROLE TRANSFORM_ROLE;         -- Used by dbt service account
CREATE ROLE INGEST_ROLE;            -- Used by Qlik / Fivetran service accounts
CREATE ROLE ADMIN_ROLE;             -- Full DDL + GRANT

-- dbt service account permissions
GRANT ROLE TRANSFORM_ROLE TO USER dbt_service_account;
GRANT USAGE ON WAREHOUSE TRANSFORM_WH TO ROLE TRANSFORM_ROLE;
GRANT USAGE, CREATE SCHEMA ON DATABASE ANALYTICS TO ROLE TRANSFORM_ROLE;
GRANT SELECT ON ALL TABLES IN SCHEMA RAW TO ROLE TRANSFORM_ROLE;
GRANT CREATE TABLE, INSERT, UPDATE, DELETE
    ON FUTURE TABLES IN SCHEMA ANALYTICS.STAGING TO ROLE TRANSFORM_ROLE;

-- Analysts get Secure Views over marts (never raw data)
GRANT SELECT ON ALL VIEWS IN SCHEMA ANALYTICS.MARTS TO ROLE ANALYST_ROLE;
-- Column-level masking for PII
CREATE MASKING POLICY email_mask AS (val VARCHAR) RETURNS VARCHAR ->
    CASE WHEN CURRENT_ROLE() IN ('ADMIN_ROLE','ENGINEER_ROLE') THEN val
         ELSE '***@***.***'
    END;
ALTER TABLE marts.dim_customers
    MODIFY COLUMN email SET MASKING POLICY email_mask;
↑ Back to top
63
Question 63 — Orchestration
Apache Airflow: DAG Design, Sensors, XComs & Production Patterns

Real-World Scenario: Your daily pipeline must: wait for an S3 file from a partner to arrive (unknown time), then ingest it into Snowflake, run dbt, and only email the finance team when all models pass tests. If any step fails, retry twice and alert Slack. You need Airflow — not just a cron job.

Core Airflow Concepts

# DAG = Directed Acyclic Graph of tasks
# Operator = a single unit of work (SQL, Python, bash, sensor, etc.)
# Sensor = special operator that polls until a condition is true
# XCom = cross-task communication (pass values between tasks)
# TaskGroup = visual grouping of related tasks in the UI
# Connection = stored credential (Snowflake, AWS, dbt Cloud, etc.)

Production DAG: S3 Arrival → Ingest → dbt → Alert

# dags/daily_finance_pipeline.py
from airflow import DAG
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta

SNOWFLAKE_CONN = 'snowflake_prod'
S3_BUCKET      = 'finance-drops'

def check_row_count(**context):
    """ShortCircuit: only proceed if ingestion loaded rows"""
    row_count = context['ti'].xcom_pull(task_ids='ingest.copy_into_snowflake')
    return row_count > 0     # False = skip all downstream tasks

with DAG(
    dag_id            = 'daily_finance_pipeline',
    schedule_interval = '0 6 * * 1-5',   # 6 AM UTC, weekdays only
    start_date        = datetime(2024, 1, 1),
    catchup           = False,
    default_args = {
        'retries':        2,
        'retry_delay':    timedelta(minutes=10),
        'on_failure_callback': lambda ctx: SlackWebhookOperator(
            task_id='slack_fail',
            slack_webhook_conn_id='slack_data_alerts',
            message=f":red_circle: DAG {ctx['dag'].dag_id} failed on {ctx['ds']}"
        ).execute(ctx),
    },
    tags = ['finance', 'snowflake', 'dbt'],
) as dag:

    # 1. Wait for partner file (polls every 60s, times out after 4 hours)
    wait_for_file = S3KeySensor(
        task_id         = 'wait_for_s3_file',
        bucket_name     = S3_BUCKET,
        bucket_key      = 'finance/daily_report_{{ ds_nodash }}.parquet',
        aws_conn_id     = 'aws_prod',
        poke_interval   = 60,
        timeout         = 14400,   # 4 hours
        mode            = 'reschedule',  # releases worker slot while waiting
    )

    with TaskGroup('ingest') as ingest_group:
        copy_into = SnowflakeOperator(
            task_id          = 'copy_into_snowflake',
            snowflake_conn_id = SNOWFLAKE_CONN,
            sql = """
                COPY INTO raw.finance_report
                FROM @raw.finance_stage/finance/daily_report_{{ ds_nodash }}.parquet
                FILE_FORMAT = parquet_fmt
                ON_ERROR = 'ABORT_STATEMENT';
                SELECT ROW_COUNT();
            """,
            warehouse = 'INGEST_WH',
        )
        row_check = ShortCircuitOperator(
            task_id         = 'check_rows_loaded',
            python_callable = check_row_count,
            provide_context = True,
        )
        copy_into >> row_check

    # 2. dbt transform (Slim CI: only changed models)
    dbt_run = DbtCloudRunJobOperator(
        task_id           = 'dbt_daily_models',
        dbt_cloud_conn_id = 'dbt_cloud',
        job_id            = 12345,
        check_interval    = 30,
        timeout           = 3600,
    )

    # 3. Notify finance team on success
    notify = SlackWebhookOperator(
        task_id              = 'notify_finance_team',
        slack_webhook_conn_id = 'slack_finance',
        message = ":white_check_mark: Finance pipeline complete for {{ ds }}. Models ready in Snowflake.",
    )

    wait_for_file >> ingest_group >> dbt_run >> notify

Sensor Modes — Critical for Production

# mode='poke'       — sensor holds the worker slot the whole time (bad for long waits)
# mode='reschedule' — sensor releases worker slot between checks (cost-efficient)
# Use 'reschedule' for any sensor that may wait more than a few minutes

XCom — Passing Data Between Tasks

def extract_row_count(ti, **kwargs):
    # Push a value to XCom
    ti.xcom_push(key='rows_ingested', value=150000)

def validate_count(ti, **kwargs):
    # Pull value from another task
    count = ti.xcom_pull(task_ids='count_task', key='rows_ingested')
    if count < 1000:
        raise ValueError(f"Only {count} rows ingested — possible data issue")

# XCom is for small values (IDs, counts, flags) — never pass DataFrames via XCom

Airflow Best Practices

  • Idempotent tasks — every task must be safely re-runnable. Use MERGE not INSERT, use {{ ds }} templating for date partitions
  • Atomic tasks — each task does one thing. Don't combine ingest + transform in one SnowflakeOperator call
  • Use mode='reschedule' on all sensors to avoid starving the worker pool
  • TaskGroups — group related tasks visually but don't over-nest; max 2 levels deep
  • SLA alerts — set sla=timedelta(hours=2) on critical tasks; Airflow emails if a task misses its SLA
  • Pool usage — create Airflow Pools for Snowflake connections to cap concurrency and prevent warehouse overload
↑ Back to top
64
Question 64 — Advanced Snowflake
Snowpark: Python DataFrames, UDFs & ML Pipelines Inside Snowflake

Real-World Scenario: Your analytics team needs to run a customer churn prediction model. Traditionally this means pulling data out of Snowflake into a Jupyter notebook, running Python, and writing predictions back — with data leaving the secure environment. Snowpark lets you run the entire pipeline inside Snowflake — no data movement, no separate compute cluster.

Snowpark Session & DataFrame API

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, sum as sum_, avg, when, lit
from snowflake.snowpark.types import IntegerType, FloatType

# Connect (same creds as SQL — uses warehouse, role, schema from profile)
session = Session.builder.configs({
    'account':   'orgname-acctname',
    'user':      'dbt_service_account',
    'password':  'xxx',
    'role':      'TRANSFORM_ROLE',
    'warehouse': 'TRANSFORM_WH',
    'database':  'ANALYTICS',
    'schema':    'MARTS',
}).create()

# DataFrame — lazy evaluation, pushes computation to Snowflake
orders = session.table('fact_orders')

# Filter + aggregate (no data leaves Snowflake)
daily_revenue = (orders
    .filter(col('status') == 'COMPLETED')
    .filter(col('order_date') >= '2024-01-01')
    .group_by('order_date', 'region')
    .agg(
        sum_('net_revenue').alias('total_revenue'),
        avg('net_revenue').alias('avg_order_value'),
    )
    .sort('order_date')
)

# Inspect without materialising
daily_revenue.show(10)
print(daily_revenue.explain())  # prints the SQL Snowpark will execute

# Write result back to Snowflake
daily_revenue.write.mode('overwrite').save_as_table('MARTS.daily_revenue_summary')

Vectorised Python UDF (batch processing, fast)

from snowflake.snowpark.functions import udf, col
from snowflake.snowpark.types import FloatType
import pandas as pd

# Vectorised UDF: receives a pd.Series, returns a pd.Series
# Much faster than row-by-row UDFs for large tables
@udf(name='calculate_ltv_score', is_permanent=True,
     stage_location='@raw.udf_stage', replace=True,
     input_types=[FloatType(), FloatType(), FloatType()],
     return_type=FloatType(),
     packages=['pandas'])
def calculate_ltv_score(total_spend: pd.Series,
                         order_count: pd.Series,
                         months_active: pd.Series) -> pd.Series:
    # LTV scoring: recency + frequency + monetary
    rfm_score = (
        (total_spend / total_spend.max()) * 0.5 +
        (order_count / order_count.max()) * 0.3 +
        (months_active / months_active.max()) * 0.2
    )
    return rfm_score.clip(0, 1)

# Use UDF in a DataFrame operation
customers = session.table('dim_customers')
scored = customers.with_column(
    'ltv_score',
    calculate_ltv_score(col('lifetime_spend'), col('order_count'), col('months_active'))
)
scored.write.mode('overwrite').save_as_table('MARTS.dim_customers_scored')

ML Pipeline inside Snowflake (Snowpark ML)

from snowflake.ml.modeling.preprocessing import StandardScaler
from snowflake.ml.modeling.ensemble import RandomForestClassifier
from snowflake.ml.modeling.model_selection import train_test_split

# Load features from Snowflake
features = session.table('ML.churn_features')
train_df, test_df = train_test_split(features, test_size=0.2, random_state=42)

# Scale features (runs inside Snowflake)
scaler = StandardScaler(
    input_cols  = ['total_spend', 'days_since_last_order', 'order_count'],
    output_cols = ['spend_scaled', 'recency_scaled', 'frequency_scaled']
)
train_scaled = scaler.fit(train_df).transform(train_df)

# Train model (runs on Snowflake compute)
clf = RandomForestClassifier(
    input_cols   = ['spend_scaled', 'recency_scaled', 'frequency_scaled'],
    label_cols   = ['is_churned'],
    output_cols  = ['predicted_churn'],
    n_estimators = 100,
)
clf.fit(train_scaled)

# Score entire customer table — predictions written back to Snowflake
predictions = clf.predict(scaler.transform(session.table('dim_customers')))
predictions.write.mode('overwrite').save_as_table('ML.churn_predictions')

When to Use Snowpark vs dbt vs Stored Procedures

ToolBest ForLanguage
dbtSQL transformations, testing, documentation, CI/CDSQL + Jinja
Snowpark PythonComplex logic, ML, pandas-style operations, UDFsPython
Stored Procedures (JS)Admin tasks, dynamic SQL, metadata operationsJavaScript
Stored Procedures (Python)Python logic that must run transactionally with SQLPython
↑ Back to top
65
Question 65 — Analytics Engineering
dbt Metrics & Semantic Layer: Defining Business Logic Once for All BI Tools

Real-World Scenario: Your finance team calculates "Monthly Recurring Revenue" one way in Tableau, the sales team calculates it differently in Looker, and the exec team uses yet another formula in Excel. Three definitions of MRR means three different numbers in board meetings. The dbt Semantic Layer solves this — define MRR once in dbt, all BI tools query the same definition.

dbt Semantic Layer Architecture

# The Semantic Layer sits between dbt models and BI tools
# BI tools query the Semantic Layer API instead of tables directly
# This means: metric logic lives in code, version-controlled, tested

BI Tool (Tableau / Looker / Mode)
         │  "Give me MRR by region for last quarter"
         ▼
dbt Semantic Layer API (MetricFlow)
         │  Translates to optimised SQL against your Snowflake marts
         ▼
Snowflake MARTS (fact_subscriptions, dim_customers, dim_date)

Defining a Semantic Model (dbt Core 1.6+)

# models/semantic_models/sem_orders.yml
# A semantic model describes the grain and measures of a model

semantic_models:
  - name: orders
    description: "Order-level semantic model — grain is one row per order"
    model: ref('fact_orders')

    defaults:
      agg_time_dimension: order_date

    entities:
      - name: order
        type: primary
        expr: order_sk
      - name: customer
        type: foreign
        expr: customer_sk

    dimensions:
      - name: order_date
        type: time
        type_params:
          time_granularity: day
      - name: region
        type: categorical
        expr: region
      - name: status
        type: categorical
        expr: status

    measures:
      - name: order_count
        description: "Total number of orders"
        agg: count
        expr: order_sk

      - name: gross_revenue
        description: "Sum of gross revenue before discounts"
        agg: sum
        expr: gross_revenue

      - name: net_revenue
        description: "Sum of net revenue after discounts"
        agg: sum
        expr: net_revenue

Defining Metrics on Top of Semantic Models

# models/metrics/metrics.yml
metrics:
  - name: monthly_recurring_revenue
    label: "MRR"
    description: "Monthly recurring revenue from active subscriptions"
    type: simple
    type_params:
      measure: net_revenue
    filter: |
      {{ Dimension('status') }} = 'ACTIVE'
        AND {{ Dimension('subscription_type') }} = 'RECURRING'

  - name: average_order_value
    label: "AOV"
    type: ratio
    type_params:
      numerator: net_revenue
      denominator: order_count

  - name: revenue_growth_mom
    label: "MoM Revenue Growth %"
    type: derived
    type_params:
      expr: "(current_period_revenue - prior_period_revenue) / prior_period_revenue * 100"
      metrics:
        - name: net_revenue
          alias: current_period_revenue
          offset_window: 0 months
        - name: net_revenue
          alias: prior_period_revenue
          offset_window: 1 month

  - name: customer_lifetime_value
    label: "LTV"
    type: cumulative
    type_params:
      measure: net_revenue
      window: unbounded     # sum all revenue for each customer, ever

Querying Metrics via CLI or API

# Query a metric directly from dbt CLI (MetricFlow)
mf query   --metrics monthly_recurring_revenue   --group-by metric_time__month,region   --where "region = 'US-EAST'"   --order metric_time__month

# Output: generates and runs optimised Snowflake SQL
# SELECT
#   DATE_TRUNC('month', order_date) AS metric_time__month,
#   region,
#   SUM(net_revenue) AS monthly_recurring_revenue
# FROM marts.fact_orders
# WHERE status = 'ACTIVE' AND subscription_type = 'RECURRING'
#   AND region = 'US-EAST'
# GROUP BY 1, 2
# ORDER BY 1

Why the Semantic Layer Matters for Analytics Engineers

  • Single source of truth — MRR is defined once in YAML, tested in CI, and used by every BI tool. No more metric drift between teams
  • Version-controlled business logic — when the CFO changes the MRR definition, you open a PR, get approval, and deploy. Full audit trail
  • Self-service with guardrails — analysts query metrics without writing SQL, but the SQL they get is governed and correct
  • Reuse measures across metrics — define net_revenue once as a measure, build MRR, AOV, LTV, cohort revenue all from the same base without duplication
↑ Back to top
66
Question 66 — Data Architecture
Data Contracts: Schema Enforcement Between Producers & Consumers

Real-World Scenario: The backend team adds a new field to the orders API response and renames customer_id to customerId (camelCase). Your Snowflake pipeline breaks silently — orders still load, but all customer joins now return NULL. You find out when the weekly revenue report is wrong. Data contracts would have caught this before the change was deployed.

What is a Data Contract?

# A data contract is a formal, versioned agreement between a data producer
# (backend team, source system) and data consumer (data engineering, analytics)
# specifying:
#   - Schema: column names, types, nullability
#   - Semantics: what each field means
#   - SLAs: freshness, availability guarantees
#   - Ownership: who is responsible for breaking changes
#   - Versioning: how breaking changes are communicated and rolled out

Contract as Code — YAML Schema Definition

# contracts/orders_v2.yml
# This file lives in the PRODUCER's repo (backend team)
# AND is referenced by the consumer's repo (data engineering)
# Breaking changes require a version bump and migration period

id: orders-contract
version: "2.1.0"
owner: vsurya@duck.com
consumers:
  - vsurya@duck.com
  - vsurya@duck.com

schema:
  fields:
    - name: order_id
      type: integer
      nullable: false
      description: "Unique order identifier. Never recycled."

    - name: customer_id
      type: integer
      nullable: false
      description: "FK to customers table. Always present for completed orders."

    - name: status
      type: string
      nullable: false
      enum: ["PENDING", "PROCESSING", "SHIPPED", "DELIVERED", "CANCELLED", "REFUNDED"]
      description: "Order lifecycle status. Only terminal states: DELIVERED, CANCELLED, REFUNDED."

    - name: amount_usd
      type: decimal(12,2)
      nullable: false
      min: 0
      description: "Order total in USD. Always positive. Tax inclusive."

    - name: created_at
      type: timestamp_tz
      nullable: false

sla:
  freshness_minutes: 5        # Data must be available within 5 min of creation
  availability_pct:  99.9     # 99.9% uptime guarantee

changelog:
  - version: "2.1.0"
    date: "2024-01-15"
    changes: "Added optional loyalty_tier field (nullable, non-breaking)"
  - version: "2.0.0"
    date: "2023-06-01"
    changes: "BREAKING: renamed customer_id from customerId. Migration period: 90 days."

Enforcing Contracts in dbt (Consumer Side)

# models/staging/schema.yml
# Reference the contract — dbt validates against it at compile time

models:
  - name: stg_orders
    config:
      contract:
        enforced: true        # dbt will validate column types match the model
    columns:
      - name: order_id
        data_type: int
        constraints:
          - type: not_null
          - type: primary_key

      - name: customer_id
        data_type: int
        constraints:
          - type: not_null
          - type: foreign_key
            to: ref('stg_customers')
            to_columns: [customer_id]

      - name: status
        data_type: varchar
        constraints:
          - type: not_null

# When contract: enforced: true, dbt adds a CREATE OR REPLACE TABLE with
# explicit column definitions — if your SQL returns wrong types, it fails at compile

Detecting Contract Violations (Producer Side)

-- Run in CI on producer's repo before merging schema changes
-- Compares current schema against the contract YAML

WITH contract_columns AS (
    -- What the contract says should exist
    SELECT 'order_id'    AS col, 'INTEGER' AS expected_type UNION ALL
    SELECT 'customer_id',         'INTEGER' UNION ALL
    SELECT 'status',              'VARCHAR' UNION ALL
    SELECT 'amount_usd',          'DECIMAL'
),
actual_columns AS (
    SELECT LOWER(column_name) AS col, data_type AS actual_type
    FROM information_schema.columns
    WHERE table_name = 'ORDERS' AND table_schema = 'APP'
)
SELECT
    c.col,
    c.expected_type,
    a.actual_type,
    CASE
        WHEN a.col IS NULL THEN 'MISSING COLUMN — BREAKING CHANGE'
        WHEN c.expected_type != a.actual_type THEN 'TYPE MISMATCH — BREAKING CHANGE'
        ELSE 'OK'
    END AS status
FROM contract_columns c
LEFT JOIN actual_columns a ON c.col = a.col
WHERE a.col IS NULL OR c.expected_type != a.actual_type;

Data Contract Lifecycle

  • Non-breaking changes (add nullable column, add enum value) — allowed without version bump, but must notify consumers
  • Breaking changes (rename column, change type, remove column, change enum values) — require major version bump, migration period (30–90 days), and dual-publishing the old + new schema simultaneously
  • Contract registry — store contracts centrally (Git repo, data catalog like DataHub) so any team can discover what data is available and what is guaranteed
  • Ownership — the producer is contractually responsible for notifying consumers before breaking changes and maintaining SLAs
↑ Back to top
67
Question 67 — Data Modelling
Slowly Changing Dimensions: All Types with Real-World Trade-offs

Real-World Scenario: A customer moves from the "SMB" segment to "Enterprise". How you handle this change determines whether your historical revenue reports show correct segmentation — or silently backfill the customer's entire history as Enterprise, inflating enterprise metrics and deflating SMB metrics going back years.

SCD Type 0 — Never Changes (retain original)

-- Use when: the original value at acquisition time is the analytically correct one
-- Example: customer's first acquisition channel — should never change even if
-- the customer signs up for a new product via a different channel later
CREATE TABLE dim_customers_t0 (
    customer_sk   VARCHAR PRIMARY KEY,
    customer_id   INT,
    first_channel VARCHAR,   -- set at acquisition, immutable forever
    signup_date   DATE
);
-- dbt: use a simple table materialization — no SCD logic needed

SCD Type 1 — Overwrite (no history)

-- Use when: corrections (data was wrong) or attributes where history is irrelevant
-- Example: fixing a customer's email address typo, updating a phone number
-- WARNING: destroys historical context — all past joins use the new value

-- dbt: standard incremental merge
{{ config(materialized='incremental', unique_key='customer_sk',
         incremental_strategy='merge') }}
SELECT
    {{ generate_surrogate_key(['customer_id']) }} AS customer_sk,
    customer_id, email, phone_number, region
FROM {{ ref('stg_customers') }}
{% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

SCD Type 2 — Full History (most common, interview favourite)

-- Use when: dimension changes and history matters for analytics
-- Example: customer segment changes SMB → Enterprise
-- Each version gets its own row with validity dates

-- dim_customers with SCD2:
-- | customer_sk | customer_id | segment    | valid_from | valid_to   | is_current |
-- |-------------|-------------|------------|------------|------------|------------|
-- | abc123      | 1001        | SMB        | 2020-01-01 | 2023-06-14 | FALSE      |
-- | def456      | 1001        | ENTERPRISE | 2023-06-15 | NULL       | TRUE       |

-- dbt snapshot (automates SCD2)
{% snapshot snap_customers %}
{{
    config(
        target_schema='snapshots',
        unique_key='customer_id',
        strategy='check',
        check_cols=['segment', 'account_manager', 'region']
    )
}}
SELECT customer_id, name, email, segment, account_manager, region, updated_at
FROM {{ source('crm', 'customers') }}
{% endsnapshot %}

-- Point-in-time join: what segment was customer 1001 in on Black Friday 2022?
SELECT f.*, c.segment
FROM fact_orders f
JOIN dim_customers c
    ON f.customer_sk = c.customer_sk
    AND f.order_date BETWEEN c.valid_from AND COALESCE(c.valid_to, '9999-12-31');
-- Result: correctly shows SMB for pre-June 2023 orders, ENTERPRISE after

SCD Type 3 — Limited History (current + previous column)

-- Use when: only the most recent change matters and storage is constrained
-- Example: show both current and previous region for a customer
-- Limitation: only tracks ONE previous value — two changes ago is lost

CREATE TABLE dim_customers_t3 (
    customer_sk      VARCHAR,
    customer_id      INT,
    current_region   VARCHAR,
    previous_region  VARCHAR,    -- only the immediately prior value
    region_changed_at DATE
);

-- When region changes: shift current → previous, write new current
UPDATE dim_customers_t3 SET
    previous_region   = current_region,
    current_region    = :new_region,
    region_changed_at = CURRENT_DATE
WHERE customer_id = :customer_id;

SCD Type 4 — History Table (rapid change handling)

-- Use when: an attribute changes very frequently (e.g., real-time status)
-- Keep a lean current-state table + separate history table
-- Avoids SCD2 tables bloating with millions of status change rows

-- Current state (small, fast joins)
CREATE TABLE dim_order_status_current (
    order_id   INT PRIMARY KEY,
    status     VARCHAR,
    updated_at TIMESTAMP
);

-- Full history (append-only audit trail)
CREATE TABLE dim_order_status_history (
    order_id    INT,
    status      VARCHAR,
    valid_from  TIMESTAMP,
    valid_to    TIMESTAMP,
    changed_by  VARCHAR
);

-- BI joins to current table for dashboards (fast)
-- Data science queries history table for status transition analysis

SCD Type 6 — Hybrid (Type 1 + 2 + 3 combined)

-- Use when: you need history AND easy access to current value on every row
-- Adds "current_" columns to a Type 2 table — denormalised but very convenient for BI

CREATE TABLE dim_customers_t6 (
    customer_sk        VARCHAR PRIMARY KEY,
    customer_id        INT,
    segment            VARCHAR,    -- value at this VERSION's time
    current_segment    VARCHAR,    -- always the LATEST value (Type 1 overwrite)
    valid_from         DATE,
    valid_to           DATE,
    is_current         BOOLEAN
);
-- BI can join without date logic for current-state reports (use current_segment)
-- Data science uses segment + valid_from/valid_to for historical accuracy

SCD Type Comparison

TypeRows per ChangeHistoryComplexityBest For
Type 00Original onlyNoneAcquisition channel, first touch
Type 10 (overwrite)NoneLowData corrections, non-analytic attrs
Type 2Close old + open newFullHighSegment, region, account manager changes
Type 30 (column update)One previous valueLowWhen only last change matters
Type 41 in history tableFull (separate table)MediumRapidly changing attributes (status)
Type 6Close old + open newFull + current colsVery HighBI convenience + historical accuracy
↑ Back to top
68
Question 68 — Streaming Architecture
Kafka + Snowflake: Real-Time Streaming Pipeline Design

Real-World Scenario: Your ride-sharing app emits 50,000 trip events per second. You need trip completion events in Snowflake within 30 seconds for operational monitoring, and in a clean aggregated form within 5 minutes for the driver payout system. Design the streaming pipeline.

Architecture: Kafka → Kafka Connect → Snowflake

Mobile App / Backend Services
        │  (publish events as Avro/JSON to Kafka topics)
        ▼
Apache Kafka Cluster (Confluent Cloud or self-hosted)
  Topics:
    - trip.completed        (50K events/sec)
    - trip.status_updated   (200K events/sec)
    - driver.location       (500K events/sec — usually filtered, not all to SF)
        │
        │  Kafka Connect — Snowflake Sink Connector
        │  (batches micro-groups every 30 seconds into S3 intermediate)
        ▼
Snowflake Snowpipe (auto-ingest from S3 buffer)
        │
        ▼
raw.trip_events_landing   (VARIANT column, schema-flexible)
        │
        │  Snowflake Stream + Task (every 1 min)
        ▼
staging.trip_events       (typed, deduplicated)
        │
        │  dbt incremental (every 5 min)
        ▼
marts.fact_trips          (aggregated, clustered by trip_date)

Kafka Connect Snowflake Sink Configuration

# kafka-connect-snowflake-sink.json
{
    "name": "snowflake-trip-sink",
    "config": {
        "connector.class":            "com.snowflake.kafka.connector.SnowflakeSinkConnector",
        "tasks.max":                  "4",
        "topics":                     "trip.completed",
        "snowflake.url.name":         "orgname-acctname.snowflakecomputing.com",
        "snowflake.user.name":        "kafka_service_account",
        "snowflake.private.key":      "${file:/secrets/snowflake.properties:private.key}",
        "snowflake.database.name":    "RAW",
        "snowflake.schema.name":      "KAFKA_LANDING",
        "snowflake.topic2table.map":  "trip.completed:trip_events",
        "key.converter":              "org.apache.kafka.connect.storage.StringConverter",
        "value.converter":            "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "https://schema-registry:8081",
        "buffer.count.records":       "10000",   # flush after 10K records...
        "buffer.flush.time":          "30",       # ...or 30 seconds
        "buffer.size.bytes":          "67108864", # ...or 64MB — whichever first
        "snowflake.metadata.createtable": "true"
    }
}

Landing Table Schema + Stream

-- Kafka connector creates this automatically, but here's the structure:
CREATE TABLE raw.trip_events (
    record_metadata VARIANT,   -- Kafka offset, partition, topic
    record_content  VARIANT    -- the actual event payload
);

-- Stream captures every new event landed by Kafka connector
CREATE STREAM raw.trip_events_stream ON TABLE raw.trip_events;

-- Task processes stream every 60 seconds
CREATE TASK raw.process_trips_task
    WAREHOUSE   = INGEST_WH
    SCHEDULE    = '1 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('raw.trip_events_stream')
AS
INSERT INTO staging.trip_events
SELECT
    record_content:trip_id::VARCHAR                           AS trip_id,
    record_content:driver_id::INT                             AS driver_id,
    record_content:rider_id::INT                              AS rider_id,
    record_content:fare_amount::DECIMAL(10,2)                 AS fare_amount,
    record_content:completed_at::TIMESTAMP                    AS completed_at,
    record_metadata:CreateTime::TIMESTAMP                     AS kafka_timestamp,
    -- Detect late arrivals (event time vs Kafka ingestion time)
    DATEDIFF('second',
        record_content:completed_at::TIMESTAMP,
        record_metadata:CreateTime::TIMESTAMP)                AS latency_seconds
FROM raw.trip_events_stream
WHERE record_content:event_type::VARCHAR = 'TRIP_COMPLETED'
  -- Deduplicate: skip if already processed
  AND record_content:trip_id::VARCHAR NOT IN (
      SELECT trip_id FROM staging.trip_events
      WHERE completed_at >= CURRENT_DATE - 1
  );

Handling Schema Evolution with Schema Registry

# Confluent Schema Registry enforces Avro schema compatibility
# BACKWARD compatibility: new schema can read data written with old schema
# Producers must register schema before publishing
# Breaking changes (rename/remove field) are REJECTED at the registry

# In practice: 
# - Add new optional fields freely (backward compatible)
# - Never rename or remove fields without a deprecation period
# - Use nullable fields with defaults for all new additions

# Example Avro schema for trip.completed topic:
{
    "type": "record",
    "name": "TripCompleted",
    "namespace": "com.rideshare.events",
    "fields": [
        {"name": "trip_id",       "type": "string"},
        {"name": "fare_amount",   "type": "double"},
        {"name": "completed_at",  "type": "long", "logicalType": "timestamp-millis"},
        {"name": "tip_amount",    "type": ["null","double"], "default": null}  -- v2: nullable
    ]
}

Latency Budget

HopLatencyTuning
App → Kafka publish< 5msasync producer, acks=1
Kafka → Connector buffer flush30 secreduce buffer.flush.time
Snowpipe ingest30–60 secauto-ingest, can't reduce below ~30s
Stream + Task process1 minreduce Task schedule
Total: raw landing~2 minWell within 30s for monitoring
dbt incremental marts5 minhourly dbt run job
↑ Back to top
69
Question 69 — FinOps / Architecture
Snowflake Cost Optimisation at Scale: Resource Monitors, Credits & Query Governance

Real-World Scenario: Your Snowflake bill jumped from $8K to $22K in one month. No new pipelines were added. The culprit: a BI analyst accidentally left a LARGE warehouse running without auto-suspend, and a broken dbt model ran a full table scan on a 500M-row table 48 times due to a retry loop. Here's how to prevent and diagnose this.

Step 1: Find the Cost Drivers

-- Top credit consumers by warehouse (last 30 days)
SELECT
    warehouse_name,
    ROUND(SUM(credits_used), 2)               AS total_credits,
    ROUND(SUM(credits_used) * 3.0, 2)         AS est_cost_usd,   -- ~$3/credit on Enterprise
    COUNT(DISTINCT query_id)                   AS query_count,
    MAX(warehouse_size)                        AS max_size_used
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= CURRENT_DATE - 30
GROUP BY warehouse_name
ORDER BY total_credits DESC;

-- Most expensive queries (by credits consumed)
SELECT
    query_id,
    LEFT(query_text, 80)                       AS query_preview,
    warehouse_name,
    total_elapsed_time / 1000                  AS elapsed_sec,
    credits_used_cloud_services,
    ROUND(bytes_scanned / POWER(1024,3), 2)    AS gb_scanned,
    partitions_scanned,
    partitions_total,
    ROUND(partitions_scanned * 100.0 / NULLIF(partitions_total,0), 1) AS pct_scanned
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE - 7
  AND execution_status = 'SUCCESS'
ORDER BY total_elapsed_time DESC
LIMIT 20;

Step 2: Resource Monitors — Budget Caps Per Warehouse

-- Create a resource monitor that alerts at 80% and suspends at 100%
CREATE RESOURCE MONITOR bi_warehouse_monitor
    WITH CREDIT_QUOTA = 500              -- 500 credits per month
    TRIGGERS
        ON 80 PERCENT DO NOTIFY          -- email account admins
        ON 100 PERCENT DO SUSPEND        -- suspend warehouse mid-run if exceeded
        ON 110 PERCENT DO SUSPEND_IMMEDIATE;  -- hard stop, kill running queries

-- Assign to BI warehouse
ALTER WAREHOUSE BI_WH SET RESOURCE_MONITOR = bi_warehouse_monitor;

-- Account-level monitor (catch-all safety net)
CREATE RESOURCE MONITOR account_monthly_cap
    WITH CREDIT_QUOTA = 5000             -- total monthly account budget
    TRIGGERS
        ON 75 PERCENT DO NOTIFY
        ON 90 PERCENT DO NOTIFY
        ON 100 PERCENT DO SUSPEND;       -- suspends ALL warehouses

Step 3: Warehouse Right-Sizing & Auto-Suspend Tuning

-- Rule: auto-suspend should reflect the gap between queries
-- BI warehouse: users query in bursts → 5 min suspend (keep warm between clicks)
ALTER WAREHOUSE BI_WH SET AUTO_SUSPEND = 300;

-- dbt transform: runs in a scheduled job, idle between runs → 2 min suspend
ALTER WAREHOUSE TRANSFORM_WH SET AUTO_SUSPEND = 120;

-- Ingest: brief bursts → aggressive suspend
ALTER WAREHOUSE INGEST_WH SET AUTO_SUSPEND = 60;

-- Check how long warehouses actually run vs idle (find over-provisioned ones)
SELECT
    warehouse_name,
    ROUND(SUM(CASE WHEN credits_used > 0 THEN 1 ELSE 0 END) * 100.0
          / COUNT(*), 1)                        AS pct_time_active,
    ROUND(SUM(credits_used), 2)                 AS credits_used
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= CURRENT_DATE - 30
GROUP BY warehouse_name
ORDER BY pct_time_active ASC;
-- Warehouses with <10% active time are candidates for downsizing or consolidation

Step 4: Query Governance — Prevent Runaway Queries

-- Set statement timeout on warehouses (kill queries running over 10 min)
ALTER WAREHOUSE TRANSFORM_WH SET
    STATEMENT_TIMEOUT_IN_SECONDS = 600;    -- kill at 10 min

ALTER WAREHOUSE BI_WH SET
    STATEMENT_TIMEOUT_IN_SECONDS = 300;    -- BI queries should be fast

-- Set max concurrency to prevent one user starving others
ALTER WAREHOUSE BI_WH SET
    MAX_CONCURRENCY_LEVEL = 8;             -- max 8 parallel queries per cluster

-- Query tag: tag every dbt model run for cost attribution
-- In dbt_project.yml:
-- pre-hook: "ALTER SESSION SET QUERY_TAG = 'dbt_run|{{ model.name }}|{{ target.name }}'"
-- Now you can attribute credits to specific dbt models:
SELECT
    PARSE_JSON(query_tag):model_name::VARCHAR  AS dbt_model,
    SUM(credits_used_cloud_services)           AS credits
FROM snowflake.account_usage.query_history
WHERE query_tag LIKE '%dbt_run%'
  AND start_time >= CURRENT_DATE - 7
GROUP BY 1
ORDER BY 2 DESC;

Step 5: Storage Cost Optimisation

-- Find large tables consuming expensive Active Storage
SELECT
    table_schema,
    table_name,
    ROUND(active_bytes / POWER(1024,3), 2)              AS active_gb,
    ROUND(time_travel_bytes / POWER(1024,3), 2)         AS time_travel_gb,
    ROUND(failsafe_bytes / POWER(1024,3), 2)            AS failsafe_gb,
    ROUND((active_bytes + time_travel_bytes + failsafe_bytes) / POWER(1024,3), 2) AS total_gb
FROM snowflake.account_usage.table_storage_metrics
WHERE active_bytes > 1073741824    -- tables over 1GB
ORDER BY total_gb DESC;

-- Reduce Time Travel on non-critical tables to save storage
ALTER TABLE raw.clickstream_events SET DATA_RETENTION_TIME = 1;   -- 1 day (default is 1)
ALTER TABLE marts.fact_orders       SET DATA_RETENTION_TIME = 14;  -- keep 14 days for prod marts

-- Use Transient tables for staging (no Fail-Safe = 50% storage cost reduction)
CREATE TRANSIENT TABLE staging.orders_temp AS SELECT * FROM raw.orders;

Cost Optimisation Checklist

ActionTypical SavingsRisk
Auto-suspend all warehouses20–40%Low
Right-size warehouses (downsize over-provisioned)15–30%Low if tested
Use Transient tables for staging10–20% storageLow (lose Fail-Safe)
Add clustering keys to high-scan tables20–50% on large tablesLow (background process)
Move cold data to External Tables on S330–60% storageMedium (query performance)
Add STATEMENT_TIMEOUT to all warehousesPrevents runaway billsLow
↑ Back to top
70
Question 70 — Analytics Engineering
Analytics Engineering: Role, Responsibilities & Distinction from Data Engineering

Real-World Scenario: A company hires both a Data Engineer and an Analytics Engineer. The Data Engineer builds and maintains the ingestion pipeline from Salesforce → Snowflake raw. The Analytics Engineer picks it up from there — they own the dbt models, define metrics, write documentation, and make sure the revenue number in Tableau matches the revenue number the CFO sees in the board pack.

Role Distinction

DimensionData EngineerAnalytics Engineer
Primary focusData movement, reliability, infrastructureData transformation, modelling, trust
Primary toolPython, Spark, Airflow, Kafka, Qlikdbt, SQL, BI tools
OwnsPipelines, ingestion, raw landingStaging → marts, metrics, docs, tests
Interacts withBackend engineers, DevOps, cloudBusiness analysts, BI developers, product
Success metricPipeline uptime, data freshness, latencyMetric accuracy, self-service adoption, trust
Typical deliverable"Raw orders table is in Snowflake within 5 min of creation""fact_orders is tested, documented, and all BI tools use it"

The Analytics Engineer's Core Responsibilities

# 1. Transform raw data into trusted analytics models
# Input:  raw.oracle_orders (messy, source-schema-dependent)
# Output: marts.fact_orders (clean, tested, documented, BI-ready)

# 2. Define and govern metrics
# "Monthly Recurring Revenue" has ONE definition in dbt metrics
# All BI tools query the semantic layer — no metric drift

# 3. Write and maintain dbt tests
# Every mart has: not_null, unique, relationships, freshness, custom business rules

# 4. Document everything (for self-service analytics)
# Every column has a description, every model has a business context note
# dbt docs generate → searchable data catalog

# 5. Own the semantic layer
# dbt metrics / MetricFlow definitions
# Exposure YAML: documents which dashboards use which models

# 6. Enable stakeholder self-service
# Build well-named, well-documented marts so analysts can write their own SQL
# Replace ad-hoc Excel with governed dbt models

Analytics Engineering in Practice — A Day in the Life

# Morning standup: "fact_orders freshness test failed in production"
# ─────────────────────────────────────────────────────────────────
# 1. Check dbt Cloud job logs → source freshness check flagged RAW.ORACLE.ORDERS as stale
# 2. Check Qlik Replicate → task suspended due to source DB maintenance window
# 3. Notify stakeholders: "Revenue dashboard data is 3 hours behind, fixing now"
# 4. Resume Qlik task → stream catches up → dbt hourly job runs → dashboard fresh
# 5. Add monitoring: alert if source freshness > 2 hours (add to sources.yml)

# Afternoon: new business request
# ─────────────────────────────────────────────────────────────────
# Finance: "We need net revenue excluding marketplace fees, split by subscription type"
# AE response:
#   1. Check if source fields exist in stg_orders
#   2. Add marketplace_fee column to stg_orders if missing → PR → CI → deploy
#   3. Create int_orders_net_of_fees.sql → apply fee deduction logic
#   4. Add net_revenue_excl_fees to fact_orders → update schema.yml
#   5. Define metric: net_revenue_subscription in metrics.yml
#   6. Update exposure: finance_dashboard depends_on fact_orders
#   7. Run dbt test + dbt docs generate → share docs link with finance team

Key dbt Skills for an Analytics Engineer

# Modelling: staging → intermediate → marts, surrogate keys, SCD2
# Testing: schema tests, singular tests, dbt-expectations, freshness
# Documentation: column descriptions, model descriptions, exposures
# Macros: generate_surrogate_key, date_spine, union_relations
# Metrics: MetricFlow semantic models and metric definitions
# CI/CD: Slim CI, state:modified+, --defer, PR-based workflow
# Performance: incremental strategies, materialisation decisions
# Governance: post-hooks for GRANT, masking policies, row-level security

The Trust Chain

Raw data (untrusted, messy)
        │
        │  Data Engineer: reliable delivery
        ▼
Snowflake RAW schema
        │
        │  Analytics Engineer: transform, test, document, govern
        ▼
Snowflake MARTS schema (trusted, tested, documented)
        │
        │  Business Analysts / BI Developers: self-service
        ▼
Tableau / Looker / Power BI dashboards
        │
        ▼
Business decisions (the whole point)
↑ Back to top
71
Question 71 — Snowflake
Snowflake Cache Types: Result, Metadata & Warehouse (Local Disk) Cache — How Each Works

Snowflake has three distinct caching layers. Understanding which one fires — and when — is critical both for performance tuning and for explaining query behaviour to stakeholders.

Cache 1 — Result Cache (Global, FREE)

The result cache stores the exact output of every query for 24 hours. If you run the same SQL again and the underlying data has not changed, Snowflake returns the cached result instantly — without spinning up a warehouse or consuming a single credit. This is the most powerful cache because it costs absolutely nothing to hit.

-- First run: hits the warehouse, takes 8 seconds
SELECT region, SUM(net_revenue) FROM fact_orders
WHERE order_date = '2024-03-01' GROUP BY 1;

-- Second run (within 24 hours, table unchanged): returns in < 100ms
-- Query profile shows "Results Reused" = 100%
SELECT region, SUM(net_revenue) FROM fact_orders
WHERE order_date = '2024-03-01' GROUP BY 1;

What invalidates the result cache?

  • Any DML on the underlying table (INSERT, UPDATE, DELETE, MERGE, COPY INTO)
  • The query SQL changes (even adding a space changes the hash)
  • Session parameters that affect results change (e.g. timezone, date format)
  • The 24-hour TTL expires
  • Functions like CURRENT_TIMESTAMP(), RANDOM(), UUID_STRING() disable result caching entirely for that query

Cache 2 — Metadata Cache (Global, FREE — no warehouse needed)

The Cloud Services layer maintains a persistent metadata store containing row counts, min/max values per column per micro-partition, null counts, and distinct value counts. Simple metadata-only queries resolve entirely here — zero compute cost.

-- These queries answer from metadata cache ONLY — no warehouse needed
SELECT COUNT(*) FROM fact_orders;              -- row count from metadata
SELECT MIN(order_date), MAX(order_date)        -- min/max from partition stats
FROM fact_orders;

-- Check in Query Profile: "Cloud Services" time = 100%, no warehouse time
-- Credit attribution: metadata queries appear in cloud_services credits (usually <10%)

How metadata enables micro-partition pruning: Before a query even touches the warehouse, the query optimizer reads partition metadata to determine which micro-partitions cannot contain rows matching your WHERE clause — and skips them entirely. A query filtering WHERE order_date = '2024-03-01' on a 500-partition table may only read 3 partitions if the data is well-clustered.

Cache 3 — Warehouse (Local Disk) Cache — per warehouse, consumed credits

When a warehouse executes a query, it reads micro-partitions from S3 and caches them on the warehouse's local SSD. Subsequent queries that scan the same micro-partitions will read from local disk instead of S3 — significantly faster (SSD vs object storage network round-trip).

-- Query A: cold warehouse, scans 10 micro-partitions from S3 → 12 sec
SELECT * FROM fact_orders WHERE region = 'US-EAST' AND order_date = '2024-03-01';

-- Query B (same warehouse, same partitions, running 30 seconds later):
-- The 10 micro-partitions are now on local SSD → 2.5 sec
SELECT * FROM fact_orders WHERE region = 'US-EAST' AND order_date = '2024-02-29';

-- Check: in Query Profile, "Bytes Scanned from Cache" will be high

Critical: Warehouse cache is destroyed when the warehouse SUSPENDS. If your warehouse auto-suspends between runs, the next run starts cold again. This is why:

  • BI warehouses serving interactive dashboards should have longer auto-suspend (e.g., 5–10 min) to keep the cache warm between user clicks
  • ETL warehouses that run once per hour can suspend aggressively (60 sec) — they start cold anyway

Cache Summary Table

CacheScopeCostSurvives SuspendInvalidated By
Result CacheGlobal (all sessions)FREEYes (24h TTL)Table DML, SQL change, TTL expire
Metadata CacheGlobal (Cloud Services)FREEYes (persistent)Table DML (updates partition stats)
Warehouse (Disk) CachePer warehouse clusterCredits consumedNo — lost on suspendWarehouse suspend/resize

Practical tuning tips:

  • Maximise result cache hits — standardise BI queries (same SQL = cache hit). Avoid CURRENT_TIMESTAMP() in SELECT; use CURRENT_DATE and filter in WHERE instead
  • Keep BI warehouse warm — set AUTO_SUSPEND = 300 (5 min) so the SSD cache survives between dashboard page loads
  • Use clustering keys to maximise metadata pruning — fewer partitions read = less S3 I/O = warehouse cache more effective for what is read
  • Multi-cluster warehouses do NOT share disk cache across clusters — each cluster has its own local cache
↑ Back to top
72
Question 72 — SQL
TableA, TableB with 1 Column (Values: 1, 1, 0, NULL) — Find Results Across Different JOINs

Problem: Two tables each with a single column. TableA has values: 1, 1, 0, NULL. TableB has values: 1, 1, 0, NULL. Find the output for different JOIN types.

Sample Tables Setup

-- TableA (1 column)
-- val
-- ---
-- 1
-- 1
-- 0
-- NULL

-- TableB (1 column)
-- val
-- ---
-- 1
-- 1
-- 0
-- NULL

CREATE TABLE tableA (val INT);
INSERT INTO tableA VALUES (1), (1), (0), (NULL);

CREATE TABLE tableB (val INT);
INSERT INTO tableB VALUES (1), (1), (0), (NULL);

INNER JOIN ON tableA.val = tableB.val

SELECT A.val, B.val
FROM tableA A
INNER JOIN tableB B ON A.val = B.val;

-- Result:
-- val (A) | val (B)
-- ---------|--------
-- 1       | 1      ← A(1) × B(1,1) = 2 rows (cartesian on duplicates)
-- 1       | 1
-- 1       | 1
-- 1       | 1
-- 0       | 0      ← A(0) × B(0) = 1 row

-- KEY: NULL vs NULL = UNKNOWN (not TRUE) → no matches for NULL rows
-- Total: 5 rows

LEFT JOIN ON tableA.val = tableB.val

SELECT A.val, B.val
FROM tableA A
LEFT JOIN tableB B ON A.val = B.val;

-- Result:
-- val (A) | val (B)
-- ---------|--------
-- 1       | 1      (A not NULL, matches B)
-- 1       | 1      (A not NULL, matches B)
-- 1       | 1      (A not NULL, matches B)
-- 1       | 1      (A not NULL, matches B)
-- 0       | 0      (A not NULL, matches B)
-- NULL    | NULL   (A is NULL, no match in ON clause → filled with NULL)

-- KEY: All rows from Left table are preserved
-- Total: 6 rows

RIGHT JOIN ON tableA.val = tableB.val

SELECT A.val, B.val
FROM tableA A
RIGHT JOIN tableB B ON A.val = B.val;

-- Result:
-- val (A) | val (B)
-- ---------|--------
-- 1       | 1      (matched)
-- 1       | 1      (matched)
-- 1       | 1      (matched)
-- 1       | 1      (matched)
-- 0       | 0      (matched)
-- NULL    | NULL   (B is NULL, no match from A → filled with NULL)

-- KEY: All rows from Right table are preserved
-- Total: 6 rows

FULL OUTER JOIN ON tableA.val = tableB.val

SELECT A.val, B.val
FROM tableA A
FULL OUTER JOIN tableB B ON A.val = B.val;

-- Result:
-- val (A) | val (B)
-- ---------|--------
-- 1       | 1      (matched)
-- 1       | 1      (matched)
-- 1       | 1      (matched)
-- 1       | 1      (matched)
-- 0       | 0      (matched)
-- NULL    | NULL   (A's NULL unmatched)
-- NULL    | NULL   (B's NULL unmatched)

-- KEY: Union of LEFT + RIGHT semantics
-- Total: 7 rows

Key Takeaways

  • NULL joining rule: NULL = NULL is UNKNOWN in SQL, so NULL rows never match in ON clauses
  • Cartesian product: Duplicate values create multiple match combinations
  • INNER JOIN: Only matching rows (NULLs excluded)
  • LEFT JOIN: All left rows + matched right rows + NULL-filled for unmatched
  • RIGHT JOIN: All right rows + matched left rows + NULL-filled for unmatched
  • FULL OUTER: All rows from both tables, NULL-filled where no match

CROSS JOIN — cartesian product (all combinations)

SELECT a.id, a.val AS a_val, b.val AS b_val
FROM a CROSS JOIN b;
-- Returns 4 × 3 = 12 rows (every row in A combined with every row in B)
-- Use case: generating all combinations (e.g., all dates × all products)

Quick Reference Card

JOIN TypeRows IncludedNULL key matched?Duplicate key effect
INNERMatching rows onlyNoCartesian on matching side
LEFTAll from left + matches from rightLeft NULLs preserved (NULL fill right)Cartesian on matching side
RIGHTAll from right + matches from leftRight NULLs preserved (NULL fill left)Cartesian on matching side
FULL OUTERAll from both sidesAll NULLs preserved, unmatchedCartesian on matching side
CROSSAll combinationsYes (no ON clause)M × N rows always
↑ Back to top
73
Question 73 — SQL
Min, Max & Avg Salaries with user_id — Aggregation Patterns and Edge Cases

Salary aggregation questions are deceptively tricky — the common pitfall is knowing which user has the max/min salary, especially when multiple users tie. Let's cover all common patterns.

Sample Data

CREATE TABLE salaries (
    user_id   INT,
    dept      VARCHAR,
    salary    DECIMAL(10,2),
    hire_date DATE
);

INSERT INTO salaries VALUES
(1, 'Engineering', 95000, '2021-01-10'),
(2, 'Engineering', 120000,'2019-06-15'),
(3, 'Marketing',   72000, '2022-03-01'),
(4, 'Marketing',   95000, '2020-07-20'),
(5, 'HR',          58000, '2023-02-14'),
(6, 'HR',          58000, '2021-11-30'),  -- tie with user 5
(7, 'Engineering', 120000,'2020-01-05');  -- tie with user 2

Pattern 1 — Simple Global Aggregates

-- Min, Max, and Avg salary across all employees
SELECT
    MIN(salary) AS min_salary,
    MAX(salary) AS max_salary,
    ROUND(AVG(salary), 2) AS avg_salary,
    COUNT(*)    AS employee_count
FROM salaries;

-- Result:
-- min_salary | max_salary | avg_salary | employee_count
-- 58000      | 120000     | 88285.71   | 7

Pattern 2 — Find the user_id with max salary (handles ties)

-- WRONG approach: this doesn't give you the user_id
SELECT MAX(salary) AS max_salary FROM salaries;

-- CORRECT: use a subquery or QUALIFY to get user(s) with max salary
-- If there are ties, BOTH user_ids are returned (correct behaviour)
SELECT user_id, dept, salary
FROM salaries
WHERE salary = (SELECT MAX(salary) FROM salaries);
-- Returns: user_id=2 AND user_id=7 (both earn 120000)

-- Alternative using QUALIFY + RANK (handles ties naturally)
SELECT user_id, dept, salary
FROM salaries
QUALIFY RANK() OVER (ORDER BY salary DESC) = 1;
-- Same result: both tied users returned

-- Alternative using DENSE_RANK (same for global max)
SELECT user_id, dept, salary
FROM salaries
QUALIFY DENSE_RANK() OVER (ORDER BY salary DESC) = 1;

Pattern 3 — Min, Max, Avg per department WITH user_id of min/max earner

-- This is the hard version: include user_id for min and max salary in each dept
-- Challenge: standard GROUP BY can't include user_id (not in aggregate)

WITH dept_stats AS (
    SELECT
        dept,
        MIN(salary) AS min_salary,
        MAX(salary) AS max_salary,
        ROUND(AVG(salary), 2) AS avg_salary
    FROM salaries
    GROUP BY dept
),
min_earners AS (
    -- Get user_id(s) with min salary per dept
    SELECT s.dept, s.user_id AS min_user_id, s.salary AS min_salary
    FROM salaries s
    WHERE (s.dept, s.salary) IN (
        SELECT dept, MIN(salary) FROM salaries GROUP BY dept
    )
),
max_earners AS (
    -- Get user_id(s) with max salary per dept
    SELECT s.dept, s.user_id AS max_user_id, s.salary AS max_salary
    FROM salaries s
    WHERE (s.dept, s.salary) IN (
        SELECT dept, MAX(salary) FROM salaries GROUP BY dept
    )
)
SELECT
    ds.dept,
    ds.min_salary,
    mi.min_user_id,
    ds.max_salary,
    ma.max_user_id,
    ds.avg_salary
FROM dept_stats ds
JOIN min_earners mi ON ds.dept = mi.dept
JOIN max_earners ma ON ds.dept = ma.dept
ORDER BY ds.dept;

-- Result (ties shown as separate rows):
-- dept         | min_salary | min_user_id | max_salary | max_user_id | avg_salary
-- Engineering  | 95000      | 1           | 120000     | 2           | 111666.67
-- Engineering  | 95000      | 1           | 120000     | 7           | 111666.67  ← tie
-- HR           | 58000      | 5           | 58000      | 5           | 58000
-- HR           | 58000      | 6           | 58000      | 6           | 58000      ← tie
-- Marketing    | 72000      | 3           | 95000      | 4           | 83500

Pattern 4 — Cleaner version using window functions + QUALIFY

-- Rank employees within each dept by salary (for min use ASC, for max use DESC)
-- Then filter using QUALIFY for clean, readable SQL

-- Top earner per department (handles ties)
SELECT user_id, dept, salary
FROM salaries
QUALIFY DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary DESC) = 1;
-- Returns ALL tied top earners per dept

-- Lowest earner per department
SELECT user_id, dept, salary
FROM salaries
QUALIFY DENSE_RANK() OVER (PARTITION BY dept ORDER BY salary ASC) = 1;

-- Combined: attach min/max user_ids alongside dept averages
SELECT
    user_id,
    dept,
    salary,
    AVG(salary) OVER (PARTITION BY dept)  AS dept_avg,
    MIN(salary) OVER (PARTITION BY dept)  AS dept_min,
    MAX(salary) OVER (PARTITION BY dept)  AS dept_max,
    RANK() OVER (PARTITION BY dept ORDER BY salary DESC) AS salary_rank_in_dept
FROM salaries
ORDER BY dept, salary_rank_in_dept;

Pattern 5 — Employees earning above department average

SELECT user_id, dept, salary,
       ROUND(AVG(salary) OVER (PARTITION BY dept), 2) AS dept_avg
FROM salaries
QUALIFY salary > AVG(salary) OVER (PARTITION BY dept);

-- Result:
-- user_id | dept        | salary | dept_avg
-- 2       | Engineering | 120000 | 111666.67
-- 7       | Engineering | 120000 | 111666.67
-- 4       | Marketing   | 95000  | 83500.00

Common pitfalls in salary aggregation:

  • NULL salariesAVG(salary) ignores NULLs; MIN/MAX also ignore NULLs. Use COALESCE(salary, 0) if you want NULLs treated as zero
  • Ties in min/max — always ask: "if two employees have the same max salary, return one or both?" DENSE_RANK returns all tied rows; ROW_NUMBER returns exactly one (arbitrary choice)
  • GROUP BY error — you cannot SELECT user_id in a GROUP BY query unless user_id is in the GROUP BY or aggregate. Use window functions or subqueries instead
↑ Back to top
74
Question 74 — dbt
dbt is_incremental() Deep Dive: ref() Inside vs Outside, Failure Scenarios & Troubleshooting

is_incremental() is a Jinja macro that evaluates to TRUE only when the model is running in incremental mode AND the target table already exists. Understanding exactly when it fires — and when it does not — prevents the most common incremental model bugs.

Core Behaviour — When is_incremental() is TRUE vs FALSE

-- is_incremental() = TRUE when ALL of these are true:
-- 1. The model's materialization is 'incremental'
-- 2. The target table already exists in the warehouse
-- 3. The run is NOT a full-refresh (dbt run --full-refresh = FALSE)

-- is_incremental() = FALSE when:
-- 1. First-ever run (table doesn't exist yet) → dbt does a full build
-- 2. dbt run --full-refresh is passed → forces complete rebuild
-- 3. materialized != 'incremental'

ref() OUTSIDE is_incremental() — always executes

-- models/marts/fact_orders.sql
{{ config(materialized='incremental', unique_key='order_id') }}

-- This part runs on EVERY execution (full build AND incremental):
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}  -- ← ref() OUTSIDE the block
),
customers AS (
    SELECT * FROM {{ ref('stg_customers') }}  -- ← also OUTSIDE
)
SELECT
    o.order_id,
    o.customer_id,
    c.region,
    o.amount,
    o.updated_at
FROM orders o
JOIN customers c ON o.customer_id = c.customer_id

{% if is_incremental() %}
    -- This WHERE clause is ONLY added during incremental runs
    WHERE o.updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

-- On FIRST RUN (is_incremental() = FALSE):
-- Compiles to: SELECT ... FROM stg_orders JOIN stg_customers
-- (no WHERE clause → loads everything)

-- On SUBSEQUENT RUNS (is_incremental() = TRUE):
-- Compiles to: SELECT ... FROM stg_orders JOIN stg_customers
--              WHERE o.updated_at > (SELECT MAX(updated_at) FROM fact_orders)
-- (only loads new/updated rows)

ref() INSIDE is_incremental() — only executes on incremental runs

-- A ref() placed inside an {% if is_incremental() %} block
-- creates a dependency that is ONLY active during incremental runs
-- This is an unusual but valid pattern for lookback/enrichment logic

{{ config(materialized='incremental', unique_key='order_id') }}

SELECT
    o.order_id,
    o.amount,
    o.updated_at
FROM {{ ref('stg_orders') }} o  -- ← OUTSIDE: always a dependency

{% if is_incremental() %}
    -- ref() INSIDE: only joined during incremental runs
    -- On first run this entire block is skipped
    JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
    WHERE o.updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

-- RISK: On first run, dim_customers join is skipped → you may get different
-- column counts or logic between first run and incremental runs.
-- BEST PRACTICE: Keep all refs OUTSIDE the is_incremental block.
-- Use the block ONLY for WHERE / LIMIT filtering.

The {{ this }} macro — references the model's own target table

-- {{ this }} resolves to the fully-qualified table name of THIS model
-- In dev:  ANALYTICS_DEV.MARTS.FACT_ORDERS
-- In prod: ANALYTICS.MARTS.FACT_ORDERS

-- Common pattern: watermark via MAX of existing table
{% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

-- Safety: handle the case where the watermark column has all NULLs
{% if is_incremental() %}
    WHERE updated_at > (
        SELECT COALESCE(MAX(updated_at), '1900-01-01'::TIMESTAMP)
        FROM {{ this }}
    )
{% endif %}

When does a dbt incremental job FAIL? — Common causes

-- FAILURE 1: Schema mismatch — new column in source not in target table
-- Symptom: "Column 'loyalty_tier' does not exist in target table"
-- Cause: stg_orders got a new column; fact_orders still has old schema
-- Fix: Add on_schema_change config

{{ config(
    materialized='incremental',
    unique_key='order_id',
    on_schema_change='append_new_columns'  -- auto-add new columns to target
    -- Options: 'ignore' | 'fail' | 'append_new_columns' | 'sync_all_columns'
) }}

-- FAILURE 2: Watermark subquery returns NULL on first incremental run
-- Symptom: WHERE clause evaluates to "WHERE updated_at > NULL" → zero rows loaded
-- Cause: First run succeeded but loaded 0 rows (empty source); on 2nd run
--        MAX(updated_at) is NULL → WHERE filters everything out
-- Fix: Use COALESCE as shown above

-- FAILURE 3: unique_key has duplicates in the incoming batch
-- Symptom: MERGE key column uniqueness violation
-- Cause: Source table has duplicate PKs; dbt's generated MERGE fails
-- Fix: Deduplicate in the model before the MERGE executes:

SELECT *,
    ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY updated_at DESC) AS rn
FROM {{ ref('stg_orders') }}
QUALIFY rn = 1  -- keep only the latest version of each order_id

-- FAILURE 4: {{ this }} doesn't exist (table was manually dropped)
-- Symptom: "Object 'ANALYTICS.MARTS.FACT_ORDERS' does not exist"
-- Fix: Run with --full-refresh to rebuild the table from scratch:
-- dbt run --select fact_orders --full-refresh

-- FAILURE 5: Upstream model changed materialization type
-- Symptom: ref() resolves to a view instead of a table; performance degrades
-- or the view itself fails due to changed upstream schema
-- Fix: Run dbt run --select +fact_orders to rebuild from root

Scenario: Initial run succeeds, all subsequent runs fail

-- This is the "succeeds once, breaks forever" pattern — very common in practice

-- Example: fact_daily_revenue with delete+insert strategy
{{ config(
    materialized='incremental',
    incremental_strategy='delete+insert',
    unique_key='sale_date'
) }}

SELECT
    sale_date,
    SUM(revenue) AS total_revenue
FROM {{ ref('stg_orders') }}

{% if is_incremental() %}
    WHERE sale_date >= DATEADD('day', -3, CURRENT_DATE)
{% endif %}

GROUP BY sale_date

-- PROBLEM: On first run (no WHERE), loads all history. Works great.
-- On incremental runs: WHERE filters to last 3 days.
-- But stg_orders was truncated/rebuilt and ONLY has last 7 days of data.
-- SYMPTOM: Runs succeed but older dates silently disappear from the mart.
-- This is a data correctness failure, not a SQL error.

-- REAL FAILURE PATTERN:
-- First run: 10M rows, takes 45 min, succeeds
-- Second run: 50K rows (incremental), takes 2 min, succeeds
-- Third run: ERROR — stg_orders added a NOT NULL column "region"
--   but fact_orders was built without it
-- Fix: on_schema_change='append_new_columns' + dbt run --full-refresh

Troubleshooting checklist for incremental model failures

  • Check compiled SQL — run dbt compile --select <model> and inspect target/compiled/ to see what SQL was actually generated
  • Check is_incremental() evaluation — if the table doesn't exist, is_incremental() is FALSE; run dbt run --full-refresh to reset
  • Check watermark logic — run the MAX subquery manually in Snowflake to verify it returns the expected timestamp
  • Check schema drift — compare SHOW COLUMNS IN TABLE <target> vs the model's SELECT list
  • Add clock skew buffer — use updated_at > DATEADD('minute', -5, MAX(updated_at)) to avoid missing in-flight rows
  • Use --store-failuresdbt test --store-failures saves failing rows to an audit schema for inspection
↑ Back to top
75
Question 75 — SQL / Analytics
Maximum Winning Streak — player_id, event_id, winning_score, event_date (Multiple Events per Day)

What is a Winning Streak?

A winning streak means consecutive DAYS where the player won at least ONE match, regardless of losses on the same day. Key points:

  • Streak is day-based, not match-based: What matters is whether the player won AT LEAST ONE match on a day, not the total number of wins
  • Mixed results on same day: Win + Lose on same day = STREAK CONTINUES. The presence of at least one win counts
  • Complete loss day: If a day has ONLY losses or no matches, the streak BREAKS immediately
  • Consecutive days rule: Day1 has a win → Streak = 1. Day2 has a win → Streak = 2. Day3 has zero wins → Streak = 0 (reset). Day4 has a win → Streak = 1 (fresh count)
  • Key insight: Streak measures consecutive winning DAYS, not consecutive winning matches. One loss per day doesn't break the streak if there's at least one win

Example Timeline:

Day 1: Win, Lose, Win      → Streak = 1  (at least one win on Day 1; losses don't break it)
Day 2: Lose                → Streak = 0  (no wins on Day 2; streak broken)
Day 3: Win                 → Streak = 1  (fresh count; at least one win on Day 3)
Day 4: Win, Lose, Lose     → Streak = 2  (at least one win on Day 4; continues from Day 3)
Day 5: Lose, Lose          → Streak = 0  (no wins on Day 5; streak broken)
Day 6: Win                 → Streak = 1  (fresh count begins)

Maximum winning streak across all timeline = 2 (Day 3 through Day 4)

Streak problems are about consecutive "winning DAYS" — not consecutive individual wins. The key technique is the "gaps and islands" pattern, but applied at the DAY level: identify which days have at least one win, then find consecutive sequences of such days.

Sample Data

CREATE TABLE events (
    player_id     INT,
    event_id      INT,     -- global unique event sequence number
    winning_score INT,     -- 1 = player won this event, 0 = lost, NULL = did not participate
    event_date    DATE
);

INSERT INTO events VALUES
-- Player 1: Day1 (W+L) → Day2 (L only) → Day3+ (W → W+L+L)
(1, 1,  1, '2024-01-01'),  -- Day 1: Win
(1, 2,  0, '2024-01-01'),  -- Day 1: Lose (but Day 1 has at least 1 win → streak continues)
(1, 3,  0, '2024-01-02'),  -- Day 2: Lose only (no wins → streak breaks)
(1, 4,  1, '2024-01-03'),  -- Day 3: Win (fresh streak starts = 1)
(1, 5,  1, '2024-01-04'),  -- Day 4: Win (streak continues = 2)
(1, 6,  0, '2024-01-04'),  -- Day 4: Lose (but Day 4 has at least 1 win → streak still continues = 2)
(1, 7,  0, '2024-01-04'),  -- Day 4: Lose (still at least 1 win on Day 4)
-- Player 2: L → W → W+L → W+L+L
(2, 8,  0, '2024-01-01'),  -- Day 1: Lose only (no wins → no streak)
(2, 9,  1, '2024-01-02'),  -- Day 2: Win (fresh streak starts = 1)
(2, 10, 1, '2024-01-03'),  -- Day 3: Win (streak continues = 2)
(2, 11, 0, '2024-01-03'),  -- Day 3: Lose (but Day 3 has at least 1 win → streak still continues = 2)
(2, 12, 1, '2024-01-04'),  -- Day 4: Win (streak continues = 3)
(2, 13, 0, '2024-01-04'),  -- Day 4: Lose (but Day 4 has at least 1 win)
(2, 14, 0, '2024-01-04');  -- Day 4: Lose (still at least 1 win on Day 4 → streak = 3)

Step 1 — Identify days with at least one win per player

-- Group by player and date; if SUM(winning_score) > 0, that day has wins
WITH winning_days AS (
    SELECT
        player_id,
        event_date,
        SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) AS wins_on_day
    FROM events
    GROUP BY player_id, event_date
    HAVING SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) > 0
)
SELECT * FROM winning_days ORDER BY player_id, event_date;

Step 2 — Gaps and islands on DAYS (not individual matches)

-- For consecutive winning days, the difference (day_row_num - day_offset) stays constant
-- When a day with NO wins breaks the streak, the row number changes

WITH winning_days AS (
    SELECT
        player_id,
        event_date,
        SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) AS wins_on_day
    FROM events
    GROUP BY player_id, event_date
    HAVING SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) > 0
),
ordered_days AS (
    SELECT
        player_id,
        event_date,
        ROW_NUMBER() OVER (PARTITION BY player_id ORDER BY event_date) AS day_seq,
        ROW_NUMBER() OVER (PARTITION BY player_id ORDER BY event_date) 
        - (SELECT COUNT(DISTINCT event_date) FROM events e2 
           WHERE e2.player_id = winning_days.player_id 
           AND e2.event_date < winning_days.event_date 
           AND (SELECT SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) 
                FROM events e3 WHERE e3.player_id = e2.player_id AND e3.event_date = e2.event_date) > 0) AS streak_group
    FROM winning_days
)
SELECT player_id, event_date, day_seq, streak_group FROM ordered_days ORDER BY player_id, event_date;

                

Step 3 — Count consecutive winning days and find maximum streak

WITH winning_days AS (
    SELECT
        player_id,
        event_date,
        SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) AS wins_on_day
    FROM events
    GROUP BY player_id, event_date
    HAVING SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) > 0
),
-- Tag each winning day with a "day gap" marker
-- If the previous day (for this player) was also a winning day, gap = 0; otherwise gap = 1
day_gaps AS (
    SELECT
        player_id,
        event_date,
        wins_on_day,
        CASE 
            WHEN LAG(event_date) OVER (PARTITION BY player_id ORDER BY event_date)
                 = DATEADD(day, -1, event_date)
            THEN 0  -- consecutive day (previous day was yesterday)
            ELSE 1  -- new streak (gap or first day)
        END AS streak_start
    FROM winning_days
),
-- Assign streak ID by summing up all streak starts
streak_ids AS (
    SELECT
        player_id,
        event_date,
        SUM(streak_start) OVER (PARTITION BY player_id ORDER BY event_date ROWS UNBOUNDED PRECEDING) AS streak_id
    FROM day_gaps
),
-- Count the number of consecutive winning days per streak
streak_lengths AS (
    SELECT
        player_id,
        streak_id,
        COUNT(*) AS streak_length
    FROM streak_ids
    GROUP BY player_id, streak_id
)
SELECT
    player_id,
    MAX(streak_length) AS max_winning_streak
FROM streak_lengths
GROUP BY player_id
ORDER BY player_id;

-- Result:
-- player_id | max_winning_streak
-- 1         | 2                   ← Day 3 (Jan 3) + Day 4 (Jan 4)
-- 2         | 3                   ← Day 2 (Jan 2) + Day 3 (Jan 3) + Day 4 (Jan 4)

                

Alternative — Simpler approach: Check if each day has at least one win

-- Simplified: group by date, check SUM(wins), then detect consecutive date gaps

WITH daily_wins AS (
    SELECT
        player_id,
        event_date,
        SUM(CASE WHEN winning_score = 1 THEN 1 ELSE 0 END) AS total_wins
    FROM events
    GROUP BY player_id, event_date
),
winning_days_only AS (
    SELECT
        player_id,
        event_date,
        total_wins,
        ROW_NUMBER() OVER (PARTITION BY player_id ORDER BY event_date) AS day_seq
    FROM daily_wins
    WHERE total_wins > 0  -- only days with at least 1 win
),
-- Detect day gaps: if consecutive days are not 1 apart, a gap occurred
streak_markers AS (
    SELECT
        player_id,
        event_date,
        day_seq,
        DATEDIFF(day, LAG(event_date) OVER (PARTITION BY player_id ORDER BY event_date), event_date) AS day_gap
    FROM winning_days_only
),
-- Assign streak IDs based on gaps
streak_ids AS (
    SELECT
        player_id,
        event_date,
        SUM(CASE WHEN day_gap > 1 OR day_gap IS NULL THEN 1 ELSE 0 END) 
            OVER (PARTITION BY player_id ORDER BY event_date ROWS UNBOUNDED PRECEDING) AS streak_id
    FROM streak_markers
),
streak_counts AS (
    SELECT
        player_id,
        streak_id,
        COUNT(*) AS winning_days_in_streak
    FROM streak_ids
    GROUP BY player_id, streak_id
)
SELECT
    player_id,
    MAX(winning_days_in_streak) AS max_winning_streak
FROM streak_counts
GROUP BY player_id;

-- Result: Same as above
-- player_id | max_winning_streak
-- 1         | 2                   ← Day 3 + Day 4
-- 2         | 3                   ← Day 2 + Day 3 + Day 4

Key concepts to explain in interviews:

  • Day-level vs match-level: This problem counts consecutive DAYS with wins, not consecutive individual wins. One loss mixed with wins on the same day does NOT break the streak
  • Gaps and islands on dates: Use ROW_NUMBER() to assign day sequences, then detect gaps. A gap > 1 day marks the start of a new streak
  • DATEDIFF for date gaps: DATEDIFF(day, previous_date, current_date) tells you if days are consecutive. If > 1, there's a gap
  • NULL handling: A day with no events at all is not counted. Only days with at least one event are relevant. A day with all losses has zero wins — it breaks the streak
  • Business definition: Always clarify with the interviewer: does "streak" mean consecutive individual wins, or consecutive days with at least one win? This fundamentally changes the solution
↑ Back to top
76
Question 76 — SQL / Snowflake
LIKE, ILIKE, RLIKE, REGEXP_LIKE — Pattern Matching in Snowflake

Snowflake supports four levels of pattern matching: simple wildcards (LIKE/ILIKE), POSIX regex (RLIKE), and full REGEXP functions. Knowing when to use each — and their performance implications — is essential for data cleaning and filtering tasks.

LIKE — case-sensitive wildcard matching

-- Wildcards: % = any sequence of characters, _ = exactly one character

-- Emails ending in @gmail.com
SELECT email FROM users WHERE email LIKE '%@gmail.com';

-- Names starting with 'Sur'
SELECT name FROM employees WHERE name LIKE 'Sur%';

-- Exactly 5-character product codes
SELECT code FROM products WHERE code LIKE '_____';   -- 5 underscores

-- Contains 'cloud' anywhere in the string
SELECT description FROM docs WHERE description LIKE '%cloud%';

-- LIKE is case-sensitive in Snowflake by default!
SELECT * FROM users WHERE name LIKE 'surya%';   -- will NOT match 'Surya'

ILIKE — case-INsensitive wildcard matching (Snowflake extension)

-- Same as LIKE but ignores case — very useful for user-entered data

-- Match 'Surya', 'surya', 'SURYA' all at once
SELECT * FROM users WHERE name ILIKE 'surya%';

-- Find any cloud-related tag regardless of capitalisation
SELECT tag FROM labels WHERE tag ILIKE '%cloud%';

-- Standard SQL alternative (works everywhere):
SELECT * FROM users WHERE LOWER(name) LIKE 'surya%';
-- ILIKE is cleaner and often faster (no LOWER() function overhead)

RLIKE — regular expression matching (alias: REGEXP, REGEXP_LIKE)

-- RLIKE uses POSIX extended regex syntax
-- Returns TRUE/FALSE — use in WHERE clause

-- Valid email format check
SELECT email
FROM users
WHERE email RLIKE '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$';

-- Phone numbers in formats: 555-1234, (555) 123-4567, +1-555-123-4567
SELECT phone FROM contacts
WHERE phone RLIKE '^(\\+1-)?\\(?[0-9]{3}\\)?[-. ]?[0-9]{3}[-. ]?[0-9]{4}$';

-- Strings containing only digits
SELECT value FROM raw_data WHERE value RLIKE '^[0-9]+$';

-- Match Indian PAN card format: ABCDE1234F (5 letters, 4 digits, 1 letter)
SELECT pan FROM kyc_data WHERE pan RLIKE '^[A-Z]{5}[0-9]{4}[A-Z]$';

-- RLIKE is case-sensitive by default in Snowflake
-- For case-insensitive regex, use (?i) flag or REGEXP_LIKE with 'i' parameter:
SELECT name FROM employees WHERE name RLIKE '(?i)^surya';

REGEXP_LIKE, REGEXP_SUBSTR, REGEXP_REPLACE, REGEXP_COUNT — full family

-- REGEXP_LIKE: explicit function form of RLIKE
SELECT REGEXP_LIKE('Surya@gmail.com', '^[a-zA-Z]+@[a-zA-Z]+\\.com$') AS is_valid_email;
-- Returns: TRUE

-- REGEXP_LIKE with case-insensitive flag 'i'
SELECT * FROM users WHERE REGEXP_LIKE(email, '^surya.*\\.com$', 'i');

-- REGEXP_SUBSTR: extract the FIRST match from a string
-- Extract domain name from email
SELECT
    email,
    REGEXP_SUBSTR(email, '@(.+)', 1, 1, 'e', 1) AS domain
    -- params: (string, pattern, position, occurrence, flag, group)
FROM users;
-- 'suryateja@gmail.com' → 'gmail.com'

-- Extract all digits from a mixed string
SELECT REGEXP_SUBSTR('Order #12345 placed', '[0-9]+') AS order_number;
-- Returns: '12345'

-- REGEXP_REPLACE: replace matches with substitution
-- Mask credit card number (keep last 4 digits)
SELECT REGEXP_REPLACE('4111-1111-1111-1234', '[0-9]{4}-[0-9]{4}-[0-9]{4}-', '****-****-****-')
    AS masked_card;
-- Returns: '****-****-****-1234'

-- Remove all non-alphanumeric characters
SELECT REGEXP_REPLACE(phone_raw, '[^a-zA-Z0-9]', '') AS phone_clean
FROM contacts;

-- REGEXP_COUNT: count occurrences of a pattern
SELECT REGEXP_COUNT('the cat sat on the mat', 'at') AS count_at;
-- Returns: 3  (c-at, s-at, m-at)

Comparison Table

FunctionCase Sensitive?Pattern TypeReturnsBest For
LIKEYesWildcards (%, _)TRUE/FALSESimple prefix/suffix/contains
ILIKENoWildcards (%, _)TRUE/FALSECase-insensitive simple matching
RLIKE / REGEXPYes (use (?i) for not)Full POSIX regexTRUE/FALSEComplex validation, WHERE clause
REGEXP_LIKEConfigurable ('i' flag)Full POSIX regexTRUE/FALSEExplicit regex WHERE filtering
REGEXP_SUBSTRConfigurableFull POSIX regexMatched substringExtraction (email domain, order IDs)
REGEXP_REPLACEConfigurableFull POSIX regexModified stringMasking, cleaning, normalisation
REGEXP_COUNTConfigurableFull POSIX regexInteger countCounting pattern occurrences

Performance notes:

  • LIKE with a leading wildcard ('%value') cannot use partition metadata for pruning — it must scan all rows. Prefer trailing wildcards ('value%') where possible
  • ILIKE is generally faster than LOWER(col) LIKE 'val%' because it avoids the function transformation
  • RLIKE/REGEXP functions are significantly more expensive than LIKE — use only when simple wildcards are insufficient
  • For high-volume filtering on regex patterns, consider pre-extracting the relevant part into a separate column and indexing/clustering on it instead
↑ Back to top
77
Question 77 — Snowflake / JSON
Flatten Multi-Nested Arrays in Snowflake — Chained LATERAL FLATTEN

Real-world JSON payloads rarely have simple arrays — they have arrays of objects that themselves contain arrays. Chaining multiple LATERAL FLATTEN calls handles arbitrary nesting depth, one level at a time.

Sample JSON Structure — deeply nested

-- A single order payload with nested line items and nested discount codes per item:
{
  "order_id": "ORD-001",
  "customer_id": 1001,
  "items": [
    {
      "sku": "SHOE-RED",
      "quantity": 2,
      "price": 49.99,
      "discounts": [
        {"code": "SAVE10", "amount": 5.00},
        {"code": "LOYALTY", "amount": 2.50}
      ]
    },
    {
      "sku": "BAG-BLUE",
      "quantity": 1,
      "price": 89.00,
      "discounts": [
        {"code": "WELCOME", "amount": 10.00}
      ]
    }
  ]
}

Level 1 — Flatten the items array

-- First LATERAL FLATTEN unpacks the outer 'items' array
-- Each item becomes its own row, with the parent order_id preserved

SELECT
    o.value:order_id::VARCHAR          AS order_id,
    o.value:customer_id::INT           AS customer_id,
    item.value:sku::VARCHAR            AS sku,
    item.value:quantity::INT           AS quantity,
    item.value:price::DECIMAL(10,2)    AS unit_price,
    item.index                         AS item_position,    -- 0-based position in array
    item.value                         AS item_raw          -- full item object (for next level)
FROM raw.orders_landing,
LATERAL FLATTEN(input => value:items) AS item;

-- Result: 2 rows for ORD-001 (one per item)
-- order_id | sku      | quantity | unit_price | item_position
-- ORD-001  | SHOE-RED | 2        | 49.99      | 0
-- ORD-001  | BAG-BLUE | 1        | 89.00      | 1

Level 2 — Chain a second FLATTEN to unpack discounts within each item

-- Each LATERAL FLATTEN can reference the output of the previous one
-- item.value is passed as input to the second FLATTEN

SELECT
    o.value:order_id::VARCHAR          AS order_id,
    o.value:customer_id::INT           AS customer_id,
    item.value:sku::VARCHAR            AS sku,
    item.value:quantity::INT           AS quantity,
    item.value:price::DECIMAL(10,2)    AS unit_price,
    item.index                         AS item_position,
    disc.value:code::VARCHAR           AS discount_code,
    disc.value:amount::DECIMAL(10,2)   AS discount_amount,
    disc.index                         AS discount_position
FROM raw.orders_landing,
LATERAL FLATTEN(input => value:items)             AS item,   -- Level 1
LATERAL FLATTEN(input => item.value:discounts)    AS disc;   -- Level 2 (chained)

-- Result: 3 rows for ORD-001 (one per discount across all items)
-- order_id | sku      | unit_price | discount_code | discount_amount
-- ORD-001  | SHOE-RED | 49.99      | SAVE10        | 5.00
-- ORD-001  | SHOE-RED | 49.99      | LOYALTY       | 2.50
-- ORD-001  | BAG-BLUE | 89.00      | WELCOME       | 10.00

Handling optional/missing nested arrays — OUTER FLATTEN

-- Problem: if an item has NO discounts, the chained FLATTEN produces zero rows
-- for that item → the item disappears from results entirely

-- Solution: use OUTER => TRUE on the inner FLATTEN
-- OUTER = TRUE means "if the array is empty or null, still produce one row with NULL values"

SELECT
    o.value:order_id::VARCHAR          AS order_id,
    item.value:sku::VARCHAR            AS sku,
    item.value:price::DECIMAL(10,2)    AS unit_price,
    disc.value:code::VARCHAR           AS discount_code,   -- NULL if no discounts
    disc.value:amount::DECIMAL(10,2)   AS discount_amount  -- NULL if no discounts
FROM raw.orders_landing,
LATERAL FLATTEN(input => value:items)                           AS item,
LATERAL FLATTEN(input => item.value:discounts, OUTER => TRUE)   AS disc;
-- Items with no discounts appear with NULL discount_code/amount (not dropped)

Level 3 — Three levels of nesting (e.g., order → items → attributes → values)

-- If each discount itself has an array of "applicable_dates":
SELECT
    o.value:order_id::VARCHAR           AS order_id,
    item.value:sku::VARCHAR             AS sku,
    disc.value:code::VARCHAR            AS discount_code,
    apply_date.value::DATE              AS applicable_date
FROM raw.orders_landing,
LATERAL FLATTEN(input => value:items)                            AS item,
LATERAL FLATTEN(input => item.value:discounts, OUTER => TRUE)    AS disc,
LATERAL FLATTEN(input => disc.value:applicable_dates, OUTER => TRUE) AS apply_date;
-- Three chained FLATTENs — each level adds one more join via LATERAL

FLATTEN in a dbt model

-- models/staging/stg_order_discounts.sql
-- Unpack orders → items → discounts into a clean relational table

{{ config(materialized='view') }}

SELECT
    src.value:order_id::VARCHAR         AS order_id,
    src.value:customer_id::INT          AS customer_id,
    item.value:sku::VARCHAR             AS sku,
    item.value:quantity::INT            AS quantity,
    item.value:price::DECIMAL(10,2)     AS unit_price,
    disc.value:code::VARCHAR            AS discount_code,
    disc.value:amount::DECIMAL(10,2)    AS discount_amount,
    src._loaded_at                      AS ingested_at
FROM {{ source('raw', 'orders_landing') }} src,
LATERAL FLATTEN(input => src.value:items)              item,
LATERAL FLATTEN(input => item.value:discounts, OUTER => TRUE) disc

Key FLATTEN parameters:

ParameterDefaultDescription
inputrequiredThe VARIANT / ARRAY / OBJECT to flatten
OUTERFALSETRUE = produce a row with NULLs when input is empty/null (like LEFT JOIN)
RECURSIVEFALSETRUE = flatten all levels recursively (use with caution on deeply nested data)
MODEBOTHARRAY / OBJECT / BOTH — what types to expand
f.valueThe element value at this position
f.index0-based position in the array (NULL for objects)
f.keyKey name (for object flattening)
f.pathFull dot-notation path to this element
↑ Back to top
78
Question 78 — FinOps / Architecture
Cost Savings on Data Engineering Projects — Strategies, Frameworks & Real Examples

Cost optimisation is a core skill for senior data engineers — interviewers want specific, quantified examples. Here is a framework for identifying, measuring, and implementing savings across compute, storage, ingestion, and pipeline design.

1. Compute Cost Savings — Warehouse Right-Sizing & Auto-Suspend

-- Identify warehouses that are over-provisioned (low activity %)
SELECT
    warehouse_name,
    SUM(credits_used)                                         AS total_credits,
    ROUND(SUM(credits_used) * 3.0, 2)                        AS est_cost_usd,
    ROUND(AVG(CASE WHEN credits_used > 0 THEN 100 ELSE 0 END), 1) AS pct_active
FROM snowflake.account_usage.warehouse_metering_history
WHERE start_time >= CURRENT_DATE - 30
GROUP BY 1 ORDER BY 3 DESC;

-- Action: downsize warehouses with <20% active time
ALTER WAREHOUSE ANALYTICS_WH SET WAREHOUSE_SIZE = 'MEDIUM';  -- was LARGE
-- Immediate 50% compute cost reduction on this warehouse

-- Action: aggressively set auto-suspend
ALTER WAREHOUSE DEV_WH SET AUTO_SUSPEND = 60;    -- 1 min for dev
ALTER WAREHOUSE ETL_WH SET AUTO_SUSPEND = 120;   -- 2 min for batch ETL
-- Typical savings: 20-40% on idle warehouse costs

2. Storage Cost Savings — Transient Tables & Retention Tuning

-- Staging tables don't need Time Travel or Fail-Safe
-- Transient tables have no Fail-Safe and 0-1 day Time Travel → ~50% lower storage cost

-- In dbt: set staging layer to transient
{{ config(materialized='table', transient=true) }}  -- staging models

-- For permanent tables: reduce retention on large, low-risk tables
ALTER TABLE raw.clickstream_events SET DATA_RETENTION_TIME = 1;   -- was 14
ALTER TABLE raw.server_logs SET DATA_RETENTION_TIME = 1;

-- Find Time Travel storage waste (high time_travel_bytes)
SELECT table_name,
       ROUND(time_travel_bytes / POWER(1024,3), 2) AS time_travel_gb,
       ROUND(failsafe_bytes    / POWER(1024,3), 2) AS failsafe_gb
FROM snowflake.account_usage.table_storage_metrics
ORDER BY (time_travel_bytes + failsafe_bytes) DESC LIMIT 20;
-- Typical savings: 10-30% on storage for large event/log tables

3. Query Efficiency — Eliminate Full Table Scans

-- Find queries scanning >80% of a large table's partitions (no pruning)
SELECT
    LEFT(query_text, 100) AS query_preview,
    ROUND(partitions_scanned * 100.0 / NULLIF(partitions_total, 0), 1) AS pct_scanned,
    total_elapsed_time / 1000 AS elapsed_sec,
    warehouse_name
FROM snowflake.account_usage.query_history
WHERE start_time >= CURRENT_DATE - 7
  AND partitions_total > 100
  AND partitions_scanned / NULLIF(partitions_total, 0) > 0.8
ORDER BY total_elapsed_time DESC;

-- Fix: add clustering key so BI queries prune by date
ALTER TABLE marts.fact_orders CLUSTER BY (order_date);
-- Typical savings: 40-70% query time reduction → cheaper compute needed

4. Pipeline Architecture — Incremental vs Full Refresh

-- Scenario: dbt model doing full rebuild of 500M row table nightly
-- Takes 45 min on X-LARGE warehouse ($80/run × 30 days = $2,400/month)

-- Fix: convert to incremental model
{{ config(
    materialized='incremental',
    unique_key='order_id',
    incremental_strategy='merge'
) }}
SELECT * FROM {{ ref('stg_orders') }}
{% if is_incremental() %}
    WHERE updated_at > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}

-- Result: 3 min on MEDIUM warehouse ($2/run × 30 days = $60/month)
-- Savings: $2,340/month on ONE model alone

5. Smart Scheduling — Only Run When Data Exists

-- Use Snowflake Streams to gate Tasks (zero credits if no new data)
CREATE TASK etl.process_orders_task
    WAREHOUSE = TRANSFORM_WH
    SCHEDULE  = '5 MINUTE'
    WHEN SYSTEM$STREAM_HAS_DATA('raw.orders_stream')  -- only runs if stream has data
AS CALL etl.sp_merge_orders();

-- Without WHEN guard: task starts warehouse every 5 min, 288 times/day
-- With WHEN guard: warehouse starts only when data arrives → 80-95% fewer cold starts

6. Data Architecture — Move Cold Data to External Tables

-- Data older than 2 years: store in S3 Parquet, query via external table
-- S3 cost: ~$0.023/GB/month vs Snowflake storage: ~$40/TB/month
-- 10TB of cold data: $400/month in Snowflake vs $230/month in S3 (~$170/month saved)

CREATE EXTERNAL TABLE archive.orders_historical (
    order_id    INT    AS (value:order_id::INT),
    order_date  DATE   AS (value:order_date::DATE),
    net_revenue FLOAT  AS (value:net_revenue::FLOAT)
)
PARTITION BY (order_year INT AS YEAR(order_date))
LOCATION = @archive.s3_archive_stage/orders/
FILE_FORMAT = (TYPE = 'PARQUET')
AUTO_REFRESH = TRUE;

7. Tooling Consolidation — Rationalise Ingestion Tools

  • Replace Fivetran (per-MAR pricing) with Airbyte (open source) for internal source connectors where Qlik Replicate already handles CDC — avoid double-paying for the same source
  • Consolidate overlapping dbt Cloud and Airflow orchestration — standardise on one orchestrator to reduce licensing + maintenance cost
  • Use Snowflake Tasks + Streams for simple Snowflake-internal pipelines instead of triggering Airflow for a single SQL statement

Cost Savings Summary Framework

CategoryActionTypical Savings
ComputeRight-size + auto-suspend warehouses20–40%
ComputeFull refresh → incremental models50–90% per model
ComputeStream-gated tasks (no empty runs)80–95% on idle tasks
StorageTransient staging tables~50% on staging storage
StorageReduce Time Travel retention on raw tables10–30%
StorageMove cold data to S3 external tables30–60% on archive storage
QueryClustering keys on large fact tables40–70% query time
LicensingConsolidate ingestion toolsVaries by contract
↑ Back to top
79
Question 79 — Security / Governance
Redaction vs Static Masking vs Dynamic Data Masking vs Row-Level Security — Complete Comparison

These four techniques all protect sensitive data but work at different layers, with different performance characteristics and governance implications. Interviewers often conflate them — here is a precise, side-by-side breakdown.

1. Data Redaction — Data is physically removed or replaced at source

Redaction means the sensitive value is permanently removed or overwritten in the storage layer — it does not exist in the table at all. There is no dynamic evaluation — the data is gone. Common in ETL pipelines before loading to analytics environments.

-- Static redaction during ETL: replace PII before loading to Snowflake
INSERT INTO staging.customers (customer_id, email, phone)
SELECT
    customer_id,
    '***@redacted.com'      AS email,    -- permanently replaced, original gone
    'XXX-XXX-XXXX'          AS phone
FROM source.raw_customers;

-- Use case: dev/test environments — load redacted data so developers
-- can work with realistic data shapes without ever seeing real PII
-- Unlike masking, there is NO way to recover the original — it never landed

2. Static (Format-Preserving) Masking — Data replaced consistently, value-preserving

Static masking replaces the value with a fake but realistic one — format is preserved so downstream logic still works. Unlike redaction, the substitution is often consistent (same input → same fake output), enabling joins across masked tables.

-- Static masking: replace with fake but format-preserving values
-- Used when: test/dev DB needs referential integrity but not real values

-- Mask credit card: keep format, replace digits
SELECT
    card_number,
    REGEXP_REPLACE(card_number, '[0-9]', 'X') AS masked_card
    -- 4111-1111-1111-1234 → XXXX-XXXX-XXXX-XXXX

-- Format-preserving: keep last 4 digits (realistic for display)
SELECT
    RPAD('****', LENGTH(card_number) - 4, '*') || RIGHT(card_number, 4) AS display_card
    -- 4111-1111-1111-1234 → ****-****-****-1234

-- Consistent substitution using hash (same input → same fake output)
SELECT MD5(email) || '@fake.com' AS masked_email
FROM customers;
-- 'john@real.com' → always maps to same fake email → joins work

3. Dynamic Data Masking (DDM) — Real-time masking at query time, role-based

DDM is the most powerful technique for production analytics environments. The real data is stored unmasked in Snowflake. When a query runs, the masking policy function is evaluated on the fly and the result depends on the querying user's role. Different roles see different values from the same physical table — with zero data duplication.

-- Step 1: Create masking policy with role-based logic
CREATE OR REPLACE MASKING POLICY pii.email_mask
    AS (val VARCHAR) RETURNS VARCHAR ->
    CASE
        -- DBA and data owners see the real email
        WHEN CURRENT_ROLE() IN ('SYSADMIN', 'DATA_OWNER_ROLE')
            THEN val
        -- Analysts see first letter + masked domain (useful for support)
        WHEN CURRENT_ROLE() = 'ANALYST_ROLE'
            THEN LEFT(val, 1) || '***@***.***'
        -- All other roles see fully masked value
        ELSE '***MASKED***'
    END;

-- Step 2: Apply policy to the column (one-time DDL)
ALTER TABLE staging.customers
    MODIFY COLUMN email SET MASKING POLICY pii.email_mask;

-- Step 3: Observe different results for different roles
USE ROLE DATA_OWNER_ROLE;
SELECT email FROM staging.customers;
-- Output: john.doe@gmail.com  (real value)

USE ROLE ANALYST_ROLE;
SELECT email FROM staging.customers;
-- Output: j***@***.***  (partially masked)

USE ROLE DEVELOPER_ROLE;
SELECT email FROM staging.customers;
-- Output: ***MASKED***

-- KEY: the data in Snowflake storage is ALWAYS the real email.
-- Masking is applied at query time — zero storage of duplicate masked data.

4. Row-Level Security (Row Access Policies) — Control WHICH ROWS a user can see

Where DDM controls WHAT VALUE is shown in a column, Row-Level Security controls WHICH ROWS are returned at all. A user simply does not see rows they are not permitted to — the rows are filtered out as if they don't exist.

-- Use case: regional sales managers should only see their own region's data
-- Without RLS: all 10M rows returned to every user — must rely on WHERE clause
-- With RLS: Snowflake enforces the filter automatically, user can't bypass it

-- Step 1: Create a mapping table (user → allowed region)
CREATE TABLE access.region_access (
    user_name VARCHAR,
    region    VARCHAR
);
INSERT INTO access.region_access VALUES
('analyst_apac',   'APAC'),
('analyst_emea',   'EMEA'),
('analyst_us',     'US-EAST'),
('analyst_us',     'US-WEST');   -- user can see multiple regions

-- Step 2: Create row access policy
CREATE OR REPLACE ROW ACCESS POLICY security.region_filter
    AS (region_col VARCHAR) RETURNS BOOLEAN ->
    -- ADMIN sees all rows
    CURRENT_ROLE() = 'ADMIN_ROLE'
    -- Other users see only their allowed regions
    OR EXISTS (
        SELECT 1 FROM access.region_access
        WHERE user_name = CURRENT_USER()
          AND region    = region_col
    );

-- Step 3: Apply to table
ALTER TABLE marts.fact_orders
    ADD ROW ACCESS POLICY security.region_filter ON (region);

-- Now analyst_apac runs:
SELECT * FROM marts.fact_orders;
-- Only returns rows WHERE region = 'APAC' — automatically, silently, enforced

Combining DDM + Row-Level Security

-- Most enterprise setups combine both:
-- Row Access Policy: analyst sees only their region's rows
-- Masking Policy:    within those rows, email is masked unless they have DBO role
-- This gives two independent dimensions of access control

Comparison Table

TechniqueWhere Data is ProtectedReversible?Role-Based?Best For
RedactionETL pipeline (data never lands)No — original goneNoDev/test environments, GDPR deletion
Static MaskingPhysical table (fake values stored)No — original replacedNoTest DB clones, consistent anonymisation
Dynamic MaskingQuery time (real data stored, masked at read)Yes — real data preservedYes (CURRENT_ROLE)Production analytics, PII column-level control
Row-Level SecurityQuery time (filter on rows)Yes — real data preservedYes (CURRENT_USER/ROLE)Multi-tenant, regional access control
↑ Back to top
80
Question 80 — NoSQL
MongoDB Fundamentals — Documents, Collections, Queries, Aggregations & When to Use It

MongoDB is a document-oriented NoSQL database that stores data as JSON-like BSON documents. As a data engineer, you encounter MongoDB as a source system — understanding its data model and query language is essential for designing ingestion pipelines.

Core Concepts — SQL vs MongoDB Terminology

SQL ConceptMongoDB EquivalentDescription
DatabaseDatabaseSame concept
TableCollectionGroup of documents
RowDocument (BSON)JSON-like key-value object
ColumnFieldKey in a document
Primary Key_idAuto-generated ObjectId or custom
JOIN$lookup (aggregation)Expensive — denormalise instead
INDEXIndexSame concept, different syntax
SELECTfind() / aggregate()Query documents

Document Structure — schemaless, nested, flexible

// A MongoDB order document (BSON stored as JSON)
{
    "_id": ObjectId("64f2a1b3c4e5f6789a0b1c2d"),
    "order_id": "ORD-001",
    "customer": {
        "id": 1001,
        "name": "Suryateja",
        "email": "surya@example.com"
    },
    "items": [
        { "sku": "SHOE-RED", "qty": 2, "price": 49.99 },
        { "sku": "BAG-BLUE", "qty": 1, "price": 89.00 }
    ],
    "status": "DELIVERED",
    "created_at": ISODate("2024-01-15T10:30:00Z"),
    "total": 188.98
}
// Key insight: nested objects and arrays are first-class — no JOIN needed
// But this makes querying and flattening for analytics harder

Basic CRUD Operations

// INSERT one document
db.orders.insertOne({
    order_id: "ORD-002",
    customer: { id: 1002, name: "Ravi" },
    status: "PENDING",
    total: 75.00
});

// FIND all delivered orders (equivalent to SELECT WHERE)
db.orders.find({ status: "DELIVERED" });

// FIND with projection (equivalent to SELECT specific columns)
db.orders.find(
    { status: "DELIVERED" },     // filter
    { order_id: 1, total: 1, _id: 0 }  // projection: 1=include, 0=exclude
);

// Nested field query: find orders where customer.id = 1001
db.orders.find({ "customer.id": 1001 });

// Array query: find orders containing SKU 'SHOE-RED' in items
db.orders.find({ "items.sku": "SHOE-RED" });

// UPDATE one document
db.orders.updateOne(
    { order_id: "ORD-001" },
    { $set: { status: "RETURNED" } }
);

// DELETE documents older than 1 year
db.orders.deleteMany({
    created_at: { $lt: new Date("2023-01-01") }
});

// Common query operators:
// $eq, $ne, $gt, $gte, $lt, $lte  → comparison
// $in, $nin                        → value in/not in array
// $and, $or, $not                  → logical
// $exists                          → field exists check
// $regex                           → regex match

Aggregation Pipeline — the MongoDB equivalent of GROUP BY + HAVING + JOIN

// Aggregation pipeline: each stage transforms the document stream
// Equivalent to: SELECT status, COUNT(*), SUM(total) FROM orders GROUP BY status

db.orders.aggregate([
    // Stage 1: Filter (WHERE)
    { $match: { status: { $in: ["DELIVERED", "RETURNED"] } } },

    // Stage 2: Unwind array (like LATERAL FLATTEN — one row per item)
    { $unwind: "$items" },

    // Stage 3: Project computed fields
    { $project: {
        order_id: 1,
        status: 1,
        sku: "$items.sku",
        line_total: { $multiply: ["$items.qty", "$items.price"] }
    }},

    // Stage 4: Group by (GROUP BY + aggregate)
    { $group: {
        _id: "$status",
        order_count: { $sum: 1 },
        total_revenue: { $sum: "$line_total" },
        avg_line_value: { $avg: "$line_total" }
    }},

    // Stage 5: Sort (ORDER BY)
    { $sort: { total_revenue: -1 } },

    // Stage 6: Limit (LIMIT)
    { $limit: 10 }
]);

// $lookup: LEFT JOIN equivalent (use sparingly — expensive)
db.orders.aggregate([
    { $lookup: {
        from: "customers",           // right collection
        localField: "customer.id",   // field in orders
        foreignField: "_id",         // field in customers
        as: "customer_details"       // output array field
    }},
    { $unwind: "$customer_details" }  // flatten the joined array
]);

Indexes — critical for query performance

// Without index: MongoDB does a full collection scan (like Snowflake without pruning)
// Single field index
db.orders.createIndex({ status: 1 });           // 1 = ascending, -1 = descending

// Compound index (order matters — most selective field first)
db.orders.createIndex({ status: 1, created_at: -1 });

// Index on nested field
db.orders.createIndex({ "customer.id": 1 });

// Check which indexes exist
db.orders.getIndexes();

// Explain query plan (check if index is used)
db.orders.find({ status: "DELIVERED" }).explain("executionStats");
// Look for: winningPlan.stage = "IXSCAN" (index scan) vs "COLLSCAN" (full scan)

MongoDB as a Data Engineering Source — CDC & Ingestion

// MongoDB Change Streams — native CDC for downstream pipelines
// Similar to Oracle redo logs or SQL Server transaction logs but for MongoDB

const changeStream = db.orders.watch([
    { $match: { operationType: { $in: ['insert', 'update', 'delete'] } } }
]);

changeStream.on('change', (event) => {
    // event.operationType: 'insert' | 'update' | 'delete' | 'replace'
    // event.fullDocument: the document after change (for inserts/updates)
    // event.documentKey._id: the _id of the changed document
    // event.updateDescription: fields changed + removed (for updates)
    console.log(event);
    // Ship to Kafka topic → Snowflake via Kafka Connect
});

// Tools for MongoDB → Snowflake ingestion:
// Debezium MongoDB connector → Kafka → Snowflake Kafka Connector
// Fivetran MongoDB connector (managed, no-code)
// MongoDB Atlas Data Federation (if using Atlas cloud)

When to use MongoDB vs Snowflake

FactorUse MongoDBUse Snowflake
Data shapeFlexible, nested, schema evolves oftenTabular, defined schema
Access patternOperational reads/writes (OLTP)Analytical queries (OLAP)
ScaleHigh write throughput, low latencyLarge analytical scans
JOINsAvoid — denormalise into documentsExcellent multi-table joins
AggregationsLimited, slower than SQLHighly optimised
Typical useApp backend, product catalogue, CMSData warehouse, analytics, BI
↑ Back to top
81
Question 81 — Snowflake / JSON
JSON Flattening in Snowflake — Complete Guide: PARSE_JSON, Dot Notation, FLATTEN, TRY_CAST

JSON flattening is the process of converting nested JSON structures into relational rows and columns. In Snowflake, the VARIANT type stores arbitrary JSON and a family of operators/functions lets you query and flatten it without pre-defining a schema.

VARIANT — Snowflake's JSON storage type

-- VARIANT stores any valid JSON: object, array, string, number, boolean, null
CREATE TABLE raw.events (
    event_id  INT,
    payload   VARIANT,      -- entire JSON document stored here
    loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Insert raw JSON (common during Snowpipe/COPY INTO loads)
INSERT INTO raw.events (event_id, payload) SELECT
    SEQ4() AS event_id,
    PARSE_JSON('{
        "user_id": 42,
        "event_type": "purchase",
        "amount": 149.99,
        "metadata": {
            "device": "mobile",
            "os": "iOS",
            "version": "17.2"
        },
        "tags": ["sale", "premium", "new_user"]
    }') AS payload;

Accessing values — colon (:) path notation + double-colon (::) casting

-- Colon notation navigates into the JSON structure
SELECT
    payload:user_id::INT             AS user_id,          -- top-level field
    payload:event_type::VARCHAR      AS event_type,
    payload:amount::DECIMAL(10,2)    AS amount,
    payload:metadata:device::VARCHAR AS device,           -- nested object field
    payload:metadata:os::VARCHAR     AS os,
    payload:metadata.version::VARCHAR AS version          -- dot notation also works

    -- Alternative: GET() function (useful for dynamic field names)
    -- GET(payload, 'user_id')::INT AS user_id_alt,

    -- GET_PATH() for deep nesting
    -- GET_PATH(payload, 'metadata.os')::VARCHAR AS os_alt

FROM raw.events;

-- Handling missing fields — returns NULL if path doesn't exist
SELECT payload:nonexistent_field::VARCHAR FROM raw.events;  -- returns NULL, no error

TRY_CAST and safe type coercion

-- Problem: source JSON may have type inconsistencies
-- payload:amount could be "149.99" (string) in some events, 149.99 (number) in others

-- TRY_CAST: returns NULL instead of ERROR on conversion failure
SELECT
    TRY_CAST(payload:amount::VARCHAR AS DECIMAL(10,2)) AS amount_safe,
    TRY_CAST(payload:user_id::VARCHAR AS INT)          AS user_id_safe
FROM raw.events;

-- TRY_ prefix functions (Snowflake family):
-- TRY_CAST, TRY_TO_DATE, TRY_TO_TIMESTAMP, TRY_TO_NUMBER, TRY_TO_DECIMAL
-- All return NULL on failure instead of raising an error
-- Essential for production ETL where source data quality is variable

Flattening arrays — LATERAL FLATTEN (review + examples)

-- Unpack the "tags" array into individual rows
SELECT
    payload:user_id::INT    AS user_id,
    payload:event_type::VARCHAR AS event_type,
    tag.value::VARCHAR      AS tag,
    tag.index               AS tag_position
FROM raw.events,
LATERAL FLATTEN(input => payload:tags) AS tag;

-- Result:
-- user_id | event_type | tag      | tag_position
-- 42      | purchase   | sale     | 0
-- 42      | purchase   | premium  | 1
-- 42      | purchase   | new_user | 2

Flattening JSON objects — LATERAL FLATTEN on an object

-- Flatten an object into key-value pairs (useful for EAV-style metadata)
SELECT
    payload:user_id::INT    AS user_id,
    meta.key                AS metadata_key,
    meta.value::VARCHAR     AS metadata_value
FROM raw.events,
LATERAL FLATTEN(input => payload:metadata) AS meta;

-- Result:
-- user_id | metadata_key | metadata_value
-- 42      | device       | mobile
-- 42      | os           | iOS
-- 42      | version      | 17.2

RECURSIVE FLATTEN — flatten all levels at once

-- RECURSIVE = TRUE traverses all nesting levels
-- Useful for exploration — not recommended for production (output can be unpredictable)
SELECT
    f.key,
    f.value,
    f.path,
    f.index
FROM raw.events,
LATERAL FLATTEN(input => payload, RECURSIVE => TRUE) AS f;

-- Returns every key-value pair at every nesting level
-- path shows the dot-notation path: e.g., 'metadata.device', 'tags[0]'

Complete flattening pipeline — raw VARIANT → relational staging table

-- models/staging/stg_events.sql  (dbt model)
-- Full pipeline: VARIANT → typed, flat, relational rows

{{ config(materialized='view') }}

WITH flattened_tags AS (
    SELECT
        e.event_id,
        e.payload:user_id::INT              AS user_id,
        e.payload:event_type::VARCHAR       AS event_type,
        TRY_CAST(e.payload:amount::VARCHAR AS DECIMAL(10,2)) AS amount,
        e.payload:metadata:device::VARCHAR  AS device,
        e.payload:metadata:os::VARCHAR      AS os,
        TRY_TO_TIMESTAMP(e.payload:timestamp::VARCHAR)       AS event_timestamp,
        tag.value::VARCHAR                  AS tag,
        tag.index                           AS tag_position,
        e.loaded_at
    FROM {{ source('raw', 'events') }} e,
    LATERAL FLATTEN(input => e.payload:tags, OUTER => TRUE) AS tag
    -- OUTER => TRUE: events with empty tags[] still appear (with NULL tag)
)
SELECT * FROM flattened_tags
WHERE user_id IS NOT NULL   -- filter malformed records

PARSE_JSON vs OBJECT_CONSTRUCT vs TO_VARIANT

-- PARSE_JSON: converts a JSON string into VARIANT
SELECT PARSE_JSON('{"key": "value", "num": 42}');

-- OBJECT_CONSTRUCT: build a VARIANT object from key-value pairs
SELECT OBJECT_CONSTRUCT('user_id', 42, 'event', 'click', 'score', 9.5);
-- Useful for creating JSON payloads from relational columns

-- ARRAY_CONSTRUCT: build a VARIANT array
SELECT ARRAY_CONSTRUCT('tag1', 'tag2', 'tag3');

-- Combine them:
SELECT OBJECT_CONSTRUCT(
    'user_id', 42,
    'tags',    ARRAY_CONSTRUCT('sale', 'premium'),
    'meta',    OBJECT_CONSTRUCT('device', 'mobile', 'os', 'iOS')
) AS rebuilt_payload;

Common JSON flattening mistakes:

  • Casting without TRYpayload:amount::INT will error if amount is "N/A". Always use TRY_CAST for user-provided or third-party JSON
  • Missing OUTER on FLATTEN — if any record has an empty array, it silently disappears. Add OUTER => TRUE to preserve parent rows
  • Forgetting STRIP_OUTER_ARRAY in file format — if your JSON file is [{...},{...}] (array at root), add STRIP_OUTER_ARRAY = TRUE to your file format or it loads as one row
  • NULL vs missing fieldpayload:field::VARCHAR returns NULL for both a missing field and a JSON null value. Use IS_NULL_VALUE(payload:field) to distinguish
↑ Back to top
82
Question 82 — Data Engineering
API Data Acquisition — Elements, Authentication, Pagination, Push vs Pull, Error Handling & Steps

Ingesting data from APIs is one of the most common tasks in data engineering — Stripe, Salesforce, HubSpot, GitHub, and hundreds of SaaS tools expose their data via REST APIs. Understanding every layer of API interaction prevents the silent data gaps that plague naive implementations.

1. Core API Elements

ElementDescriptionExample
Base URLRoot endpoint of the APIhttps://api.stripe.com/v1
EndpointSpecific resource path/charges, /customers/{id}
HTTP MethodAction on the resourceGET (read), POST (create), PUT/PATCH (update), DELETE
HeadersMetadata sent with requestAuthorization, Content-Type, Accept
Query ParametersFilter/sort/paginate in URL?limit=100&created_after=2024-01-01
Request BodyData sent (POST/PUT)JSON payload
Response BodyData returned by APIJSON, XML, CSV
Status CodeHTTP result code200 OK, 201 Created, 400 Bad Request, 401 Unauthorized, 429 Rate Limited, 500 Server Error

2. Authentication Methods

import requests

# Method 1: API Key in Header (most common for data APIs)
headers = {
    "Authorization": "Bearer sk_live_abc123xyz",   # Stripe pattern
    "Content-Type": "application/json"
}
response = requests.get("https://api.stripe.com/v1/charges", headers=headers)

# Method 2: Basic Auth (username:password base64 encoded)
import base64
creds = base64.b64encode(b"api_key:").decode()
headers = {"Authorization": f"Basic {creds}"}

# Method 3: OAuth 2.0 (Salesforce, Google, HubSpot)
# Step 1: Get access token
token_response = requests.post("https://login.salesforce.com/services/oauth2/token", data={
    "grant_type":    "password",
    "client_id":     CLIENT_ID,
    "client_secret": CLIENT_SECRET,
    "username":      SF_USERNAME,
    "password":      SF_PASSWORD + SF_SECURITY_TOKEN
})
access_token = token_response.json()["access_token"]
instance_url = token_response.json()["instance_url"]

# Step 2: Use access token for API calls
headers = {"Authorization": f"Bearer {access_token}"}
records = requests.get(f"{instance_url}/services/data/v58.0/query",
                       params={"q": "SELECT Id, Name FROM Account"},
                       headers=headers).json()

# Method 4: API Key in Query Parameter (less secure, some older APIs)
response = requests.get("https://api.example.com/data?api_key=MY_KEY")

3. HTTP Status Codes — what to do with each

STATUS_HANDLING = {
    200: "Success — process the response body",
    201: "Created — resource was created (POST)",
    204: "No Content — success but empty body (DELETE)",
    400: "Bad Request — your request is malformed, fix the query/body",
    401: "Unauthorized — invalid/expired credentials, refresh token",
    403: "Forbidden — valid credentials but insufficient permissions",
    404: "Not Found — resource doesn't exist (log + skip, don't retry)",
    429: "Too Many Requests — rate limited, back off and retry",
    500: "Internal Server Error — API side issue, retry with backoff",
    502: "Bad Gateway — transient, retry",
    503: "Service Unavailable — API down, retry with exponential backoff",
}

4. Pagination Strategies

import time

# Strategy 1: Cursor-based pagination (most reliable — Stripe, GitHub)
# API returns a cursor pointing to next page; immune to data changes mid-fetch

def fetch_all_charges_cursor(base_url, headers):
    all_records = []
    params = {"limit": 100}
    while True:
        resp = requests.get(base_url + "/charges", headers=headers, params=params).json()
        all_records.extend(resp["data"])
        if not resp.get("has_more"):      # Stripe: has_more = False means last page
            break
        params["starting_after"] = resp["data"][-1]["id"]  # cursor = last record ID
    return all_records

# Strategy 2: Offset/Limit pagination (simple, common in REST APIs)
# WARNING: if records are inserted mid-fetch, you may miss records or get duplicates
def fetch_all_offset(base_url, headers):
    all_records, page, limit = [], 0, 100
    while True:
        resp = requests.get(base_url + "/orders",
                            headers=headers,
                            params={"limit": limit, "offset": page * limit}).json()
        if not resp["items"]:
            break
        all_records.extend(resp["items"])
        page += 1
    return all_records

# Strategy 3: Page-number pagination
def fetch_all_pages(base_url, headers):
    all_records, page = [], 1
    while True:
        resp = requests.get(base_url + "/records",
                            params={"page": page, "per_page": 100}).json()
        all_records.extend(resp["results"])
        if page >= resp["total_pages"]:
            break
        page += 1
    return all_records

# Strategy 4: Link header pagination (GitHub REST API)
def fetch_all_link_header(url, headers):
    all_records = []
    while url:
        resp = requests.get(url, headers=headers)
        all_records.extend(resp.json())
        url = resp.links.get("next", {}).get("url")  # follow next link
    return all_records

5. Push vs Pull — Two Fundamental Acquisition Strategies

# PULL (polling) — your pipeline fetches data on a schedule
# ─────────────────────────────────────────────────────────
# - You control the frequency
# - Simple to implement (cron job + API call)
# - Always a latency gap between real-time and your data
# - Can hit rate limits if polling too frequently
# - Example: fetch Stripe charges every 15 minutes

while True:
    charges = fetch_charges_since(last_watermark)
    load_to_snowflake(charges)
    last_watermark = charges[-1]["created"] if charges else last_watermark
    time.sleep(900)  # 15-minute poll interval

# PUSH (webhooks/streaming) — the API sends data to YOU as it happens
# ─────────────────────────────────────────────────────────────────────
# - Real-time (data arrives seconds after the event)
# - No polling cost
# - You need a receiver endpoint (REST endpoint or message queue)
# - Source must support webhooks
# - Example: Stripe sends a webhook POST to your endpoint on charge.created

# Webhook receiver (Flask example)
from flask import Flask, request
import hmac, hashlib

app = Flask(__name__)

@app.route("/stripe-webhook", methods=["POST"])
def stripe_webhook():
    payload   = request.get_data()
    sig_header = request.headers.get("Stripe-Signature")
    secret    = "whsec_your_secret"

    # Verify webhook authenticity (CRITICAL — never skip this)
    try:
        event = stripe.Webhook.construct_event(payload, sig_header, secret)
    except stripe.error.SignatureVerificationError:
        return "Invalid signature", 400

    if event["type"] == "charge.succeeded":
        charge = event["data"]["object"]
        load_charge_to_snowflake(charge)

    return "", 200

6. Rate Limiting — Handling 429 with Exponential Backoff

import time, random

def api_request_with_retry(url, headers, params=None, max_retries=5):
    for attempt in range(max_retries):
        resp = requests.get(url, headers=headers, params=params)

        if resp.status_code == 200:
            return resp.json()

        elif resp.status_code == 429:
            # Check Retry-After header (some APIs tell you exactly how long to wait)
            retry_after = int(resp.headers.get("Retry-After", 60))
            wait = retry_after + random.uniform(0, 10)  # jitter to avoid thundering herd
            print(f"Rate limited. Waiting {wait:.1f}s before retry {attempt+1}/{max_retries}")
            time.sleep(wait)

        elif resp.status_code in (500, 502, 503):
            # Exponential backoff: 1s, 2s, 4s, 8s, 16s...
            wait = (2 ** attempt) + random.uniform(0, 1)
            print(f"Server error {resp.status_code}. Waiting {wait:.1f}s...")
            time.sleep(wait)

        elif resp.status_code in (400, 401, 403, 404):
            # Non-retryable errors: fix the request or skip
            raise ValueError(f"Non-retryable error: {resp.status_code} {resp.text}")

    raise RuntimeError(f"Max retries exceeded for {url}")

7. Complete API Ingestion Pipeline — Steps

# STEP 1: Authentication setup
# - Obtain API key / OAuth token / service account
# - Store credentials in secrets manager (AWS Secrets Manager, not hardcoded)
# - Set up credential rotation if tokens expire

# STEP 2: Watermarking / Incremental state management
# - Store last successful extraction timestamp in a control table
# - Use this to fetch ONLY new/changed data (avoid re-fetching everything)
last_run = get_last_run_timestamp('stripe_charges')   # from control table

# STEP 3: Fetch data with pagination and rate limiting
all_data = []
for page_data in paginate_api(endpoint, since=last_run):
    all_data.extend(page_data)
    time.sleep(0.1)  # respect rate limits proactively

# STEP 4: Validate and transform
for record in all_data:
    assert 'id' in record, f"Missing id: {record}"
    record['_ingested_at'] = datetime.utcnow().isoformat()

# STEP 5: Load to Snowflake (land raw, then transform)
# Land as VARIANT to handle schema evolution
write_to_snowflake_stage(all_data, stage_path)
execute_snowflake_copy_into(target_table)

# STEP 6: Update watermark (only after successful load)
update_last_run_timestamp('stripe_charges', max(r['created'] for r in all_data))

# STEP 7: Monitor and alert
# - Log row counts, API response times, error rates
# - Alert on: zero rows (potential API issue), high error rate, stale watermark

Summary — Pull vs Push trade-offs

DimensionPull (Polling)Push (Webhooks/Streaming)
LatencyPoll interval (minutes–hours)Near real-time (seconds)
ComplexityLow (schedule + GET)High (receiver + queue + dedup)
ReliabilityYou control retriesMust ACK or webhook resends
Volume handlingBatch — good for large historical loadsEvent-by-event — good for low-latency
Rate limitsMust manage carefullyNot applicable (API pushes to you)
Best forHistorical backfill, less critical freshnessPayment events, real-time triggers, notifications
↑ Back to top