Skip to content

Aqueduct

Definition for an "Aqueduct" data pipeline. An aqueduct defines a complete data processing pipeline with sources, transformation stages, and an optional destination. Most configuration uses sensible defaults to minimize verbosity.

Root Properties

version

Property Details

Type: string
Required: No Default: v2

Schema version for migration compatibility

sources

Property Details

Type: array
Required: Yes

Definition of the data sources for this pipeline

stages

Property Details

Type: array
Required: Yes

A sequential list of transformations to execute within the context of this pipeline Nested stages are executed in parallel

destination

Property Details

Type: anyOf
Required: No

Destination for the final step of the "Aqueduct" takes the last stage as input for the write operation

Type: Destination

Type: null

Type Definitions

Source

A data source for aqueducts pipelines. Sources define where data is read from and include various formats and storage systems. Each source type has specific configuration options for its format and location.

Type: object

Properties:

Property Type Required Description Default
type string -
name string Name of the in-memory table, existence will be checked at runtime -

Enum Values: - type: in_memory

Type: object

Properties:

Property Type Required Description Default
type string -
name string Name of the file source, will be the registered table name in the SQL context -
format SourceFileType File format of the file to be ingested Supports "Parquet" for parquet files, "Csv" for CSV files and "Json" for JSON files -
location Location A URL or Path to the location of the file Supports relative local paths -
storage_config object Storage configuration for the file Please reference the delta-rs github repo for more information on available keys (e.g. https://github.com/delta-io/delta-rs/blob/main/crates/aws/src/storage.rs) additionally also reference the "object_store" docs (e.g. https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html) {}

Enum Values: - type: file

Type: object

Properties:

Property Type Required Description Default
type string -
name string Name of the directory source, will be the registered table name in the SQL context -
format SourceFileType File format of the files to be ingested Supports "Parquet" for parquet files, "Csv" for CSV files and "Json" for JSON files -
partition_columns array Columns to partition the table by This is a list of key value tuples where the key is the column name and the value is a DataType []
location Location A URL or Path to the location of the directory Supports relative local paths -
storage_config object Storage configuration for the directory Please reference the delta-rs github repo for more information on available keys (e.g. https://github.com/delta-io/delta-rs/blob/main/crates/aws/src/storage.rs) additionally also reference the "object_store" docs (e.g. https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html) {}

Enum Values: - type: directory

Type: object

Properties:

Property Type Required Description Default
type string -
name string Name of the ODBC source, will be the registered table name in the SQL context -
load_query string Query to execute when fetching data from the ODBC connection This query will execute eagerly before the data is processed by the pipeline Size of data returned from the query cannot exceed work memory -
connection_string string ODBC connection string Please reference the respective database connection string syntax (e.g. https://www.connectionstrings.com/postgresql-odbc-driver-psqlodbc/) -

Enum Values: - type: odbc

Type: object

Properties:

Property Type Required Description Default
type string -
name string Name of the delta source, will be the registered table name in the SQL context -
location Location A URL or Path to the location of the delta table Supports relative local paths -
storage_config object Storage configuration for the delta table Please reference the delta-rs github repo for more information on available keys (e.g. https://github.com/delta-io/delta-rs/blob/main/crates/aws/src/storage.rs) additionally also reference the "object_store" docs (e.g. https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html) {}
version integer | null Delta table version to read from When unspecified, will read the latest version -
timestamp string | null RFC3339 timestamp to read the delta table at When unspecified, will read the latest version -

Enum Values: - type: delta

SourceFileType

File type of the source file, supports "Parquet", "Csv" or "Json"

Type: object

Properties:

Property Type Required Description Default
type string -
options ParquetSourceOptions -

Enum Values: - type: parquet

Type: object

Properties:

Property Type Required Description Default
type string -
options CsvSourceOptions -

Enum Values: - type: csv

Type: object

Properties:

Property Type Required Description Default
type string -
options JsonSourceOptions -

Enum Values: - type: json

ParquetSourceOptions

Properties:

Property Type Required Description Default
schema array schema to read this Parquet with Schema definition using universal Field types []

Field

A field definition that can be used across all aqueducts backends. Fields define the structure of data with a name, data type, nullable flag, and optional description.

Properties:

Property Type Required Description Default
name string The name of the field -
data_type string Data type specification. Examples: 'string', 'int64', 'bool', 'list', 'struct', 'timestamp', 'decimal<10,2>' -
nullable boolean Whether the field can contain null values True
description string | null Optional description of what this field represents -

CsvSourceOptions

Properties:

Property Type Required Description Default
has_header boolean set to "true" to treat first row of CSV as the header column names will be inferred from the header, if there is no header the column names are "column_1, column_2, ... column_x" True
delimiter string set a delimiter character to read this CSV with ,
schema array schema to read this CSV with Schema definition using universal Field types []

JsonSourceOptions

Properties:

Property Type Required Description Default
schema array schema to read this JSON with Schema definition using universal Field types []

Location

A file path or URL. File paths will be converted to file:// URLs. Examples: '/tmp/data.csv', './data.csv', 'https://example.com/data.csv', 's3://bucket/data.csv'

DataType

Universal data type that can be converted to Arrow, Delta, and other formats. DataType supports all common data types and can be parsed from user-friendly string representations. This provides a unified schema definition that works across different backends. When used in YAML/JSON configurations, data types are specified as strings that are automatically parsed into the appropriate DataType variant.

Data Type Format Examples

Basic Types:

  • "string" or "utf8" - UTF-8 string
  • "int32", "int", or "integer" - 32-bit signed integer
  • "int64" or "long" - 64-bit signed integer
  • "float32" or "float" - 32-bit floating point
  • "float64" or "double" - 64-bit floating point
  • "bool" or "boolean" - Boolean value
  • "date32" or "date" - Date as days since epoch

Complex Types:

  • "list" - List of strings
  • "struct" - Struct with name and age fields
  • "decimal<10,2>" - Decimal with precision 10, scale 2
  • "timestamp" - Timestamp with time unit and timezone
  • "map" - Map from string keys to int32 values

YAML Configuration Example:

schema:
  - name: user_id
    data_type: int64
    nullable: false
  - name: email
    data_type: string
    nullable: true
  - name: scores
    data_type: "list<float64>"
    nullable: true
  - name: profile
    data_type: "struct<name:string,age:int32>"
    nullable: true

Type: string

Allowed Values: - Boolean - Int8 - Int16 - Int32 - Int64 - UInt8 - UInt16 - UInt32 - UInt64 - Float32 - Float64 - Utf8 - LargeUtf8 - Binary - LargeBinary - Date32 - Date64

Type: object

Properties:

Property Type Required Description Default
FixedSizeBinary integer -

Type: object

Properties:

Property Type Required Description Default
Time32 TimeUnit -

Type: object

Properties:

Property Type Required Description Default
Time64 TimeUnit -

Type: object

Properties:

Property Type Required Description Default
Timestamp array -

Type: object

Properties:

Property Type Required Description Default
Duration TimeUnit -

Type: object

Properties:

Property Type Required Description Default
Interval IntervalUnit -

Type: object

Properties:

Property Type Required Description Default
Decimal128 array -

Type: object

Properties:

Property Type Required Description Default
Decimal256 array -

Type: object

Properties:

Property Type Required Description Default
List Field -

Type: object

Properties:

Property Type Required Description Default
LargeList Field -

Type: object

Properties:

Property Type Required Description Default
FixedSizeList array -

Type: object

Properties:

Property Type Required Description Default
Struct array -

Type: object

Properties:

Property Type Required Description Default
Map array -

Type: object

Properties:

Property Type Required Description Default
Union array -

Type: object

Properties:

Property Type Required Description Default
Dictionary array -

Stage

A processing stage in an aqueducts pipeline. Stages execute SQL queries against the available data sources and previous stage results. Each stage creates a named table that can be referenced by subsequent stages.

Properties:

Property Type Required Description Default
name string Name of the stage, used as the table name for the result of this stage -
query string SQL query that is executed against a datafusion context. Check the datafusion SQL reference for more information https://datafusion.apache.org/user-guide/sql/index.html -
show integer | null When set to a value of up to "usize", will print the result of this stage to the stdout limited by the number Set value to 0 to not limit the outputs -
explain boolean When set to 'true' the stage will output the query execution plan False
explain_analyze boolean When set to 'true' the stage will output the query execution plan with added execution metrics False
print_schema boolean When set to 'true' the stage will pretty print the output schema of the executed query False

Destination

Target output destination for aqueducts pipelines. Destinations define where processed data is written and include various formats and storage systems with their specific configuration options.

Type: object

Properties:

Property Type Required Description Default
type string -
name string Name to register the table with in the provided "SessionContext" -

Enum Values: - type: in_memory

Type: object

Properties:

Property Type Required Description Default
type string -
name string Name of the file to write -
location Location Location of the file as a URL e.g. file:///tmp/output.csv, s3://bucket_name/prefix/output.parquet, s3:://bucket_name/prefix -
format DestinationFileType File format, supported types are Parquet and CSV -
single_file boolean Describes whether to write a single file (can be used to overwrite destination file) True
partition_columns array Columns to partition table by []
storage_config object Object store storage configuration {}

Enum Values: - type: file

Type: object

Properties:

Property Type Required Description Default
type string -
name string Name of the destination -
connection_string string ODBC connection string Please reference the respective database connection string syntax (e.g. https://www.connectionstrings.com/postgresql-odbc-driver-psqlodbc/) -
write_mode WriteMode Strategy for performing ODBC write operation -
batch_size integer Batch size for inserts (defaults to 1000) 1000

Enum Values: - type: odbc

Type: object

Properties:

Property Type Required Description Default
type string -
name string Name of the destination -
location Location A URL or Path to the location of the delta table Supports relative local paths -
write_mode DeltaWriteMode Write mode for the delta destination -
storage_config object Storage configuration for the delta table Please reference the delta-rs github repo for more information on available keys (e.g. https://github.com/delta-io/delta-rs/blob/main/crates/aws/src/storage.rs) additionally also reference the "object_store" docs (e.g. https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html) {}
partition_columns array Partition columns for the delta table []
table_properties object DeltaTable table properties: https://docs.delta.io/latest/table-properties.html {}
metadata object Custom metadata to include with the table creation {}
schema array Table schema definition using universal Field types []

Enum Values: - type: delta

DestinationFileType

File type and options for destinations

Type: object

Properties:

Property Type Required Description Default
type string -
options object -

Enum Values: - type: parquet

Type: object

Properties:

Property Type Required Description Default
type string -
options CsvDestinationOptions -

Enum Values: - type: csv

Type: object

Properties:

Property Type Required Description Default
type string -

Enum Values: - type: json

CsvDestinationOptions

CSV destination options

Properties:

Property Type Required Description Default
has_header boolean Set to "true" to include headers in CSV True
delimiter string Set delimiter character to write CSV with ,
compression string | null Compression type for CSV output -

WriteMode

Write modes for the "Destination" output.

Type: object

Properties:

Property Type Required Description Default
operation string -

Enum Values: - operation: append

Type: object

Properties:

Property Type Required Description Default
operation string -
transaction CustomStatements -

Enum Values: - operation: custom

CustomStatements

SQL statements for "Custom" write mode.

Properties:

Property Type Required Description Default
pre_insert string | null Optional (non-insert) preliminary statement -
insert string Insert prepared statement -

DeltaWriteMode

Write mode for delta destinations

Type: object

Properties:

Property Type Required Description Default
operation string -

Enum Values: - operation: append

Type: object

Properties:

Property Type Required Description Default
operation string -
params array -

Enum Values: - operation: upsert

Type: object

Properties:

Property Type Required Description Default
operation string -
params array -

Enum Values: - operation: replace

ReplaceCondition

Condition used to build a predicate for data replacement.

Properties:

Property Type Required Description Default
column string Column name to match against -
value string Value to match for replacement -