Interface PubSubConsumerAdapter
- All Superinterfaces:
AutoCloseable,Closeable
- All Known Implementing Classes:
ApacheKafkaConsumerAdapter,MockInMemoryConsumerAdapter
PubSubConstants.PUBSUB_CONSUMER_API_DEFAULT_TIMEOUT_MS.-
Method Summary
Modifier and TypeMethodDescriptionvoidbatchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicPartitionSet) Unsubscribes the consumer from a batch of topic-partitions.default PubSubPositionbeginningPosition(PubSubTopicPartition pubSubTopicPartition) beginningPosition(PubSubTopicPartition pubSubTopicPartition, Duration timeout) Retrieves the beginning position for the specified PubSub topic-partition.beginningPositions(Collection<PubSubTopicPartition> partitions, Duration timeout) voidclose()Closes the PubSub consumer and releases any associated resources.longcomparePositions(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2) Compares two PubSub positions within the specified topic partition.default PubSubPositiondecodePosition(PubSubTopicPartition partition, int positionTypeId, byte[] data) Decodes the given type-encoded byte array into aPubSubPositionfor the specified topic partition.decodePosition(PubSubTopicPartition partition, int positionTypeId, ByteBuffer buffer) Decodes the givenByteBufferinto aPubSubPositionfor the specified topic partition.endPosition(PubSubTopicPartition pubSubTopicPartition) Retrieves the end position for the specified PubSub topic-partition.endPositions(Collection<PubSubTopicPartition> partitions, Duration timeout) Retrieves the end positions for a collection of PubSub topic-partitions.Retrieves the set of PubSub topic-partitions currently assigned to the consumer.default longgetLatestOffset(PubSubTopicPartition pubSubTopicPartition) Retrieves the latest available offset for a PubSub topic partition.default longgetOffsetLag(PubSubTopicPartition pubSubTopicPartition) Retrieves the consuming offset lag for a PubSub topic partition.getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp) Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition.getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout) Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition.booleanChecks if the consumer has any active topic-partition subscriptions.booleanhasSubscription(PubSubTopicPartition pubSubTopicPartition) Checks if the consumer is currently subscribed to the specified PubSub topic-partition.partitionsFor(PubSubTopic pubSubTopic) Retrieves the list of partitions associated with a given Pub-Sub topic.voidpause(PubSubTopicPartition pubSubTopicPartition) Pauses message consumption for the specified PubSub topic-partition.poll(long timeoutMs) Polls the Kafka consumer for messages from the subscribed topic partitions within the specified time duration.longpositionDifference(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2) Computes the relative difference between twoPubSubPositioninstances for a givenPubSubTopicPartition, asposition1 - position2.voidresetOffset(PubSubTopicPartition pubSubTopicPartition) Resets the offset for a specific topic-partition, seeking it to the beginning of the topic.voidresume(PubSubTopicPartition pubSubTopicPartition) Resumes message consumption for the specified PubSub topic-partition.voidsubscribe(PubSubTopicPartition pubSubTopicPartition, PubSubPosition lastReadPubSubPosition) Subscribes to a specified topic-partition if it is not already subscribed.voidsubscribe(PubSubTopicPartition pubSubTopicPartition, PubSubPosition position, boolean isInclusive) Subscribes to a specified topic-partition if it is not already subscribed.voidunSubscribe(PubSubTopicPartition pubSubTopicPartition) Unsubscribes the consumer from a specified topic-partition.
-
Method Details
-
subscribe
void subscribe(@Nonnull PubSubTopicPartition pubSubTopicPartition, @Nonnull PubSubPosition lastReadPubSubPosition) Subscribes to a specified topic-partition if it is not already subscribed. If the topic-partition is already subscribed, this method performs no action. The subscription uses the providedPubSubPositionto determine the starting offset for consumption. If the position isPubSubSymbolicPosition.EARLIEST, the consumer will seek to the earliest available message. If it isPubSubSymbolicPosition.LATEST, the consumer will seek to the latest available message. If a custom position is provided, implementations should resolve it to the corresponding offset or position in the underlying pub-sub system. Implementations of this interface should ensure proper validation of the topic-partition existence and manage consumer assignments. This method does not guarantee immediate subscription state changes and may defer them based on implementation details.- Parameters:
pubSubTopicPartition- the topic-partition to subscribe tolastReadPubSubPosition- the last known position for the topic-partition- Throws:
IllegalArgumentException- if lastReadPubSubPosition is null or of an unsupported typePubSubTopicDoesNotExistException- if the specified topic does not exist
-
subscribe
void subscribe(@Nonnull PubSubTopicPartition pubSubTopicPartition, @Nonnull PubSubPosition position, boolean isInclusive) Subscribes to a specified topic-partition if it is not already subscribed. If the topic-partition is already subscribed, this method performs no action.The subscription uses the provided
PubSubPositionto determine the starting position for consumption. If the position isPubSubSymbolicPosition.EARLIEST, the consumer will seek to the earliest available message. If it isPubSubSymbolicPosition.LATEST, the consumer will seek to the latest available message. If a concrete position is provided, implementations should resolve it to a specific offset or internal position based on the underlying pub-sub system.The
inclusiveflag determines whether the message at the specified position (if resolvable to an offset or equivalent) should be included in consumption:- If
true, the consumer should begin from the exact position specified. - If
false, consumption should begin immediately after the specified position.
Implementations should validate the topic-partition's existence and handle any necessary assignment or internal state initialization. The method does not guarantee immediate subscription effect and may defer changes depending on the consumer's execution model.
- Parameters:
pubSubTopicPartition- the topic-partition to subscribe toposition- the position from which consumption should beginisInclusive- whether to include the message at the given position- Throws:
IllegalArgumentException- ifpositionis null or of an unsupported typePubSubTopicDoesNotExistException- if the specified topic does not exist
- If
-
unSubscribe
Unsubscribes the consumer from a specified topic-partition. If the consumer was previously subscribed to the given partition, it will be unsubscribed, and the associated partition assignments and tracked offsets will be updated accordingly.- Parameters:
pubSubTopicPartition- The PubSub topic-partition to unsubscribe from.
-
batchUnsubscribe
Unsubscribes the consumer from a batch of topic-partitions.- Parameters:
pubSubTopicPartitionSet- A set of topic-partitions to unsubscribe from.
-
resetOffset
void resetOffset(PubSubTopicPartition pubSubTopicPartition) throws PubSubUnsubscribedTopicPartitionException Resets the offset for a specific topic-partition, seeking it to the beginning of the topic. If the topic partition is not currently subscribed, aPubSubUnsubscribedTopicPartitionExceptionis thrown.- Parameters:
pubSubTopicPartition- The PubSub topic-partition for which to reset the offset.- Throws:
PubSubUnsubscribedTopicPartitionException- If the specified topic-partition is not currently subscribed.
-
close
void close()Closes the PubSub consumer and releases any associated resources.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable
-
poll
Polls the Kafka consumer for messages from the subscribed topic partitions within the specified time duration.- Parameters:
timeoutMs- The maximum time, in milliseconds, to wait for messages to be polled.- Returns:
- A mapping of PubSub topic partitions to lists of PubSub messages retrieved from Kafka.
- Throws:
PubSubClientException- If there is an error during message retrieval from Kafka.PubSubClientRetriableException- If a retriable exception occurs during polling attempts, with retries as configured.
-
hasAnySubscription
boolean hasAnySubscription()Checks if the consumer has any active topic-partition subscriptions.- Returns:
- true if the consumer is subscribed to one or more topic partitions, false otherwise.
-
hasSubscription
Checks if the consumer is currently subscribed to the specified PubSub topic-partition.- Parameters:
pubSubTopicPartition- The PubSub topic-partition to check for subscription.- Returns:
- true if the consumer is subscribed to the given topic-partition, false otherwise.
-
pause
Pauses message consumption for the specified PubSub topic-partition. If the partition was not previously subscribed, this method is a no-op.- Parameters:
pubSubTopicPartition- The PubSub topic-partition for which to pause message consumption.
-
resume
Resumes message consumption for the specified PubSub topic-partition. If the partition was not previously paused or if they were not subscribed at all, this method is a no-op.- Parameters:
pubSubTopicPartition- The PubSub topic partition for which to resume message consumption.
-
getAssignment
Set<PubSubTopicPartition> getAssignment()Retrieves the set of PubSub topic-partitions currently assigned to the consumer.- Returns:
- A set of PubSub topic-partitions representing the current assignment of the consumer.
-
getOffsetLag
Retrieves the consuming offset lag for a PubSub topic partition. The offset lag represents the difference between the last consumed message offset and the latest available message offset for the partition.- Parameters:
pubSubTopicPartition- The PubSub topic partition for which to fetch the offset lag.- Returns:
- The offset lag, which is zero or a positive value if a valid lag was collected by the consumer, or -1 if the lag cannot be determined or is not applicable.
-
getLatestOffset
Retrieves the latest available offset for a PubSub topic partition.- Parameters:
pubSubTopicPartition- The PubSub topic partition for which to fetch the latest offset.- Returns:
- The latest offset, which is zero or a positive value if an offset was collected by the consumer, or -1 if the offset cannot be determined or is not applicable.
-
getPositionByTimestamp
PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout) Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found,nullwill be returned for the partition.- Parameters:
pubSubTopicPartition- The PubSub topic-partition for which to fetch the offset.timestamp- The target timestamp to search for in milliseconds since the Unix epoch.timeout- The maximum duration to wait for the operation to complete.- Returns:
- The offset of the first message with a timestamp greater than or equal to the target timestamp,
or
nullif no such message is found for the partition. - Throws:
PubSubOpTimeoutException- If the operation times out while fetching the offset.PubSubClientException- If there is an error while attempting to fetch the offset.
-
getPositionByTimestamp
Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found,nullwill be returned for the partition.- Parameters:
pubSubTopicPartition- The PubSub topic-partition for which to fetch the offset.timestamp- The target timestamp to search for in milliseconds since the Unix epoch.- Returns:
- The offset of the first message with a timestamp greater than or equal to the target timestamp,
or
nullif no such message is found for the partition. - Throws:
PubSubOpTimeoutException- If the operation times out while fetching the offset.PubSubClientException- If there is an error while attempting to fetch the offset.
-
beginningPosition
Retrieves the beginning position for the specified PubSub topic-partition.- Parameters:
pubSubTopicPartition- The PubSub topic-partition for which to fetch the beginning offset.timeout- The maximum duration to wait for the operation to complete.- Returns:
- The beginning offset of the specified topic-partition. If topic-partition exists but has no messages, the offset will be 0.
- Throws:
PubSubOpTimeoutException- If the operation times out while fetching the beginning offset.PubSubClientException- If there is an error while attempting to fetch the beginning offset.
-
beginningPosition
-
beginningPositions
Map<PubSubTopicPartition,PubSubPosition> beginningPositions(Collection<PubSubTopicPartition> partitions, Duration timeout) -
endPositions
Map<PubSubTopicPartition,PubSubPosition> endPositions(Collection<PubSubTopicPartition> partitions, Duration timeout) Retrieves the end positions for a collection of PubSub topic-partitions. The end position represents the highest position available in each specified partition, i.e., position of the last message + 1. If there are no messages in a partition, the end position will be equal to the beginning position.- Parameters:
partitions- A collection of PubSub topic-partitions for which to fetch the end positions.timeout- The maximum duration to wait for the operation to complete.- Returns:
- A mapping of PubSub topic partitions to their respective end positions, or an empty map if the positions cannot be determined.
- Throws:
PubSubOpTimeoutException- If the operation times out while fetching the end positions.PubSubClientException- If there is an error while attempting to fetch the end positions.
-
endPosition
Retrieves the end position for the specified PubSub topic-partition. The end position represents the highest position available in each specified partition, i.e., position of the last message + 1. If there are no messages in a partition, the end position will be equal to the beginning position.- Parameters:
pubSubTopicPartition- The PubSub topic partition for which to fetch the end position.- Returns:
- The end offset of the specified topic partition.
- Throws:
PubSubOpTimeoutException- If the operation times out while fetching the end position.PubSubClientException- If there is an error while attempting to fetch the end position.
-
partitionsFor
Retrieves the list of partitions associated with a given Pub-Sub topic.- Parameters:
pubSubTopic- The Pub-Sub topic for which partition information is requested.- Returns:
- A list of
PubSubTopicPartitionInforepresenting the partitions of the topic, ornullif the topic does not exist.
-
comparePositions
long comparePositions(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2) Compares two PubSub positions within the specified topic partition.- Parameters:
partition- The topic partition where the comparison is being performed.position1- The first PubSub position.position2- The second PubSub position.- Returns:
- A negative value if
position1is behindposition2, zero if equal, or a positive value ifposition1is ahead ofposition2.
-
positionDifference
long positionDifference(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2) Computes the relative difference between twoPubSubPositioninstances for a givenPubSubTopicPartition, asposition1 - position2.Implementations must resolve symbolic positions such as
PubSubSymbolicPosition.EARLIESTandPubSubSymbolicPosition.LATESTto concrete positions based on the partition's start and end positions. This ensures that symbolic references can be treated consistently during subtraction.For example:
- If both positions are concrete, the result is the logical offset difference between them.
- If
position1is symbolic (e.g., EARLIEST), it must be resolved to the concrete beginning position. - If
position2is symbolic (e.g., LATEST), it must be resolved to the concrete end position.
- Parameters:
partition- The topic partition for which the difference is calculated.position1- The first PubSub position (minuend).position2- The second PubSub position (subtrahend).- Returns:
- The signed offset difference between
position1andposition2. - Throws:
IllegalArgumentException- if either position isnull, or if symbolic positions cannot be resolved.
-
decodePosition
default PubSubPosition decodePosition(PubSubTopicPartition partition, int positionTypeId, byte[] data) Decodes the given type-encoded byte array into aPubSubPositionfor the specified topic partition.- Parameters:
partition- The topic partition this position belongs to.positionTypeId- The type ID of the position, which indicates how to decode the byte array.data- The byte array containing the encoded position.- Returns:
- The decoded
PubSubPosition. - Throws:
IllegalArgumentException- if the data cannot be decoded into a validPubSubPosition.
-
decodePosition
PubSubPosition decodePosition(PubSubTopicPartition partition, int positionTypeId, ByteBuffer buffer) Decodes the givenByteBufferinto aPubSubPositionfor the specified topic partition.- Parameters:
partition- The topic partition this position belongs to.positionTypeId- The type ID of the position, which indicates how to decode the byte buffer.buffer- TheByteBuffercontaining the encoded position.- Returns:
- The decoded
PubSubPosition. - Throws:
IllegalArgumentException- if the buffer cannot be decoded into a validPubSubPosition.
-