Class AggKafkaConsumerService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.AggKafkaConsumerService
- All Implemented Interfaces:
Closeable,AutoCloseable
AggKafkaConsumerService supports Kafka consumer pool for multiple Kafka clusters from different data centers;
for each Kafka bootstrap server url, AggKafkaConsumerService will create one KafkaConsumerService.-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState -
Field Summary
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState -
Constructor Summary
ConstructorsConstructorDescriptionAggKafkaConsumerService(PubSubConsumerAdapterFactory consumerFactory, TopicManagerContext.PubSubPropertiesSupplier pubSubPropertiesSupplier, VeniceServerConfig serverConfig, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, StaleTopicChecker staleTopicChecker, PubSubMessageDeserializer pubSubDeserializer, Consumer<String> killIngestionTaskRunnable, Function<String, Boolean> isAAOrWCEnabledFunc, ReadOnlyStoreRepository metadataRepository) -
Method Summary
Modifier and TypeMethodDescriptioncreateKafkaConsumerService(Properties consumerProperties) Create a newKafkaConsumerServicegiven consumerProperties which must contain a value for "bootstrap.servers".getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, String regionName) static intgetKeyLevelLockMaxPoolSizeBasedOnServerConfig(VeniceServerConfig serverConfig, int partitionCount) longgetLatestOffsetBasedOnMetrics(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) protected static RunnablegetStuckConsumerDetectionAndRepairRunnable(org.apache.logging.log4j.Logger logger, Time time, Map<String, AbstractKafkaConsumerService> kafkaServerToConsumerServiceMap, Map<String, StoreIngestionTask> versionTopicStoreIngestionTaskMapping, long stuckConsumerRepairThresholdMs, long nonExistingTopicIngestionTaskKillThresholdMs, long nonExistingTopicRetryIntervalMs, long consumerPollTrackerStaleThresholdMs, StuckConsumerRepairStats stuckConsumerRepairStats, Consumer<String> killIngestionTaskRunnable) booleanhasConsumerAssignedFor(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) booleanIMPORTANT: All newly created KafkaConsumerService are already started increateKafkaConsumerService(Properties), if this is no longer the case in future, make sure to update the startInner logic here.voidsubscribeConsumerFor(String kafkaURL, StoreIngestionTask storeIngestionTask, PartitionReplicaIngestionContext partitionReplicaIngestionContext, PubSubPosition lastOffset) voidunsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) voidunsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs)
-
Constructor Details
-
AggKafkaConsumerService
public AggKafkaConsumerService(PubSubConsumerAdapterFactory consumerFactory, TopicManagerContext.PubSubPropertiesSupplier pubSubPropertiesSupplier, VeniceServerConfig serverConfig, IngestionThrottler ingestionThrottler, KafkaClusterBasedRecordThrottler kafkaClusterBasedRecordThrottler, io.tehuti.metrics.MetricsRepository metricsRepository, StaleTopicChecker staleTopicChecker, PubSubMessageDeserializer pubSubDeserializer, Consumer<String> killIngestionTaskRunnable, Function<String, Boolean> isAAOrWCEnabledFunc, ReadOnlyStoreRepository metadataRepository)
-
-
Method Details
-
startInner
public boolean startInner()IMPORTANT: All newly created KafkaConsumerService are already started increateKafkaConsumerService(Properties), if this is no longer the case in future, make sure to update the startInner logic here.- Specified by:
startInnerin classAbstractVeniceService- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceStatetoAbstractVeniceService.ServiceState.STARTEDupon completion of the async work).
-
stopInner
- Specified by:
stopInnerin classAbstractVeniceService- Throws:
Exception
-
getStuckConsumerDetectionAndRepairRunnable
protected static Runnable getStuckConsumerDetectionAndRepairRunnable(org.apache.logging.log4j.Logger logger, Time time, Map<String, AbstractKafkaConsumerService> kafkaServerToConsumerServiceMap, Map<String, StoreIngestionTask> versionTopicStoreIngestionTaskMapping, long stuckConsumerRepairThresholdMs, long nonExistingTopicIngestionTaskKillThresholdMs, long nonExistingTopicRetryIntervalMs, long consumerPollTrackerStaleThresholdMs, StuckConsumerRepairStats stuckConsumerRepairStats, Consumer<String> killIngestionTaskRunnable) -
createKafkaConsumerService
Create a newKafkaConsumerServicegiven consumerProperties which must contain a value for "bootstrap.servers". If aKafkaConsumerServicefor the given "bootstrap.servers" (Kafka URL) has already been created, this method returns the createdKafkaConsumerService.- Parameters:
consumerProperties- consumer properties that are used to createKafkaConsumerService
-
hasConsumerAssignedFor
public boolean hasConsumerAssignedFor(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) -
unsubscribeConsumerFor
public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) -
unsubscribeConsumerFor
public void unsubscribeConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) -
subscribeConsumerFor
public ConsumedDataReceiver<List<DefaultPubSubMessage>> subscribeConsumerFor(String kafkaURL, StoreIngestionTask storeIngestionTask, PartitionReplicaIngestionContext partitionReplicaIngestionContext, PubSubPosition lastOffset) -
getLatestOffsetBasedOnMetrics
public long getLatestOffsetBasedOnMetrics(String kafkaURL, PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) -
getIngestionInfoFor
public String getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, String regionName) -
getKeyLevelLockMaxPoolSizeBasedOnServerConfig
public static int getKeyLevelLockMaxPoolSizeBasedOnServerConfig(VeniceServerConfig serverConfig, int partitionCount)
-