Class KafkaConsumerService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
com.linkedin.davinci.kafka.consumer.KafkaConsumerService
- All Implemented Interfaces:
- Closeable,- AutoCloseable
- Direct Known Subclasses:
- PartitionWiseKafkaConsumerService
KafkaConsumerService is used to manage a pool of consumption-related resources connected to a specific Kafka
 cluster.
 The reasons to have this pool are:
 1. To reduce the unnecessary overhead of having one consumer per store-version, each of which includes the internal
    IO threads/connections to brokers and internal buffers;
 2. To reduce the GC overhead when there are a lot of store versions bootstrapping/ingesting at the same time;
 3. To have a predictable and configurable upper bound on the total amount of resources occupied by consumers become,
    no matter how many store-versions are being hosted in the same instance;
 The responsibilities of this class include:
 1. Setting up a fixed size pool of consumption unit, where each unit contains exactly one:
    a) SharedKafkaConsumer
    b) ConsumptionTask
    c) ConsumerSubscriptionCleaner
 2. Receive various calls to interrogate or mutate consumer state, and delegate them to the correct unit, by
    maintaining a mapping of which unit belongs to which version-topic and subscribed topic-partition. Notably,
    the #startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, long, ConsumedDataReceiver) function allows the
    caller to start funneling consumed data into a receiver (i.e. into another task).
 3. Provide a single abstract function that must be overridden by subclasses in order to implement a consumption
    load balancing strategy: pickConsumerForPartition(PubSubTopic, PubSubTopicPartition)- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionstatic enumThis consumer assignment strategy specify how consumers from consumer pool are allocated.Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceServiceAbstractVeniceService.ServiceState
- 
Field SummaryFieldsModifier and TypeFieldDescriptionprotected final AggKafkaConsumerServiceStatsprotected final ConsumerPollTrackerprotected final IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,com.linkedin.davinci.kafka.consumer.ConsumptionTask> protected final Map<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,ReentrantLock> This read-only per consumer lock is for protecting the partition unsubscription and data receiver setting operations.protected final InactiveTopicPartitionCheckerprotected final Stringprotected final Stringprotected final ConsumerPoolTypeprotected final Map<PubSubTopic,Map<PubSubTopicPartition, com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>> Fields inherited from class com.linkedin.venice.service.AbstractVeniceServicelogger, serviceState
- 
Constructor SummaryConstructorsModifierConstructorDescriptionprotectedKafkaConsumerService(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, StaleTopicChecker staleTopicChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled, VeniceServerConfig serverConfig) 
- 
Method SummaryModifier and TypeMethodDescriptioncom.linkedin.davinci.kafka.consumer.SharedKafkaConsumerassignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) This function assigns a consumer for the givenStoreIngestionTaskand returns the assigned consumer.voidbatchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition> topicPartitionsToUnSub) static StringconvertTopicPartitionIngestionInfoMapToStr(Map<PubSubTopicPartition, TopicPartitionIngestionInfo> topicPartitionIngestionInfoMap) com.linkedin.davinci.kafka.consumer.SharedKafkaConsumergetConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, boolean respectRedundantLoggingFilter) This is for providing ingestion related information for a specific topic partition from the implementation of this class.longgetLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) longgetStaleTopicPartitions(long thresholdTimestamp) booleanhasAnySubscriptionFor(PubSubTopic versionTopic) protected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumerpickConsumerForPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) protected voidremoveTopicPartitionFromConsumptionTask(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition) voidsetThreadFactory(RandomAccessDaemonThreadFactory threadFactory) voidstartConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, PubSubPosition lastReadPosition, ConsumedDataReceiver<List<DefaultPubSubMessage>> consumedDataReceiver) booleanvoidvoidunSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) Stop specific subscription associated with the given version topic.voidunsubscribeAll(PubSubTopic versionTopic) Stop all subscription associated with the given version topic.Methods inherited from class com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerServiceunSubscribe
- 
Field Details- 
kafkaUrl
- 
kafkaUrlForLogger
- 
poolType
- 
aggStats
- 
consumerToConsumptionTaskprotected final IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,com.linkedin.davinci.kafka.consumer.ConsumptionTask> consumerToConsumptionTask
- 
versionTopicToTopicPartitionToConsumerprotected final Map<PubSubTopic,Map<PubSubTopicPartition, versionTopicToTopicPartitionToConsumercom.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>> 
- 
consumerToLocksprotected final Map<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,ReentrantLock> consumerToLocksThis read-only per consumer lock is for protecting the partition unsubscription and data receiver setting operations. Using consumer intrinsic lock may cause race condition, refer https://github.com/linkedin/venice/pull/1308
- 
consumerPollTracker
- 
inactiveTopicPartitionChecker
 
