Class StorageUtilizationManager
java.lang.Object
com.linkedin.davinci.kafka.consumer.StorageUtilizationManager
- All Implemented Interfaces:
StoreDataChangedListener
This class has the following responsibilities:
1. Keep track of storage utilization, at least on disk, and potentially also in-memory (when hybrid quota
enforcement is enabled)..
2. Take action (pause/resume) on the associated consumer if storage quota is breached (and enforcement is
enabled).
3. Listen to store config changes related to quota limits and whether enforcement is enabled, and react
accordingly.
4: Report replica status changes if the above actions affect them.
TODO: Consider whether this is tech debt and if we could/should decouple status reporting from this class.
This would allow us to stop passing in the
ingestionNotificationDispatcher
which in turn may allow us
to stop mutating the entries in partitionConsumptionStateMap
(in which case, we could pass
a map where the values are a read-only interface implemented by PartitionConsumptionState
and thus preventing mutations of this state from here).
For a Samza RT job, if a partition exhausts the partition-level quota, we will stall the consumption for
this partition. i.e. stop polling and processing records for this partition until more space is available
(quota gets bumped or db compactions shrink disk usage); if only some partitions violates the quota, the
job will pause these partitions while keep processing the other good partitions.
Assumptions:
1. every partition in a store shares similar size;
2. no write-compute messages/partial updates/incremental push-
Constructor Summary
ConstructorDescriptionStorageUtilizationManager
(AbstractStorageEngine storageEngine, Store store, String versionTopic, int partitionCount, Map<Integer, PartitionConsumptionState> partitionConsumptionStateMap, boolean isHybridQuotaEnabledInServer, boolean isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled, boolean isSeparateRealtimeTopicEnabled, com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher ingestionNotificationDispatcher, TopicPartitionConsumerFunction pausePartition, TopicPartitionConsumerFunction resumePartition) -
Method Summary
Modifier and TypeMethodDescriptionvoid
Enforce partition level quota for the map.void
enforcePartitionQuota
(int partition, long additionalRecordSizeUsed) double
protected long
protected long
void
handleStoreChanged
(Store store) void
handleStoreCreated
(Store store) Do NOT try to acquire the lock of store repository again in the implementation, otherwise a dead lock issue will happen.void
handleStoreDeleted
(String storeName) boolean
void
initPartition
(int partition) protected boolean
isPartitionPausedIngestion
(int partition) protected boolean
void
void
removePartition
(int partition) Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface com.linkedin.venice.meta.StoreDataChangedListener
handleStoreDeleted
-
Constructor Details
-
StorageUtilizationManager
public StorageUtilizationManager(AbstractStorageEngine storageEngine, Store store, String versionTopic, int partitionCount, Map<Integer, PartitionConsumptionState> partitionConsumptionStateMap, boolean isHybridQuotaEnabledInServer, boolean isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled, boolean isSeparateRealtimeTopicEnabled, com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher ingestionNotificationDispatcher, TopicPartitionConsumerFunction pausePartition, TopicPartitionConsumerFunction resumePartition) - Parameters:
store
- a snapshot of theStore
associated with this manager at the time of construction N.B.: It's important not to hang on to a reference of this param since it will become stale when the store config changes. Refreshing the things we need out of the store object is handled inhandleStoreChanged(Store)
.
-
-
Method Details
-
handleStoreCreated
Description copied from interface:StoreDataChangedListener
Do NOT try to acquire the lock of store repository again in the implementation, otherwise a dead lock issue will happen.- Specified by:
handleStoreCreated
in interfaceStoreDataChangedListener
-
handleStoreDeleted
- Specified by:
handleStoreDeleted
in interfaceStoreDataChangedListener
-
handleStoreChanged
- Specified by:
handleStoreChanged
in interfaceStoreDataChangedListener
-
initPartition
public void initPartition(int partition) -
removePartition
public void removePartition(int partition) -
checkAllPartitionsQuota
public void checkAllPartitionsQuota()Enforce partition level quota for the map. This function could be invoked by multiple threads when shared consumer is being used. CheckStoreIngestionTask.produceToStoreBufferServiceOrKafka(java.lang.Iterable<com.linkedin.venice.pubsub.api.PubSubMessage<com.linkedin.venice.message.KafkaKey, com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope, java.lang.Long>>, com.linkedin.venice.pubsub.api.PubSubTopicPartition, java.lang.String, int)
andStoreIngestionTask.checkIngestionProgress(com.linkedin.venice.meta.Store)
to find more details. -
enforcePartitionQuota
public void enforcePartitionQuota(int partition, long additionalRecordSizeUsed) -
isPartitionPausedIngestion
protected boolean isPartitionPausedIngestion(int partition) -
hasPausedPartitionIngestion
public boolean hasPausedPartitionIngestion() -
getStoreQuotaInBytes
protected long getStoreQuotaInBytes() -
getPartitionQuotaInBytes
protected long getPartitionQuotaInBytes() -
isVersionOnline
protected boolean isVersionOnline() -
getDiskQuotaUsage
public double getDiskQuotaUsage() -
notifyFlushToDisk
-