Using the Bruin Python SDK

Skip the boilerplate. Use the Bruin Python SDK to query databases, manage connections, and access pipeline context from your Python assets with a few imports.

Overview

Goal - Learn how to use the Bruin Python SDK to eliminate boilerplate in your Python assets. Query databases, access typed connections, and read pipeline context with just a few imports.

Audience - Data engineers and analysts writing Python assets in Bruin who want cleaner, more maintainable pipeline code.

Prerequisites

  • Bruin CLI installed
  • A Bruin project with at least one Python asset
  • Python 3.10 or higher
  • A configured database connection (BigQuery, Postgres, Snowflake, DuckDB, etc.)

Why the SDK?

Before the SDK, a Python asset that ran a query required manually parsing credentials from environment variables, building a database client, handling auth, and executing the query. That's 10+ lines of setup before you write a single line of business logic.

With the SDK, it collapses to this:

from bruin import query, context

df = query(f"SELECT * FROM users WHERE dt >= '{context.start_date}'")

The SDK handles credential parsing, client initialization, and context injection for you.

The SDK pairs naturally with Python materialization. The SDK handles reading and transforming data; materialization handles writing a DataFrame back to your warehouse with strategies like merge or append. See Step 7 below.


Steps

1) Install the SDK

Add bruin-sdk to your asset's requirements.txt. Use extras to pull in the database drivers you need:

bruin-sdk[bigquery]

Available extras:

  • bruin-sdk[bigquery] - Google BigQuery
  • bruin-sdk[snowflake] - Snowflake
  • bruin-sdk[postgres] - PostgreSQL
  • bruin-sdk[redshift] - Redshift
  • bruin-sdk[mssql] - Microsoft SQL Server
  • bruin-sdk[mysql] - MySQL
  • bruin-sdk[duckdb] - DuckDB
  • bruin-sdk[sheets] - Google Sheets
  • bruin-sdk[all] - Everything

Bruin will install the dependencies automatically when your asset runs.

2) Declare your connection in asset metadata

In your Python asset, declare the connection in the @bruin block. The SDK reads this to know which credentials to load:

""" @bruin
name: raw.active_users
connection: my_bigquery
@bruin """

from bruin import query

df = query("SELECT * FROM users WHERE active = true")
print(df.head())

No credential parsing, no client setup. query() uses the asset's default connection.

3) Run a query

The query(sql, connection=None) function returns a pandas DataFrame for SELECT, WITH, SHOW, DESCRIBE, and EXPLAIN statements, and None for DDL/DML operations like CREATE or INSERT.

Note: if you want to write the result back to a table, don't hand-roll CREATE TABLE statements - use Python materialization instead and return the DataFrame from materialize().

from bruin import query

# Uses the asset's default connection
df = query("SELECT * FROM users")

# Or specify a different one
df = query("SELECT * FROM events", connection="my_postgres")

# DDL/DML returns None
query("CREATE TABLE temp AS SELECT * FROM users")

4) Access pipeline context

The context module gives typed access to Bruin's runtime environment variables without manual os.environ parsing:

from bruin import query, context

if context.is_full_refresh:
    df = query("SELECT * FROM raw.events")
else:
    df = query(f"""
        SELECT * FROM raw.events
        WHERE event_date BETWEEN '{context.start_date}' AND '{context.end_date}'
    """)

Available properties include:

  • context.start_date / context.end_date - run window as date objects
  • context.start_datetime / context.end_datetime - same window with time
  • context.execution_date - the run's execution date
  • context.is_full_refresh - boolean flag for full refreshes
  • context.pipeline / context.asset_name - names of the current pipeline and asset
  • context.run_id - unique identifier for the run
  • context.vars - pipeline variables dict (JSON Schema types preserved)

5) Use pipeline variables

Pipeline variables defined in pipeline.yml are exposed through context.vars with their types preserved:

from bruin import query, context

segment = context.vars["segment"]
lookback = context.vars["lookback_days"]

df = query(f"""
    SELECT * FROM customers
    WHERE segment = '{segment}'
    AND created_at >= DATEADD(day, -{lookback}, CURRENT_DATE())
""")

6) Work with multiple connections

When you need a second connection, list it under secrets in the asset metadata:

""" @bruin
name: staging.active_users
connection: my_bigquery
secrets:
    - key: my_postgres
@bruin """

from bruin import query, get_connection

# Reads from Postgres
df = query("SELECT * FROM users WHERE active = true", connection="my_postgres")

# Writes to BigQuery using the typed client
bq = get_connection("my_bigquery")
df.to_gbq(
    "staging.active_users",
    project_id=bq.raw["project_id"],
    credentials=bq.credentials,
    if_exists="replace",
)

Writing data manually like this works, but for most cases you're better off using Python materialization - return the DataFrame from materialize() and let Bruin handle the load with a proper strategy (merge, append, delete+insert, or create+replace).

7) Combine the SDK with materialization

The SDK and Python materialization compose cleanly: the SDK reads and transforms; materialization writes.

"""@bruin
name: analytics.active_users
image: python:3.13
connection: my_bigquery

materialization:
  type: table
  strategy: merge

columns:
  - name: id
    type: integer
    primary_key: true
  - name: name
    type: string
    update_on_merge: true
@bruin"""

from bruin import query, context

def materialize():
    if context.is_full_refresh:
        return query("SELECT id, name FROM raw.users WHERE active = true")

    return query(f"""
        SELECT id, name FROM raw.users
        WHERE active = true
          AND last_seen_at BETWEEN '{context.start_date}' AND '{context.end_date}'
    """)

Bruin calls materialize(), captures the returned DataFrame, and merges it into analytics.active_users using the primary key you declared.

8) Use typed connection objects

get_connection(name) returns an object with lazy-initialized database clients:

from bruin import get_connection

conn = get_connection("my_bigquery")

conn.name     # Connection name
conn.type     # Connection type string
conn.raw      # Parsed connection JSON
conn.client   # Native client, initialized on first access
df = conn.query("SELECT * FROM users")

Client types returned by conn.client:

  • google_cloud_platformbigquery.Client
  • snowflakesnowflake.connector.Connection
  • postgres / redshiftpsycopg2.connection
  • mssqlpymssql.Connection
  • mysqlmysql.connector.Connection
  • duckdbduckdb.DuckDBPyConnection

GCP connections also expose conn.bigquery(), conn.sheets(), and conn.credentials for finer-grained service access.

9) Handle errors

The SDK ships a small exception hierarchy so you can catch specific failure modes:

from bruin.exceptions import (
    BruinError,                # base class
    ConnectionNotFoundError,   # connection name unknown
    ConnectionParseError,      # invalid connection JSON
    ConnectionTypeError,       # unsupported connection type
    QueryError,                # SQL execution failure
)

try:
    df = query("SELECT * FROM users")
except QueryError as e:
    print(f"Query failed: {e}")

If a database extra is missing (e.g. bruin-sdk[bigquery] not installed), the SDK raises a clear error telling you which extra to add.


Key takeaways

  • The SDK removes boilerplate: credentials, clients, and context are handled for you
  • query() returns a pandas DataFrame for reads, None for writes
  • context gives you typed access to run dates, pipeline variables, and the full-refresh flag
  • get_connection() exposes the native database client when you need more control
  • Declare connections in the @bruin metadata block - the default goes under connection:, extras go under secrets: