Data Ingestion
Overview
Flex Ingest leverages best-in-class ingestion engines like Meltano, dlt (Data Load Tool), SlingData, and more to deliver a highly configurable YAML-driven data ingestion experience. One configuration file can move data from any source to any destination. Flex Ingest also provides custom source and destination connectors for data ingestion and reverse ETL. We also provide custom adapters that leverage high-performance native connectors and data lake architecture that's up to 20x faster than standard adapters.
Prior to setting up a Data Ingestion task, you'll need to create a Data Transformation and Orchestration job.
YAML Config Reference
Minimal Config
Complete Config
Config Sections
batch_size
Process tables in groups to prevent out-of-memory (OOM) errors on large pipelines.
When to use:
- Container has limited memory (< 8GB)
- Loading many large tables
- Seeing OOM errors
How it works:
- Pipeline runs with first N tables
- Data loaded to destination
- Memory freed
- Next batch starts
defaults
Settings applied to all tables (can be overridden per-table).
When primary_key or incremental_column is set, write_disposition automatically defaults to merge.
dataset
Use {SOURCE_SCHEMA} in the dataset name to automatically route each source schema to its own destination schema:
The pipeline runs one sub-pipeline per schema group, substituting the actual schema name into the dataset. Requires tables in grouped format.
validate_rowcounts
After loading, compare source row counts to destination row counts to catch silent data loss.
Can also be set per-table to target specific tables:
tuning
Source-specific extraction settings.
Oracle Tuning
| Key | Description | Default | When to Change |
|---|---|---|---|
parallelize | Extract tables in parallel | true | Set false if hitting connection limits |
arraysize | Rows per Oracle fetch batch | 100000 | Lower if OOM, higher if network is slow |
lob_arraysize | Rows per fetch batch for tables with LOB columns (CLOB/BLOB) | 1000 | Increase carefully โ LOB columns require extra round-trips per value |
encoding_errors | How to handle unencodable characters in string columns | "replace" | "ignore" silently drops bad chars; "strict" raises an exception |
Salesforce Tuning
| Key | Description | Default | When to Change |
|---|---|---|---|
bulk | API strategy: false (REST always), true (Bulk always), "full_only" (Bulk for full loads, REST for incrementals) | false | Use "full_only" for large tables with small daily changes |
bulk_threshold | Auto-use Bulk API when total table rows > N. Counts all rows, not just the incremental subset. | None | Set to 1000000; prefer bulk: "full_only" for incremental workloads |
bulk_yield_size | Rows per yield batch during Bulk extraction | 100000 | Lower to 10000 for very wide tables (100+ fields) or low-memory servers |
chunk_days | Split bulk queries by date range | None | Set to 30โ365 for very large tables (10M+) |
parallelize | Extract tables in parallel | true | Set false to reduce memory |
max_text_length | VARCHAR max for formula fields | None | Set to 4000 for Informatica compatibility |
text_length_multiplier | Multiply VARCHAR lengths | 1 | Set to 4 for UTF-8 byte safety |
preserve_case | Keep original field names | false | Set true with naming: sql_cs_v1 |
include_deleted | Include soft-deleted records | true | Set false to exclude deleted rows |
options
Maps directly to dlt configuration. Any nested key becomes DLT__<SECTION>__<KEY>.
Key options reference:
| Key | Default | Description |
|---|---|---|
extract.workers | 5 | Threads for parallel extraction |
extract.max_parallel_items | 20 | Max queued async items during extraction |
normalize.workers | 1 | Processes for normalization (>1 uses multiprocessing) |
normalize.naming | snake_case | Identifier naming convention |
load.workers | 20 | Threads for parallel loading |
load.loader_file_format | auto | File format for load jobs (jsonl, parquet) |
data_writer.buffer_max_items | 5000 | In-memory rows before flush to disk |
runtime.log_level | WARNING | dlt log verbosity |
Log levels:
| Level | Behavior |
|---|---|
DEBUG | Full dlt internals โ schema diffs, file writes, normalization details |
INFO | Standard progress โ extract counts, load status, step durations |
WARNING | Warnings only โ data issues, retries, non-fatal errors |
ERROR | Errors only |
CRITICAL | Silent โ only pipeline-level output (table counts, batch timing) |
Naming conventions (normalize.naming):
Built-in dlt conventions:
| Convention | Quoted | Normalization | Use When |
|---|---|---|---|
snake_case | No | Lowercase snake_case | Default โ most destinations |
sql_ci_v1 | No | Lowercase, SQL-safe | Alternative to snake_case; strips special chars like $ |
sql_cs_v1 | Yes | As-is | Preserve Salesforce camelCase field names |
direct | Yes | Pass-through | Preserve special characters (e.g., $) โ but quotes identifiers |
FlexFlow custom conventions:
| Convention | Quoted | Normalization | Snowflake Result |
|---|---|---|---|
flexflow_naming.unquoted | No | As-is | UPPERCASE (DB default fold), special chars preserved |
flexflow_naming.uppercase | Yes | Uppercase | "UPPERCASE" (exact, case-sensitive) |
flexflow_naming.lowercase | Yes | Lowercase | "lowercase" (exact, case-sensitive) |
flexflow_naming.unquoted is recommended for Oracle โ Snowflake pipelines: identifiers arrive unquoted so Snowflake folds them to uppercase by default, and special characters like $ in table names are preserved as-is.
Note: Switching naming conventions on a pipeline that has already loaded data requires clearing the local pipeline state directory first.
Recommended workers by container memory:
| Memory | extract.workers | normalize.workers | load.workers |
|---|---|---|---|
| 4GB | 1 | 1 | 1 |
| 8GB | 1 | 2 | 2 |
| 16GB+ | 2 | 4 | 4 |
pipelines_dir
Override the default dlt pipeline state directory (~/.dlt/pipelines/). Useful for container deployments or non-standard filesystem layouts.
Also configurable via environment variable (takes precedence over the config value):
Tables Configuration
Flat List
Grouped by Schema
For sources with tables in multiple schemas:
Wildcards
Use * (matches any sequence) and ? (matches a single character) in schema or table names:
Expansion is logged at startup: SATURN.STV* -> 12 table(s). Any per-table settings on a wildcard entry (write_disposition, bulk, chunk_days, etc.) are copied to every expanded table. Non-matching patterns are skipped with a warning.
Per-Table Config
Column Hints
Override column types, precision, or constraints using dlt's native columns syntax:
Supported hints:
| Hint | Description | Example |
|---|---|---|
data_type | Column type: text, bigint, double, decimal, bool, date, timestamp | text |
precision | Length for text, total digits for decimal, fractional seconds for timestamp | 200 |
scale | Decimal places (for decimal type) | 2 |
timezone | Timezone awareness for timestamps | false |
nullable | Allow NULL values | false |
Write Dispositions
| Mode | Behavior | Use When |
|---|---|---|
replace | Truncate and reload | Full refresh, small tables |
append | Insert new rows | Event logs, immutable data |
merge | Upsert by primary key | Incremental sync, mutable data |
Auto-detection: When primary_key or incremental_column is set, write_disposition defaults to merge.
Native Oracle Extraction
Features
- Auto-detected โ Uses native path when
drivernamecontains "oracle" - Per-table schema โ Grouped YAML format supports different schemas
- Retry logic โ Retries on connection failures (not mid-stream to avoid duplicates)
- Graceful skip โ Missing tables (ORA-00942) logged and skipped
- Arrow-native โ Direct to columnar format, merge-compatible
Configuration
Native Salesforce Extraction
Features
- Any Salesforce object โ Standard and custom objects (e.g.,
Academic_Advising__c) - Bulk API 2.0 โ 10โ40x faster for large tables
- Auto-bulk threshold โ Automatically switch API based on row count
- Arrow-native โ Direct to columnar, merge-compatible
- Date chunking โ Split huge tables into date ranges to reduce memory
- Timing diagnostics โ See exactly where time is spent (API wait vs processing)
API Selection Guide
| Table Size | Recommended | Config |
|---|---|---|
| < 100K rows | REST | (default) |
| 100K โ 1M rows | Either | bulk_threshold: 100000 |
| > 1M rows | Bulk API | bulk: true or bulk_threshold: 1000000 |
| > 10M rows | Bulk + chunking | bulk: true + chunk_days: 30 |
Basic Usage
Incremental Sync (Recommended for Production)
How incremental works:
- First run: Full extract of all records
- Subsequent runs: Only fetches
WHERE SystemModstamp > last_loaded_value - Merge: Upserts by
Id, updating existing and inserting new records
Key fields:
primary_key: Idโ Salesforce record ID (always use this)incremental_column: SystemModstampโ Updated on any change (preferred over LastModifiedDate)include_deleted: trueโ Captures soft deletes so theIsDeletedflag is synced
Bulk API with Date Chunking
For very large tables (10M+ rows), split into date ranges to reduce memory:
Timing Diagnostics
The Bulk API path logs detailed timing:
wait_sfโ Time waiting for Salesforce to deliver data (network/API bottleneck)parseโ Time parsing CSV to Arrowyieldโ Time for dlt to process batches
If wait_sf dominates, the bottleneck is Salesforce. If yield dominates, increase batch_size or reduce normalize.workers.
Case-Preserving DDL (Informatica Compatibility)
Produces:
Memory Management
Symptoms of Memory Issues
- Container killed (OOMKilled)
- "Cannot allocate memory" errors
- Pipeline hangs during normalize phase
Solutions
1. Reduce batch size:
2. Reduce workers:
3. Disable parallelization:
4. Use date chunking for huge tables:
5. Lower Oracle fetch size:
Memory Budget Guide
| Container Memory | Recommended Config |
|---|---|
| 4GB | batch_size: 1, all workers: 1, parallelize: false |
| 8GB | batch_size: 2, workers: 2, parallelize: false |
| 16GB | batch_size: 4, workers: 4, parallelize: true |
Troubleshooting
"No data loaded" / Empty tables
- Check source credentials and permissions
- Verify table/object names (case-sensitive for custom objects)
- Check for
IsDeleted = truefilter ifinclude_deleted: false
Slow Salesforce extraction
- Check timing logs: if
wait_sfis high, Salesforce API is the bottleneck - Reduce number of fields if possible
- Use Bulk API for tables > 100K rows
- Use date chunking for tables > 10M rows
Merge not working / No _dlt_id column
- Ensure
primary_keyandincremental_columnare set - The pipeline auto-enables
add_dlt_idfor Arrow tables - Check that
write_dispositionresolved tomerge(logged at startup)
OOM / Memory errors
See Memory Management section.
Oracle connection timeouts
The pipeline sets reasonable defaults (tcp_connect_timeout: 30s, call_timeout: 600000ms). Override in tuning if needed:
Best Practices
For First-Time Users
- Start small โ Test with one small table first
- Use incremental โ Set
primary_keyandincremental_columnfor production - Monitor memory โ Start with conservative settings, increase gradually
- Check timing logs โ Understand where time is spent before optimizing
For Production
- Always use incremental for mutable data (not append-only logs)
- Set
include_deleted: truefor accurate state sync - Use
bulk_thresholdinstead ofbulk: truefor mixed table sizes - Set
batch_sizebased on container memory - Use named connections for environment separation
Performance Optimization
- Salesforce: Use Bulk API for anything > 100K rows
- Oracle: Increase
arraysizeif memory allows - General: Increase workers if CPU is underutilized
- Snowflake: Use a larger warehouse for faster COPY (scale up during load)