Class KafkaConsumerService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
com.linkedin.davinci.kafka.consumer.KafkaConsumerService
- All Implemented Interfaces:
Closeable
,AutoCloseable
- Direct Known Subclasses:
PartitionWiseKafkaConsumerService
,TopicWiseKafkaConsumerService
KafkaConsumerService
is used to manage a pool of consumption-related resources connected to a specific Kafka
cluster.
The reasons to have this pool are:
1. To reduce the unnecessary overhead of having one consumer per store-version, each of which includes the internal
IO threads/connections to brokers and internal buffers;
2. To reduce the GC overhead when there are a lot of store versions bootstrapping/ingesting at the same time;
3. To have a predictable and configurable upper bound on the total amount of resources occupied by consumers become,
no matter how many store-versions are being hosted in the same instance;
The responsibilities of this class include:
1. Setting up a fixed size pool of consumption unit, where each unit contains exactly one:
a) SharedKafkaConsumer
b) ConsumptionTask
c) ConsumerSubscriptionCleaner
2. Receive various calls to interrogate or mutate consumer state, and delegate them to the correct unit, by
maintaining a mapping of which unit belongs to which version-topic and subscribed topic-partition. Notably,
the startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, long, ConsumedDataReceiver)
function allows the
caller to start funneling consumed data into a receiver (i.e. into another task).
3. Provide a single abstract function that must be overridden by subclasses in order to implement a consumption
load balancing strategy: pickConsumerForPartition(PubSubTopic, PubSubTopicPartition)
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic enum
This consumer assignment strategy specify how consumers from consumer pool are allocated.Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
Field Summary
Modifier and TypeFieldDescriptionprotected final AggKafkaConsumerServiceStats
protected final IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,
com.linkedin.davinci.kafka.consumer.ConsumptionTask> protected final Map<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,
ReentrantLock> This read-only per consumer lock is for protecting the partition unsubscription and data receiver setting operations.protected final String
protected final String
protected final ConsumerPoolType
protected final Map<PubSubTopic,
Map<PubSubTopicPartition, com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>> Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
Constructor Summary
ModifierConstructorDescriptionprotected
KafkaConsumerService
(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, TopicExistenceChecker topicExistenceChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled) -
Method Summary
Modifier and TypeMethodDescriptioncom.linkedin.davinci.kafka.consumer.SharedKafkaConsumer
assignConsumerFor
(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) This function assigns a consumer for the givenStoreIngestionTask
and returns the assigned consumer.void
batchUnsubscribe
(PubSubTopic versionTopic, Set<PubSubTopicPartition> topicPartitionsToUnSub) com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer
getConsumerAssignedToVersionTopicPartition
(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) getIngestionInfoFor
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) long
getLatestOffsetBasedOnMetrics
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) long
long
getOffsetLagBasedOnMetrics
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) boolean
hasAnySubscriptionFor
(PubSubTopic versionTopic) protected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer
pickConsumerForPartition
(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) protected void
removeTopicPartitionFromConsumptionTask
(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition) void
setThreadFactory
(RandomAccessDaemonThreadFactory threadFactory) void
startConsumptionIntoDataReceiver
(PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastReadOffset, ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver) boolean
void
void
unSubscribe
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) Stop specific subscription associated with the given version topic.void
unsubscribeAll
(PubSubTopic versionTopic) Stop all subscription associated with the given version topic.Methods inherited from class com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
unSubscribe
-
Field Details
-
kafkaUrl
-
kafkaUrlForLogger
-
poolType
-
aggStats
-
consumerToConsumptionTask
protected final IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,com.linkedin.davinci.kafka.consumer.ConsumptionTask> consumerToConsumptionTask -
versionTopicToTopicPartitionToConsumer
protected final Map<PubSubTopic,Map<PubSubTopicPartition, versionTopicToTopicPartitionToConsumercom.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>> -
consumerToLocks
protected final Map<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,ReentrantLock> consumerToLocksThis read-only per consumer lock is for protecting the partition unsubscription and data receiver setting operations. Using consumer intrinsic lock may cause race condition, refer https://github.com/linkedin/venice/pull/1308
-
-
Constructor Details
-
KafkaConsumerService
protected KafkaConsumerService(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, TopicExistenceChecker topicExistenceChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled) - Parameters:
statsOverride
- injection of stats, for test purposes
-
-
Method Details
-
getConsumerAssignedToVersionTopicPartition
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) - Specified by:
getConsumerAssignedToVersionTopicPartition
in classAbstractKafkaConsumerService
-
assignConsumerFor
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer assignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) This function assigns a consumer for the givenStoreIngestionTask
and returns the assigned consumer. Must be idempotent and thus return previously a assigned consumer (for the same params) if any exists.- Specified by:
assignConsumerFor
in classAbstractKafkaConsumerService
-
pickConsumerForPartition
protected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer pickConsumerForPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) -
removeTopicPartitionFromConsumptionTask
protected void removeTopicPartitionFromConsumptionTask(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition) -
unsubscribeAll
Stop all subscription associated with the given version topic.- Specified by:
unsubscribeAll
in classAbstractKafkaConsumerService
-
unSubscribe
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) Stop specific subscription associated with the given version topic.- Specified by:
unSubscribe
in classAbstractKafkaConsumerService
-
batchUnsubscribe
public void batchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition> topicPartitionsToUnSub) - Specified by:
batchUnsubscribe
in classAbstractKafkaConsumerService
-
startInner
public boolean startInner()- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
stopInner
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
Exception
-
hasAnySubscriptionFor
- Specified by:
hasAnySubscriptionFor
in classAbstractKafkaConsumerService
-
getMaxElapsedTimeMSSinceLastPollInConsumerPool
public long getMaxElapsedTimeMSSinceLastPollInConsumerPool()- Specified by:
getMaxElapsedTimeMSSinceLastPollInConsumerPool
in classAbstractKafkaConsumerService
-
startConsumptionIntoDataReceiver
public void startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastReadOffset, ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver) - Specified by:
startConsumptionIntoDataReceiver
in classAbstractKafkaConsumerService
-
getOffsetLagBasedOnMetrics
public long getOffsetLagBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) - Specified by:
getOffsetLagBasedOnMetrics
in classAbstractKafkaConsumerService
-
getLatestOffsetBasedOnMetrics
public long getLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) - Specified by:
getLatestOffsetBasedOnMetrics
in classAbstractKafkaConsumerService
-
getIngestionInfoFor
public Map<PubSubTopicPartition,TopicPartitionIngestionInfo> getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) - Specified by:
getIngestionInfoFor
in classAbstractKafkaConsumerService
-
setThreadFactory
-