Class KafkaConsumerServiceDelegator
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator
- All Implemented Interfaces:
Closeable
,AutoCloseable
This delegator impl is used to distribute different partition requests into different consumer service.
When {#link ConfigKeys#SERVER_DEDICATED_CONSUMER_POOL_FOR_AA_WC_LEADER_ENABLED} is off, this class
will always return the default consumer service.
When the option is on, it will return the dedicated consumer service when the topic partition belongs
to a Real-time topic and the corresponding store has active/active or write compute enabled.
The reason to use dedicated consumer pool for leader replicas of active/active or write compute stores is
that handling the writes before putting into the drainer queue is too expensive comparing to others.
-
Nested Class Summary
Modifier and TypeClassDescriptionclass
class
static enum
class
class
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
Field Summary
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
Constructor Summary
ConstructorDescriptionKafkaConsumerServiceDelegator
(VeniceServerConfig serverConfig, com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator.KafkaConsumerServiceBuilder consumerServiceConstructor, Function<String, Boolean> isAAWCStoreFunc) -
Method Summary
Modifier and TypeMethodDescriptioncom.linkedin.davinci.kafka.consumer.SharedKafkaConsumer
assignConsumerFor
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) void
batchUnsubscribe
(PubSubTopic versionTopic, Set<PubSubTopicPartition> topicPartitionsToUnSub) com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer
getConsumerAssignedToVersionTopicPartition
(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) There might be no consumer service found for the version topic and topic partition, which means the topic partition is not subscribed.getIngestionInfoFor
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) long
getLatestOffsetBasedOnMetrics
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) long
long
getOffsetLagBasedOnMetrics
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) boolean
hasAnySubscriptionFor
(PubSubTopic versionTopic) void
startConsumptionIntoDataReceiver
(PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastReadOffset, ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver) boolean
void
void
unSubscribe
(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) void
unsubscribeAll
(PubSubTopic versionTopic) Methods inherited from class com.linkedin.davinci.kafka.consumer.AbstractKafkaConsumerService
unSubscribe
-
Constructor Details
-
KafkaConsumerServiceDelegator
public KafkaConsumerServiceDelegator(VeniceServerConfig serverConfig, com.linkedin.davinci.kafka.consumer.KafkaConsumerServiceDelegator.KafkaConsumerServiceBuilder consumerServiceConstructor, Function<String, Boolean> isAAWCStoreFunc)
-
-
Method Details
-
getConsumerAssignedToVersionTopicPartition
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer getConsumerAssignedToVersionTopicPartition(PubSubTopic versionTopic, PubSubTopicPartition topicPartition) There might be no consumer service found for the version topic and topic partition, which means the topic partition is not subscribed. In this case, we should return null. Caller should check the return value and handle it properly.- Specified by:
getConsumerAssignedToVersionTopicPartition
in classAbstractKafkaConsumerService
-
assignConsumerFor
public com.linkedin.davinci.kafka.consumer.SharedKafkaConsumer assignConsumerFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) - Specified by:
assignConsumerFor
in classAbstractKafkaConsumerService
-
unsubscribeAll
- Specified by:
unsubscribeAll
in classAbstractKafkaConsumerService
-
unSubscribe
public void unSubscribe(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition, long timeoutMs) - Specified by:
unSubscribe
in classAbstractKafkaConsumerService
-
batchUnsubscribe
public void batchUnsubscribe(PubSubTopic versionTopic, Set<PubSubTopicPartition> topicPartitionsToUnSub) - Specified by:
batchUnsubscribe
in classAbstractKafkaConsumerService
-
hasAnySubscriptionFor
- Specified by:
hasAnySubscriptionFor
in classAbstractKafkaConsumerService
-
getMaxElapsedTimeMSSinceLastPollInConsumerPool
public long getMaxElapsedTimeMSSinceLastPollInConsumerPool()- Specified by:
getMaxElapsedTimeMSSinceLastPollInConsumerPool
in classAbstractKafkaConsumerService
-
startConsumptionIntoDataReceiver
public void startConsumptionIntoDataReceiver(PartitionReplicaIngestionContext partitionReplicaIngestionContext, long lastReadOffset, ConsumedDataReceiver<List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumedDataReceiver) - Specified by:
startConsumptionIntoDataReceiver
in classAbstractKafkaConsumerService
-
getOffsetLagBasedOnMetrics
public long getOffsetLagBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) - Specified by:
getOffsetLagBasedOnMetrics
in classAbstractKafkaConsumerService
-
getLatestOffsetBasedOnMetrics
public long getLatestOffsetBasedOnMetrics(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) - Specified by:
getLatestOffsetBasedOnMetrics
in classAbstractKafkaConsumerService
-
getIngestionInfoFor
public Map<PubSubTopicPartition,TopicPartitionIngestionInfo> getIngestionInfoFor(PubSubTopic versionTopic, PubSubTopicPartition pubSubTopicPartition) - Specified by:
getIngestionInfoFor
in classAbstractKafkaConsumerService
-
startInner
- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work). - Throws:
Exception
-
stopInner
- Specified by:
stopInner
in classAbstractVeniceService
- Throws:
Exception
-