Class StoreBufferService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
com.linkedin.davinci.kafka.consumer.StoreBufferService
- All Implemented Interfaces:
Closeable
,AutoCloseable
This class is serving as a
PubSubMessage
buffer with an accompanying pool of drainer threads. The drainers
pull records out of the buffer and delegate the persistence and validation to the appropriate StoreIngestionTask
.
High-level idea:
1. StoreBufferService
will be maintaining a fixed number (configurable) of StoreBufferService.StoreBufferDrainer
pool;
2. For each StoreBufferService.StoreBufferDrainer
, there is a corresponding BlockingQueue
, which will buffer StoreBufferService.QueueNode
;
3. All the records belonging to the same topic+partition will be allocated to the same drainer thread, otherwise DIV will fail;
4. The logic to assign topic+partition to drainer, please check getDrainerIndexForConsumerRecord(PubSubMessage, int)
;
5. There is still a thread executing StoreIngestionTask
for each topic, which will handle admin actions, such
as subscribe, unsubscribe, kill and so on, and also poll consumer records from Kafka and put them into blockingQueueArr
maintained by StoreBufferService
;
For now, the assumption is that one-consumer-polling-thread should be fast enough to catch up with Kafka MM replication,
and data processing is the slowest part. If we find that polling is also slow later on, we may consider to adopt a consumer
thread pool to speed up polling from local Kafka brokers.-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
Field Summary
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
Constructor Summary
ConstructorDescriptionStoreBufferService
(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sorted) -
Method Summary
Modifier and TypeMethodDescriptionvoid
drainBufferedRecordsFromTopicPartition
(PubSubTopicPartition topicPartition) This function is used to drain all the records for the specified topic + partition.execSyncOffsetCommandAsync
(PubSubTopicPartition topicPartition, StoreIngestionTask ingestionTask) protected MemoryBoundBlockingQueue<com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode>
getDrainerForConsumerRecord
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, int partition) protected int
getDrainerIndexForConsumerRecord
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, int partition) long
long
long
long
protected void
internalDrainBufferedRecordsFromTopicPartition
(PubSubTopicPartition topicPartition, int retryNum, int sleepIntervalInMS) void
putConsumerRecord
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, StoreIngestionTask ingestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) boolean
void
-
Constructor Details
-
StoreBufferService
public StoreBufferService(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sorted)
-
-
Method Details
-
getDrainerForConsumerRecord
protected MemoryBoundBlockingQueue<com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode> getDrainerForConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, int partition) -
getDrainerIndexForConsumerRecord
protected int getDrainerIndexForConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, int partition) -
putConsumerRecord
public void putConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, throws InterruptedExceptionLong> consumerRecord, StoreIngestionTask ingestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) - Specified by:
putConsumerRecord
in classAbstractStoreBufferService
- Throws:
InterruptedException
-
drainBufferedRecordsFromTopicPartition
public void drainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition) throws InterruptedException This function is used to drain all the records for the specified topic + partition. The reason is that we don't want overlap Kafka messages between two different subscriptions, which could introduce complicate dependencies inStoreIngestionTask
.- Specified by:
drainBufferedRecordsFromTopicPartition
in classAbstractStoreBufferService
- Parameters:
topicPartition
- for which to drain buffer- Throws:
InterruptedException
-
internalDrainBufferedRecordsFromTopicPartition
protected void internalDrainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition, int retryNum, int sleepIntervalInMS) throws InterruptedException - Throws:
InterruptedException
-
execSyncOffsetCommandAsync
public CompletableFuture<Void> execSyncOffsetCommandAsync(PubSubTopicPartition topicPartition, StoreIngestionTask ingestionTask) throws InterruptedException - Specified by:
execSyncOffsetCommandAsync
in classAbstractStoreBufferService
- Throws:
InterruptedException
-
startInner
public boolean startInner()- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
stopInner
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
Exception
-
getTotalMemoryUsage
public long getTotalMemoryUsage()- Specified by:
getTotalMemoryUsage
in classAbstractStoreBufferService
-
getTotalRemainingMemory
public long getTotalRemainingMemory()- Specified by:
getTotalRemainingMemory
in classAbstractStoreBufferService
-
getMaxMemoryUsagePerDrainer
public long getMaxMemoryUsagePerDrainer()- Specified by:
getMaxMemoryUsagePerDrainer
in classAbstractStoreBufferService
-
getMinMemoryUsagePerDrainer
public long getMinMemoryUsagePerDrainer()- Specified by:
getMinMemoryUsagePerDrainer
in classAbstractStoreBufferService
-