Class KeyLevelLocksManager


  • public class KeyLevelLocksManager
    extends java.lang.Object
    A helper class to return the same lock for the same raw key bytes. There is an upper limit of how many locks available. If a lock is already assigned to a key and the lock is being used, and if another thread comes in and asks for a lock for the same key, the same lock will be returned; if there is no lock assigned to the requested key, pick the next available locks from the pool, or create a new lock when the pool is empty. Current use case of this lock manager is inside Active/Active write path: During Active/Active ingestion, the below data flow must be in a critical session for the same key: Read existing value/RMD from transient record cache/disk -> perform DCR and decide incoming value wins -> update transient record cache -> produce to VT (just call send, no need to wait for the produce future in the critical session) Therefore, put the above critical session in key level locking will have the minimum lock contention; to avoid creating too much locks, we can build a pool of locks. Theoretically, the pool size doesn't need to exceed the number of potential real-time topic partitions from different source regions --- let's assume the number of RT source regions is x, the number of topic partitions are y, the Active/Active write-path could at most handle x * y different keys at the same time. If there are more use cases that could leverage this key level lock manager in future, feel free to do so, and extend/update the class if necessary.
    • Constructor Summary

      Constructors 
      Modifier Constructor Description
      protected KeyLevelLocksManager​(java.lang.String storeVersion, int initialPoolSize, int maxPoolSize)  
    • Method Summary

      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • KeyLevelLocksManager

        protected KeyLevelLocksManager​(java.lang.String storeVersion,
                                       int initialPoolSize,
                                       int maxPoolSize)