Writing Pipelines¶
Aqueducts pipelines are declarative YAML configurations that define data processing workflows. This guide covers the key concepts and patterns for building effective pipelines.
Pipeline Structure¶
Every Aqueducts pipeline follows this basic structure:
# yaml-language-server: $schema=https://raw.githubusercontent.com/vigimite/aqueducts/main/json_schema/aqueducts.schema.json
version: "v2" # Schema version
sources: [...] # Where to read data from
stages: [...] # How to transform the data
destination: {...} # Where to write the results (optional)
Editor Support & Validation¶
For the best development experience, add the schema directive at the top of your pipeline files:
# yaml-language-server: $schema=https://raw.githubusercontent.com/vigimite/aqueducts/main/json_schema/aqueducts.schema.json
This enables:
- Autocompletion for all configuration options
- Real-time validation with error highlighting
- Inline documentation on hover
- Schema-aware suggestions for valid values
Supported Editors: - VS Code (with YAML extension) - Neovim (with yaml-language-server) - IntelliJ IDEA / PyCharm - Any editor with YAML Language Server support
Data Sources¶
Sources define where your pipeline reads data from. Aqueducts supports multiple source types:
File Sources¶
Read from individual files in various formats:
sources:
- type: file
name: sales_data
format:
type: csv
options:
has_header: true
delimiter: ","
location: ./data/sales.csv
Supported formats: CSV, Parquet, JSON
Directory Sources¶
Process all files in a directory:
sources:
- type: directory
name: daily_logs
format:
type: parquet
options: {}
location: s3://bucket/logs/
partition_columns:
- ["date", "date32"]
Delta Table Sources¶
Read from Delta Lake tables:
sources:
- type: delta
name: user_events
location: s3://datalake/events/
# Optional: read specific version or timestamp
version: 42
# timestamp: "2024-01-15T10:30:00Z"
ODBC Sources¶
Feature Flag Required
ODBC support requires the odbc
feature flag during installation.
sources:
- type: odbc
name: postgres_table
connection_string: "Driver={PostgreSQL};Server=localhost;Database=test;"
load_query: "SELECT * FROM users WHERE created_at > '2024-01-01'"
Cloud Storage Configuration¶
All file and directory sources support cloud storage with authentication:
Data Transformation (Stages)¶
Stages define SQL queries that transform your data. Aqueducts uses DataFusion for SQL processing.
Basic Stages¶
stages:
- - name: daily_summary
query: >
SELECT
date_trunc('day', timestamp) as date,
count(*) as events,
avg(value) as avg_value
FROM source_data
GROUP BY 1
ORDER BY 1
Parallel Execution¶
Stages at the same level execute in parallel:
stages:
# These run in parallel
- - name: sales_summary
query: "SELECT region, sum(amount) FROM sales GROUP BY region"
- name: user_summary
query: "SELECT country, count(*) FROM users GROUP BY country"
# This runs after both above stages complete
- - name: combined_report
query: >
SELECT s.region, s.total_sales, u.user_count
FROM sales_summary s
JOIN user_summary u ON s.region = u.country
Debugging Stages¶
Add debugging options to inspect your data:
stages:
- - name: debug_stage
query: "SELECT * FROM source_data LIMIT 10"
show: 10 # Print 10 rows to stdout
explain: true # Show query execution plan
print_schema: true # Print output schema
Data Destinations¶
Destinations define where to write your pipeline results.
File Destinations¶
Write to various file formats:
destination:
type: file
name: output
location: ./results/output.parquet
format:
type: parquet
options: {}
single_file: true
partition_columns: ["region"]
Delta Table Destinations¶
Write to Delta Lake with advanced operations:
destination:
type: delta
name: target_table
location: s3://datalake/target/
write_mode:
operation: upsert # append, replace, or upsert
params: ["user_id"] # upsert key columns
partition_columns: ["date"]
schema:
- name: user_id
data_type: int64
nullable: false
- name: name
data_type: string
nullable: true
Advanced Features¶
Parameter Templating¶
Make pipelines reusable with parameter substitution:
sources:
- type: file
name: data
location: ./data/${environment}/${date}.csv
destination:
type: file
location: ./output/${environment}_${date}_results.parquet
Execute with parameters:
Schema Definitions¶
Define explicit schemas for type safety:
sources:
- type: file
name: typed_data
format:
type: csv
options:
schema:
- name: user_id
data_type: int64
nullable: false
- name: score
data_type: "decimal<10,2>"
nullable: true
Complex Data Types¶
Aqueducts supports rich data types including:
- Basic types:
string
,int64
,float64
,bool
,date32
- Complex types:
list<string>
,struct<name:string,age:int32>
- Temporal types:
timestamp<millisecond,UTC>
,date32
- Decimal types:
decimal<10,2>
(precision, scale)
See the Schema Reference for complete type documentation.
Examples¶
Explore real-world pipeline patterns:
- Simple Pipeline: Basic CSV processing
- Complex Pipeline: Multi-source Delta operations
- ODBC Pipeline: Database integration
Best Practices¶
Performance Tips
- Use partitioning for large datasets
- Leverage parallel stages for independent operations
- Consider Delta Lake for complex update patterns
- Use explicit schemas for better performance and type safety
Common Pitfalls
- Ensure column names match between stages and joins
- Check data types when joining tables from different sources
- Be mindful of memory usage with large datasets