Venice P2P Transfer Bootstrapping Architecture
High-Performance Peer-to-Peer Data Bootstrap
Accelerating Venice Node Deployment Through Direct Peer-to-Peer RocksDB Snapshot Transfer
π¨ Problem & Solution
The Problem
Bootstrap Bottlenecks
- Kafka-Only Recovery: All nodes bootstrap exclusively from Kafka brokers
- Resource Intensive: Time-consuming process during cluster recovery, inefficient for consuming messages from the PubSub system under high-update workloads
- Scalability Limits: Broker capacity becomes recovery bottleneck
Real-World Impact
- Extended MTTR (Mean Time to Restore or Repair) during outages
- Cascading broker overload
- Slower cluster expansion
- Increased operational overhead
π‘ The Solution
Direct P2P Transfer
- Peer-to-Peer: Direct node-to-node data transfer
- RocksDB Snapshots: Consistent point-in-time data copies
- Intelligent Fallback: Automatic Kafka Ingestion recovery on failure
- Low Risk: Low Deployment Risk in DaVinci client
Key Benefits
- Faster node recovery and scaling
- Reduced Kafka broker load
ποΈ System Architecture Flow
Venice Blob Transfer Complete Flow
Venice Blob Transfer Complete Flow
Step 1: Peer Discovery
βββββββββββββββ Discovery Request βββββββββββββββ Query Helix/ZK βββββββββββββββ
β Client Node β ββββββββββββββββββββ>β Discovery β ββββββββββββββββββ> β Metadata β
β (Needs Data)β β Service β β Repository β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
^ β β
β β Return Host List β
β βββββββββββββββ β
ββββββββββββββββββββββββββββββββ Host List: β <βββββββββββββββββββββββββββ
β [host1, β
β host2, β
β host3...] β
βββββββββββββββ
Step 2: Sequential Host Attempts
βββββββββββββββ T1:Try Host 1 βββββββββββββββ
β Client Node β ββββββββββββββ> β Server A β
β β <ββ FAIL ββββββ β (No Data/ β
βββββββββββββββ β Busy) β
βββββββββββββββ
T2:Try Host 2 βββββββββββββββ
ββββββββββββββ> β Server B β
<ββ FAIL ββββββ β Table Formatβ
β not Match β
βββββββββββββββ
T3:Try Host 3 βββββββββββββββ
ββββββββββββββ> β Server C β
β (Not busy, β
βFormat Match)β
βββββββββββββββ
Step 3: Start Transfer
ββββββββββββββββββ
β Client Node β
β β
β Receives: β <ββ 1.1 File: file1.sst ββββββββββ βββββββββββββββ
β 1. Files β <ββ 1.2 File: file2.sst ββββββββββ β Server C β
β β <ββ 1.3 File: MANIFEST_00 ββββββββ β β
β 2. Metadata β <ββ 2. Metadata ββββββββββββββββββ βββββββββββββββ
β 3. COMPLETE β <ββ 3. COMPLETE Flag βββββββββββββ
ββββββββββββββββββ
Step 4: Client Processing after recevie COMPLETE flag
βββββββββββββββ Validate Files βββββββββββββββ Atomic Rename βββββββββββββββ
β Client Node β ββββββββββββββββββ> β Temp Folder β βββββββββββββββββ> β Partition β
β β (MD5) β/store/temp_ β (tempβpartition) β Folder β
βββββββββββββββ | p1/ | β /store/p1/ β
βββββββββββββββ βββββββββββββββ
Step 5: Kafka Ingestion Fallback (Always Happens)
βββββββββββββββ Resume/Fill Gap βββββββββββββββ
β Client Node β ββββββββββββββββββ> β Kafka β
β β From snapshot β Ingestion β
β β offset to latest β β
βββββββββββββββ βββββββββββββββ
Step 1: Peer Discovery
- Venice Server: Query Helix CustomizedViewRepository for COMPLETED nodes
- DaVinci Application: Query DaVinci push job report for ready-to-serve nodes
Step 2: P2P Transfer in Client Side
Connectivity Check ββββ> Peer Selection ββββ> Sequential Request
β β β
Parallel connection Filter & shuffle Try peers until
check with caching connectable hosts success or exhaust
Step 3: Data Transfer
Snapshot Creation ββββ> File Streaming ββββ> Metadata Sync
β β β
Server creates Chunked transfer Offset + Version State
RocksDB snapshot with MD5 validation after files complete
Step 4: Completion
- Success Path: After validating all files, atomically rename the temp directory to the partition directory, then initiate Kafka ingestion to synchronize any remaining offset gap.
- Fallback Path: If any error occurs, clean up the temp directory and retry with the next peer; if all peers fail, back to Kafka bootstrap from the beginning.
Key Components
- DefaultIngestionBackend - Bootstrap orchestration entry point
- NettyP2PBlobTransferManager - P2P transfer coordinator
- BlobSnapshotManager - Server-side snapshot lifecycle
- NettyFileTransferClient - High-performance client
π₯ Client-Side Process
Process Flow
1. Discover Peers β 2. Check Connectivity β 3. Request Data β 4. Fallback to Kafka
π Step 1: Peer Discovery
Venice Server:
- Query Helix CustomizedViewRepository
- Find COMPLETED nodes
- Filter by store/version/partition
DaVinci Application:
- Query DaVinci push job report
- Find ready-to-serve nodes
- Extract available peer list
π Step 2: Connectivity Checking
Smart Caching Strategy due to large peer sets:
βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ βββββββββββββββ
β Purge Stale β β β Filter Bad β β β Check Hosts in β β β Update Cacheβ
β Records β β Hosts β βParallel Connectivityβ β Results β
βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββ βββββββββββββββ
- Parallel Testing: Multiple host connections simultaneously
- Smart Caching: Remember good/bad hosts with timestamps
- Timeout Management: 1-minute connectivity check limit
π¦ Step 3: Sequential Data Request
For Each Peer (Shuffled List):
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Send Request β β β Receive Data β β β Receive β β β Receive β β β Validate β β β Rename Temp β
β HTTP GET β β File Chunks β β Metadata β β COMPLETE_FLAG β β MD5 β β to Partition Dirβ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
π¨ Error Handling Details in different Scenarios
Connection Failures (VenicePeersConnectionException)
- Network timeout
- SSL handshake failure
- Host unreachable
- Action: Try next peer
File Not Found (VeniceBlobTransferFileNotFoundException)
- Snapshot missing
- Stale snapshot
- Server too busy, reject with 429 errors
- Action: Try next peer
Transfer Errors (Data Integrity Issues)
- Checksum mismatch
- File size mismatch
- Network interruption: Server initiates deployment or shuts down unexpectedly
- Action: Cleanup + next peer
π§Ή Comprehensive Cleanup Process
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Force Flush β β β Close File β β β Delete β β β Reset β
β File Channelβ β Handles β β Partial Dir β β Handler β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
- Zero Partial State: Complete cleanup ensures no corruption
- Handler Reset: Ready for next peer attempt
π Step 4: Kafka Fallback Strategy
Two-Phase Strategy:
- Phase 1 - Blob Transfer: Rapid bulk data transfer via P2P
- Phase 2 - Kafka Fill-Gap: Even if blob transfer fails, deployment continues with Kafka ingestion
Zero-Risk Design: After a successful blob transfer, Kafka ingestion always follows to synchronize any data between the snapshot offset and the latest offset, guaranteeing full deployment.
π€ Server-Side Process
Process Flow
1. Request Reception & Validation β 2. Prepare Snapshot & Metadata β 3. File Transfer & Chunking β 4. Metadata Transfer & Completion
βοΈ Step 1: Request Reception & Validation
Incoming Request Pipeline:
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β HTTP GET β β Parse URI β β Validate β β Check β
β Request β β β Extract β β β Table β β β Concurrency β
β β β Parameters β β Format β β Limits β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
- URI Parsing: /storeName/version/partition/tableFormat
- Format Check: PLAIN_TABLE vs BLOCK_BASED_TABLE match verification
- Concurrency Control: Global limit enforcement; reject with 429 if over limit
π Step 2: Prepare Snapshot & Metadata
Snapshot Lifecycle:
- Check staleness (Configable TTL)
- Verify concurrent user limits
- Create a new snapshot if the existing one is stale or does not exist
- Prepare partition metadata
Metadata Preparation:
- Serialization of StoreVersionState (enables synchronization of hybrid store configuration parameters)
- OffsetRecord encapsulation (captures the snapshotβs offset for accurate state synchronization)
- JSON metadata response
π¦ Step 3: File Transfer & Chunking Strategy (Server/Client Single File Transfer Process Details)
Server-Side Adaptive Chunking Algorithm:
Step 1: File Preparation
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
β Open File for β β Calculate File β β Generate MD5 β β Prepare HTTP β β Send Response β
β Reading β β Length β β Checksum β β Response β β Headers β
β β β β β β β β β β β β β β
β Open file in β β Get file size for β β Generate checksum β β Set content headers β β Send headers to β
β read-only mode β β response headers β β for validation β β and file metadata β β client first β
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
Step 2: Adaptive Chunking Strategy
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
β Calculate Optimal β β Create Chunked β β Wrap for HTTP β
β Chunk Size β β File Handler β β Streaming β
β 16KB - 2MB β β β β β β β
β Determine best chunk β β Set up memory-efficient β β Prepare for HTTP β
β size based on file β β streaming mechanism β β chunked transfer β
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
Step 3: Efficient File Streaming
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
β Stream File Data β β Netty Chunked β β Monitor Transfer β β Complete Transfer β
β in Chunks β β Write Handler β β Progress β β β
β β β β β β β β β β β
β ctx.writeAndFlush() β β Memory-efficient β β Log success/failure β β Ready for next file β
β non-blocking write β β file streaming β β per file β β or metadata β
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
Client-Side High-Performance Reception:
Step 1: File Setup
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
β Parse Headers β β Create Temp Dir β β Create Empty File β
β β β β β β
β β’ Extract filename β β β β’ Make /store/temp_p0/ β β β β’ Create empty file β
β β’ Extract file size β β β’ Delete if exists β β β’ Open FileChannel β
β β’ Extract MD5 hash β β β β in WRITE mode β
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
Step 2: Write Data Chunk for each chunk arrived repeatedly
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
β Receive Network β β Convert to Channel β β Write to File β
β Data Chunk β β β β β
β β β β β’ ByteBuf β Stream β β β β’ transferFrom() call β
β β’ HttpContent arrives β β β’ Stream β Channel β β β
β β’ Extract ByteBuf β β β β β’ Update file position β
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
Step 3: Complete File
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
β Last Chunk Received β β Flush and Close β β Async Validation β
β β β β β β
β β’ Detect end of file β β β β’ Force data to disk β β β β’ Submit MD5 check β
β β’ Validate file size β β β’ Close FileChannel β β β’ Reset handler state β
β β β β β β’ Ready for next file β
βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ βββββββββββββββββββββββββββ
Data Flow Transformation at Convert to Channel
Network β ByteBuf β ByteBufInputStream β ReadableByteChannel β FileChannel
β β β β β
β β β β ββ Disk File
β β β ββ NIO Channel (efficient)
β β ββ Java Stream (bridge)
β ββ Netty Buffer
ββ Raw network packets
π Step 4: Metadata Transfer & Completion Protocol
Critical Ordering for Data Consistency:
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
β 1. Prepare β β 2. Transfer β β 3. Send β β 4. Send β
β RocksDB β β β All Files β β β Metadata ββ β COMPLETE β
β Snapshot β β (with MD5) β β (Offset + β β Signal β
β β β β β SVS) β β β
βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ
Why This Ordering is Critical:
β Wrong Order (Metadata β Files β Complete):
- Client updates offset records immediately
- If file transfer fails, offset state is corrupted
- Need offset/SVS rollback mechanisms
- Increased error handling complexity and risk
β Correct Order (Files β Metadata β Complete):
- Files transferred and validated first
- Metadata only sent after successful file transfer
- Atomic state update on COMPLETE signal
- Less risk consistency guarantee
Metadata Consistency Protocol:
- Metadata: OffsetRecord + StoreVersionState captured before snapshot creation time
- JSON Serialization: Structured metadata transfer with size validation
π Traffic Control
π₯ Global Traffic Shaping
Shared Rate Limiting: Single GlobalChannelTrafficShapingHandler instance controls bandwidth across ALL blob transfer channels globally
Traffic Management Strategy
Global Control Architecture:
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Single β β Monitor β β Enforce β
β Controller β β β All Channelsβ β β Bandwidth β
β Instance β β Globally β β Limits β
βββββββββββββββ βββββββββββββββ βββββββββββββββ
- Write Limit: Global bytes/sec across all channels
- Read Limit: Global bytes/sec across all channels
- Check Interval: 1 second monitoring cycle
Adaptive Throttling System
Dynamic Rate Adjustment:
- Increment/Decrement: 20%
- Range: 20% - 200% of base rate
- Separate read/write throttlers
- Idle threshold handling
ποΈ Configuration & Operations
Configurations
π₯ Receiver/Client Feature Enablement Control
Venice Server:
- Store Level:
blobTransferInServerEnabled
- Application Level:
blob.transfer.receiver.server.policy
DaVinci Application:
- Store Level:
blobTransferEnabled
π€ Sender/Server Feature Enablement Control
All Applications:
- Application Level:
blob.transfer.manager.enabled
Performance Tuning Parameters
πͺ Thresholds
- Offset Lag: Skip blob transfer if not lagged enough
- Snapshot TTL: Maintain snapshot freshness
- Snapshot Cleanup Interval: Maintain disk storage
ποΈ Bandwidth Limits
- Client Read: Client side read bytes per sec
- Service Write: Server write bytes per sec
- Adaptive Range: 20% - 200% of base rate
- Max Concurrent Snapshot User: Control server concurrent serve requests load
β° Timeouts
- Transfer Max: Server side max timeout for transferring files
- Receive Max: Client side max timeout for receiving files
π Summary
Venice Blob Transfer: Key Features & Benefits
π Performance Excellence
- Intelligent peer discovery and selection
- Fast failover with comprehensive error handling
- High-performance Netty streaming architecture
- Efficient file operations and adaptive chunking
π Rock-Solid Reliability
- Consistent RocksDB snapshots with point-in-time guarantees
- Comprehensive error handling and automatic cleanup
- Data integrity validation with MD5 and size checks
- Automatic Kafka fallback for 100% data coverage
π‘οΈ Security & Control
- End-to-end SSL/TLS encryption
- Certificate-based ACL validation
- Global traffic rate limiting and adaptive throttling
- Comprehensive timeout and connection management
π§ Operational Excellence
- Flexible multi-level configuration
- Automated snapshot lifecycle management
- Graceful degradation and low risk deployment
Result: Faster node recovery, reduced infrastructure load, improved cluster scalability with enterprise-grade reliability