Class StorageUtilizationManager
- java.lang.Object
-
- com.linkedin.davinci.kafka.consumer.StorageUtilizationManager
-
- All Implemented Interfaces:
StoreDataChangedListener
public class StorageUtilizationManager extends java.lang.Object implements StoreDataChangedListener
This class has the following responsibilities: 1. Keep track of storage utilization, at least on disk, and potentially also in-memory (when hybrid quota enforcement is enabled).. 2. Take action (pause/resume) on the associated consumer if storage quota is breached (and enforcement is enabled). 3. Listen to store config changes related to quota limits and whether enforcement is enabled, and react accordingly. 4: Report replica status changes if the above actions affect them. TODO: Consider whether this is tech debt and if we could/should decouple status reporting from this class. This would allow us to stop passing in theingestionNotificationDispatcher
which in turn may allow us to stop mutating the entries inpartitionConsumptionStateMap
(in which case, we could pass a map where the values are a read-only interface implemented byPartitionConsumptionState
and thus preventing mutations of this state from here). For a Samza RT job, if a partition exhausts the partition-level quota, we will stall the consumption for this partition. i.e. stop polling and processing records for this partition until more space is available (quota gets bumped or db compactions shrink disk usage); if only some partitions violates the quota, the job will pause these partitions while keep processing the other good partitions. Assumptions: 1. every partition in a store shares similar size; 2. no write-compute messages/partial updates/incremental push
-
-
Constructor Summary
Constructors Constructor Description StorageUtilizationManager(AbstractStorageEngine storageEngine, Store store, java.lang.String versionTopic, int partitionCount, java.util.Map<java.lang.Integer,PartitionConsumptionState> partitionConsumptionStateMap, boolean isHybridQuotaEnabledInServer, boolean isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled, boolean isSeparateRealtimeTopicEnabled, com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher ingestionNotificationDispatcher, TopicPartitionConsumerFunction pausePartition, TopicPartitionConsumerFunction resumePartition)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
checkAllPartitionsQuota()
Enforce partition level quota for the map.void
enforcePartitionQuota(int partition, long additionalRecordSizeUsed)
double
getDiskQuotaUsage()
protected long
getPartitionQuotaInBytes()
protected long
getStoreQuotaInBytes()
void
handleStoreChanged(Store store)
void
handleStoreCreated(Store store)
Do NOT try to acquire the lock of store repository again in the implementation, otherwise a dead lock issue will happen.void
handleStoreDeleted(java.lang.String storeName)
boolean
hasPausedPartitionIngestion()
void
initPartition(int partition)
protected boolean
isPartitionPausedIngestion(int partition)
protected boolean
isVersionOnline()
void
notifyFlushToDisk(PartitionConsumptionState pcs)
void
removePartition(int partition)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface com.linkedin.venice.meta.StoreDataChangedListener
handleStoreDeleted
-
-
-
-
Constructor Detail
-
StorageUtilizationManager
public StorageUtilizationManager(AbstractStorageEngine storageEngine, Store store, java.lang.String versionTopic, int partitionCount, java.util.Map<java.lang.Integer,PartitionConsumptionState> partitionConsumptionStateMap, boolean isHybridQuotaEnabledInServer, boolean isServerCalculateQuotaUsageBasedOnPartitionsAssignmentEnabled, boolean isSeparateRealtimeTopicEnabled, com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher ingestionNotificationDispatcher, TopicPartitionConsumerFunction pausePartition, TopicPartitionConsumerFunction resumePartition)
- Parameters:
store
- a snapshot of theStore
associated with this manager at the time of construction N.B.: It's important not to hang on to a reference of this param since it will become stale when the store config changes. Refreshing the things we need out of the store object is handled inhandleStoreChanged(Store)
.
-
-
Method Detail
-
handleStoreCreated
public void handleStoreCreated(Store store)
Description copied from interface:StoreDataChangedListener
Do NOT try to acquire the lock of store repository again in the implementation, otherwise a dead lock issue will happen.- Specified by:
handleStoreCreated
in interfaceStoreDataChangedListener
-
handleStoreDeleted
public void handleStoreDeleted(java.lang.String storeName)
- Specified by:
handleStoreDeleted
in interfaceStoreDataChangedListener
-
handleStoreChanged
public void handleStoreChanged(Store store)
- Specified by:
handleStoreChanged
in interfaceStoreDataChangedListener
-
initPartition
public void initPartition(int partition)
-
removePartition
public void removePartition(int partition)
-
checkAllPartitionsQuota
public void checkAllPartitionsQuota()
Enforce partition level quota for the map. This function could be invoked by multiple threads when shared consumer is being used. CheckStoreIngestionTask.produceToStoreBufferServiceOrKafka(java.lang.Iterable<com.linkedin.venice.pubsub.api.PubSubMessage<com.linkedin.venice.message.KafkaKey, com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope, java.lang.Long>>, com.linkedin.venice.pubsub.api.PubSubTopicPartition, java.lang.String, int)
andStoreIngestionTask.checkIngestionProgress(com.linkedin.venice.meta.Store)
to find more details.
-
enforcePartitionQuota
public void enforcePartitionQuota(int partition, long additionalRecordSizeUsed)
-
isPartitionPausedIngestion
protected boolean isPartitionPausedIngestion(int partition)
-
hasPausedPartitionIngestion
public boolean hasPausedPartitionIngestion()
-
getStoreQuotaInBytes
protected long getStoreQuotaInBytes()
-
getPartitionQuotaInBytes
protected long getPartitionQuotaInBytes()
-
isVersionOnline
protected boolean isVersionOnline()
-
getDiskQuotaUsage
public double getDiskQuotaUsage()
-
notifyFlushToDisk
public void notifyFlushToDisk(PartitionConsumptionState pcs)
-
-