Data Ingestion

Overview

Flex Ingest leverages best-in-class ingestion engines like Meltano, dlt (Data Load Tool), SlingData, and more to deliver a highly configurable YAML-driven data ingestion experience. One configuration file can move data from any source to any destination. Flex Ingest also provides custom source and destination connectors for data ingestion and reverse ETL. We also provide custom adapters that leverage high-performance native connectors and data lake architecture that's up to 20x faster than standard adapters.

Prior to setting up a Data Ingestion task, you'll need to create a Data Transformation and Orchestration job.


YAML Config Reference

Minimal Config

pipeline_name: my_pipeline
dataset: my_dataset
source: sql_database
destination: snowflake
tables:
- users
- orders

Complete Config

pipeline_name: my_pipeline
dataset: MY_DATASET
source: salesforce
destination: snowflake
# Memory management - process tables in batches
batch_size: 2 # Tables per batch (frees memory between batches)
# Default settings for all tables
defaults:
primary_key: Id
incremental_column: SystemModstamp
# Source-specific tuning
tuning:
bulk_threshold: 1000000 # Auto-bulk for tables > 1M rows
parallelize: false # Sequential extraction (safer for memory)
max_text_length: 4000 # VARCHAR max for formula fields
text_length_multiplier: 4 # UTF-8 byte safety (4x multiplier)
preserve_case: true # Keep camelCase field names
include_deleted: true # Capture soft deletes
# dlt options (maps to DLT__* env vars)
options:
extract:
workers: 1
normalize:
workers: 2
naming: sql_cs_v1 # Quoted identifiers for case preservation
load:
workers: 2
tables:
- Account
- Contact
- name: et4ae5__IndividualEmailResult__c
bulk: true # Force Bulk API for this table

Config Sections

batch_size

Process tables in groups to prevent out-of-memory (OOM) errors on large pipelines.

batch_size: 2 # Process 2 tables, load to destination, free memory, repeat

When to use:

  • Container has limited memory (< 8GB)
  • Loading many large tables
  • Seeing OOM errors

How it works:

  1. Pipeline runs with first N tables
  2. Data loaded to destination
  3. Memory freed
  4. Next batch starts

defaults

Settings applied to all tables (can be overridden per-table).

defaults:
primary_key: Id
incremental_column: SystemModstamp
write_disposition: merge

When primary_key or incremental_column is set, write_disposition automatically defaults to merge.

tuning

Source-specific extraction settings.

Oracle Tuning

KeyDescriptionDefaultWhen to Change
parallelizeExtract tables in paralleltrueSet false if hitting connection limits
arraysizeRows per Oracle fetch batch100000Lower if OOM, higher if network is slow

Salesforce Tuning

KeyDescriptionDefaultWhen to Change
bulkUse Bulk API 2.0 for ALL tablesfalseSet true for large datasets
bulk_thresholdAuto-use Bulk API when table > N rowsNoneSet to 1000000 for mixed workloads
chunk_daysSplit bulk queries by date rangeNoneSet to 30โ€“365 for very large tables (10M+)
parallelizeExtract tables in paralleltrueSet false to reduce memory
max_text_lengthVARCHAR max for formula fieldsNoneSet to 4000 for Informatica compatibility
text_length_multiplierMultiply VARCHAR lengths1Set to 4 for UTF-8 byte safety
preserve_caseKeep original field namesfalseSet true with naming: sql_cs_v1
include_deletedInclude soft-deleted recordstrueSet false to exclude deleted rows

options

Maps directly to dlt configuration. Any nested key becomes DLT__<SECTION>__<KEY>.

options:
extract:
workers: 1 # DLT__EXTRACT__WORKERS=1
max_parallel_items: 5 # DLT__EXTRACT__MAX_PARALLEL_ITEMS=5
normalize:
workers: 2 # DLT__NORMALIZE__WORKERS=2
naming: sql_cs_v1 # DLT__NORMALIZE__NAMING=sql_cs_v1
load:
workers: 2 # DLT__LOAD__WORKERS=2
loader_file_format: jsonl
data_writer:
buffer_max_items: 5000 # In-memory rows before flushing to disk
file_max_items: 100000 # Rows per intermediary file
file_max_bytes: 10000000
runtime:
log_level: CRITICAL # Silent โ€” only pipeline-level output

Key options reference:

