Class ApacheKafkaConsumerAdapter

java.lang.Object
com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter
All Implemented Interfaces:
PubSubConsumerAdapter, Closeable, AutoCloseable

public class ApacheKafkaConsumerAdapter extends Object implements PubSubConsumerAdapter
This class is not thread safe because of the internal KafkaConsumer is not thread safe. It is the responsibility of the caller to ensure that the methods are called in a thread safe manner.
  • Method Details

    • subscribe

      public void subscribe(@Nonnull PubSubTopicPartition pubSubTopicPartition, @Nonnull PubSubPosition lastReadPubSubPosition)
      Subscribes to a specified topic-partition if it is not already subscribed. If the topic-partition is already subscribed, this method performs no action. The subscription uses the provided PubSubPosition to determine the starting offset for consumption. If the position is PubSubSymbolicPosition.EARLIEST, the consumer will seek to the earliest available message. If it is PubSubSymbolicPosition.LATEST, the consumer will seek to the latest offset. If an instance of ApacheKafkaOffsetPosition is provided, the consumer will seek to the specified offset plus one.
      Specified by:
      subscribe in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - the topic-partition to subscribe to
      lastReadPubSubPosition - the last known position for the topic-partition
      Throws:
      IllegalArgumentException - if lastReadPubSubPosition is null or not an instance of ApacheKafkaOffsetPosition
      PubSubTopicDoesNotExistException - if the specified topic does not exist
    • subscribe

      public void subscribe(@Nonnull PubSubTopicPartition pubSubTopicPartition, @Nonnull PubSubPosition position, boolean isInclusive)
      Description copied from interface: PubSubConsumerAdapter
      Subscribes to a specified topic-partition if it is not already subscribed. If the topic-partition is already subscribed, this method performs no action.

      The subscription uses the provided PubSubPosition to determine the starting position for consumption. If the position is PubSubSymbolicPosition.EARLIEST, the consumer will seek to the earliest available message. If it is PubSubSymbolicPosition.LATEST, the consumer will seek to the latest available message. If a concrete position is provided, implementations should resolve it to a specific offset or internal position based on the underlying pub-sub system.

      The inclusive flag determines whether the message at the specified position (if resolvable to an offset or equivalent) should be included in consumption:

      • If true, the consumer should begin from the exact position specified.
      • If false, consumption should begin immediately after the specified position.

      Implementations should validate the topic-partition's existence and handle any necessary assignment or internal state initialization. The method does not guarantee immediate subscription effect and may defer changes depending on the consumer's execution model.

      Specified by:
      subscribe in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - the topic-partition to subscribe to
      position - the position from which consumption should begin
      isInclusive - whether to include the message at the given position
    • unSubscribe

      public void unSubscribe(PubSubTopicPartition pubSubTopicPartition)
      Description copied from interface: PubSubConsumerAdapter
      Unsubscribes the consumer from a specified topic-partition. If the consumer was previously subscribed to the given partition, it will be unsubscribed, and the associated partition assignments and tracked offsets will be updated accordingly.
      Specified by:
      unSubscribe in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic-partition to unsubscribe from.
    • batchUnsubscribe

      public void batchUnsubscribe(Set<PubSubTopicPartition> pubSubTopicPartitionsToUnsubscribe)
      Description copied from interface: PubSubConsumerAdapter
      Unsubscribes the consumer from a batch of topic-partitions.
      Specified by:
      batchUnsubscribe in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartitionsToUnsubscribe - A set of topic-partitions to unsubscribe from.
    • resetOffset

      public void resetOffset(PubSubTopicPartition pubSubTopicPartition)
      Description copied from interface: PubSubConsumerAdapter
      Resets the offset for a specific topic-partition, seeking it to the beginning of the topic. If the topic partition is not currently subscribed, a PubSubUnsubscribedTopicPartitionException is thrown.
      Specified by:
      resetOffset in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic-partition for which to reset the offset.
    • poll

      public Map<PubSubTopicPartition,List<DefaultPubSubMessage>> poll(long timeoutMs)
      Description copied from interface: PubSubConsumerAdapter
      Polls the Kafka consumer for messages from the subscribed topic partitions within the specified time duration.
      Specified by:
      poll in interface PubSubConsumerAdapter
      Parameters:
      timeoutMs - The maximum time, in milliseconds, to wait for messages to be polled.
      Returns:
      A mapping of PubSub topic partitions to lists of PubSub messages retrieved from Kafka.
    • hasAnySubscription

      public boolean hasAnySubscription()
      Description copied from interface: PubSubConsumerAdapter
      Checks if the consumer has any active topic-partition subscriptions.
      Specified by:
      hasAnySubscription in interface PubSubConsumerAdapter
      Returns:
      true if the consumer is subscribed to one or more topic partitions, false otherwise.
    • hasSubscription

      public boolean hasSubscription(PubSubTopicPartition pubSubTopicPartition)
      Description copied from interface: PubSubConsumerAdapter
      Checks if the consumer is currently subscribed to the specified PubSub topic-partition.
      Specified by:
      hasSubscription in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic-partition to check for subscription.
      Returns:
      true if the consumer is subscribed to the given topic-partition, false otherwise.
    • pause

      public void pause(PubSubTopicPartition pubSubTopicPartition)
      If the partitions were not previously subscribed, this method is a no-op.
      Specified by:
      pause in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic-partition for which to pause message consumption.
    • resume

      public void resume(PubSubTopicPartition pubSubTopicPartition)
      If the partitions were not previously paused or if they were not subscribed at all, this method is a no-op.
      Specified by:
      resume in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic partition for which to resume message consumption.
    • getAssignment

      public Set<PubSubTopicPartition> getAssignment()
      Description copied from interface: PubSubConsumerAdapter
      Retrieves the set of PubSub topic-partitions currently assigned to the consumer.
      Specified by:
      getAssignment in interface PubSubConsumerAdapter
      Returns:
      A set of PubSub topic-partitions representing the current assignment of the consumer.
    • close

      public void close()
      Description copied from interface: PubSubConsumerAdapter
      Closes the PubSub consumer and releases any associated resources.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
      Specified by:
      close in interface PubSubConsumerAdapter
    • getOffsetLag

      public long getOffsetLag(PubSubTopicPartition pubSubTopicPartition)
      Description copied from interface: PubSubConsumerAdapter
      Retrieves the consuming offset lag for a PubSub topic partition. The offset lag represents the difference between the last consumed message offset and the latest available message offset for the partition.
      Specified by:
      getOffsetLag in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic partition for which to fetch the offset lag.
      Returns:
      The offset lag, which is zero or a positive value if a valid lag was collected by the consumer, or -1 if the lag cannot be determined or is not applicable.
    • getLatestOffset

      public long getLatestOffset(PubSubTopicPartition pubSubTopicPartition)
      Returns the latest offset for the given topic-partition. The latest offsets are derived from the lag metric and may be outdated or imprecise.
      Specified by:
      getLatestOffset in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - the topic-partition for which the latest offset is requested
      Returns:
      the latest offset, or -1 if tracking is unavailable
    • getPositionByTimestamp

      public PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp, Duration timeout)
      Description copied from interface: PubSubConsumerAdapter
      Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found, null will be returned for the partition.
      Specified by:
      getPositionByTimestamp in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic-partition for which to fetch the offset.
      timestamp - The target timestamp to search for in milliseconds since the Unix epoch.
      timeout - The maximum duration to wait for the operation to complete.
      Returns:
      The offset of the first message with a timestamp greater than or equal to the target timestamp, or null if no such message is found for the partition.
    • getPositionByTimestamp

      public PubSubPosition getPositionByTimestamp(PubSubTopicPartition pubSubTopicPartition, long timestamp)
      Description copied from interface: PubSubConsumerAdapter
      Retrieves the offset of the first message with a timestamp greater than or equal to the target timestamp for the specified PubSub topic-partition. If no such message is found, null will be returned for the partition.
      Specified by:
      getPositionByTimestamp in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic-partition for which to fetch the offset.
      timestamp - The target timestamp to search for in milliseconds since the Unix epoch.
      Returns:
      The offset of the first message with a timestamp greater than or equal to the target timestamp, or null if no such message is found for the partition.
    • beginningPosition

      public PubSubPosition beginningPosition(PubSubTopicPartition pubSubTopicPartition, Duration timeout)
      Description copied from interface: PubSubConsumerAdapter
      Retrieves the beginning position for the specified PubSub topic-partition.
      Specified by:
      beginningPosition in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic-partition for which to fetch the beginning offset.
      timeout - The maximum duration to wait for the operation to complete.
      Returns:
      The beginning offset of the specified topic-partition. If topic-partition exists but has no messages, the offset will be 0.
    • beginningPositions

      public Map<PubSubTopicPartition,PubSubPosition> beginningPositions(Collection<PubSubTopicPartition> partitions, Duration timeout)
      Specified by:
      beginningPositions in interface PubSubConsumerAdapter
    • endPositions

      public Map<PubSubTopicPartition,PubSubPosition> endPositions(Collection<PubSubTopicPartition> partitions, Duration timeout)
      Description copied from interface: PubSubConsumerAdapter
      Retrieves the end positions for a collection of PubSub topic-partitions. The end position represents the highest position available in each specified partition, i.e., position of the last message + 1. If there are no messages in a partition, the end position will be equal to the beginning position.
      Specified by:
      endPositions in interface PubSubConsumerAdapter
      Parameters:
      partitions - A collection of PubSub topic-partitions for which to fetch the end positions.
      timeout - The maximum duration to wait for the operation to complete.
      Returns:
      A mapping of PubSub topic partitions to their respective end positions, or an empty map if the positions cannot be determined.
    • endPosition

      public PubSubPosition endPosition(PubSubTopicPartition pubSubTopicPartition)
      Description copied from interface: PubSubConsumerAdapter
      Retrieves the end position for the specified PubSub topic-partition. The end position represents the highest position available in each specified partition, i.e., position of the last message + 1. If there are no messages in a partition, the end position will be equal to the beginning position.
      Specified by:
      endPosition in interface PubSubConsumerAdapter
      Parameters:
      pubSubTopicPartition - The PubSub topic partition for which to fetch the end position.
      Returns:
      The end offset of the specified topic partition.
    • partitionsFor

      public List<PubSubTopicPartitionInfo> partitionsFor(PubSubTopic topic)
      Retrieves the list of partitions associated with a given Pub-Sub topic.
      Specified by:
      partitionsFor in interface PubSubConsumerAdapter
      Parameters:
      topic - The Pub-Sub topic for which partition information is requested.
      Returns:
      A list of PubSubTopicPartitionInfo representing the partitions of the topic, or null if the topic does not exist.
    • comparePositions

      public long comparePositions(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2)
      Compares two PubSubPosition instances for a given PubSubTopicPartition.

      Special symbolic positions are handled with the following order:

      If both positions are concrete (e.g., ApacheKafkaOffsetPosition), they must be of the same type and will be compared based on their offset values.
      Specified by:
      comparePositions in interface PubSubConsumerAdapter
      Parameters:
      partition - the topic partition context (not used in current implementation, but required for interface compatibility)
      position1 - the first position to compare (must not be null)
      position2 - the second position to compare (must not be null)
      Returns:
      a negative value if position1 is less than position2, zero if equal, or positive if greater
      Throws:
      IllegalArgumentException - if either position is null or unsupported
    • positionDifference

      public long positionDifference(PubSubTopicPartition partition, PubSubPosition position1, PubSubPosition position2)
      Description copied from interface: PubSubConsumerAdapter
      Computes the relative difference between two PubSubPosition instances for a given PubSubTopicPartition, as position1 - position2.

      Implementations must resolve symbolic positions such as PubSubSymbolicPosition.EARLIEST and PubSubSymbolicPosition.LATEST to concrete positions based on the partition's start and end positions. This ensures that symbolic references can be treated consistently during subtraction.

      For example:

      • If both positions are concrete, the result is the logical offset difference between them.
      • If position1 is symbolic (e.g., EARLIEST), it must be resolved to the concrete beginning position.
      • If position2 is symbolic (e.g., LATEST), it must be resolved to the concrete end position.
      Specified by:
      positionDifference in interface PubSubConsumerAdapter
      Parameters:
      partition - The topic partition for which the difference is calculated.
      position1 - The first PubSub position (minuend).
      position2 - The second PubSub position (subtrahend).
      Returns:
      The signed offset difference between position1 and position2.
    • decodePosition

      public PubSubPosition decodePosition(PubSubTopicPartition partition, int positionTypeId, ByteBuffer buffer)
      Description copied from interface: PubSubConsumerAdapter
      Decodes the given ByteBuffer into a PubSubPosition for the specified topic partition.
      Specified by:
      decodePosition in interface PubSubConsumerAdapter
      Parameters:
      partition - The topic partition this position belongs to.
      positionTypeId - The type ID of the position, which indicates how to decode the byte buffer.
      buffer - The ByteBuffer containing the encoded position.
      Returns:
      The decoded PubSubPosition.