Class KafkaStoreIngestionService
- java.lang.Object
-
- com.linkedin.venice.service.AbstractVeniceService
-
- com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService
-
- All Implemented Interfaces:
StoreIngestionService
,java.io.Closeable
,java.lang.AutoCloseable
public class KafkaStoreIngestionService extends AbstractVeniceService implements StoreIngestionService
Assumes: One to One mapping between a Venice Store and Kafka Topic. Manages Kafka topics and partitions that need to be consumed for the stores on this node. LaunchesStoreIngestionTask
for each store version to consume and process messages. Uses the "new" Kafka Consumer.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState
-
-
Field Summary
-
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState
-
-
Constructor Summary
Constructors Constructor Description KafkaStoreIngestionService(StorageService storageService, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, ReadOnlyStoreRepository metadataRepo, ReadOnlySchemaRepository schemaRepo, ReadOnlyLiveClusterConfigRepository liveClusterConfigRepository, io.tehuti.metrics.MetricsRepository metricsRepository, java.util.Optional<SchemaReader> kafkaMessageEnvelopeSchemaReader, java.util.Optional<ClientConfig> clientConfig, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer, java.util.Optional<HelixReadOnlyZKSharedSchemaRepository> zkSharedSchemaRepository, ICProvider icProvider, boolean isIsolatedIngestion, StorageEngineBackedCompressorFactory compressorFactory, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory, java.util.Optional<SSLFactory> sslFactory, HeartbeatMonitoringService heartbeatMonitoringService, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description void
addIngestionNotifier(VeniceNotifier notifier)
Adds Notifier to get Notifications for get various status of the consumption tasks like start, completed, progress and error states.boolean
containsRunningConsumption(VeniceStoreVersionConfig veniceStore)
Check whether there is a running consumption task for given store.boolean
containsRunningConsumption(java.lang.String topic)
Check whether there is a running consumption task for given store version topic.void
demoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
java.util.concurrent.CompletableFuture<java.lang.Void>
dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId)
Drops the corresponding Venice Partition gracefully.AggVersionedIngestionStats
getAggVersionedIngestionStats()
Get AggVersionedStorageIngestionStatsAdminResponse
getConsumptionSnapshots(java.lang.String topicName, ComplementSet<java.lang.Integer> partitions)
java.util.Set<java.lang.String>
getIngestingTopicsWithVersionStatusNotOnline()
Get topic names that are currently maintained by the ingestion service with corresponding version status not in an online state.KafkaValueSerializer
getKafkaValueSerializer()
ReadOnlyStoreRepository
getMetadataRepo()
java.nio.ByteBuffer
getPartitionOffsetRecords(java.lang.String topicName, int partition)
This method should only be called when the forked ingestion process is handing over ingestion task to main process.StoreIngestionTask
getStoreIngestionTask(java.lang.String topicName)
java.nio.ByteBuffer
getStoreVersionCompressionDictionary(java.lang.String topicName)
ReplicaIngestionResponse
getTopicPartitionIngestionContext(java.lang.String versionTopic, java.lang.String topicName, int partitionId)
VeniceConfigLoader
getVeniceConfigLoader()
boolean
hasCurrentVersionBootstrapping()
static boolean
hasCurrentVersionBootstrapping(java.util.Map<java.lang.String,StoreIngestionTask> ingestionTaskMap)
boolean
isLiveUpdateSuppressionEnabled()
boolean
isPartitionConsuming(java.lang.String topic, int partitionId)
Check whether the specified partition is still being consumedboolean
killConsumptionTask(java.lang.String topicName)
Kill all of running consumptions of given store.void
promoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
void
recordIngestionFailure(java.lang.String storeName)
void
replaceAndAddTestNotifier(VeniceNotifier notifier)
void
shutdownStoreIngestionTask(java.lang.String topicName)
This method closes the specifiedStoreIngestionTask
and wait for up to 10 seconds for fully shutdown.void
startConsumption(VeniceStoreVersionConfig veniceStore, int partitionId)
Starts consuming messages from Kafka Partition corresponding to Venice Partition.boolean
startInner()
Starts the Kafka consumption tasks for already subscribed partitions.java.util.concurrent.CompletableFuture<java.lang.Void>
stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId)
Stops consuming messages from Kafka Partition corresponding to Venice Partition.void
stopConsumptionAndWait(VeniceStoreVersionConfig veniceStore, int partitionId, int sleepSeconds, int numRetries, boolean whetherToResetOffset)
Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to (sleepSeconds * numRetires) to make sure partition consumption is stopped.void
stopInner()
Stops all the Kafka consumption tasks.void
syncTopicPartitionOffset(java.lang.String topicName, int partition)
Updates offset metadata and sync to storage for specified topic partition.boolean
topicPartitionHasAnyPendingActions(java.lang.String topic, int partition)
void
updatePartitionOffsetRecords(java.lang.String topicName, int partition, java.nio.ByteBuffer offsetRecordByteBuffer)
This method updates all sub-partitions' latest offset records fetched from isolated ingestion process in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will not re-start the ingestion from scratch.protected void
updateStatsEmission(java.util.NavigableMap<java.lang.String,StoreIngestionTask> taskMap, java.lang.String storeName)
This function will go through all known ingestion task in this server node, find the task that matches the storeName and has the largest version number; if the task doesn't enable metric emission, enable it and update store ingestion stats.protected void
updateStatsEmission(java.util.NavigableMap<java.lang.String,StoreIngestionTask> taskMap, java.lang.String storeName, int maximumVersion)
Find the task that matches both the storeName and maximumVersion number, enable metrics emission for this task and update ingestion stats with this task; disable metric emission for all the task that doesn't max version.void
waitIngestionTaskToCompleteAllPartitionPendingActions(java.lang.String topicName, int partition, long retryIntervalInMs, int numRetries)
-
-
-
Constructor Detail
-
KafkaStoreIngestionService
public KafkaStoreIngestionService(StorageService storageService, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, ReadOnlyStoreRepository metadataRepo, ReadOnlySchemaRepository schemaRepo, ReadOnlyLiveClusterConfigRepository liveClusterConfigRepository, io.tehuti.metrics.MetricsRepository metricsRepository, java.util.Optional<SchemaReader> kafkaMessageEnvelopeSchemaReader, java.util.Optional<ClientConfig> clientConfig, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer, java.util.Optional<HelixReadOnlyZKSharedSchemaRepository> zkSharedSchemaRepository, ICProvider icProvider, boolean isIsolatedIngestion, StorageEngineBackedCompressorFactory compressorFactory, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory, java.util.Optional<SSLFactory> sslFactory, HeartbeatMonitoringService heartbeatMonitoringService, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
-
Method Detail
-
startInner
public boolean startInner()
Starts the Kafka consumption tasks for already subscribed partitions.- Specified by:
startInner
in classAbstractVeniceService
- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceState
toAbstractVeniceService.ServiceState.STARTED
upon completion of the async work).
-
hasCurrentVersionBootstrapping
public boolean hasCurrentVersionBootstrapping()
-
hasCurrentVersionBootstrapping
public static boolean hasCurrentVersionBootstrapping(java.util.Map<java.lang.String,StoreIngestionTask> ingestionTaskMap)
-
stopInner
public void stopInner()
Stops all the Kafka consumption tasks. Closes all the Kafka clients.- Specified by:
stopInner
in classAbstractVeniceService
-
startConsumption
public void startConsumption(VeniceStoreVersionConfig veniceStore, int partitionId)
Starts consuming messages from Kafka Partition corresponding to Venice Partition. Subscribes to partition if required.- Specified by:
startConsumption
in interfaceStoreIngestionService
- Parameters:
veniceStore
- Venice Store for the partition.partitionId
- Venice partition's id.
-
shutdownStoreIngestionTask
public void shutdownStoreIngestionTask(java.lang.String topicName)
This method closes the specifiedStoreIngestionTask
and wait for up to 10 seconds for fully shutdown.- Parameters:
topicName
- Topic name of the ingestion task to be shutdown.
-
promoteToLeader
public void promoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
- Specified by:
promoteToLeader
in interfaceStoreIngestionService
-
demoteToStandby
public void demoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
- Specified by:
demoteToStandby
in interfaceStoreIngestionService
-
waitIngestionTaskToCompleteAllPartitionPendingActions
public void waitIngestionTaskToCompleteAllPartitionPendingActions(java.lang.String topicName, int partition, long retryIntervalInMs, int numRetries)
-
topicPartitionHasAnyPendingActions
public boolean topicPartitionHasAnyPendingActions(java.lang.String topic, int partition)
-
isLiveUpdateSuppressionEnabled
public boolean isLiveUpdateSuppressionEnabled()
-
getVeniceConfigLoader
public VeniceConfigLoader getVeniceConfigLoader()
- Specified by:
getVeniceConfigLoader
in interfaceStoreIngestionService
-
updateStatsEmission
protected void updateStatsEmission(java.util.NavigableMap<java.lang.String,StoreIngestionTask> taskMap, java.lang.String storeName, int maximumVersion)
Find the task that matches both the storeName and maximumVersion number, enable metrics emission for this task and update ingestion stats with this task; disable metric emission for all the task that doesn't max version.
-
updateStatsEmission
protected void updateStatsEmission(java.util.NavigableMap<java.lang.String,StoreIngestionTask> taskMap, java.lang.String storeName)
This function will go through all known ingestion task in this server node, find the task that matches the storeName and has the largest version number; if the task doesn't enable metric emission, enable it and update store ingestion stats.
-
stopConsumption
public java.util.concurrent.CompletableFuture<java.lang.Void> stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId)
Stops consuming messages from Kafka Partition corresponding to Venice Partition.- Specified by:
stopConsumption
in interfaceStoreIngestionService
- Parameters:
veniceStore
- Venice Store for the partition.partitionId
- Venice partition's id.
-
stopConsumptionAndWait
public void stopConsumptionAndWait(VeniceStoreVersionConfig veniceStore, int partitionId, int sleepSeconds, int numRetries, boolean whetherToResetOffset)
Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to (sleepSeconds * numRetires) to make sure partition consumption is stopped.- Specified by:
stopConsumptionAndWait
in interfaceStoreIngestionService
- Parameters:
veniceStore
- Venice Store for the partition.partitionId
- Venice partition's id.sleepSeconds
-numRetries
-
-
dropStoragePartitionGracefully
public java.util.concurrent.CompletableFuture<java.lang.Void> dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId)
Drops the corresponding Venice Partition gracefully. This should only be called afterstopConsumptionAndWait(com.linkedin.davinci.config.VeniceStoreVersionConfig, int, int, int, boolean)
has been called- Parameters:
veniceStore
- Venice Store for the partition.partitionId
- Venice partition's id.- Returns:
- a future for the drop partition action.
-
killConsumptionTask
public boolean killConsumptionTask(java.lang.String topicName)
Description copied from interface:StoreIngestionService
Kill all of running consumptions of given store.- Specified by:
killConsumptionTask
in interfaceStoreIngestionService
- Parameters:
topicName
- Venice topic (store and version number) for the corresponding consumer task that needs to be killed. No action is taken for invocations of killConsumptionTask on topics that are not in the map. This includes logging.- Returns:
- true if a kill is needed and called, otherwise false
-
addIngestionNotifier
public void addIngestionNotifier(VeniceNotifier notifier)
Description copied from interface:StoreIngestionService
Adds Notifier to get Notifications for get various status of the consumption tasks like start, completed, progress and error states. Multiple Notifiers can be added for the same consumption tasks and all of them will be notified in order.- Specified by:
addIngestionNotifier
in interfaceStoreIngestionService
-
replaceAndAddTestNotifier
public void replaceAndAddTestNotifier(VeniceNotifier notifier)
- Specified by:
replaceAndAddTestNotifier
in interfaceStoreIngestionService
-
containsRunningConsumption
public boolean containsRunningConsumption(VeniceStoreVersionConfig veniceStore)
Description copied from interface:StoreIngestionService
Check whether there is a running consumption task for given store.- Specified by:
containsRunningConsumption
in interfaceStoreIngestionService
-
containsRunningConsumption
public boolean containsRunningConsumption(java.lang.String topic)
Description copied from interface:StoreIngestionService
Check whether there is a running consumption task for given store version topic.- Specified by:
containsRunningConsumption
in interfaceStoreIngestionService
-
isPartitionConsuming
public boolean isPartitionConsuming(java.lang.String topic, int partitionId)
Description copied from interface:StoreIngestionService
Check whether the specified partition is still being consumed- Specified by:
isPartitionConsuming
in interfaceStoreIngestionService
-
getIngestingTopicsWithVersionStatusNotOnline
public java.util.Set<java.lang.String> getIngestingTopicsWithVersionStatusNotOnline()
Description copied from interface:StoreIngestionService
Get topic names that are currently maintained by the ingestion service with corresponding version status not in an online state. Topics with invalid store or version number are also included in the returned list.- Specified by:
getIngestingTopicsWithVersionStatusNotOnline
in interfaceStoreIngestionService
- Returns:
- a
Set
of topic names.
-
recordIngestionFailure
public void recordIngestionFailure(java.lang.String storeName)
- Specified by:
recordIngestionFailure
in interfaceStoreIngestionService
-
getAggVersionedIngestionStats
public AggVersionedIngestionStats getAggVersionedIngestionStats()
Description copied from interface:StoreIngestionService
Get AggVersionedStorageIngestionStats- Specified by:
getAggVersionedIngestionStats
in interfaceStoreIngestionService
- Returns:
- an instance of
AggVersionedIngestionStats
-
getStoreVersionCompressionDictionary
public java.nio.ByteBuffer getStoreVersionCompressionDictionary(java.lang.String topicName)
-
getStoreIngestionTask
public StoreIngestionTask getStoreIngestionTask(java.lang.String topicName)
- Specified by:
getStoreIngestionTask
in interfaceStoreIngestionService
-
getConsumptionSnapshots
public AdminResponse getConsumptionSnapshots(java.lang.String topicName, ComplementSet<java.lang.Integer> partitions)
-
getTopicPartitionIngestionContext
public ReplicaIngestionResponse getTopicPartitionIngestionContext(java.lang.String versionTopic, java.lang.String topicName, int partitionId)
-
updatePartitionOffsetRecords
public void updatePartitionOffsetRecords(java.lang.String topicName, int partition, java.nio.ByteBuffer offsetRecordByteBuffer)
This method updates all sub-partitions' latest offset records fetched from isolated ingestion process in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will not re-start the ingestion from scratch.
-
getPartitionOffsetRecords
public java.nio.ByteBuffer getPartitionOffsetRecords(java.lang.String topicName, int partition)
This method should only be called when the forked ingestion process is handing over ingestion task to main process. It collects the user partition's latest OffsetRecords from partition consumption states (PCS). In theory, PCS should be available in this situation as we haven't unsubscribed from topic. If it is not available, we will throw exception as this is not as expected.
-
syncTopicPartitionOffset
public void syncTopicPartitionOffset(java.lang.String topicName, int partition)
Updates offset metadata and sync to storage for specified topic partition. This method is invoked only when isolated ingestion process is reporting topic partition completion to make sure ingestion process is persisted.
-
getMetadataRepo
public final ReadOnlyStoreRepository getMetadataRepo()
-
getKafkaValueSerializer
public KafkaValueSerializer getKafkaValueSerializer()
-
-