Build the NYC Taxi Pipeline
Create a three-layer pipeline with Python ingestion, SQL staging, and report assets - complete with materialization, dependencies, and quality checks.
Video
Overview
In this step, you'll build the full NYC taxi pipeline from scratch using real data. The pipeline has three layers:
- Ingestion - Python asset + seed files
- Staging - SQL transformation
- Reports - SQL aggregation
Steps
1) Initialize the NYC taxi template
bruin init zoomcamp
This creates a pipeline with placeholder files and TODOs for you to complete. Each file includes instructions in the comments.
2) Configure the pipeline
Edit pipeline.yml:
name: nyc_taxi
schedule: daily
start_date: "2022-01-01"
default_connections:
duckdb: duckdb_default
variables:
- name: taxi_types
type: array
default:
- yellow
3) Build the ingestion asset (Python)
Create a Python asset that:
- Connects to the NYC TLC API
- Uses
BRUIN_START_DATEandBRUIN_END_DATEto generate a list of months - Uses the
taxi_typescustom variable to filter by taxi type - Returns a DataFrame via the
materialize()function
Set the materialization to table with append strategy - each run inserts new data without overwriting existing rows.
You can also define a requirements.txt in your pipeline folder - Bruin automatically creates an isolated environment and installs the dependencies.
4) Add seed files for lookup tables
Create a YAML seed asset pointing to a local CSV file (e.g., payment type lookup). Add built-in quality checks:
columns:
- name: payment_type
checks:
- name: not_null
- name: unique
Bruin runs these quality checks automatically after the asset finishes.
5) Build the staging asset (SQL)
Create a DuckDB SQL asset that:
- Depends on the ingestion asset and the seed file
- Cleans and deduplicates the raw data
- Joins with the payment type lookup table
- Uses
tablematerialization (drop and recreate each run)
Add a custom quality check:
custom_checks:
- name: row_count_check
query: "SELECT COUNT(*) > 0 FROM staging.trips"
value: 1
6) Build the report asset (SQL)
Create a SQL asset that aggregates by trip date, taxi type, and payment type. Set dependencies to the staging asset and use the same table materialization.
7) Run the full pipeline
Open pipeline.yml in the Bruin panel and run the entire pipeline. Check the lineage tab to see all assets and their dependencies visualized as a DAG.
Key concepts covered
- Python materialization with
materialize()returning a DataFrame - Seed assets for loading local CSV files
- Built-in quality checks (
not_null,unique) and custom checks - Dependencies creating a three-layer DAG
- Materialization strategies -
appendfor ingestion,tablefor transformations - Custom variables for parameterized ingestion