Class MockInMemoryConsumerAdapter
- All Implemented Interfaces:
PubSubConsumerAdapter
,Closeable
,AutoCloseable
PubSubConsumerAdapter
implementation which reads messages from the InMemoryPubSubBroker
.
Used in unit tests as a lightweight alternative to a full-fledged integration test. Can be configured
with various PollStrategy
implementations in order to tweak the consuming behavior.
When MockInMemoryConsumerAdapter
is used to simulate the shared consumer behavior, there might be 2 different threads calling the methods
from this class. For example, consumer task thread from KafkaConsumerService
will
periodically call poll(long)
and StoreIngestionTask
thread
is calling resetOffset(PubSubTopicPartition)
, which may cause test failure.-
Constructor Summary
ConstructorsConstructorDescriptionMockInMemoryConsumerAdapter
(InMemoryPubSubBroker broker, PollStrategy pollStrategy, PubSubConsumerAdapter delegate) -
Method Summary
Modifier and TypeMethodDescriptionvoid
batchUnsubscribe
(Set<PubSubTopicPartition> pubSubTopicPartitionSet) Unsubscribes the consumer from a batch of topic-partitions.beginningPosition
(PubSubTopicPartition pubSubTopicPartition, Duration timeout) Retrieves the beginning position for the specified PubSub topic-partition.beginningPositions
(Collection<PubSubTopicPartition> partitions, Duration timeout) void
close()
Closes the PubSub consumer and releases any associated resources.long
comparePositions
(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2) Compares two PubSub positions within the specified topic partition.decodePosition
(PubSubTopicPartition partition, int positionTypeId, ByteBuffer buffer) Decodes the givenByteBuffer
into aPubSubPosition
for the specified topic partition.endOffset
(PubSubTopicPartition pubSubTopicPartition) Retrieves the end offset for the specified PubSub topic-partition.endOffsets
(Collection<PubSubTopicPartition> partitions, Duration timeout) Retrieves the end offsets for a collection of PubSub topic-partitions.endPosition
(PubSubTopicPartition pubSubTopicPartition) endPositions
(Collection<PubSubTopicPartition> partitions, Duration timeout) Retrieves the set of PubSub topic-partitions currently assigned to the consumer.getPositionByTimestamp
(PubSubTopicPartition pubSubTopicPartition, long timestamp) Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition.getPositionByTimestamp
(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout) Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition.boolean
Checks if the consumer has any active topic-partition subscriptions.boolean
hasSubscription
(PubSubTopicPartition pubSubTopicPartition) Checks if the consumer is currently subscribed to the specified PubSub topic-partition.offsetForTime
(PubSubTopicPartition pubSubTopicPartition, long timestamp) Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition.offsetForTime
(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout) Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition.partitionsFor
(PubSubTopic topic) Retrieves the list of partitions associated with a given Pub-Sub topic.void
pause
(PubSubTopicPartition pubSubTopicPartition) Pauses message consumption for the specified PubSub topic-partition.poll
(long timeout) Polls the Kafka consumer for messages from the subscribed topic partitions within the specified time duration.long
positionDifference
(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2) Computes the relative difference between twoPubSubPosition
instances for a givenPubSubTopicPartition
, asposition1 - position2
.void
resetOffset
(PubSubTopicPartition pubSubTopicPartition) Resets the offset for a specific topic-partition, seeking it to the beginning of the topic.void
resume
(PubSubTopicPartition pubSubTopicPartition) Resumes message consumption for the specified PubSub topic-partition.void
setMockInMemoryAdminAdapter
(MockInMemoryAdminAdapter adminAdapter) void
subscribe
(PubSubTopicPartition pubSubTopicPartition, long lastReadOffset) Subscribes to a topic-partition if it is not already subscribed.void
subscribe
(PubSubTopicPartition pubSubTopicPartition, PubSubPosition lastReadPubSubPosition) Subscribes to a specified topic-partition if it is not already subscribed.void
subscribe
(PubSubTopicPartition pubSubTopicPartition, PubSubPosition position, boolean isInclusive) Subscribes to a specified topic-partition if it is not already subscribed.void
unSubscribe
(PubSubTopicPartition pubSubTopicPartition) Unsubscribes the consumer from a specified topic-partition.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
beginningPosition, decodePosition, getLatestOffset, getOffsetLag
-
Constructor Details
-
MockInMemoryConsumerAdapter
public MockInMemoryConsumerAdapter(InMemoryPubSubBroker broker, PollStrategy pollStrategy, PubSubConsumerAdapter delegate) - Parameters:
delegate
- Can be used to pass a mock, in order to verify calls. Note: functions that return do not return the result of the mock, but rather the results of the in-memory consumer components.
-
-
Method Details
-
subscribe
Description copied from interface:PubSubConsumerAdapter
Subscribes to a topic-partition if it is not already subscribed. If the topic-partition is already subscribed, this method is a no-op. The method assumes that the topic-partition exists.- Specified by:
subscribe
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The topic-partition to subscribe to.lastReadOffset
- The last read offset for the topic-partition. A poll call following a subscribe call will return messages from the offset (lastReadOffset + 1).
-
subscribe
public void subscribe(PubSubTopicPartition pubSubTopicPartition, PubSubPosition lastReadPubSubPosition) Description copied from interface:PubSubConsumerAdapter
Subscribes to a specified topic-partition if it is not already subscribed. If the topic-partition is already subscribed, this method performs no action. The subscription uses the providedPubSubPosition
to determine the starting offset for consumption. If the position isPubSubSymbolicPosition.EARLIEST
, the consumer will seek to the earliest available message. If it isPubSubSymbolicPosition.LATEST
, the consumer will seek to the latest available message. If a custom position is provided, implementations should resolve it to the corresponding offset or position in the underlying pub-sub system. Implementations of this interface should ensure proper validation of the topic-partition existence and manage consumer assignments. This method does not guarantee immediate subscription state changes and may defer them based on implementation details.- Specified by:
subscribe
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- the topic-partition to subscribe tolastReadPubSubPosition
- the last known position for the topic-partition
-
subscribe
public void subscribe(@Nonnull PubSubTopicPartition pubSubTopicPartition, @Nonnull PubSubPosition position, boolean isInclusive) Description copied from interface:PubSubConsumerAdapter
Subscribes to a specified topic-partition if it is not already subscribed. If the topic-partition is already subscribed, this method performs no action.The subscription uses the provided
PubSubPosition
to determine the starting position for consumption. If the position isPubSubSymbolicPosition.EARLIEST
, the consumer will seek to the earliest available message. If it isPubSubSymbolicPosition.LATEST
, the consumer will seek to the latest available message. If a concrete position is provided, implementations should resolve it to a specific offset or internal position based on the underlying pub-sub system.The
inclusive
flag determines whether the message at the specified position (if resolvable to an offset or equivalent) should be included in consumption:- If
true
, the consumer should begin from the exact position specified. - If
false
, consumption should begin immediately after the specified position.
Implementations should validate the topic-partition's existence and handle any necessary assignment or internal state initialization. The method does not guarantee immediate subscription effect and may defer changes depending on the consumer's execution model.
- Specified by:
subscribe
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- the topic-partition to subscribe toposition
- the position from which consumption should beginisInclusive
- whether to include the message at the given position
- If
-
unSubscribe
Description copied from interface:PubSubConsumerAdapter
Unsubscribes the consumer from a specified topic-partition. If the consumer was previously subscribed to the given partition, it will be unsubscribed, and the associated partition assignments and tracked offsets will be updated accordingly.- Specified by:
unSubscribe
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition to unsubscribe from.
-
batchUnsubscribe
Description copied from interface:PubSubConsumerAdapter
Unsubscribes the consumer from a batch of topic-partitions.- Specified by:
batchUnsubscribe
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartitionSet
- A set of topic-partitions to unsubscribe from.
-
resetOffset
Description copied from interface:PubSubConsumerAdapter
Resets the offset for a specific topic-partition, seeking it to the beginning of the topic. If the topic partition is not currently subscribed, aPubSubUnsubscribedTopicPartitionException
is thrown.- Specified by:
resetOffset
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to reset the offset.
-
close
public void close()Description copied from interface:PubSubConsumerAdapter
Closes the PubSub consumer and releases any associated resources.- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
- Specified by:
close
in interfacePubSubConsumerAdapter
-
poll
Description copied from interface:PubSubConsumerAdapter
Polls the Kafka consumer for messages from the subscribed topic partitions within the specified time duration.- Specified by:
poll
in interfacePubSubConsumerAdapter
- Parameters:
timeout
- The maximum time, in milliseconds, to wait for messages to be polled.- Returns:
- A mapping of PubSub topic partitions to lists of PubSub messages retrieved from Kafka.
-
hasAnySubscription
public boolean hasAnySubscription()Description copied from interface:PubSubConsumerAdapter
Checks if the consumer has any active topic-partition subscriptions.- Specified by:
hasAnySubscription
in interfacePubSubConsumerAdapter
- Returns:
- true if the consumer is subscribed to one or more topic partitions, false otherwise.
-
hasSubscription
Description copied from interface:PubSubConsumerAdapter
Checks if the consumer is currently subscribed to the specified PubSub topic-partition.- Specified by:
hasSubscription
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition to check for subscription.- Returns:
- true if the consumer is subscribed to the given topic-partition, false otherwise.
-
getLastReadPositions
-
pause
Description copied from interface:PubSubConsumerAdapter
Pauses message consumption for the specified PubSub topic-partition. If the partition was not previously subscribed, this method is a no-op.- Specified by:
pause
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to pause message consumption.
-
resume
Description copied from interface:PubSubConsumerAdapter
Resumes message consumption for the specified PubSub topic-partition. If the partition was not previously paused or if they were not subscribed at all, this method is a no-op.- Specified by:
resume
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic partition for which to resume message consumption.
-
getAssignment
Description copied from interface:PubSubConsumerAdapter
Retrieves the set of PubSub topic-partitions currently assigned to the consumer.- Specified by:
getAssignment
in interfacePubSubConsumerAdapter
- Returns:
- A set of PubSub topic-partitions representing the current assignment of the consumer.
-
offsetForTime
public Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout) Description copied from interface:PubSubConsumerAdapter
Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found,null
will be returned for the partition.- Specified by:
offsetForTime
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to fetch the offset.timestamp
- The target timestamp to search for in milliseconds since the Unix epoch.timeout
- The maximum duration to wait for the operation to complete.- Returns:
- The offset of the first message with a timestamp greater than or equal to the target timestamp,
or
null
if no such message is found for the partition.
-
getPositionByTimestamp
public PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout) Description copied from interface:PubSubConsumerAdapter
Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found,null
will be returned for the partition.- Specified by:
getPositionByTimestamp
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to fetch the offset.timestamp
- The target timestamp to search for in milliseconds since the Unix epoch.timeout
- The maximum duration to wait for the operation to complete.- Returns:
- The offset of the first message with a timestamp greater than or equal to the target timestamp,
or
null
if no such message is found for the partition.
-
offsetForTime
Description copied from interface:PubSubConsumerAdapter
Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found,null
will be returned for the partition.- Specified by:
offsetForTime
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to fetch the offset.timestamp
- The target timestamp to search for in milliseconds since the Unix epoch.- Returns:
- The offset of the first message with a timestamp greater than or equal to the target timestamp,
or
null
if no such message is found for the partition.
-
getPositionByTimestamp
public PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp) Description copied from interface:PubSubConsumerAdapter
Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found,null
will be returned for the partition.- Specified by:
getPositionByTimestamp
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to fetch the offset.timestamp
- The target timestamp to search for in milliseconds since the Unix epoch.- Returns:
- The offset of the first message with a timestamp greater than or equal to the target timestamp,
or
null
if no such message is found for the partition.
-
beginningPosition
public PubSubPosition beginningPosition(PubSubTopicPartition pubSubTopicPartition, Duration timeout) Description copied from interface:PubSubConsumerAdapter
Retrieves the beginning position for the specified PubSub topic-partition.- Specified by:
beginningPosition
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to fetch the beginning offset.timeout
- The maximum duration to wait for the operation to complete.- Returns:
- The beginning offset of the specified topic-partition. If topic-partition exists but has no messages, the offset will be 0.
-
beginningPositions
public Map<PubSubTopicPartition,PubSubPosition> beginningPositions(Collection<PubSubTopicPartition> partitions, Duration timeout) - Specified by:
beginningPositions
in interfacePubSubConsumerAdapter
-
endOffsets
public Map<PubSubTopicPartition,Long> endOffsets(Collection<PubSubTopicPartition> partitions, Duration timeout) Description copied from interface:PubSubConsumerAdapter
Retrieves the end offsets for a collection of PubSub topic-partitions. The end offset represents the highest offset available in each specified partition, i.e., offset of the last message + 1. If there are no messages in a partition, the end offset will be 0.- Specified by:
endOffsets
in interfacePubSubConsumerAdapter
- Parameters:
partitions
- A collection of PubSub topic-partitions for which to fetch the end offsets.timeout
- The maximum duration to wait for the operation to complete.- Returns:
- A mapping of PubSub topic partitions to their respective end offsets, or an empty map if the offsets cannot be determined.
-
endPositions
public Map<PubSubTopicPartition,PubSubPosition> endPositions(Collection<PubSubTopicPartition> partitions, Duration timeout) - Specified by:
endPositions
in interfacePubSubConsumerAdapter
-
endOffset
Description copied from interface:PubSubConsumerAdapter
Retrieves the end offset for the specified PubSub topic-partition. The end offset represents the highest offset available in each specified partition, i.e., offset of the last message + 1. If there are no messages in a partition, the end offset will be 0.- Specified by:
endOffset
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic partition for which to fetch the end offset.- Returns:
- The end offset of the specified topic partition.
-
endPosition
- Specified by:
endPosition
in interfacePubSubConsumerAdapter
-
partitionsFor
Description copied from interface:PubSubConsumerAdapter
Retrieves the list of partitions associated with a given Pub-Sub topic.- Specified by:
partitionsFor
in interfacePubSubConsumerAdapter
- Parameters:
topic
- The Pub-Sub topic for which partition information is requested.- Returns:
- A list of
PubSubTopicPartitionInfo
representing the partitions of the topic, ornull
if the topic does not exist.
-
comparePositions
public long comparePositions(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2) Description copied from interface:PubSubConsumerAdapter
Compares two PubSub positions within the specified topic partition.- Specified by:
comparePositions
in interfacePubSubConsumerAdapter
- Parameters:
partition
- The topic partition where the comparison is being performed.position1
- The first PubSub position.position2
- The second PubSub position.- Returns:
- A negative value if
position1
is behindposition2
, zero if equal, or a positive value ifposition1
is ahead ofposition2
.
-
positionDifference
public long positionDifference(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2) Description copied from interface:PubSubConsumerAdapter
Computes the relative difference between twoPubSubPosition
instances for a givenPubSubTopicPartition
, asposition1 - position2
.Implementations must resolve symbolic positions such as
PubSubSymbolicPosition.EARLIEST
andPubSubSymbolicPosition.LATEST
to concrete positions based on the partition's start and end positions. This ensures that symbolic references can be treated consistently during subtraction.For example:
- If both positions are concrete, the result is the logical offset difference between them.
- If
position1
is symbolic (e.g., EARLIEST), it must be resolved to the concrete beginning position. - If
position2
is symbolic (e.g., LATEST), it must be resolved to the concrete end position.
- Specified by:
positionDifference
in interfacePubSubConsumerAdapter
- Parameters:
partition
- The topic partition for which the difference is calculated.position1
- The first PubSub position (minuend).position2
- The second PubSub position (subtrahend).- Returns:
- The signed offset difference between
position1
andposition2
.
-
decodePosition
public PubSubPosition decodePosition(PubSubTopicPartition partition, int positionTypeId, ByteBuffer buffer) Description copied from interface:PubSubConsumerAdapter
Decodes the givenByteBuffer
into aPubSubPosition
for the specified topic partition.- Specified by:
decodePosition
in interfacePubSubConsumerAdapter
- Parameters:
partition
- The topic partition this position belongs to.positionTypeId
- The type ID of the position, which indicates how to decode the byte buffer.buffer
- TheByteBuffer
containing the encoded position.- Returns:
- The decoded
PubSubPosition
.
-
setMockInMemoryAdminAdapter
-