Step 3
Intermediate
16 min

Build the NYC Taxi Pipeline

Create a three-layer pipeline with Python ingestion, SQL staging, and report assets - complete with materialization, dependencies, and quality checks.

Bruin CLISQLPython
Learning paths:Data Engineer

Video

Overview

In this step, you'll build the full NYC taxi pipeline from scratch using real data. The pipeline has three layers:

  1. Ingestion - Python asset + seed files
  2. Staging - SQL transformation
  3. Reports - SQL aggregation

Steps

1) Initialize the NYC taxi template

bruin init zoomcamp

This creates a pipeline with placeholder files and TODOs for you to complete. Each file includes instructions in the comments.

2) Configure the pipeline

Edit pipeline.yml:

name: nyc_taxi
schedule: daily
start_date: "2022-01-01"
default_connections:
  duckdb: duckdb_default
variables:
  - name: taxi_types
    type: array
    default:
      - yellow

3) Build the ingestion asset (Python)

Create a Python asset that:

  • Connects to the NYC TLC API
  • Uses BRUIN_START_DATE and BRUIN_END_DATE to generate a list of months
  • Uses the taxi_types custom variable to filter by taxi type
  • Returns a DataFrame via the materialize() function

Set the materialization to table with append strategy - each run inserts new data without overwriting existing rows.

You can also define a requirements.txt in your pipeline folder - Bruin automatically creates an isolated environment and installs the dependencies.

4) Add seed files for lookup tables

Create a YAML seed asset pointing to a local CSV file (e.g., payment type lookup). Add built-in quality checks:

columns:
  - name: payment_type
    checks:
      - name: not_null
      - name: unique

Bruin runs these quality checks automatically after the asset finishes.

5) Build the staging asset (SQL)

Create a DuckDB SQL asset that:

  • Depends on the ingestion asset and the seed file
  • Cleans and deduplicates the raw data
  • Joins with the payment type lookup table
  • Uses table materialization (drop and recreate each run)

Add a custom quality check:

custom_checks:
  - name: row_count_check
    query: "SELECT COUNT(*) > 0 FROM staging.trips"
    value: 1

6) Build the report asset (SQL)

Create a SQL asset that aggregates by trip date, taxi type, and payment type. Set dependencies to the staging asset and use the same table materialization.

7) Run the full pipeline

Open pipeline.yml in the Bruin panel and run the entire pipeline. Check the lineage tab to see all assets and their dependencies visualized as a DAG.

Key concepts covered

  • Python materialization with materialize() returning a DataFrame
  • Seed assets for loading local CSV files
  • Built-in quality checks (not_null, unique) and custom checks
  • Dependencies creating a three-layer DAG
  • Materialization strategies - append for ingestion, table for transformations
  • Custom variables for parameterized ingestion