Class StoreBufferService
- java.lang.Object
-
- com.linkedin.venice.service.AbstractVeniceService
-
- com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
-
- com.linkedin.davinci.kafka.consumer.StoreBufferService
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
public class StoreBufferService extends AbstractStoreBufferService
This class is serving as aPubSubMessage
buffer with an accompanying pool of drainer threads. The drainers pull records out of the buffer and delegate the persistence and validation to the appropriateStoreIngestionTask
. High-level idea: 1.StoreBufferService
will be maintaining a fixed number (configurable) ofStoreBufferService.StoreBufferDrainer
pool; 2. For eachStoreBufferService.StoreBufferDrainer
, there is a correspondingBlockingQueue
, which will bufferStoreBufferService.QueueNode
; 3. All the records belonging to the same topic+partition will be allocated to the same drainer thread, otherwise DIV will fail; 4. The logic to assign topic+partition to drainer, please checkgetDrainerIndexForConsumerRecord(PubSubMessage, int)
; 5. There is still a thread executingStoreIngestionTask
for each topic, which will handle admin actions, such as subscribe, unsubscribe, kill and so on, and also poll consumer records from Kafka and put them intoblockingQueueArr
maintained byStoreBufferService
; For now, the assumption is that one-consumer-polling-thread should be fast enough to catch up with Kafka MM replication, and data processing is the slowest part. If we find that polling is also slow later on, we may consider to adopt a consumer thread pool to speed up polling from local Kafka brokers.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
-
Field Summary
-
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
-
Constructor Summary
Constructors Constructor Description StoreBufferService(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, StoreBufferServiceStats stats)
Constructor for testingStoreBufferService(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sorted)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
drainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition)
This function is used to drain all the records for the specified topic + partition.java.util.concurrent.CompletableFuture<java.lang.Void>
execSyncOffsetCommandAsync(PubSubTopicPartition topicPartition, StoreIngestionTask ingestionTask)
protected MemoryBoundBlockingQueue<com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode>
getDrainerForConsumerRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, int partition)
protected int
getDrainerIndexForConsumerRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, int partition)
long
getMaxMemoryUsagePerDrainer()
long
getMinMemoryUsagePerDrainer()
long
getTotalMemoryUsage()
long
getTotalRemainingMemory()
protected void
internalDrainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition, int retryNum, int sleepIntervalInMS)
void
putConsumerRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, StoreIngestionTask ingestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs)
boolean
startInner()
void
stopInner()
-
-
-
Constructor Detail
-
StoreBufferService
public StoreBufferService(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sorted)
-
StoreBufferService
public StoreBufferService(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, StoreBufferServiceStats stats)
Constructor for testing
-
-
Method Detail
-
getDrainerForConsumerRecord
protected MemoryBoundBlockingQueue<com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode> getDrainerForConsumerRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, int partition)
-
getDrainerIndexForConsumerRecord
protected int getDrainerIndexForConsumerRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, int partition)
-
putConsumerRecord
public void putConsumerRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, StoreIngestionTask ingestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs) throws java.lang.InterruptedException
- Specified by:
putConsumerRecord
in classAbstractStoreBufferService
- Throws:
java.lang.InterruptedException
-
drainBufferedRecordsFromTopicPartition
public void drainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition) throws java.lang.InterruptedException
This function is used to drain all the records for the specified topic + partition. The reason is that we don't want overlap Kafka messages between two different subscriptions, which could introduce complicate dependencies inStoreIngestionTask
.- Specified by:
drainBufferedRecordsFromTopicPartition
in classAbstractStoreBufferService
- Parameters:
topicPartition
- for which to drain buffer- Throws:
java.lang.InterruptedException
-
internalDrainBufferedRecordsFromTopicPartition
protected void internalDrainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition, int retryNum, int sleepIntervalInMS) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
execSyncOffsetCommandAsync
public java.util.concurrent.CompletableFuture<java.lang.Void> execSyncOffsetCommandAsync(PubSubTopicPartition topicPartition, StoreIngestionTask ingestionTask) throws java.lang.InterruptedException
- Specified by:
execSyncOffsetCommandAsync
in classAbstractStoreBufferService
- Throws:
java.lang.InterruptedException
-
startInner
public boolean startInner()
- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
stopInner
public void stopInner() throws java.lang.Exception
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
java.lang.Exception
-
getTotalMemoryUsage
public long getTotalMemoryUsage()
- Specified by:
getTotalMemoryUsage
in classAbstractStoreBufferService
-
getTotalRemainingMemory
public long getTotalRemainingMemory()
- Specified by:
getTotalRemainingMemory
in classAbstractStoreBufferService
-
getMaxMemoryUsagePerDrainer
public long getMaxMemoryUsagePerDrainer()
- Specified by:
getMaxMemoryUsagePerDrainer
in classAbstractStoreBufferService
-
getMinMemoryUsagePerDrainer
public long getMinMemoryUsagePerDrainer()
- Specified by:
getMinMemoryUsagePerDrainer
in classAbstractStoreBufferService
-
-