- 
- 
Constructor Details- 
KafkaConsumerServiceprotected KafkaConsumerService(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, StaleTopicChecker staleTopicChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled, VeniceServerConfig serverConfig) - Parameters:
- statsOverride- injection of stats, for test purposes
 
 
- 
- 
Method Details- 
getConsumerAssignedToVersionTopicPartitionpublic com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) - Specified by:
- getConsumerAssignedToVersionTopicPartitionin class- AbstractKafkaConsumerService
 
- 
assignConsumerForpublic com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer assignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) This function assigns a consumer for the givenStoreIngestionTaskand returns the assigned consumer. Must be idempotent and thus return previously a assigned consumer (for the same params) if any exists.- Specified by:
- assignConsumerForin class- AbstractKafkaConsumerService
 
- 
pickConsumerForPartitionprotected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer pickConsumerForPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) 
- 
removeTopicPartitionFromConsumptionTaskprotected void removeTopicPartitionFromConsumptionTask(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition) 
- 
unsubscribeAllStop all subscription associated with the given version topic.- Specified by:
- unsubscribeAllin class- AbstractKafkaConsumerService
 
- 
unSubscribepublic void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) Stop specific subscription associated with the given version topic.- Specified by:
- unSubscribein class- AbstractKafkaConsumerService
 
- 
batchUnsubscribepublic void batchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition> topicPartitionsToUnSub) - Specified by:
- batchUnsubscribein class- AbstractKafkaConsumerService
 
- 
startInnerpublic boolean startInner()- Specified by:
- startInnerin class- AbstractVeniceService
- Returns:
- true if the service is completely started,
         false if it is still starting asynchronously (in this case, it is the implementer's
         responsibility to set AbstractVeniceService.serviceStatetoAbstractVeniceService.ServiceState.STARTEDupon completion of the async work).
 
- 
stopInner- Specified by:
- stopInnerin class- AbstractVeniceService
- Throws:
- Exception
 
- 
hasAnySubscriptionFor- Specified by:
- hasAnySubscriptionForin class- AbstractKafkaConsumerService
 
- 
getMaxElapsedTimeMSSinceLastPollInConsumerPoolpublic long getMaxElapsedTimeMSSinceLastPollInConsumerPool()- Specified by:
- getMaxElapsedTimeMSSinceLastPollInConsumerPoolin class- AbstractKafkaConsumerService
 
- 
convertTopicPartitionIngestionInfoMapToStrpublic static String convertTopicPartitionIngestionInfoMapToStr(Map<PubSubTopicPartition, TopicPartitionIngestionInfo> topicPartitionIngestionInfoMap) 
- 
startConsumptionIntoDataReceiverpublic void startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, PubSubPosition lastReadPosition, ConsumedDataReceiver<List<DefaultPubSubMessage>> consumedDataReceiver) - Specified by:
- startConsumptionIntoDataReceiverin class- AbstractKafkaConsumerService
 
- 
getStaleTopicPartitions- Specified by:
- getStaleTopicPartitionsin class- AbstractKafkaConsumerService
 
- 
getLatestOffsetBasedOnMetricspublic long getLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) - Specified by:
- getLatestOffsetBasedOnMetricsin class- AbstractKafkaConsumerService
 
- 
getIngestionInfoForpublic Map<PubSubTopicPartition,TopicPartitionIngestionInfo> getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, boolean respectRedundantLoggingFilter) Description copied from class:AbstractKafkaConsumerServiceThis is for providing ingestion related information for a specific topic partition from the implementation of this class.- Specified by:
- getIngestionInfoForin class- AbstractKafkaConsumerService
- respectRedundantLoggingFilter- here is to guide if we need to prepare the info map, set to true when calling from heartbeat monitoring to enable rate-limiting; set to false for admin commands or tests where all info is needed.
 
- 
setThreadFactory
 
-