KeyDefaultDescription
extract.workers5Threads for parallel extraction
extract.max_parallel_items20Max queued async items during extraction
normalize.workers1Processes for normalization (>1 uses multiprocessing)
normalize.namingsnake_caseIdentifier naming convention
load.workers20Threads for parallel loading
load.loader_file_formatautoFile format for load jobs (jsonl, parquet)
data_writer.buffer_max_items5000In-memory rows before flush to disk
runtime.log_levelWARNINGdlt log verbosity

Log levels:

LevelBehavior
DEBUGFull dlt internals โ€” schema diffs, file writes, normalization details
INFOStandard progress โ€” extract counts, load status, step durations
WARNINGWarnings only โ€” data issues, retries, non-fatal errors
ERRORErrors only
CRITICALSilent โ€” only pipeline-level output (table counts, batch timing)

Naming conventions (normalize.naming):

ConventionCaseUse When
snake_caseInsensitive (lowercase)Default โ€” most destinations
sql_ci_v1Insensitive (lowercase)SQL-safe alternative to snake_case
sql_cs_v1Sensitive (quoted)Preserve Salesforce camelCase field names
directSensitivePass-through with no transformation

Recommended workers by container memory:

Memoryextract.workersnormalize.workersload.workers
4GB111
8GB122
16GB+244

Tables Configuration

Flat List

tables:
- users
- orders
- products

Grouped by Schema

For sources with tables in multiple schemas:

tables:
SCHEMA_A:
- users
- orders
SCHEMA_B:
- transactions

Per-Table Config

tables:
- Account # Simple replace
- name: Contact
primary_key: Id
incremental_column: SystemModstamp # Auto-merge
- name: LargeTable__c
bulk: true # Force Bulk API
chunk_days: 30 # Date chunking for 10M+ rows

Column Hints

Override column types, precision, or constraints using dlt's native columns syntax:

tables:
- name: ps_assignment_type
columns:
descrshort:
data_type: text
precision: 200 # VARCHAR(200)
amount:
data_type: decimal
precision: 10
scale: 2 # DECIMAL(10,2)
event_time:
data_type: timestamp
precision: 3 # Milliseconds
timezone: false # TIMESTAMP_NTZ
is_active:
nullable: false # NOT NULL constraint

Supported hints:

HintDescriptionExample
data_typeColumn type: text, bigint, double, decimal, bool, date, timestamptext
precisionLength for text, total digits for decimal, fractional seconds for timestamp200
scaleDecimal places (for decimal type)2
timezoneTimezone awareness for timestampsfalse
nullableAllow NULL valuesfalse

Write Dispositions

ModeBehaviorUse When
replaceTruncate and reloadFull refresh, small tables
appendInsert new rowsEvent logs, immutable data
mergeUpsert by primary keyIncremental sync, mutable data

Auto-detection: When primary_key or incremental_column is set, write_disposition defaults to merge.


Native Oracle Extraction

Features

  • Auto-detected โ€” Uses native path when drivername contains "oracle"
  • Per-table schema โ€” Grouped YAML format supports different schemas
  • Retry logic โ€” Retries on connection failures (not mid-stream to avoid duplicates)
  • Graceful skip โ€” Missing tables (ORA-00942) logged and skipped
  • Arrow-native โ€” Direct to columnar format, merge-compatible

Configuration

source: sql_database
source_credentials: connections.oracle_prod
tuning:
arraysize: 100000 # Rows per fetch (tune for memory vs speed)
parallelize: true # Parallel table extraction
tables:
HR:
- employees
- departments
FINANCE:
- transactions

Native Salesforce Extraction

Features

  • Any Salesforce object โ€” Standard and custom objects (e.g., Academic_Advising__c)
  • Bulk API 2.0 โ€” 10โ€“40x faster for large tables
  • Auto-bulk threshold โ€” Automatically switch API based on row count
  • Arrow-native โ€” Direct to columnar, merge-compatible
  • Date chunking โ€” Split huge tables into date ranges to reduce memory
  • Timing diagnostics โ€” See exactly where time is spent (API wait vs processing)

API Selection Guide

Table SizeRecommendedConfig
< 100K rowsREST(default)
100K โ€“ 1M rowsEitherbulk_threshold: 100000
> 1M rowsBulk APIbulk: true or bulk_threshold: 1000000
> 10M rowsBulk + chunkingbulk: true + chunk_days: 30

Basic Usage

pipeline_name: salesforce
dataset: SALESFORCE_DATA
source: salesforce
destination: snowflake
tables:
- Account
- Contact
- Custom_Object__c # Custom objects work!

Incremental Sync (Recommended for Production)

