Class KafkaConsumerService
- java.lang.Object
-
- com.linkedin.venice.service.AbstractVeniceService
-
- com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
-
- com.linkedin.davinci.kafka.consumer.KafkaConsumerService
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
- Direct Known Subclasses:
PartitionWiseKafkaConsumerService
,TopicWiseKafkaConsumerService
public abstract class KafkaConsumerService extends AbstractKafkaConsumerService
KafkaConsumerService
is used to manage a pool of consumption-related resources connected to a specific Kafka cluster. The reasons to have this pool are: 1. To reduce the unnecessary overhead of having one consumer per store-version, each of which includes the internal IO threads/connections to brokers and internal buffers; 2. To reduce the GC overhead when there are a lot of store versions bootstrapping/ingesting at the same time; 3. To have a predictable and configurable upper bound on the total amount of resources occupied by consumers become, no matter how many store-versions are being hosted in the same instance; The responsibilities of this class include: 1. Setting up a fixed size pool of consumption unit, where each unit contains exactly one: a)SharedKafkaConsumer
b)ConsumptionTask
c)ConsumerSubscriptionCleaner
2. Receive various calls to interrogate or mutate consumer state, and delegate them to the correct unit, by maintaining a mapping of which unit belongs to which version-topic and subscribed topic-partition. Notably, thestartConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, long, ConsumedDataReceiver)
function allows the caller to start funneling consumed data into a receiver (i.e. into another task). 3. Provide a single abstract function that must be overridden by subclasses in order to implement a consumption load balancing strategy:pickConsumerForPartition(PubSubTopic, PubSubTopicPartition)
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
KafkaConsumerService.ConsumerAssignmentStrategy
This consumer assignment strategy specify how consumers from consumer pool are allocated.-
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
-
Field Summary
Fields Modifier and Type Field Description protected AggKafkaConsumerServiceStats
aggStats
protected IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,com.linkedin.davinci.kafka.consumer.ConsumptionTask>
consumerToConsumptionTask
protected java.util.Map<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,java.util.concurrent.locks.ReentrantLock>
consumerToLocks
This read-only per consumer lock is for protecting the partition unsubscription and data receiver setting operations.protected java.lang.String
kafkaUrl
protected java.lang.String
kafkaUrlForLogger
protected ConsumerPoolType
poolType
protected java.util.Map<PubSubTopic,java.util.Map<PubSubTopicPartition,com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>>
versionTopicToTopicPartitionToConsumer
-
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
-
Constructor Summary
Constructors Modifier Constructor Description protected
KafkaConsumerService(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, java.util.Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, java.lang.String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, TopicExistenceChecker topicExistenceChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled)
-
Method Summary
All Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer
assignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)
This function assigns a consumer for the givenStoreIngestionTask
and returns the assigned consumer.void
batchUnsubscribe(PubSubTopic versionTopic, java.util.Set<PubSubTopicPartition> topicPartitionsToUnSub)
com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer
getConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)
java.util.Map<PubSubTopicPartition,TopicPartitionIngestionInfo>
getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
long
getLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
long
getMaxElapsedTimeMSSinceLastPollInConsumerPool()
long
getOffsetLagBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
boolean
hasAnySubscriptionFor(PubSubTopic versionTopic)
protected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer
pickConsumerForPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)
protected void
removeTopicPartitionFromConsumptionTask(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition)
void
setThreadFactory(RandomAccessDaemonThreadFactory threadFactory)
void
startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastReadOffset, ConsumedDataReceiver<java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>> consumedDataReceiver)
boolean
startInner()
void
stopInner()
void
unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs)
Stop specific subscription associated with the given version topic.void
unsubscribeAll(PubSubTopic versionTopic)
Stop all subscription associated with the given version topic.-
Methods inherited from class com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
unSubscribe
-
-
-
-
Field Detail
-
kafkaUrl
protected final java.lang.String kafkaUrl
-
kafkaUrlForLogger
protected final java.lang.String kafkaUrlForLogger
-
poolType
protected final ConsumerPoolType poolType
-
aggStats
protected final AggKafkaConsumerServiceStats aggStats
-
consumerToConsumptionTask
protected final IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,com.linkedin.davinci.kafka.consumer.ConsumptionTask> consumerToConsumptionTask
-
versionTopicToTopicPartitionToConsumer
protected final java.util.Map<PubSubTopic,java.util.Map<PubSubTopicPartition,com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>> versionTopicToTopicPartitionToConsumer
-
consumerToLocks
protected final java.util.Map<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,java.util.concurrent.locks.ReentrantLock> consumerToLocks
This read-only per consumer lock is for protecting the partition unsubscription and data receiver setting operations. Using consumer intrinsic lock may cause race condition, refer https://github.com/linkedin/venice/pull/1308
-
-
Constructor Detail
-
KafkaConsumerService
protected KafkaConsumerService(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, java.util.Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, java.lang.String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, TopicExistenceChecker topicExistenceChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled)
- Parameters:
statsOverride
- injection of stats, for test purposes
-
-
Method Detail
-
getConsumerAssignedToVersionTopicPartition
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)
- Specified by:
getConsumerAssignedToVersionTopicPartition
in classAbstractKafkaConsumerService
-
assignConsumerFor
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer assignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)
This function assigns a consumer for the givenStoreIngestionTask
and returns the assigned consumer. Must be idempotent and thus return previously a assigned consumer (for the same params) if any exists.- Specified by:
assignConsumerFor
in classAbstractKafkaConsumerService
-
pickConsumerForPartition
protected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer pickConsumerForPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition)
-
removeTopicPartitionFromConsumptionTask
protected void removeTopicPartitionFromConsumptionTask(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition)
-
unsubscribeAll
public void unsubscribeAll(PubSubTopic versionTopic)
Stop all subscription associated with the given version topic.- Specified by:
unsubscribeAll
in classAbstractKafkaConsumerService
-
unSubscribe
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs)
Stop specific subscription associated with the given version topic.- Specified by:
unSubscribe
in classAbstractKafkaConsumerService
-
batchUnsubscribe
public void batchUnsubscribe(PubSubTopic versionTopic, java.util.Set<PubSubTopicPartition> topicPartitionsToUnSub)
- Specified by:
batchUnsubscribe
in classAbstractKafkaConsumerService
-
startInner
public boolean startInner()
- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
stopInner
public void stopInner() throws java.lang.Exception
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
java.lang.Exception
-
hasAnySubscriptionFor
public boolean hasAnySubscriptionFor(PubSubTopic versionTopic)
- Specified by:
hasAnySubscriptionFor
in classAbstractKafkaConsumerService
-
getMaxElapsedTimeMSSinceLastPollInConsumerPool
public long getMaxElapsedTimeMSSinceLastPollInConsumerPool()
- Specified by:
getMaxElapsedTimeMSSinceLastPollInConsumerPool
in classAbstractKafkaConsumerService
-
startConsumptionIntoDataReceiver
public void startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastReadOffset, ConsumedDataReceiver<java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>> consumedDataReceiver)
- Specified by:
startConsumptionIntoDataReceiver
in classAbstractKafkaConsumerService
-
getOffsetLagBasedOnMetrics
public long getOffsetLagBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
- Specified by:
getOffsetLagBasedOnMetrics
in classAbstractKafkaConsumerService
-
getLatestOffsetBasedOnMetrics
public long getLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
- Specified by:
getLatestOffsetBasedOnMetrics
in classAbstractKafkaConsumerService
-
getIngestionInfoFor
public java.util.Map<PubSubTopicPartition,TopicPartitionIngestionInfo> getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
- Specified by:
getIngestionInfoFor
in classAbstractKafkaConsumerService
-
setThreadFactory
public void setThreadFactory(RandomAccessDaemonThreadFactory threadFactory)
-
-