Class AggKafkaConsumerService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.AggKafkaConsumerService
- All Implemented Interfaces:
Closeable
,AutoCloseable
AggKafkaConsumerService
supports Kafka consumer pool for multiple Kafka clusters from different data centers;
for each Kafka bootstrap server url, AggKafkaConsumerService
will create one KafkaConsumerService
.-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
Field Summary
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
Constructor Summary
ConstructorDescriptionAggKafkaConsumerService
(PubSubConsumerAdapterFactory consumerFactory, TopicManagerContext.PubSubPropertiesSupplier pubSubPropertiesSupplier, VeniceServerConfig serverConfig, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, TopicExistenceChecker topicExistenceChecker, PubSubMessageDeserializer pubSubDeserializer, Consumer<String> killIngestionTaskRunnable, Function<String, Boolean> isAAOrWCEnabledFunc, ReadOnlyStoreRepository metadataRepository) -
Method Summary
Modifier and TypeMethodDescriptioncreateKafkaConsumerService
(Properties consumerProperties) Create a newKafkaConsumerService
given consumerProperties which must contain a value for "bootstrap.servers".long
getLatestOffsetBasedOnMetrics
(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) long
getOffsetLagBasedOnMetrics
(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) protected static Runnable
getStuckConsumerDetectionAndRepairRunnable
(Map<String, AbstractKafkaConsumerService> kafkaServerToConsumerServiceMap, Map<String, StoreIngestionTask> versionTopicStoreIngestionTaskMapping, long stuckConsumerRepairThresholdMs, long nonExistingTopicIngestionTaskKillThresholdMs, long nonExistingTopicRetryIntervalMs, StuckConsumerRepairStats stuckConsumerRepairStats, Consumer<String> killIngestionTaskRunnable) boolean
hasConsumerAssignedFor
(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) boolean
IMPORTANT: All newly created KafkaConsumerService are already started increateKafkaConsumerService(Properties)
, if this is no longer the case in future, make sure to update the startInner logic here.void
subscribeConsumerFor
(String kafkaURL, StoreIngestionTask storeIngestionTask, PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastOffset) void
unsubscribeConsumerFor
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) void
unsubscribeConsumerFor
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs)
-
Constructor Details
-
AggKafkaConsumerService
public AggKafkaConsumerService(PubSubConsumerAdapterFactory consumerFactory, TopicManagerContext.PubSubPropertiesSupplier pubSubPropertiesSupplier, VeniceServerConfig serverConfig, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, TopicExistenceChecker topicExistenceChecker, PubSubMessageDeserializer pubSubDeserializer, Consumer<String> killIngestionTaskRunnable, Function<String, Boolean> isAAOrWCEnabledFunc, ReadOnlyStoreRepository metadataRepository)
-
-
Method Details
-
startInner
public boolean startInner()IMPORTANT: All newly created KafkaConsumerService are already started increateKafkaConsumerService(Properties)
, if this is no longer the case in future, make sure to update the startInner logic here.- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
stopInner
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
Exception
-
getStuckConsumerDetectionAndRepairRunnable
protected static Runnable getStuckConsumerDetectionAndRepairRunnable(Map<String, AbstractKafkaConsumerService> kafkaServerToConsumerServiceMap, Map<String, StoreIngestionTask> versionTopicStoreIngestionTaskMapping, long stuckConsumerRepairThresholdMs, long nonExistingTopicIngestionTaskKillThresholdMs, long nonExistingTopicRetryIntervalMs, StuckConsumerRepairStats stuckConsumerRepairStats, Consumer<String> killIngestionTaskRunnable) -
createKafkaConsumerService
Create a newKafkaConsumerService
given consumerProperties which must contain a value for "bootstrap.servers". If aKafkaConsumerService
for the given "bootstrap.servers" (Kafka URL) has already been created, this method returns the createdKafkaConsumerService
.- Parameters:
consumerProperties
- consumer properties that are used to createKafkaConsumerService
-
hasConsumerAssignedFor
public boolean hasConsumerAssignedFor(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) -
unsubscribeConsumerFor
public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) -
unsubscribeConsumerFor
public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) -
subscribeConsumerFor
public ConsumedDataReceiver<List<PubSubMessage<KafkaKey,KafkaMessageEnvelope, subscribeConsumerForLong>>> (String kafkaURL, StoreIngestionTask storeIngestionTask, PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastOffset) -
getOffsetLagBasedOnMetrics
public long getOffsetLagBasedOnMetrics(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) -
getLatestOffsetBasedOnMetrics
public long getLatestOffsetBasedOnMetrics(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-