Data Ingestion
Overview
Flex Ingest leverages best-in-class ingestion engines like Meltano, dlt (Data Load Tool), SlingData, and more to deliver a highly configurable YAML-driven data ingestion experience. One configuration file can move data from any source to any destination. Flex Ingest also provides custom source and destination connectors for data ingestion and reverse ETL. We also provide custom adapters that leverage high-performance native connectors and data lake architecture that's up to 20x faster than standard adapters.
Prior to setting up a Data Ingestion task, you'll need to create a Data Transformation and Orchestration job.
YAML Config Reference
Minimal Config
Complete Config
Config Sections
batch_size
Process tables in groups to prevent out-of-memory (OOM) errors on large pipelines.
When to use:
- Container has limited memory (< 8GB)
- Loading many large tables
- Seeing OOM errors
How it works:
- Pipeline runs with first N tables
- Data loaded to destination
- Memory freed
- Next batch starts
defaults
Settings applied to all tables (can be overridden per-table).
When primary_key or incremental_column is set, write_disposition automatically defaults to merge.
tuning
Source-specific extraction settings.
Oracle Tuning
| Key | Description | Default | When to Change |
|---|---|---|---|
parallelize | Extract tables in parallel | true | Set false if hitting connection limits |
arraysize | Rows per Oracle fetch batch | 100000 | Lower if OOM, higher if network is slow |
Salesforce Tuning
| Key | Description | Default | When to Change |
|---|---|---|---|
bulk | Use Bulk API 2.0 for ALL tables | false | Set true for large datasets |
bulk_threshold | Auto-use Bulk API when table > N rows | None | Set to 1000000 for mixed workloads |
chunk_days | Split bulk queries by date range | None | Set to 30โ365 for very large tables (10M+) |
parallelize | Extract tables in parallel | true | Set false to reduce memory |
max_text_length | VARCHAR max for formula fields | None | Set to 4000 for Informatica compatibility |
text_length_multiplier | Multiply VARCHAR lengths | 1 | Set to 4 for UTF-8 byte safety |
preserve_case | Keep original field names | false | Set true with naming: sql_cs_v1 |
include_deleted | Include soft-deleted records | true | Set false to exclude deleted rows |
options
Maps directly to dlt configuration. Any nested key becomes DLT__<SECTION>__<KEY>.
Key options reference:
| Key | Default | Description |
|---|---|---|
extract.workers | 5 | Threads for parallel extraction |
extract.max_parallel_items | 20 | Max queued async items during extraction |
normalize.workers | 1 | Processes for normalization (>1 uses multiprocessing) |
normalize.naming | snake_case | Identifier naming convention |
load.workers | 20 | Threads for parallel loading |
load.loader_file_format | auto | File format for load jobs (jsonl, parquet) |
data_writer.buffer_max_items | 5000 | In-memory rows before flush to disk |
runtime.log_level | WARNING | dlt log verbosity |
Log levels:
| Level | Behavior |
|---|---|
DEBUG | Full dlt internals โ schema diffs, file writes, normalization details |
INFO | Standard progress โ extract counts, load status, step durations |
WARNING | Warnings only โ data issues, retries, non-fatal errors |
ERROR | Errors only |
CRITICAL | Silent โ only pipeline-level output (table counts, batch timing) |
Naming conventions (normalize.naming):
| Convention | Case | Use When |
|---|---|---|
snake_case | Insensitive (lowercase) | Default โ most destinations |
sql_ci_v1 | Insensitive (lowercase) | SQL-safe alternative to snake_case |
sql_cs_v1 | Sensitive (quoted) | Preserve Salesforce camelCase field names |
direct | Sensitive | Pass-through with no transformation |
Recommended workers by container memory:
| Memory | extract.workers | normalize.workers | load.workers |
|---|---|---|---|
| 4GB | 1 | 1 | 1 |
| 8GB | 1 | 2 | 2 |
| 16GB+ | 2 | 4 | 4 |
Tables Configuration
Flat List
Grouped by Schema
For sources with tables in multiple schemas:
Per-Table Config
Column Hints
Override column types, precision, or constraints using dlt's native columns syntax:
Supported hints:
| Hint | Description | Example |
|---|---|---|
data_type | Column type: text, bigint, double, decimal, bool, date, timestamp | text |
precision | Length for text, total digits for decimal, fractional seconds for timestamp | 200 |
scale | Decimal places (for decimal type) | 2 |
timezone | Timezone awareness for timestamps | false |
nullable | Allow NULL values | false |
Write Dispositions
| Mode | Behavior | Use When |
|---|---|---|
replace | Truncate and reload | Full refresh, small tables |
append | Insert new rows | Event logs, immutable data |
merge | Upsert by primary key | Incremental sync, mutable data |
Auto-detection: When primary_key or incremental_column is set, write_disposition defaults to merge.
Native Oracle Extraction
Features
- Auto-detected โ Uses native path when
drivernamecontains "oracle" - Per-table schema โ Grouped YAML format supports different schemas
- Retry logic โ Retries on connection failures (not mid-stream to avoid duplicates)
- Graceful skip โ Missing tables (ORA-00942) logged and skipped
- Arrow-native โ Direct to columnar format, merge-compatible
Configuration
Native Salesforce Extraction
Features
- Any Salesforce object โ Standard and custom objects (e.g.,
Academic_Advising__c) - Bulk API 2.0 โ 10โ40x faster for large tables
- Auto-bulk threshold โ Automatically switch API based on row count
- Arrow-native โ Direct to columnar, merge-compatible
- Date chunking โ Split huge tables into date ranges to reduce memory
- Timing diagnostics โ See exactly where time is spent (API wait vs processing)
API Selection Guide
| Table Size | Recommended | Config |
|---|---|---|
| < 100K rows | REST | (default) |
| 100K โ 1M rows | Either | bulk_threshold: 100000 |
| > 1M rows | Bulk API | bulk: true or bulk_threshold: 1000000 |
| > 10M rows | Bulk + chunking | bulk: true + chunk_days: 30 |
Basic Usage
Incremental Sync (Recommended for Production)
How incremental works:
- First run: Full extract of all records
- Subsequent runs: Only fetches
WHERE SystemModstamp > last_loaded_value - Merge: Upserts by
Id, updating existing and inserting new records
Key fields:
primary_key: Idโ Salesforce record ID (always use this)incremental_column: SystemModstampโ Updated on any change (preferred over LastModifiedDate)include_deleted: trueโ Captures soft deletes so theIsDeletedflag is synced
Bulk API with Date Chunking
For very large tables (10M+ rows), split into date ranges to reduce memory:
Timing Diagnostics
The Bulk API path logs detailed timing:
wait_sfโ Time waiting for Salesforce to deliver data (network/API bottleneck)parseโ Time parsing CSV to Arrowyieldโ Time for dlt to process batches
If wait_sf dominates, the bottleneck is Salesforce. If yield dominates, increase batch_size or reduce normalize.workers.
Case-Preserving DDL (Informatica Compatibility)
Produces:
Memory Management
Symptoms of Memory Issues
- Container killed (OOMKilled)
- "Cannot allocate memory" errors
- Pipeline hangs during normalize phase
Solutions
1. Reduce batch size:
2. Reduce workers:
3. Disable parallelization:
4. Use date chunking for huge tables:
5. Lower Oracle fetch size:
Memory Budget Guide
| Container Memory | Recommended Config |
|---|---|
| 4GB | batch_size: 1, all workers: 1, parallelize: false |
| 8GB | batch_size: 2, workers: 2, parallelize: false |
| 16GB | batch_size: 4, workers: 4, parallelize: true |
Troubleshooting
"No data loaded" / Empty tables
- Check source credentials and permissions
- Verify table/object names (case-sensitive for custom objects)
- Check for
IsDeleted = truefilter ifinclude_deleted: false
Slow Salesforce extraction
- Check timing logs: if
wait_sfis high, Salesforce API is the bottleneck - Reduce number of fields if possible
- Use Bulk API for tables > 100K rows
- Use date chunking for tables > 10M rows
Merge not working / No _dlt_id column
- Ensure
primary_keyandincremental_columnare set - The pipeline auto-enables
add_dlt_idfor Arrow tables - Check that
write_dispositionresolved tomerge(logged at startup)
OOM / Memory errors
See Memory Management section.
Oracle connection timeouts
The pipeline sets reasonable defaults (tcp_connect_timeout: 30s, call_timeout: 600000ms). Override in tuning if needed:
Best Practices
For First-Time Users
- Start small โ Test with one small table first
- Use incremental โ Set
primary_keyandincremental_columnfor production - Monitor memory โ Start with conservative settings, increase gradually
- Check timing logs โ Understand where time is spent before optimizing
For Production
- Always use incremental for mutable data (not append-only logs)
- Set
include_deleted: truefor accurate state sync - Use
bulk_thresholdinstead ofbulk: truefor mixed table sizes - Set
batch_sizebased on container memory - Use named connections for environment separation
Performance Optimization
- Salesforce: Use Bulk API for anything > 100K rows
- Oracle: Increase
arraysizeif memory allows - General: Increase workers if CPU is underutilized
- Snowflake: Use a larger warehouse for faster COPY (scale up during load)