pipeline_name: salesforce
dataset: SALESFORCE_DATA
source: salesforce
destination: snowflake
defaults:
primary_key: Id
incremental_column: SystemModstamp
tuning:
bulk_threshold: 1000000 # Auto-bulk for large tables
include_deleted: true # Track soft deletes
tables:
- Account
- Contact
- Opportunity
- name: et4ae5__IndividualEmailResult__c
bulk: true # Force bulk for known large table

How incremental works:

  1. First run: Full extract of all records
  2. Subsequent runs: Only fetches WHERE SystemModstamp > last_loaded_value
  3. Merge: Upserts by Id, updating existing and inserting new records

Key fields:

  • primary_key: Id โ€” Salesforce record ID (always use this)
  • incremental_column: SystemModstamp โ€” Updated on any change (preferred over LastModifiedDate)
  • include_deleted: true โ€” Captures soft deletes so the IsDeleted flag is synced

Bulk API with Date Chunking

For very large tables (10M+ rows), split into date ranges to reduce memory:

tuning:
bulk: true
chunk_days: 30 # Query 30 days at a time
tables:
- name: ActivityHistory__c
chunk_field: CreatedDate # Optional: override chunk field

Timing Diagnostics

The Bulk API path logs detailed timing:

Contact: 117 chunks, wait_sf=1341.6s, parse=39.9s, yield=62.2s
  • wait_sf โ€” Time waiting for Salesforce to deliver data (network/API bottleneck)
  • parse โ€” Time parsing CSV to Arrow
  • yield โ€” Time for dlt to process batches

If wait_sf dominates, the bottleneck is Salesforce. If yield dominates, increase batch_size or reduce normalize.workers.

Case-Preserving DDL (Informatica Compatibility)

tuning:
max_text_length: 4000
text_length_multiplier: 4
preserve_case: true
options:
normalize:
naming: sql_cs_v1

Produces:

CREATE TABLE "Account" (
"Id" VARCHAR(72),
"Name" VARCHAR(1020),
"_dlt_load_id" VARCHAR,
"_dlt_id" VARCHAR
);

Memory Management

Symptoms of Memory Issues

  • Container killed (OOMKilled)
  • "Cannot allocate memory" errors
  • Pipeline hangs during normalize phase

Solutions

1. Reduce batch size:

batch_size: 1 # Process one table at a time

2. Reduce workers:

options:
extract:
workers: 1
normalize:
workers: 1
load:
workers: 1

3. Disable parallelization:

tuning:
parallelize: false

4. Use date chunking for huge tables:

tuning:
chunk_days: 30 # Frees memory between chunks

5. Lower Oracle fetch size:

tuning:
arraysize: 10000 # Down from default 100000

Memory Budget Guide

Container MemoryRecommended Config
4GBbatch_size: 1, all workers: 1, parallelize: false
8GBbatch_size: 2, workers: 2, parallelize: false
16GBbatch_size: 4, workers: 4, parallelize: true

Troubleshooting

"No data loaded" / Empty tables

  • Check source credentials and permissions
  • Verify table/object names (case-sensitive for custom objects)
  • Check for IsDeleted = true filter if include_deleted: false

Slow Salesforce extraction

  • Check timing logs: if wait_sf is high, Salesforce API is the bottleneck
  • Reduce number of fields if possible
  • Use Bulk API for tables > 100K rows
  • Use date chunking for tables > 10M rows

Merge not working / No _dlt_id column

  • Ensure primary_key and incremental_column are set
  • The pipeline auto-enables add_dlt_id for Arrow tables
  • Check that write_disposition resolved to merge (logged at startup)

OOM / Memory errors

See Memory Management section.

Oracle connection timeouts

The pipeline sets reasonable defaults (tcp_connect_timeout: 30s, call_timeout: 600000ms). Override in tuning if needed:

tuning:
tcp_connect_timeout: 60
call_timeout: 1200000

Best Practices

For First-Time Users

  1. Start small โ€” Test with one small table first
  2. Use incremental โ€” Set primary_key and incremental_column for production
  3. Monitor memory โ€” Start with conservative settings, increase gradually
  4. Check timing logs โ€” Understand where time is spent before optimizing

For Production

  1. Always use incremental for mutable data (not append-only logs)
  2. Set include_deleted: true for accurate state sync
  3. Use bulk_threshold instead of bulk: true for mixed table sizes
  4. Set batch_size based on container memory
  5. Use named connections for environment separation

Performance Optimization

  1. Salesforce: Use Bulk API for anything > 100K rows
  2. Oracle: Increase arraysize if memory allows
  3. General: Increase workers if CPU is underutilized
  4. Snowflake: Use a larger warehouse for faster COPY (scale up during load)