Using the Bruin Python SDK
Skip the boilerplate. Use the Bruin Python SDK to query databases, manage connections, and access pipeline context from your Python assets with a few imports.
Overview
Goal - Learn how to use the Bruin Python SDK to eliminate boilerplate in your Python assets. Query databases, access typed connections, and read pipeline context with just a few imports.
Audience - Data engineers and analysts writing Python assets in Bruin who want cleaner, more maintainable pipeline code.
Prerequisites
- Bruin CLI installed
- A Bruin project with at least one Python asset
- Python 3.10 or higher
- A configured database connection (BigQuery, Postgres, Snowflake, DuckDB, etc.)
Why the SDK?
Before the SDK, a Python asset that ran a query required manually parsing credentials from environment variables, building a database client, handling auth, and executing the query. That's 10+ lines of setup before you write a single line of business logic.
With the SDK, it collapses to this:
from bruin import query, context
df = query(f"SELECT * FROM users WHERE dt >= '{context.start_date}'")
The SDK handles credential parsing, client initialization, and context injection for you.
The SDK pairs naturally with Python materialization. The SDK handles reading and transforming data; materialization handles writing a DataFrame back to your warehouse with strategies like
mergeorappend. See Step 7 below.
Explore example project
A minimal Bruin project showing the Python SDK in action - browse the files to see how each SDK feature lands in a real asset.
default_environment: default
environments:
default:
connections:
google_cloud_platform:
- name: my_bigquery
location: US
project_id: <project-id>
use_application_default_credentials: true
postgres:
- name: my_postgres
username: <user>
password: <password>
host: <host-url>
port: 5432
database: <db>
snowflake:
- name: my_snowflake
account: <account>
username: <user>
password: <password>
warehouse: <warehouse>
database: <db>
duckdb:
- name: duckdb-default
path: duckdb.dbSteps
1) Install the SDK
Add bruin-sdk to your asset's requirements.txt. Use extras to pull in the database drivers you need:
bruin-sdk[bigquery]
Available extras:
bruin-sdk[bigquery]- Google BigQuerybruin-sdk[snowflake]- Snowflakebruin-sdk[postgres]- PostgreSQLbruin-sdk[redshift]- Redshiftbruin-sdk[mssql]- Microsoft SQL Serverbruin-sdk[mysql]- MySQLbruin-sdk[duckdb]- DuckDBbruin-sdk[sheets]- Google Sheetsbruin-sdk[all]- Everything
Bruin will install the dependencies automatically when your asset runs.
2) Declare your connection in asset metadata
In your Python asset, declare the connection in the @bruin block. The SDK reads this to know which credentials to load:
""" @bruin
name: raw.active_users
connection: my_bigquery
@bruin """
from bruin import query
df = query("SELECT * FROM users WHERE active = true")
print(df.head())
No credential parsing, no client setup. query() uses the asset's default connection.
3) Run a query
The query(sql, connection=None) function returns a pandas DataFrame for SELECT, WITH, SHOW, DESCRIBE, and EXPLAIN statements, and None for DDL/DML operations like CREATE or INSERT.
Note: if you want to write the result back to a table, don't hand-roll CREATE TABLE statements - use Python materialization instead and return the DataFrame from materialize().
from bruin import query
# Uses the asset's default connection
df = query("SELECT * FROM users")
# Or specify a different one
df = query("SELECT * FROM events", connection="my_postgres")
# DDL/DML returns None
query("CREATE TABLE temp AS SELECT * FROM users")
4) Access pipeline context
The context module gives typed access to Bruin's runtime environment variables without manual os.environ parsing:
from bruin import query, context
if context.is_full_refresh:
df = query("SELECT * FROM raw.events")
else:
df = query(f"""
SELECT * FROM raw.events
WHERE event_date BETWEEN '{context.start_date}' AND '{context.end_date}'
""")
Available properties include:
context.start_date/context.end_date- run window asdateobjectscontext.start_datetime/context.end_datetime- same window with timecontext.execution_date- the run's execution datecontext.is_full_refresh- boolean flag for full refreshescontext.pipeline/context.asset_name- names of the current pipeline and assetcontext.run_id- unique identifier for the runcontext.vars- pipeline variables dict (JSON Schema types preserved)
5) Use pipeline variables
Pipeline variables defined in pipeline.yml are exposed through context.vars with their types preserved:
from bruin import query, context
segment = context.vars["segment"]
lookback = context.vars["lookback_days"]
df = query(f"""
SELECT * FROM customers
WHERE segment = '{segment}'
AND created_at >= DATEADD(day, -{lookback}, CURRENT_DATE())
""")
6) Work with multiple connections
One of the biggest wins of the SDK is that it handles reading from and writing to multiple data warehouses and databases through a single, consistent interface. You don't need to import google.cloud.bigquery, import psycopg2, import snowflake.connector, import duckdb, and then juggle each library's auth, client, and cursor semantics. The SDK handles all of that - credential parsing, driver selection, and client initialization - behind query() and get_connection().
Want to read from Postgres and write to BigQuery in the same asset? Just declare both connections in the @bruin block and call query(..., connection=...). The SDK wires up the right client for each source and destination automatically.
""" @bruin
name: staging.active_users
connection: my_bigquery
secrets:
- key: my_postgres
- key: my_snowflake
@bruin """
from bruin import query, get_connection
# Read from Postgres
users = query("SELECT * FROM users WHERE active = true", connection="my_postgres")
# Read from Snowflake - same API, different warehouse
events = query("SELECT user_id, COUNT(*) AS events FROM events GROUP BY 1", connection="my_snowflake")
# Write to BigQuery using the typed client
bq = get_connection("my_bigquery")
users.to_gbq(
"staging.active_users",
project_id=bq.raw["project_id"],
credentials=bq.credentials,
if_exists="replace",
)
No separate SDKs to install, no manual credential plumbing, no per-warehouse client setup. One import, many connections.
Writing data manually like this works, but for most cases you're better off using Python materialization - return the DataFrame from materialize() and let Bruin handle the load with a proper strategy (merge, append, delete+insert, or create+replace).
7) Combine the SDK with materialization
The SDK and Python materialization compose cleanly: the SDK reads and transforms; materialization writes.
"""@bruin
name: analytics.active_users
image: python:3.13
connection: my_bigquery
materialization:
type: table
strategy: merge
columns:
- name: id
type: integer
primary_key: true
- name: name
type: string
update_on_merge: true
@bruin"""
from bruin import query, context
def materialize():
if context.is_full_refresh:
return query("SELECT id, name FROM raw.users WHERE active = true")
return query(f"""
SELECT id, name FROM raw.users
WHERE active = true
AND last_seen_at BETWEEN '{context.start_date}' AND '{context.end_date}'
""")
Bruin calls materialize(), captures the returned DataFrame, and merges it into analytics.active_users using the primary key you declared.
8) Use typed connection objects
get_connection(name) returns an object with lazy-initialized database clients:
from bruin import get_connection
conn = get_connection("my_bigquery")
conn.name # Connection name
conn.type # Connection type string
conn.raw # Parsed connection JSON
conn.client # Native client, initialized on first access
df = conn.query("SELECT * FROM users")
Client types returned by conn.client:
google_cloud_platform→bigquery.Clientsnowflake→snowflake.connector.Connectionpostgres/redshift→psycopg2.connectionmssql→pymssql.Connectionmysql→mysql.connector.Connectionduckdb→duckdb.DuckDBPyConnection
GCP connections also expose conn.bigquery(), conn.sheets(), and conn.credentials for finer-grained service access.
9) Handle errors
The SDK ships a small exception hierarchy so you can catch specific failure modes:
from bruin.exceptions import (
BruinError, # base class
ConnectionNotFoundError, # connection name unknown
ConnectionParseError, # invalid connection JSON
ConnectionTypeError, # unsupported connection type
QueryError, # SQL execution failure
)
try:
df = query("SELECT * FROM users")
except QueryError as e:
print(f"Query failed: {e}")
If a database extra is missing (e.g. bruin-sdk[bigquery] not installed), the SDK raises a clear error telling you which extra to add.
Key takeaways
- The SDK removes boilerplate: credentials, clients, and context are handled for you
query()returns a pandas DataFrame for reads,Nonefor writescontextgives you typed access to run dates, pipeline variables, and the full-refresh flagget_connection()exposes the native database client when you need more control- Declare connections in the
@bruinmetadata block - the default goes underconnection:, extras go undersecrets:
Helpful links
- Bruin Python SDK documentation
- bruin-sdk on PyPI
- Source on GitHub
- Python assets in Bruin
- Python materialization tutorial - pair with the SDK to load DataFrames back into your warehouse
More tutorials

Chat with an AI Agent
Use Bruin Cloud's chat to ask an AI agent about your data, generate reports, and run Bruin Cloud CLI tasks like pipeline status and history.

Configure AI Agents
Create and configure AI agents in Bruin Cloud - pick a project, add messaging integrations, attach a connection set, and set permissions.

Connect Bruin Cloud MCP to Claude Code
Set up the Bruin Cloud MCP so your AI agent can query pipelines, inspect runs, and trigger actions in Bruin Cloud directly from your terminal.