Class MemoryBoundBlockingQueue<T extends Measurable>
- java.lang.Object
-
- com.linkedin.venice.utils.collections.MemoryBoundBlockingQueue<T>
-
- Type Parameters:
T
-
- All Implemented Interfaces:
java.lang.Iterable<T>
,java.util.Collection<T>
,java.util.concurrent.BlockingQueue<T>
,java.util.Queue<T>
public class MemoryBoundBlockingQueue<T extends Measurable> extends java.lang.Object implements java.util.concurrent.BlockingQueue<T>
This class is a generic implementation of a memory bound blocking queue. This blocking queue is bounded by the memory usage of eachMeasurable
object buffered inside. To guarantee some kind of fairness, you need to choose suitablenotifyDeltaInByte
according to the max size of messages, which could be buffered. The reason behind this design: Considering some thread could put various sizes of messages into the shared queue,MemoryBoundBlockingQueue
won't notify the waiting thread (the 'put' thread) right away when some message gets processed until the freed memory hit the follow config:notifyDeltaInByte
. The reason behind this design: When the buffered queue is full, and the processing thread keeps processing small message, the bigger message won't have chance to get queued into the buffer since the memory freed by the processed small message is not enough to fit the bigger message. With this delta config,MemoryBoundBlockingQueue
will guarantee some kind of fairness among various sizes of messages when buffered queue is full. When tuning this config, we need to consider the following tradeoffs: 1.notifyDeltaInByte
must be smaller thanmemoryCapacityInByte
; 2. If the delta is too big, it will waste some buffer space since it won't notify the waiting threads even there are some memory available (less than the delta); 3. If the delta is too small, the big message may not be able to get chance to be buffered when the queue is full;
-
-
Field Summary
Fields Modifier and Type Field Description static int
LINKED_LIST_NODE_SHALLOW_OVERHEAD
Considering the node implementation:LinkedList.Node
, the overhead is three references.
-
Constructor Summary
Constructors Constructor Description MemoryBoundBlockingQueue(long memoryCapacityInByte, long notifyDeltaInByte)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description boolean
add(T t)
boolean
addAll(java.util.Collection<? extends T> c)
void
clear()
boolean
contains(java.lang.Object o)
boolean
containsAll(java.util.Collection<?> c)
int
drainTo(java.util.Collection<? super T> c)
int
drainTo(java.util.Collection<? super T> c, int maxElements)
T
element()
long
getMemoryUsage()
boolean
isEmpty()
java.util.Iterator<T>
iterator()
boolean
offer(T t)
boolean
offer(T t, long timeout, java.util.concurrent.TimeUnit unit)
T
peek()
T
poll()
T
poll(long timeout, java.util.concurrent.TimeUnit unit)
void
put(T record)
int
remainingCapacity()
long
remainingMemoryCapacityInByte()
T
remove()
boolean
remove(java.lang.Object o)
boolean
removeAll(java.util.Collection<?> c)
boolean
retainAll(java.util.Collection<?> c)
int
size()
T
take()
java.lang.Object[]
toArray()
<T1> T1[]
toArray(T1[] a)
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
-
-
-
Method Detail
-
getMemoryUsage
public long getMemoryUsage()
-
remainingMemoryCapacityInByte
public long remainingMemoryCapacityInByte()
-
put
public void put(T record) throws java.lang.InterruptedException
- Specified by:
put
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
- Throws:
java.lang.InterruptedException
-
take
public T take() throws java.lang.InterruptedException
- Specified by:
take
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
- Throws:
java.lang.InterruptedException
-
offer
public boolean offer(T t, long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Specified by:
offer
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
- Throws:
java.lang.InterruptedException
-
add
public boolean add(T t)
- Specified by:
add
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
- Specified by:
add
in interfacejava.util.Collection<T extends Measurable>
- Specified by:
add
in interfacejava.util.Queue<T extends Measurable>
-
offer
public boolean offer(T t)
- Specified by:
offer
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
- Specified by:
offer
in interfacejava.util.Queue<T extends Measurable>
-
remove
public T remove()
- Specified by:
remove
in interfacejava.util.Queue<T extends Measurable>
-
poll
public T poll()
- Specified by:
poll
in interfacejava.util.Queue<T extends Measurable>
-
element
public T element()
- Specified by:
element
in interfacejava.util.Queue<T extends Measurable>
-
peek
public T peek()
- Specified by:
peek
in interfacejava.util.Queue<T extends Measurable>
-
poll
public T poll(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Specified by:
poll
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
- Throws:
java.lang.InterruptedException
-
remainingCapacity
public int remainingCapacity()
- Specified by:
remainingCapacity
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
-
remove
public boolean remove(java.lang.Object o)
- Specified by:
remove
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
- Specified by:
remove
in interfacejava.util.Collection<T extends Measurable>
-
containsAll
public boolean containsAll(java.util.Collection<?> c)
- Specified by:
containsAll
in interfacejava.util.Collection<T extends Measurable>
-
addAll
public boolean addAll(java.util.Collection<? extends T> c)
- Specified by:
addAll
in interfacejava.util.Collection<T extends Measurable>
-
removeAll
public boolean removeAll(java.util.Collection<?> c)
- Specified by:
removeAll
in interfacejava.util.Collection<T extends Measurable>
-
retainAll
public boolean retainAll(java.util.Collection<?> c)
- Specified by:
retainAll
in interfacejava.util.Collection<T extends Measurable>
-
clear
public void clear()
- Specified by:
clear
in interfacejava.util.Collection<T extends Measurable>
-
size
public int size()
- Specified by:
size
in interfacejava.util.Collection<T extends Measurable>
-
isEmpty
public boolean isEmpty()
- Specified by:
isEmpty
in interfacejava.util.Collection<T extends Measurable>
-
contains
public boolean contains(java.lang.Object o)
- Specified by:
contains
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
- Specified by:
contains
in interfacejava.util.Collection<T extends Measurable>
-
iterator
public java.util.Iterator<T> iterator()
- Specified by:
iterator
in interfacejava.util.Collection<T extends Measurable>
- Specified by:
iterator
in interfacejava.lang.Iterable<T extends Measurable>
-
toArray
public java.lang.Object[] toArray()
- Specified by:
toArray
in interfacejava.util.Collection<T extends Measurable>
-
toArray
public <T1> T1[] toArray(T1[] a)
- Specified by:
toArray
in interfacejava.util.Collection<T extends Measurable>
-
drainTo
public int drainTo(java.util.Collection<? super T> c)
- Specified by:
drainTo
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
-
drainTo
public int drainTo(java.util.Collection<? super T> c, int maxElements)
- Specified by:
drainTo
in interfacejava.util.concurrent.BlockingQueue<T extends Measurable>
-
-