Class KafkaStoreIngestionService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService
- All Implemented Interfaces:
StoreIngestionService
,Closeable
,AutoCloseable
public class KafkaStoreIngestionService
extends AbstractVeniceService
implements StoreIngestionService
Assumes: One to One mapping between a Venice Store and Kafka Topic.
Manages Kafka topics and partitions that need to be consumed for the stores on this node.
Launches
StoreIngestionTask
for each store version to consume and process messages.
Uses the "new" Kafka Consumer.-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
Field Summary
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
Constructor Summary
ConstructorDescriptionKafkaStoreIngestionService
(StorageService storageService, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, ReadOnlyStoreRepository metadataRepo, ReadOnlySchemaRepository schemaRepo, ReadOnlyLiveClusterConfigRepository liveClusterConfigRepository, io.tehuti.metrics.MetricsRepository metricsRepository, Optional<SchemaReader> kafkaMessageEnvelopeSchemaReader, Optional<ClientConfig> clientConfig, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer, Optional<HelixReadOnlyZKSharedSchemaRepository> zkSharedSchemaRepository, ICProvider icProvider, boolean isIsolatedIngestion, StorageEngineBackedCompressorFactory compressorFactory, Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerConfig recordTransformerConfig, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory, Optional<SSLFactory> sslFactory, HeartbeatMonitoringService heartbeatMonitoringService, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin, AdaptiveThrottlerSignalService adaptiveThrottlerSignalService) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addIngestionNotifier
(VeniceNotifier notifier) Adds Notifier to get Notifications for get various status of the consumption tasks like start, completed, progress and error states.boolean
containsRunningConsumption
(VeniceStoreVersionConfig veniceStore) Check whether there is a running consumption task for given store.boolean
containsRunningConsumption
(String topic) Check whether there is a running consumption task for given store version topic.void
demoteToStandby
(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) dropStoragePartitionGracefully
(VeniceStoreVersionConfig veniceStore, int partitionId) Drops the corresponding Venice Partition gracefully.Get AggVersionedStorageIngestionStatsgetConsumptionSnapshots
(String topicName, ComplementSet<Integer> partitions) Get topic names that are currently maintained by the ingestion service with corresponding version status not in an online state.final ReadOnlyStoreRepository
getPartitionOffsetRecords
(String topicName, int partition) This method should only be called when the forked ingestion process is handing over ingestion task to main process.getStoreIngestionTask
(String topicName) getStoreVersionCompressionDictionary
(String topicName) getTopicPartitionIngestionContext
(String versionTopic, String topicName, int partitionId) boolean
static boolean
hasCurrentVersionBootstrapping
(Map<String, StoreIngestionTask> ingestionTaskMap) boolean
boolean
isPartitionConsuming
(String topic, int partitionId) Check whether the specified partition is still being consumedboolean
killConsumptionTask
(String topicName) Kill all of running consumptions of given store.void
promoteToLeader
(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) void
recordIngestionFailure
(String storeName) void
replaceAndAddTestNotifier
(VeniceNotifier notifier) void
shutdownStoreIngestionTask
(String topicName) This method closes the specifiedStoreIngestionTask
and wait for up to 10 seconds for fully shutdown.void
startConsumption
(VeniceStoreVersionConfig veniceStore, int partitionId) Starts consuming messages from Kafka Partition corresponding to Venice Partition.boolean
Starts the Kafka consumption tasks for already subscribed partitions.stopConsumption
(VeniceStoreVersionConfig veniceStore, int partitionId) Stops consuming messages from Kafka Partition corresponding to Venice Partition.void
stopConsumptionAndWait
(VeniceStoreVersionConfig veniceStore, int partitionId, int sleepSeconds, int numRetries, boolean whetherToResetOffset) Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to (sleepSeconds * numRetires) to make sure partition consumption is stopped.void
Stops all the Kafka consumption tasks.void
syncTopicPartitionOffset
(String topicName, int partition) Updates offset metadata and sync to storage for specified topic partition.boolean
topicPartitionHasAnyPendingActions
(String topic, int partition) void
updatePartitionOffsetRecords
(String topicName, int partition, ByteBuffer offsetRecordByteBuffer) This method updates all sub-partitions' latest offset records fetched from isolated ingestion process in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will not re-start the ingestion from scratch.protected void
updateStatsEmission
(NavigableMap<String, StoreIngestionTask> taskMap, String storeName) This function will go through all known ingestion task in this server node, find the task that matches the storeName and has the largest version number; if the task doesn't enable metric emission, enable it and update store ingestion stats.protected void
updateStatsEmission
(NavigableMap<String, StoreIngestionTask> taskMap, String storeName, int maximumVersion) Find the task that matches both the storeName and maximumVersion number, enable metrics emission for this task and update ingestion stats with this task; disable metric emission for all the task that doesn't max version.void
waitIngestionTaskToCompleteAllPartitionPendingActions
(String topicName, int partition, long retryIntervalInMs, int numRetries)
-
Constructor Details
-
KafkaStoreIngestionService
public KafkaStoreIngestionService(StorageService storageService, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, ReadOnlyStoreRepository metadataRepo, ReadOnlySchemaRepository schemaRepo, ReadOnlyLiveClusterConfigRepository liveClusterConfigRepository, io.tehuti.metrics.MetricsRepository metricsRepository, Optional<SchemaReader> kafkaMessageEnvelopeSchemaReader, Optional<ClientConfig> clientConfig, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer, Optional<HelixReadOnlyZKSharedSchemaRepository> zkSharedSchemaRepository, ICProvider icProvider, boolean isIsolatedIngestion, StorageEngineBackedCompressorFactory compressorFactory, Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerConfig recordTransformerConfig, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory, Optional<SSLFactory> sslFactory, HeartbeatMonitoringService heartbeatMonitoringService, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin, AdaptiveThrottlerSignalService adaptiveThrottlerSignalService)
-
-
Method Details
-
startInner
public boolean startInner()Starts the Kafka consumption tasks for already subscribed partitions.- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
hasCurrentVersionBootstrapping
public boolean hasCurrentVersionBootstrapping() -
hasCurrentVersionBootstrapping
public static boolean hasCurrentVersionBootstrapping(Map<String, StoreIngestionTask> ingestionTaskMap) -
stopInner
public void stopInner()Stops all the Kafka consumption tasks. Closes all the Kafka clients.- Specified by:
stopInner
in classAbstractVeniceService
-
startConsumption
Starts consuming messages from Kafka Partition corresponding to Venice Partition. Subscribes to partition if required.- Specified by:
startConsumption
in interfaceStoreIngestionService
- Parameters:
veniceStore
- Venice Store for the partition.partitionId
- Venice partition's id.
-
shutdownStoreIngestionTask
This method closes the specifiedStoreIngestionTask
and wait for up to 10 seconds for fully shutdown.- Parameters:
topicName
- Topic name of the ingestion task to be shutdown.
-
promoteToLeader
public void promoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) - Specified by:
promoteToLeader
in interfaceStoreIngestionService
-
demoteToStandby
public void demoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) - Specified by:
demoteToStandby
in interfaceStoreIngestionService
-
waitIngestionTaskToCompleteAllPartitionPendingActions
public void waitIngestionTaskToCompleteAllPartitionPendingActions(String topicName, int partition, long retryIntervalInMs, int numRetries) -
topicPartitionHasAnyPendingActions
-
isLiveUpdateSuppressionEnabled
public boolean isLiveUpdateSuppressionEnabled() -
getVeniceConfigLoader
- Specified by:
getVeniceConfigLoader
in interfaceStoreIngestionService
-
stopConsumption
public CompletableFuture<Void> stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId) Stops consuming messages from Kafka Partition corresponding to Venice Partition.- Specified by:
stopConsumption
in interfaceStoreIngestionService
- Parameters:
veniceStore
- Venice Store for the partition.partitionId
- Venice partition's id.
-
stopConsumptionAndWait
public void stopConsumptionAndWait(VeniceStoreVersionConfig veniceStore, int partitionId, int sleepSeconds, int numRetries, boolean whetherToResetOffset) Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to (sleepSeconds * numRetires) to make sure partition consumption is stopped.- Specified by:
stopConsumptionAndWait
in interfaceStoreIngestionService
- Parameters:
veniceStore
- Venice Store for the partition.partitionId
- Venice partition's id.sleepSeconds
-numRetries
-
-
dropStoragePartitionGracefully
public CompletableFuture<Void> dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) Drops the corresponding Venice Partition gracefully. This should only be called afterstopConsumptionAndWait(com.linkedin.davinci.config.VeniceStoreVersionConfig, int, int, int, boolean)
has been called- Parameters:
veniceStore
- Venice Store for the partition.partitionId
- Venice partition's id.- Returns:
- a future for the drop partition action.
-
killConsumptionTask
Description copied from interface:StoreIngestionService
Kill all of running consumptions of given store.- Specified by:
killConsumptionTask
in interfaceStoreIngestionService
- Parameters:
topicName
- Venice topic (store and version number) for the corresponding consumer task that needs to be killed. No action is taken for invocations of killConsumptionTask on topics that are not in the map. This includes logging.- Returns:
- true if a kill is needed and called, otherwise false
-
addIngestionNotifier
Description copied from interface:StoreIngestionService
Adds Notifier to get Notifications for get various status of the consumption tasks like start, completed, progress and error states. Multiple Notifiers can be added for the same consumption tasks and all of them will be notified in order.- Specified by:
addIngestionNotifier
in interfaceStoreIngestionService
-
replaceAndAddTestNotifier
- Specified by:
replaceAndAddTestNotifier
in interfaceStoreIngestionService
-
containsRunningConsumption
Description copied from interface:StoreIngestionService
Check whether there is a running consumption task for given store.- Specified by:
containsRunningConsumption
in interfaceStoreIngestionService
-
containsRunningConsumption
Description copied from interface:StoreIngestionService
Check whether there is a running consumption task for given store version topic.- Specified by:
containsRunningConsumption
in interfaceStoreIngestionService
-
isPartitionConsuming
Description copied from interface:StoreIngestionService
Check whether the specified partition is still being consumed- Specified by:
isPartitionConsuming
in interfaceStoreIngestionService
-
getIngestingTopicsWithVersionStatusNotOnline
Description copied from interface:StoreIngestionService
Get topic names that are currently maintained by the ingestion service with corresponding version status not in an online state. Topics with invalid store or version number are also included in the returned list.- Specified by:
getIngestingTopicsWithVersionStatusNotOnline
in interfaceStoreIngestionService
- Returns:
- a
Set
of topic names.
-
recordIngestionFailure
- Specified by:
recordIngestionFailure
in interfaceStoreIngestionService
-
getAggVersionedIngestionStats
Description copied from interface:StoreIngestionService
Get AggVersionedStorageIngestionStats- Specified by:
getAggVersionedIngestionStats
in interfaceStoreIngestionService
- Returns:
- an instance of
AggVersionedIngestionStats
-
getStoreVersionCompressionDictionary
-
getStoreIngestionTask
- Specified by:
getStoreIngestionTask
in interfaceStoreIngestionService
-
getConsumptionSnapshots
-
getTopicPartitionIngestionContext
public ReplicaIngestionResponse getTopicPartitionIngestionContext(String versionTopic, String topicName, int partitionId) -
updatePartitionOffsetRecords
public void updatePartitionOffsetRecords(String topicName, int partition, ByteBuffer offsetRecordByteBuffer) This method updates all sub-partitions' latest offset records fetched from isolated ingestion process in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will not re-start the ingestion from scratch. -
getPartitionOffsetRecords
This method should only be called when the forked ingestion process is handing over ingestion task to main process. It collects the user partition's latest OffsetRecords from partition consumption states (PCS). In theory, PCS should be available in this situation as we haven't unsubscribed from topic. If it is not available, we will throw exception as this is not as expected. -
syncTopicPartitionOffset
Updates offset metadata and sync to storage for specified topic partition. This method is invoked only when isolated ingestion process is reporting topic partition completion to make sure ingestion process is persisted. -
getMetadataRepo
-
getKafkaValueSerializer
-