Da Vinci Client
This allows you to eagerly load some or all partitions of the dataset and perform queries against the resulting local cache. Future updates to the data continue to be streamed in and applied to the local cache.
Record Transformer (DVRT)
The Record Transformer lets you hook into Da Vinci’s data ingestion process to react to every record change in real-time.
What Does It Do?
- React to changes: Get notified when records are added, updated, or deleted
- Transform data: Modify records as they come in (add fields, filter data, etc.)
- Forward data: Send Venice data to other systems (databases, search indexes, analytics)
Quick Start Guide
Step 1: Implement the Interface
Extend DaVinciRecordTransformer and implement:
transform(key, value, partitionId)
- Transform data before local persistence, returns DaVinciRecordTransformerResult:UNCHANGED
- Keep original valueTRANSFORMED
- Use new transformed valueSKIP
- Drop this record entirely
processPut(key, value, partitionId)
- Handle record updatesprocessDelete(key, partitionId)
- Handle deletions (optional)
Step 2: Configure and Register
Build a DaVinciRecordTransformerConfig and register it:
DaVinciRecordTransformerConfig config = new DaVinciRecordTransformerConfig.Builder()
.setRecordTransformerFunction(YourTransformer::new)
.build();
DaVinciConfig daVinciConfig = new DaVinciConfig();
daVinciConfig.setRecordTransformerConfig(config);
For Custom Constructor Parameters: If you need to pass additional parameters to your transformer’s constructor beyond the default ones provided:
// Your custom parameter
String databasePath = "/my/path";
DaVinciRecordTransformerFunctionalInterface transformerFunction = (
// Venice-provided parameters:
storeName, storeVersion, keySchema, inputValueSchema, outputValueSchema, config) ->
new YourTransformer(
// Venice-provided parameters
storeName, storeVersion, keySchema, inputValueSchema, outputValueSchema, config,
// Your custom parameter
databasePath);
DaVinciRecordTransformerConfig config = new DaVinciRecordTransformerConfig.Builder()
.setRecordTransformerFunction(transformerFunction)
.build();
Key Concepts
Version Management
Venice stores have versions. When new data is pushed, Venice creates a future version while the current version continues serving traffic. Once ready, Venice atomically swaps the future version to become the new current version.
For Record Transformers:
- Each version gets its own transformer instance
- During a push, you’ll have transformers for both current and future versions running in parallel
- Use
onStartVersionIngestion(isCurrentVersion)
to initialize resources - Use
onEndVersionIngestion(currentVersion)
to clean up when a version stops serving
Best Practice: Separate Data by Version
When propagating Venice data to external systems (databases, search indexes, etc.), always separate data from different versions into independent storage locations. Think of it as maintaining one database table per Venice store version.
Why Version Separation Matters:
- Prevents data races: Multiple versions writing to the same table creates race conditions
- Avoids record leaks: Old version data won’t pollute your current dataset
- Enables clean transitions: You can atomically switch to new data once ready
Implementation Strategy:
- Create version-specific storage (e.g.,
user_profiles_v1
,user_profiles_v2
) - Maintain a pointer to current version (database views, atomic pointer, etc.)
- Switch pointer atomically when Venice promotes a new current version
- Clean up old versions once no longer needed
See the example below:
String tableName = getStoreName() + "_v" + getStoreVersion();
String currentVersionViewName = getStoreName() + "_current_version";
@Override
public void onStartVersionIngestion(boolean isCurrentVersion) {
// Initialize resources for this version
if (!externalDB.containsTable(tableName)) {
externalDB.createTable(tableName);
}
// Maintain pointer to current version
if (isCurrentVersion) {
externalDB.createOrReplaceView(currentViewName,
"SELECT * FROM " + tableName);
}
}
@Override
public void onEndVersionIngestion(int currentVersion) {
// Only clean up if this version is no longer serving
if (currentVersion != getStoreVersion()) {
String newCurrentTableName = getStoreName() + "_v" + currentVersion;
// Update view to point to new current version
externalDB.createOrReplaceView(currentViewName,
"SELECT * FROM " + newCurrentTableName);
// Delete old version
externalDB.dropTable(tableName);
}
}
Key Behaviors
- Lazy deserialization: Keys/values are deserialized lazily to avoid unnecessary CPU/memory overhead if you only need to inspect some records or parameters
- Startup replay: Venice replays existing records from disk on startup to rebuild external state
- Compatibility checks: Implementation changes are automatically detected and local state is rebuilt to prevent stale data
Featured Implementations
- BootstrappingVeniceChangelogConsumerDaVinciRecordTransformerImpl:
- The new Venice Change Data Capture (CDC) client was built using the record transformer.
- DuckDBDaVinciRecordTransformer:
- Forwards Venice data to DuckDB, allowing you to query your Venice data via SQL.
Configuration Options
Required:
setRecordTransformerFunction
- Function that creates your transformer instances
Optional:
setKeyClass
: set this if you want to deserialize keys into Avro SpecificRecords.setOutputValueClass
+setOutputValueSchema
: required together when changing value type/schema or using Avro SpecificRecords for values.setStoreRecordsInDaVinci
(default: true): persist into Da Vinci’s local disk.setAlwaysBootstrapFromVersionTopic
(default: false): set this to true ifstoreRecordsInDaVinci
is false, and you’re storing records in memory without being backed by disk.setSkipCompatibilityChecks
(default: false): consider true when returningUNCHANGED
duringtransform
or during frequent changes to the interface without modifying the transform logic.