Class MockInMemoryConsumer
- java.lang.Object
-
- com.linkedin.venice.unit.kafka.consumer.MockInMemoryConsumer
-
- All Implemented Interfaces:
PubSubConsumerAdapter
,java.io.Closeable
,java.lang.AutoCloseable
public class MockInMemoryConsumer extends java.lang.Object implements PubSubConsumerAdapter
APubSubConsumerAdapter
implementation which reads messages from theInMemoryKafkaBroker
. Used in unit tests as a lightweight alternative to a full-fledged integration test. Can be configured with variousPollStrategy
implementations in order to tweak the consuming behavior. WhenMockInMemoryConsumer
is used to simulate the shared consumer behavior, there might be 2 different threads calling the methods from this class. For example, consumer task thread fromKafkaConsumerService
will periodically callpoll(long)
andStoreIngestionTask
thread is callingresetOffset(PubSubTopicPartition)
, which may cause test failure. TODO: Remove synchronized keyword in this class when consumer operations in consumption task is event-driven.
-
-
Constructor Summary
Constructors Constructor Description MockInMemoryConsumer(InMemoryKafkaBroker broker, PollStrategy pollStrategy, PubSubConsumerAdapter delegate)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
batchUnsubscribe(java.util.Set<PubSubTopicPartition> pubSubTopicPartitionSet)
Unsubscribes the consumer from a batch of topic-partitions.java.lang.Long
beginningOffset(PubSubTopicPartition partition, java.time.Duration timeout)
Retrieves the beginning offset for the specified PubSub topic-partition.void
close()
Closes the PubSub consumer and releases any associated resources.java.lang.Long
endOffset(PubSubTopicPartition pubSubTopicPartition)
Retrieves the end offset for the specified PubSub topic-partition.java.util.Map<PubSubTopicPartition,java.lang.Long>
endOffsets(java.util.Collection<PubSubTopicPartition> partitions, java.time.Duration timeout)
Retrieves the end offsets for a collection of PubSub topic-partitions.java.util.Set<PubSubTopicPartition>
getAssignment()
Retrieves the set of PubSub topic-partitions currently assigned to the consumer.java.util.Map<PubSubTopicPartition,java.lang.Long>
getOffsets()
boolean
hasAnySubscription()
Checks if the consumer has any active topic-partition subscriptions.boolean
hasSubscription(PubSubTopicPartition pubSubTopicPartition)
Checks if the consumer is currently subscribed to the specified PubSub topic-partition.java.lang.Long
offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp)
Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition.java.lang.Long
offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp, java.time.Duration timeout)
Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition.java.util.List<PubSubTopicPartitionInfo>
partitionsFor(PubSubTopic topic)
Retrieves the list of partitions associated with a given Pub-Sub topic.void
pause(PubSubTopicPartition pubSubTopicPartition)
Pauses message consumption for the specified PubSub topic-partition.java.util.Map<PubSubTopicPartition,java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>>
poll(long timeout)
Polls the Kafka consumer for messages from the subscribed topic partitions within the specified time duration.void
resetOffset(PubSubTopicPartition pubSubTopicPartition)
Resets the offset for a specific topic-partition, seeking it to the beginning of the topic.void
resume(PubSubTopicPartition pubSubTopicPartition)
Resumes message consumption for the specified PubSub topic-partition.void
setMockInMemoryAdminAdapter(MockInMemoryAdminAdapter adminAdapter)
void
subscribe(PubSubTopicPartition pubSubTopicPartition, long lastReadOffset)
Subscribes to a topic-partition if it is not already subscribed.void
unSubscribe(PubSubTopicPartition pubSubTopicPartition)
Unsubscribes the consumer from a specified topic-partition.-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
-
Methods inherited from interface com.linkedin.venice.pubsub.api.PubSubConsumerAdapter
getLatestOffset, getOffsetLag
-
-
-
-
Constructor Detail
-
MockInMemoryConsumer
public MockInMemoryConsumer(InMemoryKafkaBroker broker, PollStrategy pollStrategy, PubSubConsumerAdapter delegate)
- Parameters:
delegate
- Can be used to pass a mock, in order to verify calls. Note: functions that return do not return the result of the mock, but rather the results of the in-memory consumer components.
-
-
Method Detail
-
subscribe
public void subscribe(PubSubTopicPartition pubSubTopicPartition, long lastReadOffset)
Description copied from interface:PubSubConsumerAdapter
Subscribes to a topic-partition if it is not already subscribed. If the topic-partition is already subscribed, this method is a no-op. The method assumes that the topic-partition exists.- Specified by:
subscribe
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The topic-partition to subscribe to.lastReadOffset
- The last read offset for the topic-partition. A poll call following a subscribe call will return messages from the offset (lastReadOffset + 1).
-
unSubscribe
public void unSubscribe(PubSubTopicPartition pubSubTopicPartition)
Description copied from interface:PubSubConsumerAdapter
Unsubscribes the consumer from a specified topic-partition. If the consumer was previously subscribed to the given partition, it will be unsubscribed, and the associated partition assignments and tracked offsets will be updated accordingly.- Specified by:
unSubscribe
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition to unsubscribe from.
-
batchUnsubscribe
public void batchUnsubscribe(java.util.Set<PubSubTopicPartition> pubSubTopicPartitionSet)
Description copied from interface:PubSubConsumerAdapter
Unsubscribes the consumer from a batch of topic-partitions.- Specified by:
batchUnsubscribe
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartitionSet
- A set of topic-partitions to unsubscribe from.
-
resetOffset
public void resetOffset(PubSubTopicPartition pubSubTopicPartition)
Description copied from interface:PubSubConsumerAdapter
Resets the offset for a specific topic-partition, seeking it to the beginning of the topic. If the topic partition is not currently subscribed, aPubSubUnsubscribedTopicPartitionException
is thrown.- Specified by:
resetOffset
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to reset the offset.
-
close
public void close()
Description copied from interface:PubSubConsumerAdapter
Closes the PubSub consumer and releases any associated resources.- Specified by:
close
in interfacejava.lang.AutoCloseable
- Specified by:
close
in interfacejava.io.Closeable
- Specified by:
close
in interfacePubSubConsumerAdapter
-
poll
public java.util.Map<PubSubTopicPartition,java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>> poll(long timeout)
Description copied from interface:PubSubConsumerAdapter
Polls the Kafka consumer for messages from the subscribed topic partitions within the specified time duration.- Specified by:
poll
in interfacePubSubConsumerAdapter
- Parameters:
timeout
- The maximum time, in milliseconds, to wait for messages to be polled.- Returns:
- A mapping of PubSub topic partitions to lists of PubSub messages retrieved from Kafka.
-
hasAnySubscription
public boolean hasAnySubscription()
Description copied from interface:PubSubConsumerAdapter
Checks if the consumer has any active topic-partition subscriptions.- Specified by:
hasAnySubscription
in interfacePubSubConsumerAdapter
- Returns:
- true if the consumer is subscribed to one or more topic partitions, false otherwise.
-
hasSubscription
public boolean hasSubscription(PubSubTopicPartition pubSubTopicPartition)
Description copied from interface:PubSubConsumerAdapter
Checks if the consumer is currently subscribed to the specified PubSub topic-partition.- Specified by:
hasSubscription
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition to check for subscription.- Returns:
- true if the consumer is subscribed to the given topic-partition, false otherwise.
-
getOffsets
public java.util.Map<PubSubTopicPartition,java.lang.Long> getOffsets()
-
pause
public void pause(PubSubTopicPartition pubSubTopicPartition)
Description copied from interface:PubSubConsumerAdapter
Pauses message consumption for the specified PubSub topic-partition. If the partition was not previously subscribed, this method is a no-op.- Specified by:
pause
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to pause message consumption.
-
resume
public void resume(PubSubTopicPartition pubSubTopicPartition)
Description copied from interface:PubSubConsumerAdapter
Resumes message consumption for the specified PubSub topic-partition. If the partition was not previously paused or if they were not subscribed at all, this method is a no-op.- Specified by:
resume
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic partition for which to resume message consumption.
-
getAssignment
public java.util.Set<PubSubTopicPartition> getAssignment()
Description copied from interface:PubSubConsumerAdapter
Retrieves the set of PubSub topic-partitions currently assigned to the consumer.- Specified by:
getAssignment
in interfacePubSubConsumerAdapter
- Returns:
- A set of PubSub topic-partitions representing the current assignment of the consumer.
-
offsetForTime
public java.lang.Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp, java.time.Duration timeout)
Description copied from interface:PubSubConsumerAdapter
Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found,null
will be returned for the partition.- Specified by:
offsetForTime
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to fetch the offset.timestamp
- The target timestamp to search for in milliseconds since the Unix epoch.timeout
- The maximum duration to wait for the operation to complete.- Returns:
- The offset of the first message with a timestamp greater than or equal to the target timestamp,
or
null
if no such message is found for the partition.
-
offsetForTime
public java.lang.Long offsetForTime(PubSubTopicPartition pubSubTopicPartition, long timestamp)
Description copied from interface:PubSubConsumerAdapter
Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found,null
will be returned for the partition.- Specified by:
offsetForTime
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic-partition for which to fetch the offset.timestamp
- The target timestamp to search for in milliseconds since the Unix epoch.- Returns:
- The offset of the first message with a timestamp greater than or equal to the target timestamp,
or
null
if no such message is found for the partition.
-
beginningOffset
public java.lang.Long beginningOffset(PubSubTopicPartition partition, java.time.Duration timeout)
Description copied from interface:PubSubConsumerAdapter
Retrieves the beginning offset for the specified PubSub topic-partition.- Specified by:
beginningOffset
in interfacePubSubConsumerAdapter
- Parameters:
partition
- The PubSub topic-partition for which to fetch the beginning offset.timeout
- The maximum duration to wait for the operation to complete.- Returns:
- The beginning offset of the specified topic-partition. If topic-partition exists but has no messages, the offset will be 0.
-
endOffsets
public java.util.Map<PubSubTopicPartition,java.lang.Long> endOffsets(java.util.Collection<PubSubTopicPartition> partitions, java.time.Duration timeout)
Description copied from interface:PubSubConsumerAdapter
Retrieves the end offsets for a collection of PubSub topic-partitions. The end offset represents the highest offset available in each specified partition, i.e., offset of the last message + 1. If there are no messages in a partition, the end offset will be 0.- Specified by:
endOffsets
in interfacePubSubConsumerAdapter
- Parameters:
partitions
- A collection of PubSub topic-partitions for which to fetch the end offsets.timeout
- The maximum duration to wait for the operation to complete.- Returns:
- A mapping of PubSub topic partitions to their respective end offsets, or an empty map if the offsets cannot be determined.
-
endOffset
public java.lang.Long endOffset(PubSubTopicPartition pubSubTopicPartition)
Description copied from interface:PubSubConsumerAdapter
Retrieves the end offset for the specified PubSub topic-partition. The end offset represents the highest offset available in each specified partition, i.e., offset of the last message + 1. If there are no messages in a partition, the end offset will be 0.- Specified by:
endOffset
in interfacePubSubConsumerAdapter
- Parameters:
pubSubTopicPartition
- The PubSub topic partition for which to fetch the end offset.- Returns:
- The end offset of the specified topic partition.
-
partitionsFor
public java.util.List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic topic)
Description copied from interface:PubSubConsumerAdapter
Retrieves the list of partitions associated with a given Pub-Sub topic.- Specified by:
partitionsFor
in interfacePubSubConsumerAdapter
- Parameters:
topic
- The Pub-Sub topic for which partition information is requested.- Returns:
- A list of
PubSubTopicPartitionInfo
representing the partitions of the topic, ornull
if the topic does not exist.
-
setMockInMemoryAdminAdapter
public void setMockInMemoryAdminAdapter(MockInMemoryAdminAdapter adminAdapter)
-
-