Class StoreBufferService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
com.linkedin.davinci.kafka.consumer.StoreBufferService
- All Implemented Interfaces:
- Closeable,- AutoCloseable
This class is serving as a 
PubSubMessage buffer with an accompanying pool of drainer threads. The drainers
 pull records out of the buffer and delegate the persistence and validation to the appropriate StoreIngestionTask.
 High-level idea:
 1. StoreBufferService will be maintaining a fixed number (configurable) of StoreBufferService.StoreBufferDrainer pool;
 2. For each StoreBufferService.StoreBufferDrainer, there is a corresponding BlockingQueue, which will buffer StoreBufferService.QueueNode;
 3. All the records belonging to the same topic+partition will be allocated to the same drainer thread, otherwise DIV will fail;
 4. The logic to assign topic+partition to drainer, please check #getDrainerIndexForConsumerRecord(PubSubMessage, int);
 5. There is still a thread executing StoreIngestionTask for each topic, which will handle admin actions, such
 as subscribe, unsubscribe, kill and so on, and also poll consumer records from Kafka and put them into blockingQueueArr
 maintained by StoreBufferService;
 For now, the assumption is that one-consumer-polling-thread should be fast enough to catch up with Kafka MM replication,
 and data processing is the slowest part. If we find that polling is also slow later on, we may consider to adopt a consumer
 thread pool to speed up polling from local Kafka brokers.- 
Nested Class SummaryNested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceServiceAbstractVeniceService.ServiceState
- 
Field SummaryFields inherited from class com.linkedin.venice.service.AbstractVeniceServicelogger, serviceState
- 
Constructor SummaryConstructorsConstructorDescriptionStoreBufferService(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, LogContext logContext, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sorted) 
- 
Method SummaryModifier and TypeMethodDescriptionvoiddrainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition) This function is used to drain all the records for the specified topic + partition.execSyncOffsetCommandAsync(PubSubTopicPartition topicPartition, StoreIngestionTask ingestionTask) voidexecSyncOffsetFromSnapshotAsync(PubSubTopicPartition topicPartition, PartitionTracker vtDivSnapshot, StoreIngestionTask ingestionTask) protected MemoryBoundBlockingQueue<com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode>getDrainerForConsumerRecord(DefaultPubSubMessage consumerRecord, int partition) protected intgetDrainerIndexForConsumerRecord(DefaultPubSubMessage consumerRecord, int partition) longlonglonglongprotected voidinternalDrainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition, int retryNum, int sleepIntervalInMS) voidputConsumerRecord(DefaultPubSubMessage consumerRecord, StoreIngestionTask ingestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) booleanvoid
- 
Constructor Details- 
StoreBufferServicepublic StoreBufferService(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, LogContext logContext, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sorted) 
 
- 
- 
Method Details- 
getDrainerForConsumerRecordprotected MemoryBoundBlockingQueue<com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode> getDrainerForConsumerRecord(DefaultPubSubMessage consumerRecord, int partition) 
- 
getDrainerIndexForConsumerRecord
- 
putConsumerRecordpublic void putConsumerRecord(DefaultPubSubMessage consumerRecord, StoreIngestionTask ingestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) throws InterruptedException - Specified by:
- putConsumerRecordin class- AbstractStoreBufferService
- Throws:
- InterruptedException
 
- 
drainBufferedRecordsFromTopicPartitionpublic void drainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition) throws InterruptedException This function is used to drain all the records for the specified topic + partition. The reason is that we don't want overlap Kafka messages between two different subscriptions, which could introduce complicate dependencies inStoreIngestionTask.- Specified by:
- drainBufferedRecordsFromTopicPartitionin class- AbstractStoreBufferService
- Parameters:
- topicPartition- for which to drain buffer
- Throws:
- InterruptedException
 
- 
internalDrainBufferedRecordsFromTopicPartitionprotected void internalDrainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition, int retryNum, int sleepIntervalInMS) throws InterruptedException - Throws:
- InterruptedException
 
- 
execSyncOffsetCommandAsyncpublic CompletableFuture<Void> execSyncOffsetCommandAsync(PubSubTopicPartition topicPartition, StoreIngestionTask ingestionTask) throws InterruptedException - Specified by:
- execSyncOffsetCommandAsyncin class- AbstractStoreBufferService
- Throws:
- InterruptedException
 
- 
execSyncOffsetFromSnapshotAsyncpublic void execSyncOffsetFromSnapshotAsync(PubSubTopicPartition topicPartition, PartitionTracker vtDivSnapshot, StoreIngestionTask ingestionTask) throws InterruptedException - Specified by:
- execSyncOffsetFromSnapshotAsyncin class- AbstractStoreBufferService
- Throws:
- InterruptedException
 
- 
startInnerpublic boolean startInner()- Specified by:
- startInnerin class- AbstractVeniceService
- Returns:
- true if the service is completely started,
         false if it is still starting asynchronously (in this case, it is the implementer's
         responsibility to set AbstractVeniceService.serviceStatetoAbstractVeniceService.ServiceState.STARTEDupon completion of the async work).
 
- 
stopInner- Specified by:
- stopInnerin class- AbstractVeniceService
- Throws:
- Exception
 
- 
getTotalMemoryUsagepublic long getTotalMemoryUsage()- Specified by:
- getTotalMemoryUsagein class- AbstractStoreBufferService
 
- 
getTotalRemainingMemorypublic long getTotalRemainingMemory()- Specified by:
- getTotalRemainingMemoryin class- AbstractStoreBufferService
 
- 
getMaxMemoryUsagePerDrainerpublic long getMaxMemoryUsagePerDrainer()- Specified by:
- getMaxMemoryUsagePerDrainerin class- AbstractStoreBufferService
 
- 
getMinMemoryUsagePerDrainerpublic long getMinMemoryUsagePerDrainer()- Specified by:
- getMinMemoryUsagePerDrainerin class- AbstractStoreBufferService
 
 
-