Class MemoryBoundBlockingQueue<T extends Measurable>

  • Type Parameters:
    T -
    All Implemented Interfaces:
    java.lang.Iterable<T>, java.util.Collection<T>, java.util.concurrent.BlockingQueue<T>, java.util.Queue<T>

    public class MemoryBoundBlockingQueue<T extends Measurable>
    extends java.lang.Object
    implements java.util.concurrent.BlockingQueue<T>
    This class is a generic implementation of a memory bound blocking queue. This blocking queue is bounded by the memory usage of each Measurable object buffered inside. To guarantee some kind of fairness, you need to choose suitable notifyDeltaInByte according to the max size of messages, which could be buffered. The reason behind this design: Considering some thread could put various sizes of messages into the shared queue, MemoryBoundBlockingQueue won't notify the waiting thread (the 'put' thread) right away when some message gets processed until the freed memory hit the follow config: notifyDeltaInByte. The reason behind this design: When the buffered queue is full, and the processing thread keeps processing small message, the bigger message won't have chance to get queued into the buffer since the memory freed by the processed small message is not enough to fit the bigger message. With this delta config, MemoryBoundBlockingQueue will guarantee some kind of fairness among various sizes of messages when buffered queue is full. When tuning this config, we need to consider the following tradeoffs: 1. notifyDeltaInByte must be smaller than memoryCapacityInByte; 2. If the delta is too big, it will waste some buffer space since it won't notify the waiting threads even there are some memory available (less than the delta); 3. If the delta is too small, the big message may not be able to get chance to be buffered when the queue is full;
    • Field Summary

      Fields 
      Modifier and Type Field Description
      static int LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE
      Considering the node implementation: LinkedList.Node, the overhead is three references, which could be about 24 bytes, and the 'Node' object type itself could take 24 bytes.
    • Constructor Summary

      Constructors 
      Constructor Description
      MemoryBoundBlockingQueue​(long memoryCapacityInByte, long notifyDeltaInByte)  
    • Method Summary

      All Methods Instance Methods Concrete Methods 
      Modifier and Type Method Description
      boolean add​(T t)  
      boolean addAll​(java.util.Collection<? extends T> c)  
      void clear()  
      boolean contains​(java.lang.Object o)  
      boolean containsAll​(java.util.Collection<?> c)  
      int drainTo​(java.util.Collection<? super T> c)  
      int drainTo​(java.util.Collection<? super T> c, int maxElements)  
      T element()  
      long getMemoryUsage()  
      boolean isEmpty()  
      java.util.Iterator<T> iterator()  
      boolean offer​(T t)  
      boolean offer​(T t, long timeout, java.util.concurrent.TimeUnit unit)  
      T peek()  
      T poll()  
      T poll​(long timeout, java.util.concurrent.TimeUnit unit)  
      void put​(T record)  
      int remainingCapacity()  
      long remainingMemoryCapacityInByte()  
      T remove()  
      boolean remove​(java.lang.Object o)  
      boolean removeAll​(java.util.Collection<?> c)  
      boolean retainAll​(java.util.Collection<?> c)  
      int size()  
      T take()  
      java.lang.Object[] toArray()  
      <T1> T1[] toArray​(T1[] a)  
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
      • Methods inherited from interface java.util.Collection

        equals, hashCode, parallelStream, removeIf, spliterator, stream, toArray
      • Methods inherited from interface java.lang.Iterable

        forEach
    • Field Detail

      • LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE

        public static final int LINKED_QUEUE_NODE_OVERHEAD_IN_BYTE
        Considering the node implementation: LinkedList.Node, the overhead is three references, which could be about 24 bytes, and the 'Node' object type itself could take 24 bytes. We can adjust this value later if necessary.
        See Also:
        Constant Field Values
    • Constructor Detail

      • MemoryBoundBlockingQueue

        public MemoryBoundBlockingQueue​(long memoryCapacityInByte,
                                        long notifyDeltaInByte)
    • Method Detail

      • getMemoryUsage

        public long getMemoryUsage()
      • remainingMemoryCapacityInByte

        public long remainingMemoryCapacityInByte()
      • put

        public void put​(T record)
                 throws java.lang.InterruptedException
        Specified by:
        put in interface java.util.concurrent.BlockingQueue<T extends Measurable>
        Throws:
        java.lang.InterruptedException
      • take

        public T take()
               throws java.lang.InterruptedException
        Specified by:
        take in interface java.util.concurrent.BlockingQueue<T extends Measurable>
        Throws:
        java.lang.InterruptedException
      • offer

        public boolean offer​(T t,
                             long timeout,
                             java.util.concurrent.TimeUnit unit)
                      throws java.lang.InterruptedException
        Specified by:
        offer in interface java.util.concurrent.BlockingQueue<T extends Measurable>
        Throws:
        java.lang.InterruptedException
      • add

        public boolean add​(T t)
        Specified by:
        add in interface java.util.concurrent.BlockingQueue<T extends Measurable>
        Specified by:
        add in interface java.util.Collection<T extends Measurable>
        Specified by:
        add in interface java.util.Queue<T extends Measurable>
      • offer

        public boolean offer​(T t)
        Specified by:
        offer in interface java.util.concurrent.BlockingQueue<T extends Measurable>
        Specified by:
        offer in interface java.util.Queue<T extends Measurable>
      • remove

        public T remove()
        Specified by:
        remove in interface java.util.Queue<T extends Measurable>
      • poll

        public T poll()
        Specified by:
        poll in interface java.util.Queue<T extends Measurable>
      • element

        public T element()
        Specified by:
        element in interface java.util.Queue<T extends Measurable>
      • peek

        public T peek()
        Specified by:
        peek in interface java.util.Queue<T extends Measurable>
      • poll

        public T poll​(long timeout,
                      java.util.concurrent.TimeUnit unit)
               throws java.lang.InterruptedException
        Specified by:
        poll in interface java.util.concurrent.BlockingQueue<T extends Measurable>
        Throws:
        java.lang.InterruptedException
      • remainingCapacity

        public int remainingCapacity()
        Specified by:
        remainingCapacity in interface java.util.concurrent.BlockingQueue<T extends Measurable>
      • remove

        public boolean remove​(java.lang.Object o)
        Specified by:
        remove in interface java.util.concurrent.BlockingQueue<T extends Measurable>
        Specified by:
        remove in interface java.util.Collection<T extends Measurable>
      • containsAll

        public boolean containsAll​(java.util.Collection<?> c)
        Specified by:
        containsAll in interface java.util.Collection<T extends Measurable>
      • addAll

        public boolean addAll​(java.util.Collection<? extends T> c)
        Specified by:
        addAll in interface java.util.Collection<T extends Measurable>
      • removeAll

        public boolean removeAll​(java.util.Collection<?> c)
        Specified by:
        removeAll in interface java.util.Collection<T extends Measurable>
      • retainAll

        public boolean retainAll​(java.util.Collection<?> c)
        Specified by:
        retainAll in interface java.util.Collection<T extends Measurable>
      • clear

        public void clear()
        Specified by:
        clear in interface java.util.Collection<T extends Measurable>
      • size

        public int size()
        Specified by:
        size in interface java.util.Collection<T extends Measurable>
      • isEmpty

        public boolean isEmpty()
        Specified by:
        isEmpty in interface java.util.Collection<T extends Measurable>
      • contains

        public boolean contains​(java.lang.Object o)
        Specified by:
        contains in interface java.util.concurrent.BlockingQueue<T extends Measurable>
        Specified by:
        contains in interface java.util.Collection<T extends Measurable>
      • iterator

        public java.util.Iterator<T> iterator()
        Specified by:
        iterator in interface java.util.Collection<T extends Measurable>
        Specified by:
        iterator in interface java.lang.Iterable<T extends Measurable>
      • toArray

        public java.lang.Object[] toArray()
        Specified by:
        toArray in interface java.util.Collection<T extends Measurable>
      • toArray

        public <T1> T1[] toArray​(T1[] a)
        Specified by:
        toArray in interface java.util.Collection<T extends Measurable>
      • drainTo

        public int drainTo​(java.util.Collection<? super T> c)
        Specified by:
        drainTo in interface java.util.concurrent.BlockingQueue<T extends Measurable>
      • drainTo

        public int drainTo​(java.util.Collection<? super T> c,
                           int maxElements)
        Specified by:
        drainTo in interface java.util.concurrent.BlockingQueue<T extends Measurable>