Class AggKafkaConsumerService
- java.lang.Object
-
- com.linkedin.venice.service.AbstractVeniceService
-
- com.linkedin.davinci.kafka.consumer.AggKafkaConsumerService
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
public class AggKafkaConsumerService extends AbstractVeniceService
AggKafkaConsumerService
supports Kafka consumer pool for multiple Kafka clusters from different data centers; for each Kafka bootstrap server url,AggKafkaConsumerService
will create oneKafkaConsumerService
.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
-
Field Summary
-
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
-
Constructor Summary
Constructors Constructor Description AggKafkaConsumerService(PubSubConsumerAdapterFactory consumerFactory, TopicManagerContext.PubSubPropertiesSupplier pubSubPropertiesSupplier, VeniceServerConfig serverConfig, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, TopicExistenceChecker topicExistenceChecker, PubSubMessageDeserializer pubSubDeserializer, java.util.function.Consumer<java.lang.String> killIngestionTaskRunnable, java.util.function.Function<java.lang.String,java.lang.Boolean> isAAOrWCEnabledFunc, ReadOnlyStoreRepository metadataRepository)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description AbstractKafkaConsumerService
createKafkaConsumerService(java.util.Properties consumerProperties)
Create a newKafkaConsumerService
given consumerProperties which must contain a value for "bootstrap.servers".long
getLatestOffsetBasedOnMetrics(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
long
getOffsetLagBasedOnMetrics(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
protected static java.lang.Runnable
getStuckConsumerDetectionAndRepairRunnable(java.util.Map<java.lang.String,AbstractKafkaConsumerService> kafkaServerToConsumerServiceMap, java.util.Map<java.lang.String,StoreIngestionTask> versionTopicStoreIngestionTaskMapping, long stuckConsumerRepairThresholdMs, long nonExistingTopicIngestionTaskKillThresholdMs, long nonExistingTopicRetryIntervalMs, StuckConsumerRepairStats stuckConsumerRepairStats, java.util.function.Consumer<java.lang.String> killIngestionTaskRunnable)
boolean
hasConsumerAssignedFor(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
boolean
startInner()
IMPORTANT: All newly created KafkaConsumerService are already started increateKafkaConsumerService(Properties)
, if this is no longer the case in future, make sure to update the startInner logic here.void
stopInner()
ConsumedDataReceiver<java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>>
subscribeConsumerFor(java.lang.String kafkaURL, StoreIngestionTask storeIngestionTask, PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastOffset)
void
unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
void
unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs)
-
-
-
Constructor Detail
-
AggKafkaConsumerService
public AggKafkaConsumerService(PubSubConsumerAdapterFactory consumerFactory, TopicManagerContext.PubSubPropertiesSupplier pubSubPropertiesSupplier, VeniceServerConfig serverConfig, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, TopicExistenceChecker topicExistenceChecker, PubSubMessageDeserializer pubSubDeserializer, java.util.function.Consumer<java.lang.String> killIngestionTaskRunnable, java.util.function.Function<java.lang.String,java.lang.Boolean> isAAOrWCEnabledFunc, ReadOnlyStoreRepository metadataRepository)
-
-
Method Detail
-
startInner
public boolean startInner()
IMPORTANT: All newly created KafkaConsumerService are already started increateKafkaConsumerService(Properties)
, if this is no longer the case in future, make sure to update the startInner logic here.- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
stopInner
public void stopInner() throws java.lang.Exception
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
java.lang.Exception
-
getStuckConsumerDetectionAndRepairRunnable
protected static java.lang.Runnable getStuckConsumerDetectionAndRepairRunnable(java.util.Map<java.lang.String,AbstractKafkaConsumerService> kafkaServerToConsumerServiceMap, java.util.Map<java.lang.String,StoreIngestionTask> versionTopicStoreIngestionTaskMapping, long stuckConsumerRepairThresholdMs, long nonExistingTopicIngestionTaskKillThresholdMs, long nonExistingTopicRetryIntervalMs, StuckConsumerRepairStats stuckConsumerRepairStats, java.util.function.Consumer<java.lang.String> killIngestionTaskRunnable)
-
createKafkaConsumerService
public AbstractKafkaConsumerService createKafkaConsumerService(java.util.Properties consumerProperties)
Create a newKafkaConsumerService
given consumerProperties which must contain a value for "bootstrap.servers". If aKafkaConsumerService
for the given "bootstrap.servers" (Kafka URL) has already been created, this method returns the createdKafkaConsumerService
.- Parameters:
consumerProperties
- consumer properties that are used to createKafkaConsumerService
-
hasConsumerAssignedFor
public boolean hasConsumerAssignedFor(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-
unsubscribeConsumerFor
public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-
unsubscribeConsumerFor
public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs)
-
subscribeConsumerFor
public ConsumedDataReceiver<java.util.List<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>> subscribeConsumerFor(java.lang.String kafkaURL, StoreIngestionTask storeIngestionTask, PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastOffset)
-
getOffsetLagBasedOnMetrics
public long getOffsetLagBasedOnMetrics(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-
getLatestOffsetBasedOnMetrics
public long getLatestOffsetBasedOnMetrics(java.lang.String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition)
-
-