Skip to content

Ingestr Assets

Ingestr is a CLI tool that allows you to easily move data between platforms. Bruin supports ingestr natively as an asset type.

Using Ingestr, you can move data from:

  • your production databases like:
    • MSSQL
    • MySQL
    • Oracle
  • your daily tools like:
    • Notion
    • Google Sheets
    • Airtable
  • from other platforms such as:
    • Hubspot
    • Salesforce
    • Google Analytics
    • Facebook Ads
    • Google Ads

to your data warehouses:

  • Google BigQuery
  • Snowflake
  • AWS Redshift
  • Azure Synapse
  • Postgres

INFO

You can read more about the capabilities of ingestr in its documentation.

Asset Structure

yaml
name: string
type: ingestr
connection: string # optional, by default uses the default connection for destination platform in pipeline.yml
parameters:
  source: string # optional, used when inferring the source from connection is not enough, e.g. GCP connection + GSheets source
  source_connection: string
  source_table: string
  destination: bigquery | snowflake | redshift | synapse
  
  # optional
  incremental_strategy: replace | append | merge | delete+insert
  incremental_key: string
  sql_backend: pyarrow | sqlalchemy
  loader_file_format: jsonl | csv | parquet

Parameter reference

ParameterRequiredIngestr flagDescription
connectionNo--dest-uriDestination connection; defaults to the pipeline's connection for the asset.
source_connectionYes--source-uriName of the configured source connection. Bruin resolves it to the URI passed to Ingestr.
sourceNon/aOverrides the inferred source type. For example, set gsheets when reusing a BigQuery connection for Google Sheets.
source_tableYes--source-tableTable, sheet, or resource identifier to pull from the source.
file_typeNo--source-table suffixAppended to the source_table as table#type for connectors that need a file format hint (csv, jsonl, parquet).
destinationYes--dest-uriLogical destination used to select the target connection; Bruin converts it into the URI supplied to Ingestr.
incremental_strategyNo--incremental-strategyPasses the incremental loading strategy (replace, append, merge, delete+insert) to Ingestr.
incremental_keyNo--incremental-keyColumn that determines incremental progress. When the column is defined with type date, Bruin also forwards it through the --columns option so Ingestr treats it as a date field.
partition_byNo--partition-byComma-separated list of destination columns to partition by.
cluster_byNo--cluster-byComma-separated list of destination clustering keys.
loader_file_formatNo--loader-file-formatOverrides the loader file format (jsonl, csv, parquet).
loader_file_sizeNo--loader-file-sizeSets the maximum loader file size accepted by Ingestr.
sql_backendNo--sql-backendSelects the SQL backend Ingestr should use (pyarrow or sqlalchemy).
schema_namingNo--schema-namingControls how Ingestr normalizes schema names. Accepted values match the Ingestr CLI.
extract_parallelismNo--extract-parallelismLimits the number of concurrent extraction workers.
sql_reflection_levelNo--sql-reflection-levelTunes the amount of schema reflection performed against the source.
sql_limitNo--sql-limitApplies a LIMIT clause when extracting from the source.
sql_exclude_columnsNo--sql-exclude-columnsList of columns to skip during extraction.
staging_bucketNo--staging-bucketOverrides the staging bucket that Ingestr uses for intermediate files.

Column metadata

Define columns on the asset to enrich the metadata passed to Ingestr. Columns flagged as primary_key: true are translated into repeated --primary-key flags, and date-typed incremental keys automatically surface through --columns. See Column metadata for the syntax.

Run configuration

Pipeline run options propagate to ingestr automatically:

  • When a run defines an interval start or end date, Bruin appends --interval-start and --interval-end with the resolved timestamps (including interval modifiers, when enabled).
  • Running with --full-refresh adds the --full-refresh flag to Ingestr.

Examples

The examples below show how to use the ingestr asset type in your pipeline. Feel free to change them as you wish according to your needs.

Copy a table from MySQL to BigQuery

yaml
name: raw.transactions
type: ingestr
parameters:
  source_connection: mysql_prod
  source_table: public.transactions
  destination: bigquery

Copy a table from Microsoft SQL Server to Snowflake incrementally

This example shows how to use updated_at column to incrementally load the data from Microsoft SQL Server to Snowflake.

yaml
name: raw.transactions
type: ingestr
parameters:
  source_connection: mysql_prod
  source_table: dbo.transactions
  destination: snowflake
  incremental_strategy: append
  incremental_key: updated_at

Copy data from Google Sheets to Snowflake

This example shows how to copy data from Google Sheets into your Snowflake database

yaml
name: raw.manual_orders
type: ingestr
parameters:
  source: gsheets
  source_connection: gcp-default
  source_table: <mysheetid>.<sheetname>
  destination: snowflake