Using the Aqueducts framework in your application
Quickstart
In order to load and execute an aqueduct pipeline we can first read the yaml configuration from a local file path:
use aqueduct::prelude::*;
// Provide params that will be substituted in the aqueduct template
let params = HashMap::from_iter(vec![
("date".into(), "2024-01-01".into()),
]);
// Load pipeline from file
let aqueduct = Aqueduct::try_from_yml("./examples/aqueduct_pipeline_example.yml", params).unwrap();
We can then execute the pipeline:
use aqueduct::prelude::*;
// Optionally set up a `SessionContext` to register necessary object_stores or UDFs, UDAFs
let result = run_pipeline(aqueduct, None).await.unwrap();
The pipeline execution will:
- register all sources into the SessionContext using the given name as a table identifier
- execute all defined stages sequentially top to bottom, caching the result of each stage as a table using the name of the stage (can be referenced downstream via SQL using the stage name)
- use the result of the final stage to write data to a destination if defined
Example YAML configurations
Here are some examples on how to use the Aqueducts deserialization schema for YAML files
Sources
Example
Processing stages
Example
stages:
- - name: show_all
query: SELECT * FROM readings
show: 0 # show complete result set
- name: show_limit
query: SELECT * FROM readings
show: 10 # show 10 values
- name: print_schema
query: SELECT * FROM readings
print_schema: true # print the data frame schema to stdout
- name: explain
query: SELECT * FROM readings
explain: true # print the query plan to stdout
- name: explain_analyze
query: SELECT * FROM readings
explain_analyze: true # print the query plan with execution statistics to stdout, takes precedence over explain
- name: combine
query: SELECT * FROM readings
# combine multiple debug options together
explain_analyze: true
print_schema: true
show: 10
Destination configuration
Example
destination:
type: Delta
name: example_output
location: ${local_path}/examples/output_delta_example/${run_id}
storage_options: {}
table_properties: {}
write_mode:
# appends data to the table
operation: Append
# columns by which to partition the table
partition_cols:
- date
# table schema using de-serialization provided by `deltalake::kernel::StructField`
schema:
- name: date
type: date
nullable: true
metadata: {}
- name: location_id
type: integer
nullable: true
metadata: {}
- name: avg_temp_c
type: double
nullable: true
metadata: {}
- name: avg_humidity
type: double
nullable: true
metadata: {}
destination:
type: Delta
name: example_output
location: ${local_path}/examples/output_delta_example/${run_id}
storage_options: {}
table_properties: {}
write_mode:
# upserts using the date as the "primary" key
operation: Upsert
params:
- date
# columns by which to partition the table
partition_cols:
- date
# table schema using de-serialization provided by `deltalake::kernel::StructField`
schema:
- name: date
type: date
nullable: true
metadata: {}
- name: location_id
type: integer
nullable: true
metadata: {}
- name: avg_temp_c
type: double
nullable: true
metadata: {}
- name: avg_humidity
type: double
nullable: true
metadata: {}
destination:
type: Delta
name: example_output
location: ${local_path}/examples/output_delta_example/${run_id}
storage_options: {}
table_properties: {}
write_mode:
# replaces using the date column to delete all data for that date
operation: Replace
params:
- column: date
value: '2024-01-01'
# columns by which to partition the table
partition_cols:
- date
# table schema using de-serialization provided by `deltalake::kernel::StructField`
schema:
- name: date
type: date
nullable: true
metadata: {}
- name: location_id
type: integer
nullable: true
metadata: {}
- name: avg_temp_c
type: double
nullable: true
metadata: {}
- name: avg_humidity
type: double
nullable: true
metadata: {}