Class StoreBufferService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.AbstractStoreBufferService
com.linkedin.davinci.kafka.consumer.StoreBufferService
- All Implemented Interfaces:
Closeable,AutoCloseable
This class is serving as a
PubSubMessage buffer with an accompanying pool of drainer threads. The drainers
pull records out of the buffer and delegate the persistence and validation to the appropriate StoreIngestionTask.
High-level idea:
1. StoreBufferService will be maintaining a fixed number (configurable) of StoreBufferService.StoreBufferDrainer pool;
2. For each StoreBufferService.StoreBufferDrainer, there is a corresponding BlockingQueue, which will buffer StoreBufferService.QueueNode;
3. All the records belonging to the same topic+partition will be allocated to the same drainer thread, otherwise DIV will fail;
4. The logic to assign topic+partition to drainer, please check #getDrainerIndexForConsumerRecord(PubSubMessage, int);
5. There is still a thread executing StoreIngestionTask for each topic, which will handle admin actions, such
as subscribe, unsubscribe, kill and so on, and also poll consumer records from Kafka and put them into blockingQueueArr
maintained by StoreBufferService;
For now, the assumption is that one-consumer-polling-thread should be fast enough to catch up with Kafka MM replication,
and data processing is the slowest part. If we find that polling is also slow later on, we may consider to adopt a consumer
thread pool to speed up polling from local Kafka brokers.-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState -
Field Summary
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState -
Constructor Summary
ConstructorsConstructorDescriptionStoreBufferService(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, LogContext logContext, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sorted) -
Method Summary
Modifier and TypeMethodDescriptionvoiddrainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition) This function is used to drain all the records for the specified topic + partition.execSyncOffsetCommandAsync(PubSubTopicPartition topicPartition, StoreIngestionTask ingestionTask) voidexecSyncOffsetFromSnapshotAsync(PubSubTopicPartition topicPartition, PartitionTracker vtDivSnapshot, StoreIngestionTask ingestionTask) protected MemoryBoundBlockingQueue<com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode>getDrainerForConsumerRecord(DefaultPubSubMessage consumerRecord, int partition) protected intgetDrainerIndexForConsumerRecord(DefaultPubSubMessage consumerRecord, int partition) longlonglonglongprotected voidinternalDrainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition, int retryNum, int sleepIntervalInMS) voidputConsumerRecord(DefaultPubSubMessage consumerRecord, StoreIngestionTask ingestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) booleanvoid
-
Constructor Details
-
StoreBufferService
public StoreBufferService(int drainerNum, long bufferCapacityPerDrainer, long bufferNotifyDelta, boolean queueLeaderWrites, LogContext logContext, io.tehuti.metrics.MetricsRepository metricsRepository, boolean sorted)
-
-
Method Details
-
getDrainerForConsumerRecord
protected MemoryBoundBlockingQueue<com.linkedin.davinci.kafka.consumer.StoreBufferService.QueueNode> getDrainerForConsumerRecord(DefaultPubSubMessage consumerRecord, int partition) -
getDrainerIndexForConsumerRecord
-
putConsumerRecord
public void putConsumerRecord(DefaultPubSubMessage consumerRecord, StoreIngestionTask ingestionTask, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) throws InterruptedException - Specified by:
putConsumerRecordin classAbstractStoreBufferService- Throws:
InterruptedException
-
drainBufferedRecordsFromTopicPartition
public void drainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition) throws InterruptedException This function is used to drain all the records for the specified topic + partition. The reason is that we don't want overlap Kafka messages between two different subscriptions, which could introduce complicate dependencies inStoreIngestionTask.- Specified by:
drainBufferedRecordsFromTopicPartitionin classAbstractStoreBufferService- Parameters:
topicPartition- for which to drain buffer- Throws:
InterruptedException
-
internalDrainBufferedRecordsFromTopicPartition
protected void internalDrainBufferedRecordsFromTopicPartition(PubSubTopicPartition topicPartition, int retryNum, int sleepIntervalInMS) throws InterruptedException - Throws:
InterruptedException
-
execSyncOffsetCommandAsync
public CompletableFuture<Void> execSyncOffsetCommandAsync(PubSubTopicPartition topicPartition, StoreIngestionTask ingestionTask) throws InterruptedException - Specified by:
execSyncOffsetCommandAsyncin classAbstractStoreBufferService- Throws:
InterruptedException
-
execSyncOffsetFromSnapshotAsync
public void execSyncOffsetFromSnapshotAsync(PubSubTopicPartition topicPartition, PartitionTracker vtDivSnapshot, StoreIngestionTask ingestionTask) throws InterruptedException - Specified by:
execSyncOffsetFromSnapshotAsyncin classAbstractStoreBufferService- Throws:
InterruptedException
-
startInner
public boolean startInner()- Specified by:
startInnerin classAbstractVeniceService- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceStatetoAbstractVeniceService.ServiceState.STARTEDupon completion of the async work).
-
stopInner
- Specified by:
stopInnerin classAbstractVeniceService- Throws:
Exception
-
getTotalMemoryUsage
public long getTotalMemoryUsage()- Specified by:
getTotalMemoryUsagein classAbstractStoreBufferService
-
getTotalRemainingMemory
public long getTotalRemainingMemory()- Specified by:
getTotalRemainingMemoryin classAbstractStoreBufferService
-
getMaxMemoryUsagePerDrainer
public long getMaxMemoryUsagePerDrainer()- Specified by:
getMaxMemoryUsagePerDrainerin classAbstractStoreBufferService
-
getMinMemoryUsagePerDrainer
public long getMinMemoryUsagePerDrainer()- Specified by:
getMinMemoryUsagePerDrainerin classAbstractStoreBufferService
-