Class KafkaConsumerService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
com.linkedin.davinci.kafka.consumer.KafkaConsumerService
- All Implemented Interfaces:
Closeable,AutoCloseable
- Direct Known Subclasses:
PartitionWiseKafkaConsumerService
KafkaConsumerService is used to manage a pool of consumption-related resources connected to a specific Kafka
cluster.
The reasons to have this pool are:
1. To reduce the unnecessary overhead of having one consumer per store-version, each of which includes the internal
IO threads/connections to brokers and internal buffers;
2. To reduce the GC overhead when there are a lot of store versions bootstrapping/ingesting at the same time;
3. To have a predictable and configurable upper bound on the total amount of resources occupied by consumers become,
no matter how many store-versions are being hosted in the same instance;
The responsibilities of this class include:
1. Setting up a fixed size pool of consumption unit, where each unit contains exactly one:
a) SharedKafkaConsumer
b) ConsumptionTask
c) ConsumerSubscriptionCleaner
2. Receive various calls to interrogate or mutate consumer state, and delegate them to the correct unit, by
maintaining a mapping of which unit belongs to which version-topic and subscribed topic-partition. Notably,
the #startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext, long, ConsumedDataReceiver) function allows the
caller to start funneling consumed data into a receiver (i.e. into another task).
3. Provide a single abstract function that must be overridden by subclasses in order to implement a consumption
load balancing strategy: pickConsumerForPartition(PubSubTopic, PubSubTopicPartition)-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic enumThis consumer assignment strategy specify how consumers from consumer pool are allocated.Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final AggKafkaConsumerServiceStatsprotected final ConsumerPollTrackerprotected final IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,com.linkedin.davinci.kafka.consumer.ConsumptionTask> protected final Map<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,ReentrantLock> This read-only per consumer lock is for protecting the partition unsubscription and data receiver setting operations.protected final InactiveTopicPartitionCheckerprotected final Stringprotected final Stringprotected final ConsumerPoolTypeprotected final Map<PubSubTopic,Map<PubSubTopicPartition, com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>> Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState -
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedKafkaConsumerService(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, StaleTopicChecker staleTopicChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled, VeniceServerConfig serverConfig) -
Method Summary
Modifier and TypeMethodDescriptioncom.linkedin.davinci.kafka.consumer.SharedKafkaConsumerassignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) This function assigns a consumer for the givenStoreIngestionTaskand returns the assigned consumer.voidbatchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition> topicPartitionsToUnSub) static StringconvertTopicPartitionIngestionInfoMapToStr(Map<PubSubTopicPartition, TopicPartitionIngestionInfo> topicPartitionIngestionInfoMap) com.linkedin.davinci.kafka.consumer.SharedKafkaConsumergetConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, boolean respectRedundantLoggingFilter) This is for providing ingestion related information for a specific topic partition from the implementation of this class.longgetLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) longgetStaleTopicPartitions(long thresholdTimestamp) booleanhasAnySubscriptionFor(PubSubTopic versionTopic) protected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumerpickConsumerForPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) protected voidremoveTopicPartitionFromConsumptionTask(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition) voidsetThreadFactory(RandomAccessDaemonThreadFactory threadFactory) voidstartConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, PubSubPosition lastReadPosition, ConsumedDataReceiver<List<DefaultPubSubMessage>> consumedDataReceiver) booleanvoidvoidunSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) Stop specific subscription associated with the given version topic.voidunsubscribeAll(PubSubTopic versionTopic) Stop all subscription associated with the given version topic.Methods inherited from class com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
unSubscribe
-
Field Details
-
kafkaUrl
-
kafkaUrlForLogger
-
poolType
-
aggStats
-
consumerToConsumptionTask
protected final IndexedMap<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,com.linkedin.davinci.kafka.consumer.ConsumptionTask> consumerToConsumptionTask -
versionTopicToTopicPartitionToConsumer
protected final Map<PubSubTopic,Map<PubSubTopicPartition, versionTopicToTopicPartitionToConsumercom.linkedin.davinci.kafka.consumer.SharedKafkaConsumer>> -
consumerToLocks
protected final Map<com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer,ReentrantLock> consumerToLocksThis read-only per consumer lock is for protecting the partition unsubscription and data receiver setting operations. Using consumer intrinsic lock may cause race condition, refer https://github.com/linkedin/venice/pull/1308 -
consumerPollTracker
-
inactiveTopicPartitionChecker
-
-
Constructor Details
-
KafkaConsumerService
protected KafkaConsumerService(ConsumerPoolType poolType, PubSubConsumerAdapterFactory pubSubConsumerAdapterFactory, Properties consumerProperties, long readCycleDelayMs, int numOfConsumersPerKafkaCluster, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, String kafkaClusterAlias, long sharedConsumerNonExistingTopicCleanupDelayMS, StaleTopicChecker staleTopicChecker, boolean liveConfigBasedKafkaThrottlingEnabled, PubSubMessageDeserializer pubSubDeserializer, Time time, AggKafkaConsumerServiceStats statsOverride, boolean isKafkaConsumerOffsetCollectionEnabled, ReadOnlyStoreRepository metadataRepository, boolean isUnregisterMetricForDeletedStoreEnabled, VeniceServerConfig serverConfig) - Parameters:
statsOverride- injection of stats, for test purposes
-
-
Method Details
-
getConsumerAssignedToVersionTopicPartition
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) - Specified by:
getConsumerAssignedToVersionTopicPartitionin classAbstractKafkaConsumerService
-
assignConsumerFor
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer assignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) This function assigns a consumer for the givenStoreIngestionTaskand returns the assigned consumer. Must be idempotent and thus return previously a assigned consumer (for the same params) if any exists.- Specified by:
assignConsumerForin classAbstractKafkaConsumerService
-
pickConsumerForPartition
protected abstract com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer pickConsumerForPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) -
removeTopicPartitionFromConsumptionTask
protected void removeTopicPartitionFromConsumptionTask(PubSubConsumerAdapter consumer, PubSubTopicPartition topicPartition) -
unsubscribeAll
Stop all subscription associated with the given version topic.- Specified by:
unsubscribeAllin classAbstractKafkaConsumerService
-
unSubscribe
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) Stop specific subscription associated with the given version topic.- Specified by:
unSubscribein classAbstractKafkaConsumerService
-
batchUnsubscribe
public void batchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition> topicPartitionsToUnSub) - Specified by:
batchUnsubscribein classAbstractKafkaConsumerService
-
startInner
public boolean startInner()- Specified by:
startInnerin classAbstractVeniceService- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceStatetoAbstractVeniceService.ServiceState.STARTEDupon completion of the async work).
-
stopInner
- Specified by:
stopInnerin classAbstractVeniceService- Throws:
Exception
-
hasAnySubscriptionFor
- Specified by:
hasAnySubscriptionForin classAbstractKafkaConsumerService
-
getMaxElapsedTimeMSSinceLastPollInConsumerPool
public long getMaxElapsedTimeMSSinceLastPollInConsumerPool()- Specified by:
getMaxElapsedTimeMSSinceLastPollInConsumerPoolin classAbstractKafkaConsumerService
-
convertTopicPartitionIngestionInfoMapToStr
public static String convertTopicPartitionIngestionInfoMapToStr(Map<PubSubTopicPartition, TopicPartitionIngestionInfo> topicPartitionIngestionInfoMap) -
startConsumptionIntoDataReceiver
public void startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, PubSubPosition lastReadPosition, ConsumedDataReceiver<List<DefaultPubSubMessage>> consumedDataReceiver) - Specified by:
startConsumptionIntoDataReceiverin classAbstractKafkaConsumerService
-
getStaleTopicPartitions
- Specified by:
getStaleTopicPartitionsin classAbstractKafkaConsumerService
-
getLatestOffsetBasedOnMetrics
public long getLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) - Specified by:
getLatestOffsetBasedOnMetricsin classAbstractKafkaConsumerService
-
getIngestionInfoFor
public Map<PubSubTopicPartition,TopicPartitionIngestionInfo> getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, boolean respectRedundantLoggingFilter) Description copied from class:AbstractKafkaConsumerServiceThis is for providing ingestion related information for a specific topic partition from the implementation of this class.- Specified by:
getIngestionInfoForin classAbstractKafkaConsumerServicerespectRedundantLoggingFilter- here is to guide if we need to prepare the info map, set to true when calling from heartbeat monitoring to enable rate-limiting; set to false for admin commands or tests where all info is needed.
-
setThreadFactory
-