Class PartitionedProducerExecutor

java.lang.Object
com.linkedin.venice.producer.PartitionedProducerExecutor

public class PartitionedProducerExecutor extends Object
Executor for partition-based parallel processing in Venice Producer.

This class enables partition-based workers that eliminate head-of-line blocking while maintaining per-key ordering. The key insight is that ordering only matters within the same partition (same key maps to same partition), so different partitions can run in parallel.

Key design principles:

  • Each worker is a single-threaded executor handling a subset of partitions
  • Tasks for the same partition always go to the same worker, preserving order
  • Different partitions can be processed in parallel by different workers
  • Blocking policy provides backpressure when queues are full (caller blocks until space available)

Execution flow:

 asyncPut(key) -> partition = hash(key) % numWorkers
                          |
              Worker[partition % W]
                1. preprocess(key,val)  <- Same thread does both
                2. veniceWriter.put()   <- Non-blocking
                          |
              Kafka callback -> complete userFuture
 

Execution modes (both pools optional):

  • workerCount=0, callbackThreadCount=0: Fully inline (preprocess + dispatch on caller thread, callback on Kafka thread)
  • workerCount=0, callbackThreadCount>0: Inline preprocessing, callback on dedicated threads
  • workerCount>0, callbackThreadCount=0: Default - parallel workers, callback on Kafka thread
  • workerCount>0, callbackThreadCount>0: Full async - parallel workers + callback isolation
  • Constructor Details

    • PartitionedProducerExecutor

      public PartitionedProducerExecutor(int workerCount, int workerQueueCapacity, int callbackThreadCount, int callbackQueueCapacity, String storeName, io.tehuti.metrics.MetricsRepository metricsRepository)
      Creates a new partitioned producer executor.
      Parameters:
      workerCount - number of partition workers (0 to disable and execute inline)
      workerQueueCapacity - queue capacity per worker for backpressure
      callbackThreadCount - number of callback threads (0 to disable and run on Kafka thread)
      callbackQueueCapacity - queue capacity for callback executor
      storeName - store name for naming threads and metrics
      metricsRepository - metrics repository for registering thread pool stats (may be null)
  • Method Details

    • submit

      public void submit(int partition, Runnable task)
      Submit work for a specific partition. If workers enabled: routes to worker[partition % workerCount]. If workers disabled: executes inline on caller thread.
      Parameters:
      partition - the partition number used for routing to the appropriate worker
      task - the work to execute (preprocessing + dispatch)
    • executeCallback

      public void executeCallback(Runnable callback)
      Execute callback (for user future completion). If callback executor enabled: hands off to callback pool. If callback executor disabled: runs inline on caller (Kafka) thread.
      Parameters:
      callback - the callback to execute
    • isWorkersEnabled

      public boolean isWorkersEnabled()
      Returns:
      whether worker threads are enabled (workerCount > 0)
    • isCallbackExecutorEnabled

      public boolean isCallbackExecutorEnabled()
      Returns:
      whether callback executor is enabled (callbackThreadCount > 0)
    • getWorkerQueueSize

      public int getWorkerQueueSize(int workerIndex)
      Get queue depth for specific worker (for metrics).
      Parameters:
      workerIndex - the worker index
      Returns:
      queue size for the specified worker, or 0 if workers disabled
    • getTotalWorkerQueueSize

      public int getTotalWorkerQueueSize()
      Get total queue depth across all workers.
      Returns:
      sum of all worker queue depths, or 0 if workers disabled
    • getCallbackQueueSize

      public int getCallbackQueueSize()
      Get callback executor queue depth.
      Returns:
      callback queue size, or 0 if callback executor disabled
    • getWorkerCount

      public int getWorkerCount()
      Returns:
      number of workers, or 0 if workers disabled
    • shutdown

      public void shutdown()
      Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.
    • shutdownNow

      public void shutdownNow()
      Attempts to stop all actively executing tasks and halts the processing of waiting tasks. This method should be called after shutdown() and awaitTermination(long, TimeUnit) if tasks did not complete within the timeout.
    • awaitTermination

      public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
      Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.
      Parameters:
      timeout - the maximum time to wait
      unit - the time unit of the timeout argument
      Returns:
      true if all executors terminated, false if timeout elapsed
      Throws:
      InterruptedException - if interrupted while waiting