Class DualWriteVeniceWriter
- All Implemented Interfaces:
Closeable,AutoCloseable
AbstractVeniceWriter and one or more ExternalStorageWriters (one per
DUAL_WRITE target region) so each batch-push record is written to every external sink first and
then produced to Kafka. The Kafka future is what the caller waits on for at-least-once semantics;
external-sink durability is synchronous from the caller's point of view, so a failed external write throws
before the Kafka produce starts and the Spark task is retried.
When the push targets multiple regions, the same record is fanned out to every regional writer before the Kafka produce. The guarantee is that no Kafka produce happens for a batch until every regional external write for that batch has succeeded: if any regional write fails (after its bounded retry) the exception propagates before the produce and fails the Spark task. Note this is not an all-or-nothing write across the external sinks within a failed attempt — an earlier region in the fan-out may already hold the batch while a later one failed. Consistency is restored by the whole-partition retry: external writes are idempotent on key, so the next attempt overwrites the partially-written region rather than leaving it divergent.
Region fan-out is sequential. Each region's batchPut (including its bounded retries and
backoff) completes before the next region's begins, on the single partition-writer task thread. This keeps
the failure semantics and ordering simple, but the per-batch external-write latency is the sum across
regions. TODO(future iteration): parallelize the per-region fan-out (e.g. a small per-task executor that
issues the regional batchPuts concurrently and then joins, aggregating failures) when cross-region
write latency dominates the push.
The writer buffers up to batchSize consecutive put records and flushes them as a single
ExternalStorageWriter.batchPut(List) (per regional writer) before the corresponding Kafka produces
fire. batchSize = 1 (the default) disables buffering: every record is forwarded immediately as a
one-element batch, matching pre-batching semantics. flush() and close() drain any pending
buffer before doing their own work, so the producer's record ordering is preserved.
update and delete are not supported — batch pushes from clean input never call either.
Both throw UnsupportedOperationException so a stray invocation fails the Spark task loudly rather
than silently leaving the external sink and Venice in divergent states.
-
Field Summary
Fields inherited from class com.linkedin.venice.writer.AbstractVeniceWriter
topicName -
Constructor Summary
ConstructorsConstructorDescriptionDualWriteVeniceWriter(String topicName, AbstractVeniceWriter<byte[], byte[], byte[]> kafkaWriter, ExternalStorageWriter externalWriter, int batchSize) Convenience constructor for callers that don't need the buffered retry policy (e.g.DualWriteVeniceWriter(String topicName, AbstractVeniceWriter<byte[], byte[], byte[]> kafkaWriter, ExternalStorageWriter externalWriter, int batchSize, int batchPutRetries, long batchPutRetryBackoffMs) Single-region convenience overload.DualWriteVeniceWriter(String topicName, AbstractVeniceWriter<byte[], byte[], byte[]> kafkaWriter, List<ExternalStorageWriter> externalWriters, int batchSize, int batchPutRetries, long batchPutRetryBackoffMs) -
Method Summary
Modifier and TypeMethodDescriptionvoidclose()voidclose(boolean gracefulClose) delete(byte[] key, long logicalTimestamp, PubSubProducerCallback callback) delete(byte[] key, PubSubProducerCallback callback) delete(byte[] key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) voidflush()put(byte[] key, byte[] value, int valueSchemaId, long logicalTimestamp, PubSubProducerCallback callback) put(byte[] key, byte[] value, int valueSchemaId, long logicalTimestamp, PubSubProducerCallback callback, PutMetadata putMetadata) put(byte[] key, byte[] value, int valueSchemaId, PubSubProducerCallback callback) put(byte[] key, byte[] value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata) update(byte[] key, byte[] update, int valueSchemaId, int derivedSchemaId, long logicalTimestamp, PubSubProducerCallback callback) update(byte[] key, byte[] update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback) Methods inherited from class com.linkedin.venice.writer.AbstractVeniceWriter
getTopicName, put
-
Constructor Details
-
DualWriteVeniceWriter
public DualWriteVeniceWriter(String topicName, AbstractVeniceWriter<byte[], byte[], byte[]> kafkaWriter, ExternalStorageWriter externalWriter, int batchSize) Convenience constructor for callers that don't need the buffered retry policy (e.g. unit tests that exercise the wrapper's other behaviors).batchPutRetries = 0means one attempt, original throw propagates immediately. -
DualWriteVeniceWriter
public DualWriteVeniceWriter(String topicName, AbstractVeniceWriter<byte[], byte[], byte[]> kafkaWriter, ExternalStorageWriter externalWriter, int batchSize, int batchPutRetries, long batchPutRetryBackoffMs) Single-region convenience overload. -
DualWriteVeniceWriter
public DualWriteVeniceWriter(String topicName, AbstractVeniceWriter<byte[], byte[], byte[]> kafkaWriter, List<ExternalStorageWriter> externalWriters, int batchSize, int batchPutRetries, long batchPutRetryBackoffMs)
-
-
Method Details
-
put
public CompletableFuture<PubSubProduceResult> put(byte[] key, byte[] value, int valueSchemaId, PubSubProducerCallback callback) - Specified by:
putin classAbstractVeniceWriter<byte[],byte[], byte[]>
-
put
public CompletableFuture<PubSubProduceResult> put(byte[] key, byte[] value, int valueSchemaId, long logicalTimestamp, PubSubProducerCallback callback) - Specified by:
putin classAbstractVeniceWriter<byte[],byte[], byte[]>
-
put
public CompletableFuture<PubSubProduceResult> put(byte[] key, byte[] value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata) - Specified by:
putin classAbstractVeniceWriter<byte[],byte[], byte[]>
-
put
public CompletableFuture<PubSubProduceResult> put(byte[] key, byte[] value, int valueSchemaId, long logicalTimestamp, PubSubProducerCallback callback, PutMetadata putMetadata) - Specified by:
putin classAbstractVeniceWriter<byte[],byte[], byte[]>
-
delete
- Specified by:
deletein classAbstractVeniceWriter<byte[],byte[], byte[]>
-
delete
public CompletableFuture<PubSubProduceResult> delete(byte[] key, long logicalTimestamp, PubSubProducerCallback callback) - Specified by:
deletein classAbstractVeniceWriter<byte[],byte[], byte[]>
-
delete
public CompletableFuture<PubSubProduceResult> delete(byte[] key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) - Specified by:
deletein classAbstractVeniceWriter<byte[],byte[], byte[]>
-
update
public Future<PubSubProduceResult> update(byte[] key, byte[] update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback) - Specified by:
updatein classAbstractVeniceWriter<byte[],byte[], byte[]>
-
update
public CompletableFuture<PubSubProduceResult> update(byte[] key, byte[] update, int valueSchemaId, int derivedSchemaId, long logicalTimestamp, PubSubProducerCallback callback) - Specified by:
updatein classAbstractVeniceWriter<byte[],byte[], byte[]>
-
flush
public void flush()- Specified by:
flushin classAbstractVeniceWriter<byte[],byte[], byte[]>
-
close
- Throws:
IOException
-
close
- Specified by:
closein classAbstractVeniceWriter<byte[],byte[], byte[]> - Throws:
IOException
-