AWS Athena
Bruin supports AWS Athena as a query engine, which means you can use Bruin to build tables and views in your data lake with Athena.
WARNING
Bruin materializations will always create Iceberg tables on Athena. You can still write SQL scripts for legacy tables and not use materialization features.
Connection
In order to have set up an Athena connection, you need to add a configuration item to connections in the .bruin.yml file complying with the following schema:
connections:
athena:
- name: "connection_name"
region: "us-west-2"
database: "some_database"
access_key_id: "XXXXXXXX"
secret_access_key: "YYYYYYYY"
query_results_path: "s3://some-bucket/some-path"
session_token: "ZZZZZZZ" # optional
profile: "some_profile" # optionalYou have two ways to set credentials:
- You can put your
access_key_idandsecret_access_key, as well as an optionalsession_tokenhere directly. - Alternatively, you can put your
profilehere, and if you have your local AWS credentials in~/.awsconfigured, Bruin will use them.
The field database is optional, if not provided, it will default to default.
WARNING
The results of the materialization as well as any temporary tables Bruin needs to create will be stored at the location defined by query_results_path. This location must be writable and might be required to be empty at the beginning.
Athena Assets
athena.sql
Runs a materialized Athena asset or an SQL script. For detailed parameters, you can check Definition Schema page.
Examples
Create a view to aggregate website traffic data
/* @bruin
name: website_traffic.view
type: athena.sql
materialization:
type: view
@bruin */
select
date,
count(distinct user_id) as unique_visitors,
sum(page_views) as total_page_views,
avg(session_duration) as avg_session_duration
from raw_web_traffic
group by date;Create a table to analyze daily sales performance:
/* @bruin
name: daily_sales_analysis.view
type: athena.sql
materialization:
type: table
@bruin */
select
order_date,
sum(total_amount) as total_sales,
count(distinct order_id) as total_orders,
avg(total_amount) as avg_order_value
from sales_data
group by order_date;Bruin Athena assets support partitioning by one column only
/* @bruin
name: daily_sales_analysis.view
type: athena.sql
materialization:
type: table
partition_by: order_date # <----------
@bruin */
select
order_date,
sum(total_amount) as total_sales,
count(distinct order_id) as total_orders,
avg(total_amount) as avg_order_value
from sales_data
group by order_date;athena.seed
athena.seed is a special type of asset used to represent CSV files that contain data that is prepared outside of your pipeline that will be loaded into your Athena database. Bruin supports seed assets natively, allowing you to simply drop a CSV file in your pipeline and ensuring the data is loaded to the Athena database.
You can define seed assets in a file ending with .yaml:
name: dashboard.hello
type: athena.seed
parameters:
path: seed.csvParameters:
path: Thepathparameter is the path to the CSV file that will be loaded into the data platform. path is relative to the asset definition file.
Examples: Load csv into a Athena database
The examples below show how to load a CSV into an Athena database.
name: dashboard.hello
type: athena.seed
parameters:
path: seed.csvExample CSV:
name,networking_through,position,contact_date
Y,LinkedIn,SDE,2024-01-01
B,LinkedIn,SDE 2,2024-01-01athena.sensor.table
Sensors are a special type of assets that are used to wait on certain external signals.
Checks if a table exists in Athena, runs by default every 30 seconds until this table is available.
name: string
type: string
parameters:
table: string
poke_interval: int (optional)Parameters:
table:table_idformat.catalogue_idanddatabase_idare then taken from the configuration inbruin.yml.poke_interval: The interval between retries in seconds (default 30 seconds).
athena.sensor.query
Checks if a query returns any results in Athena, runs by default every 30 seconds until this query returns any results.
name: string
type: string
parameters:
query: string
poke_interval: int (optional)Parameters:
query: Query you expect to return any resultspoke_interval: The interval between retries in seconds (default 30 seconds).
Example: Partitioned upstream table
Checks if the data available in upstream table for end date of the run.
name: analytics_123456789.events
type: athena.sensor.query
parameters:
query: select exists(select 1 from upstream_table where dt = "{{ end_date }}"Example: Streaming upstream table
Checks if there is any data after end timestamp, by assuming that older data is not appended to the table.
name: analytics_123456789.events
type: athena.sensor.query
parameters:
query: select exists(select 1 from upstream_table where inserted_at > "{{ end_timestamp }}"