Package com.linkedin.venice.listener
Class ReadQuotaEnforcementHandler
- java.lang.Object
-
- io.netty.channel.ChannelHandlerAdapter
-
- io.netty.channel.ChannelInboundHandlerAdapter
-
- io.netty.channel.SimpleChannelInboundHandler<RouterRequest>
-
- com.linkedin.venice.listener.ReadQuotaEnforcementHandler
-
- All Implemented Interfaces:
RoutingDataRepository.RoutingDataChangedListener
,StoreDataChangedListener
,io.netty.channel.ChannelHandler
,io.netty.channel.ChannelInboundHandler
@Sharable public class ReadQuotaEnforcementHandler extends io.netty.channel.SimpleChannelInboundHandler<RouterRequest> implements RoutingDataRepository.RoutingDataChangedListener, StoreDataChangedListener
-
-
Constructor Summary
Constructors Constructor Description ReadQuotaEnforcementHandler(long storageNodeRcuCapacity, ReadOnlyStoreRepository storeRepository, java.util.concurrent.CompletableFuture<HelixCustomizedViewOfflinePushRepository> customizedViewRepository, java.lang.String nodeId, AggServerQuotaUsageStats stats, io.tehuti.metrics.MetricsRepository metricsRepository)
ReadQuotaEnforcementHandler(long storageNodeRcuCapacity, ReadOnlyStoreRepository storeRepository, java.util.concurrent.CompletableFuture<HelixCustomizedViewOfflinePushRepository> customizedViewRepository, java.lang.String nodeId, AggServerQuotaUsageStats stats, io.tehuti.metrics.MetricsRepository metricsRepository, java.time.Clock clock)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
channelRead0(io.netty.channel.ChannelHandlerContext ctx, RouterRequest request)
boolean
checkInitAndQuotaEnabledToSkipQuotaEnforcement(io.netty.channel.ChannelHandlerContext ctx, RouterRequest request, Store store, boolean isGrpc)
boolean
checkStoreNull(io.netty.channel.ChannelHandlerContext ctx, RouterRequest request, GrpcRequestContext grpcCtx, boolean isGrpc, Store store)
TokenBucket
getBucketForStore(java.lang.String storeName)
protected static double
getNodeResponsibilityForQuota(PartitionAssignment partitionAssignment, java.lang.String nodeId)
Recalculates the amount of quota that this node should serve given the partition assignment.static int
getRcu(RouterRequest request)
Initially, this is the key count.AggServerQuotaUsageStats
getStats()
ReadOnlyStoreRepository
getStoreRepository()
java.util.concurrent.ConcurrentMap<java.lang.String,TokenBucket>
getStoreVersionBuckets()
void
handleResourceNoBucket(java.lang.String resourceName)
This method and the expiring set noBucketStores is only used to throttle the logging of such eventboolean
handleServerOverCapacity(io.netty.channel.ChannelHandlerContext ctx, GrpcRequestContext grpcCtx, java.lang.String storeName, int rcu, boolean isGrpc)
void
handleStoreChanged(Store store)
This is where we add newTokenBucket
for new store version and remove irrelevant ones.void
handleStoreCreated(Store store)
Do NOT try to acquire the lock of store repository again in the implementation, otherwise a dead lock issue will happen.void
handleStoreDeleted(java.lang.String storeName)
boolean
handleTooManyRequests(io.netty.channel.ChannelHandlerContext ctx, RouterRequest request, GrpcRequestContext grpcCtx, Store store, int rcu, boolean isGrpc)
void
init()
Initialize token buckets for all resources in the customized view repository.boolean
isInitialized()
We only ever expect the initialized value to flip from false to true, but that initialization might happen in a different thread.boolean
isNewNoBucketResource(java.lang.String resourceName)
protected java.util.Set<java.lang.String>
listTopics()
For testsvoid
onCustomizedViewAdded(PartitionAssignment partitionAssignment)
void
onCustomizedViewChange(PartitionAssignment partitionAssignment)
void
onExternalViewChange(PartitionAssignment partitionAssignment)
Handle routing data changed event.void
onPartitionStatusChange(java.lang.String topic, ReadOnlyPartitionStatus partitionStatus)
void
onRoutingDataDeleted(java.lang.String kafkaTopic)
boolean
storageConsumeRcu(int rcu)
-
Methods inherited from class io.netty.channel.SimpleChannelInboundHandler
acceptInboundMessage, channelRead
-
Methods inherited from class io.netty.channel.ChannelInboundHandlerAdapter
channelActive, channelInactive, channelReadComplete, channelRegistered, channelUnregistered, channelWritabilityChanged, exceptionCaught, userEventTriggered
-
Methods inherited from class io.netty.channel.ChannelHandlerAdapter
ensureNotSharable, handlerAdded, handlerRemoved, isSharable
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface com.linkedin.venice.meta.StoreDataChangedListener
handleStoreDeleted
-
-
-
-
Constructor Detail
-
ReadQuotaEnforcementHandler
public ReadQuotaEnforcementHandler(long storageNodeRcuCapacity, ReadOnlyStoreRepository storeRepository, java.util.concurrent.CompletableFuture<HelixCustomizedViewOfflinePushRepository> customizedViewRepository, java.lang.String nodeId, AggServerQuotaUsageStats stats, io.tehuti.metrics.MetricsRepository metricsRepository)
-
ReadQuotaEnforcementHandler
public ReadQuotaEnforcementHandler(long storageNodeRcuCapacity, ReadOnlyStoreRepository storeRepository, java.util.concurrent.CompletableFuture<HelixCustomizedViewOfflinePushRepository> customizedViewRepository, java.lang.String nodeId, AggServerQuotaUsageStats stats, io.tehuti.metrics.MetricsRepository metricsRepository, java.time.Clock clock)
-
-
Method Detail
-
getRcu
public static int getRcu(RouterRequest request)
Initially, this is the key count. As we develop a more accurate capacity model, this method can be refined.- Parameters:
request
-- Returns:
-
getNodeResponsibilityForQuota
protected static double getNodeResponsibilityForQuota(PartitionAssignment partitionAssignment, java.lang.String nodeId)
Recalculates the amount of quota that this node should serve given the partition assignment. Assumes each partition gets an even portion of quota, and for each partition divides the quota by the readyToServe instances.- Parameters:
partitionAssignment
-nodeId
-- Returns:
-
init
public final void init()
Initialize token buckets for all resources in the customized view repository.
-
isInitialized
public boolean isInitialized()
We only ever expect the initialized value to flip from false to true, but that initialization might happen in a different thread. By only checking the volatile variable if it is false, we guarantee that we see the change as early as possible and also allow the thread to cache the value once it goes true.- Returns:
-
channelRead0
public void channelRead0(io.netty.channel.ChannelHandlerContext ctx, RouterRequest request)
- Specified by:
channelRead0
in classio.netty.channel.SimpleChannelInboundHandler<RouterRequest>
-
checkStoreNull
public boolean checkStoreNull(io.netty.channel.ChannelHandlerContext ctx, RouterRequest request, GrpcRequestContext grpcCtx, boolean isGrpc, Store store)
-
checkInitAndQuotaEnabledToSkipQuotaEnforcement
public boolean checkInitAndQuotaEnabledToSkipQuotaEnforcement(io.netty.channel.ChannelHandlerContext ctx, RouterRequest request, Store store, boolean isGrpc)
-
handleTooManyRequests
public boolean handleTooManyRequests(io.netty.channel.ChannelHandlerContext ctx, RouterRequest request, GrpcRequestContext grpcCtx, Store store, int rcu, boolean isGrpc)
-
handleResourceNoBucket
public void handleResourceNoBucket(java.lang.String resourceName)
This method and the expiring set noBucketStores is only used to throttle the logging of such event
-
handleServerOverCapacity
public boolean handleServerOverCapacity(io.netty.channel.ChannelHandlerContext ctx, GrpcRequestContext grpcCtx, java.lang.String storeName, int rcu, boolean isGrpc)
-
onExternalViewChange
public void onExternalViewChange(PartitionAssignment partitionAssignment)
Description copied from interface:RoutingDataRepository.RoutingDataChangedListener
Handle routing data changed event.- Specified by:
onExternalViewChange
in interfaceRoutingDataRepository.RoutingDataChangedListener
- Parameters:
partitionAssignment
- Newest partitions assignments information including resource name and all of instances assigned to this resource. If the number of partition is 0, it means the kafka topic is deleted.
-
onCustomizedViewChange
public void onCustomizedViewChange(PartitionAssignment partitionAssignment)
- Specified by:
onCustomizedViewChange
in interfaceRoutingDataRepository.RoutingDataChangedListener
-
onCustomizedViewAdded
public void onCustomizedViewAdded(PartitionAssignment partitionAssignment)
- Specified by:
onCustomizedViewAdded
in interfaceRoutingDataRepository.RoutingDataChangedListener
-
onPartitionStatusChange
public void onPartitionStatusChange(java.lang.String topic, ReadOnlyPartitionStatus partitionStatus)
- Specified by:
onPartitionStatusChange
in interfaceRoutingDataRepository.RoutingDataChangedListener
-
onRoutingDataDeleted
public void onRoutingDataDeleted(java.lang.String kafkaTopic)
- Specified by:
onRoutingDataDeleted
in interfaceRoutingDataRepository.RoutingDataChangedListener
-
handleStoreCreated
public void handleStoreCreated(Store store)
Description copied from interface:StoreDataChangedListener
Do NOT try to acquire the lock of store repository again in the implementation, otherwise a dead lock issue will happen.- Specified by:
handleStoreCreated
in interfaceStoreDataChangedListener
-
handleStoreDeleted
public void handleStoreDeleted(java.lang.String storeName)
- Specified by:
handleStoreDeleted
in interfaceStoreDataChangedListener
-
handleStoreChanged
public void handleStoreChanged(Store store)
This is where we add newTokenBucket
for new store version and remove irrelevant ones. We should keep the same number of token buckets as the number of active versions. This is because readers like FC might be lagging behind or vice versa. This way quota will still be enforced properly during the version swap or transition period.- Specified by:
handleStoreChanged
in interfaceStoreDataChangedListener
-
listTopics
protected java.util.Set<java.lang.String> listTopics()
For tests- Returns:
-
getBucketForStore
public TokenBucket getBucketForStore(java.lang.String storeName)
-
getStoreRepository
public ReadOnlyStoreRepository getStoreRepository()
-
getStoreVersionBuckets
public java.util.concurrent.ConcurrentMap<java.lang.String,TokenBucket> getStoreVersionBuckets()
-
isNewNoBucketResource
public boolean isNewNoBucketResource(java.lang.String resourceName)
-
storageConsumeRcu
public boolean storageConsumeRcu(int rcu)
-
getStats
public AggServerQuotaUsageStats getStats()
-
-