Using the Bruin Python SDK

Skip the boilerplate. Use the Bruin Python SDK to query databases, manage connections, and access pipeline context from your Python assets with a few imports.

Overview

Goal - Learn how to use the Bruin Python SDK to eliminate boilerplate in your Python assets. Query databases, access typed connections, and read pipeline context with just a few imports.

Audience - Data engineers and analysts writing Python assets in Bruin who want cleaner, more maintainable pipeline code.

Prerequisites

  • Bruin CLI installed
  • A Bruin project with at least one Python asset
  • Python 3.10 or higher
  • A configured database connection (BigQuery, Postgres, Snowflake, DuckDB, etc.)

Why the SDK?

Before the SDK, a Python asset that ran a query required manually parsing credentials from environment variables, building a database client, handling auth, and executing the query. That's 10+ lines of setup before you write a single line of business logic.

With the SDK, it collapses to this:

from bruin import query, context

df = query(f"SELECT * FROM users WHERE dt >= '{context.start_date}'")

The SDK handles credential parsing, client initialization, and context injection for you.

The SDK pairs naturally with Python materialization. The SDK handles reading and transforming data; materialization handles writing a DataFrame back to your warehouse with strategies like merge or append. See Step 7 below.

Explore example project

A minimal Bruin project showing the Python SDK in action - browse the files to see how each SDK feature lands in a real asset.

python-sdk-example/
.bruin.yml
.gitignore
pyproject.toml
python-sdk-example
pipeline.yml
assets
materialize_users.py
sdk_context_dates.py
sdk_context_vars.py
sdk_errors.py
sdk_get_connection.py
sdk_query.py
.bruin.yml
default_environment: default
environments:
  default:
    connections:
      google_cloud_platform:
        - name: my_bigquery
          location: US
          project_id: <project-id>
          use_application_default_credentials: true
      postgres:
        - name: my_postgres
          username: <user>
          password: <password>
          host: <host-url>
          port: 5432
          database: <db>
      snowflake:
        - name: my_snowflake
          account: <account>
          username: <user>
          password: <password>
          warehouse: <warehouse>
          database: <db>
      duckdb:
        - name: duckdb-default
          path: duckdb.db

Steps

1) Install the SDK

Add bruin-sdk to your asset's requirements.txt. Use extras to pull in the database drivers you need:

bruin-sdk[bigquery]

Available extras:

  • bruin-sdk[bigquery] - Google BigQuery
  • bruin-sdk[snowflake] - Snowflake
  • bruin-sdk[postgres] - PostgreSQL
  • bruin-sdk[redshift] - Redshift
  • bruin-sdk[mssql] - Microsoft SQL Server
  • bruin-sdk[mysql] - MySQL
  • bruin-sdk[duckdb] - DuckDB
  • bruin-sdk[sheets] - Google Sheets
  • bruin-sdk[all] - Everything

Bruin will install the dependencies automatically when your asset runs.

2) Declare your connection in asset metadata

In your Python asset, declare the connection in the @bruin block. The SDK reads this to know which credentials to load:

""" @bruin
name: raw.active_users
connection: my_bigquery
@bruin """

from bruin import query

df = query("SELECT * FROM users WHERE active = true")
print(df.head())

No credential parsing, no client setup. query() uses the asset's default connection.

3) Run a query

The query(sql, connection=None) function returns a pandas DataFrame for SELECT, WITH, SHOW, DESCRIBE, and EXPLAIN statements, and None for DDL/DML operations like CREATE or INSERT.

Note: if you want to write the result back to a table, don't hand-roll CREATE TABLE statements - use Python materialization instead and return the DataFrame from materialize().

from bruin import query

# Uses the asset's default connection
df = query("SELECT * FROM users")

# Or specify a different one
df = query("SELECT * FROM events", connection="my_postgres")

# DDL/DML returns None
query("CREATE TABLE temp AS SELECT * FROM users")

4) Access pipeline context

The context module gives typed access to Bruin's runtime environment variables without manual os.environ parsing:

from bruin import query, context

if context.is_full_refresh:
    df = query("SELECT * FROM raw.events")
else:
    df = query(f"""
        SELECT * FROM raw.events
        WHERE event_date BETWEEN '{context.start_date}' AND '{context.end_date}'
    """)

Available properties include:

  • context.start_date / context.end_date - run window as date objects
  • context.start_datetime / context.end_datetime - same window with time
  • context.execution_date - the run's execution date
  • context.is_full_refresh - boolean flag for full refreshes
  • context.pipeline / context.asset_name - names of the current pipeline and asset
  • context.run_id - unique identifier for the run
  • context.vars - pipeline variables dict (JSON Schema types preserved)

