Package com.linkedin.venice.producer
Class PartitionedProducerExecutor
java.lang.Object
com.linkedin.venice.producer.PartitionedProducerExecutor
Executor for partition-based parallel processing in Venice Producer.
This class enables partition-based workers that eliminate head-of-line blocking while maintaining per-key ordering. The key insight is that ordering only matters within the same partition (same key maps to same partition), so different partitions can run in parallel.
Key design principles:
- Each worker is a single-threaded executor handling a subset of partitions
- Tasks for the same partition always go to the same worker, preserving order
- Different partitions can be processed in parallel by different workers
- Blocking policy provides backpressure when queues are full (caller blocks until space available)
Execution flow:
asyncPut(key) -> partition = hash(key) % numWorkers
|
Worker[partition % W]
1. preprocess(key,val) <- Same thread does both
2. veniceWriter.put() <- Non-blocking
|
Kafka callback -> complete userFuture
Execution modes (both pools optional):
- workerCount=0, callbackThreadCount=0: Fully inline (preprocess + dispatch on caller thread, callback on Kafka thread)
- workerCount=0, callbackThreadCount>0: Inline preprocessing, callback on dedicated threads
- workerCount>0, callbackThreadCount=0: Default - parallel workers, callback on Kafka thread
- workerCount>0, callbackThreadCount>0: Full async - parallel workers + callback isolation
-
Constructor Summary
ConstructorsConstructorDescriptionPartitionedProducerExecutor(int workerCount, int workerQueueCapacity, int callbackThreadCount, int callbackQueueCapacity, String storeName, io.tehuti.metrics.MetricsRepository metricsRepository) Creates a new partitioned producer executor. -
Method Summary
Modifier and TypeMethodDescriptionbooleanawaitTermination(long timeout, TimeUnit unit) Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.voidexecuteCallback(Runnable callback) Execute callback (for user future completion).intGet callback executor queue depth.intGet total queue depth across all workers.intintgetWorkerQueueSize(int workerIndex) Get queue depth for specific worker (for metrics).booleanbooleanvoidshutdown()Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.voidAttempts to stop all actively executing tasks and halts the processing of waiting tasks.voidSubmit work for a specific partition.
-
Constructor Details
-
PartitionedProducerExecutor
public PartitionedProducerExecutor(int workerCount, int workerQueueCapacity, int callbackThreadCount, int callbackQueueCapacity, String storeName, io.tehuti.metrics.MetricsRepository metricsRepository) Creates a new partitioned producer executor.- Parameters:
workerCount- number of partition workers (0 to disable and execute inline)workerQueueCapacity- queue capacity per worker for backpressurecallbackThreadCount- number of callback threads (0 to disable and run on Kafka thread)callbackQueueCapacity- queue capacity for callback executorstoreName- store name for naming threads and metricsmetricsRepository- metrics repository for registering thread pool stats (may be null)
-
-
Method Details
-
submit
Submit work for a specific partition. If workers enabled: routes to worker[partition % workerCount]. If workers disabled: executes inline on caller thread.- Parameters:
partition- the partition number used for routing to the appropriate workertask- the work to execute (preprocessing + dispatch)
-
executeCallback
Execute callback (for user future completion). If callback executor enabled: hands off to callback pool. If callback executor disabled: runs inline on caller (Kafka) thread.- Parameters:
callback- the callback to execute
-
isWorkersEnabled
public boolean isWorkersEnabled()- Returns:
- whether worker threads are enabled (workerCount > 0)
-
isCallbackExecutorEnabled
public boolean isCallbackExecutorEnabled()- Returns:
- whether callback executor is enabled (callbackThreadCount > 0)
-
getWorkerQueueSize
public int getWorkerQueueSize(int workerIndex) Get queue depth for specific worker (for metrics).- Parameters:
workerIndex- the worker index- Returns:
- queue size for the specified worker, or 0 if workers disabled
-
getTotalWorkerQueueSize
public int getTotalWorkerQueueSize()Get total queue depth across all workers.- Returns:
- sum of all worker queue depths, or 0 if workers disabled
-
getCallbackQueueSize
public int getCallbackQueueSize()Get callback executor queue depth.- Returns:
- callback queue size, or 0 if callback executor disabled
-
getWorkerCount
public int getWorkerCount()- Returns:
- number of workers, or 0 if workers disabled
-
shutdown
public void shutdown()Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted. -
shutdownNow
public void shutdownNow()Attempts to stop all actively executing tasks and halts the processing of waiting tasks. This method should be called aftershutdown()andawaitTermination(long, TimeUnit)if tasks did not complete within the timeout. -
awaitTermination
Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.- Parameters:
timeout- the maximum time to waitunit- the time unit of the timeout argument- Returns:
- true if all executors terminated, false if timeout elapsed
- Throws:
InterruptedException- if interrupted while waiting
-