5) Use pipeline variables

Pipeline variables defined in pipeline.yml are exposed through context.vars with their types preserved:

from bruin import query, context

segment = context.vars["segment"]
lookback = context.vars["lookback_days"]

df = query(f"""
    SELECT * FROM customers
    WHERE segment = '{segment}'
    AND created_at >= DATEADD(day, -{lookback}, CURRENT_DATE())
""")

6) Work with multiple connections

One of the biggest wins of the SDK is that it handles reading from and writing to multiple data warehouses and databases through a single, consistent interface. You don't need to import google.cloud.bigquery, import psycopg2, import snowflake.connector, import duckdb, and then juggle each library's auth, client, and cursor semantics. The SDK handles all of that - credential parsing, driver selection, and client initialization - behind query() and get_connection().

Want to read from Postgres and write to BigQuery in the same asset? Just declare both connections in the @bruin block and call query(..., connection=...). The SDK wires up the right client for each source and destination automatically.

""" @bruin
name: staging.active_users
connection: my_bigquery
secrets:
    - key: my_postgres
    - key: my_snowflake
@bruin """

from bruin import query, get_connection

# Read from Postgres
users = query("SELECT * FROM users WHERE active = true", connection="my_postgres")

# Read from Snowflake - same API, different warehouse
events = query("SELECT user_id, COUNT(*) AS events FROM events GROUP BY 1", connection="my_snowflake")

# Write to BigQuery using the typed client
bq = get_connection("my_bigquery")
users.to_gbq(
    "staging.active_users",
    project_id=bq.raw["project_id"],
    credentials=bq.credentials,
    if_exists="replace",
)

No separate SDKs to install, no manual credential plumbing, no per-warehouse client setup. One import, many connections.

Writing data manually like this works, but for most cases you're better off using Python materialization - return the DataFrame from materialize() and let Bruin handle the load with a proper strategy (merge, append, delete+insert, or create+replace).

7) Combine the SDK with materialization

The SDK and Python materialization compose cleanly: the SDK reads and transforms; materialization writes.

"""@bruin
name: analytics.active_users
image: python:3.13
connection: my_bigquery

materialization:
  type: table
  strategy: merge

columns:
  - name: id
    type: integer
    primary_key: true
  - name: name
    type: string
    update_on_merge: true
@bruin"""

from bruin import query, context

def materialize():
    if context.is_full_refresh:
        return query("SELECT id, name FROM raw.users WHERE active = true")

    return query(f"""
        SELECT id, name FROM raw.users
        WHERE active = true
          AND last_seen_at BETWEEN '{context.start_date}' AND '{context.end_date}'
    """)

Bruin calls materialize(), captures the returned DataFrame, and merges it into analytics.active_users using the primary key you declared.

8) Use typed connection objects

get_connection(name) returns an object with lazy-initialized database clients:

from bruin import get_connection

conn = get_connection("my_bigquery")

conn.name     # Connection name
conn.type     # Connection type string
conn.raw      # Parsed connection JSON
conn.client   # Native client, initialized on first access
df = conn.query("SELECT * FROM users")

Client types returned by conn.client:

  • google_cloud_platform → bigquery.Client
  • snowflake → snowflake.connector.Connection
  • postgres / redshift → psycopg2.connection
  • mssql → pymssql.Connection
  • mysql → mysql.connector.Connection
  • duckdb → duckdb.DuckDBPyConnection

GCP connections also expose conn.bigquery(), conn.sheets(), and conn.credentials for finer-grained service access.

9) Handle errors

The SDK ships a small exception hierarchy so you can catch specific failure modes:

from bruin.exceptions import (
    BruinError,                # base class
    ConnectionNotFoundError,   # connection name unknown
    ConnectionParseError,      # invalid connection JSON
    ConnectionTypeError,       # unsupported connection type
    QueryError,                # SQL execution failure
)

try:
    df = query("SELECT * FROM users")
except QueryError as e:
    print(f"Query failed: {e}")

If a database extra is missing (e.g. bruin-sdk[bigquery] not installed), the SDK raises a clear error telling you which extra to add.


Key takeaways

  • The SDK removes boilerplate: credentials, clients, and context are handled for you
  • query() returns a pandas DataFrame for reads, None for writes
  • context gives you typed access to run dates, pipeline variables, and the full-refresh flag
  • get_connection() exposes the native database client when you need more control
  • Declare connections in the @bruin metadata block - the default goes under connection:, extras go under secrets: