Class KafkaStoreIngestionService
java.lang.Object
com.linkedin.venice.service.AbstractVeniceService
com.linkedin.davinci.kafka.consumer.KafkaStoreIngestionService
- All Implemented Interfaces:
StoreIngestionService,Closeable,AutoCloseable
public class KafkaStoreIngestionService
extends AbstractVeniceService
implements StoreIngestionService
Assumes: One to One mapping between a Venice Store and Kafka Topic.
Manages Kafka topics and partitions that need to be consumed for the stores on this node.
Launches
StoreIngestionTask for each store version to consume and process messages.
Uses the "new" Kafka Consumer.-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.venice.service.AbstractVeniceService
AbstractVeniceService.ServiceState -
Field Summary
Fields inherited from class com.linkedin.venice.service.AbstractVeniceService
logger, serviceState -
Constructor Summary
ConstructorsConstructorDescriptionKafkaStoreIngestionService(StorageService storageService, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, ReadOnlyStoreRepository metadataRepo, ReadOnlySchemaRepository schemaRepo, ReadOnlyLiveClusterConfigRepository liveClusterConfigRepository, io.tehuti.metrics.MetricsRepository metricsRepository, Optional<SchemaReader> kafkaMessageEnvelopeSchemaReader, Optional<ClientConfig> clientConfig, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer, Optional<HelixReadOnlyZKSharedSchemaRepository> zkSharedSchemaRepository, ICProvider icProvider, boolean isIsolatedIngestion, StorageEngineBackedCompressorFactory compressorFactory, Optional<ObjectCacheBackend> cacheBackend, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory, Optional<SSLFactory> sslFactory, HeartbeatMonitoringService heartbeatMonitoringService, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin, AdaptiveThrottlerSignalService adaptiveThrottlerSignalService, Optional<com.linkedin.d2.balancer.D2Client> d2Client) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddIngestionNotifier(VeniceNotifier notifier) Adds Notifier to get Notifications for get various status of the consumption tasks like start, completed, progress and error states.voidattemptToPrintIngestionInfoFor(String storeName, Integer version, Integer partition, String regionName) booleancontainsRunningConsumption(VeniceStoreVersionConfig veniceStore) Check whether there is a running consumption task for given store.booleancontainsRunningConsumption(String topic) Check whether there is a running consumption task for given store version topic.voiddemoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) Drops the corresponding Venice Partition gracefully.Get AggVersionedStorageIngestionStatsgetConsumptionSnapshots(String topicName, ComplementSet<Integer> partitions) Get topic names that are currently maintained by the ingestion service with corresponding version status not in an online state.getInternalRecordTransformerConfig(String storeName) final ReadOnlyStoreRepositorygetPartitionOffsetRecords(String topicName, int partition) This method should only be called when the forked ingestion process is handing over ingestion task to main process.getPubSubPosition(VeniceStoreVersionConfig veniceStore, int partitionId, Long timestamp, PubSubPosition pubSubPosition) getStoreIngestionTask(String topicName) getStoreVersionCompressionDictionary(String topicName) protected Map<String,StoreIngestionTask> getTopicPartitionIngestionContext(String versionTopic, String topicName, int partitionId) booleanstatic booleanhasCurrentVersionBootstrapping(Map<String, StoreIngestionTask> ingestionTaskMap) booleanbooleanbooleanisPartitionConsuming(String topic, int partitionId) Check whether the specified partition is still being consumedbooleankillConsumptionTask(String topicName) Kill all of running consumptions of given store.voidpromoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) voidrecordIngestionFailure(String storeName) voidregisterRecordTransformerConfig(String storeName, DaVinciRecordTransformerConfig recordTransformerConfig) voidreplaceAndAddTestNotifier(VeniceNotifier notifier) voidshutdownStoreIngestionTask(String topicName) This method closes the specifiedStoreIngestionTaskand wait for up to 10 seconds for fully shutdown.voidstartConsumption(VeniceStoreVersionConfig veniceStore, int partitionId, Optional<PubSubPosition> pubSubPosition) Starts consuming messages from Kafka Partition corresponding to Venice Partition.booleanStarts the Kafka consumption tasks for already subscribed partitions.stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId) Stops consuming messages from Kafka Partition corresponding to Venice Partition.voidstopConsumptionAndWait(VeniceStoreVersionConfig veniceStore, int partitionId, int sleepSeconds, int numRetries, boolean whetherToResetOffset) Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to (sleepSeconds * numRetires) to make sure partition consumption is stopped.voidStops all the Kafka consumption tasks.voidsyncTopicPartitionOffset(String topicName, int partition) Updates offset metadata and sync to storage for specified topic partition.booleantopicPartitionHasAnyPendingActions(String topic, int partition) voidupdatePartitionOffsetRecords(String topicName, int partition, ByteBuffer offsetRecordByteBuffer) This method updates all sub-partitions' latest offset records fetched from isolated ingestion process in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will not re-start the ingestion from scratch.protected voidupdateStatsEmission(NavigableMap<String, StoreIngestionTask> taskMap, String storeName) This function will go through all known ingestion task in this server node, find the task that matches the storeName and has the largest version number; if the task doesn't enable metric emission, enable it and update store ingestion stats.protected voidupdateStatsEmission(NavigableMap<String, StoreIngestionTask> taskMap, String storeName, int maximumVersion) Find the task that matches both the storeName and maximumVersion number, enable metrics emission for this task and update ingestion stats with this task; disable metric emission for all the task that doesn't max version.voidwaitIngestionTaskToCompleteAllPartitionPendingActions(String topicName, int partition, long retryIntervalInMs, int numRetries)
-
Constructor Details
-
KafkaStoreIngestionService
public KafkaStoreIngestionService(StorageService storageService, VeniceConfigLoader veniceConfigLoader, StorageMetadataService storageMetadataService, ClusterInfoProvider clusterInfoProvider, ReadOnlyStoreRepository metadataRepo, ReadOnlySchemaRepository schemaRepo, ReadOnlyLiveClusterConfigRepository liveClusterConfigRepository, io.tehuti.metrics.MetricsRepository metricsRepository, Optional<SchemaReader> kafkaMessageEnvelopeSchemaReader, Optional<ClientConfig> clientConfig, InternalAvroSpecificSerializer<PartitionState> partitionStateSerializer, Optional<HelixReadOnlyZKSharedSchemaRepository> zkSharedSchemaRepository, ICProvider icProvider, boolean isIsolatedIngestion, StorageEngineBackedCompressorFactory compressorFactory, Optional<ObjectCacheBackend> cacheBackend, boolean isDaVinciClient, RemoteIngestionRepairService remoteIngestionRepairService, PubSubClientsFactory pubSubClientsFactory, Optional<SSLFactory> sslFactory, HeartbeatMonitoringService heartbeatMonitoringService, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin, AdaptiveThrottlerSignalService adaptiveThrottlerSignalService, Optional<com.linkedin.d2.balancer.D2Client> d2Client)
-
-
Method Details
-
startInner
public boolean startInner()Starts the Kafka consumption tasks for already subscribed partitions.- Specified by:
startInnerin classAbstractVeniceService- Returns:
- true if the service is completely started,
false if it is still starting asynchronously (in this case, it is the implementer's
responsibility to set
AbstractVeniceService.serviceStatetoAbstractVeniceService.ServiceState.STARTEDupon completion of the async work).
-
hasCurrentVersionBootstrapping
public boolean hasCurrentVersionBootstrapping() -
hasCurrentVersionBootstrapping
public static boolean hasCurrentVersionBootstrapping(Map<String, StoreIngestionTask> ingestionTaskMap) -
stopInner
public void stopInner()Stops all the Kafka consumption tasks. Closes all the Kafka clients.- Specified by:
stopInnerin classAbstractVeniceService
-
getPubSubPosition
public Optional<PubSubPosition> getPubSubPosition(VeniceStoreVersionConfig veniceStore, int partitionId, Long timestamp, PubSubPosition pubSubPosition) - Specified by:
getPubSubPositionin interfaceStoreIngestionService
-
startConsumption
public void startConsumption(VeniceStoreVersionConfig veniceStore, int partitionId, Optional<PubSubPosition> pubSubPosition) Starts consuming messages from Kafka Partition corresponding to Venice Partition. Subscribes to partition if required.- Specified by:
startConsumptionin interfaceStoreIngestionService- Parameters:
veniceStore- Venice Store for the partition.partitionId- Venice partition's id.pubSubPosition-
-
shutdownStoreIngestionTask
This method closes the specifiedStoreIngestionTaskand wait for up to 10 seconds for fully shutdown.- Parameters:
topicName- Topic name of the ingestion task to be shutdown.
-
promoteToLeader
public void promoteToLeader(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) - Specified by:
promoteToLeaderin interfaceStoreIngestionService
-
demoteToStandby
public void demoteToStandby(VeniceStoreVersionConfig veniceStoreVersionConfig, int partitionId, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) - Specified by:
demoteToStandbyin interfaceStoreIngestionService
-
waitIngestionTaskToCompleteAllPartitionPendingActions
public void waitIngestionTaskToCompleteAllPartitionPendingActions(String topicName, int partition, long retryIntervalInMs, int numRetries) -
topicPartitionHasAnyPendingActions
-
isLiveUpdateSuppressionEnabled
public boolean isLiveUpdateSuppressionEnabled() -
getVeniceConfigLoader
- Specified by:
getVeniceConfigLoaderin interfaceStoreIngestionService
-
getVeniceWriterFactory
- Specified by:
getVeniceWriterFactoryin interfaceStoreIngestionService
-
stopConsumption
public CompletableFuture<Void> stopConsumption(VeniceStoreVersionConfig veniceStore, int partitionId) Stops consuming messages from Kafka Partition corresponding to Venice Partition.- Specified by:
stopConsumptionin interfaceStoreIngestionService- Parameters:
veniceStore- Venice Store for the partition.partitionId- Venice partition's id.
-
stopConsumptionAndWait
public void stopConsumptionAndWait(VeniceStoreVersionConfig veniceStore, int partitionId, int sleepSeconds, int numRetries, boolean whetherToResetOffset) Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to (sleepSeconds * numRetires) to make sure partition consumption is stopped.- Specified by:
stopConsumptionAndWaitin interfaceStoreIngestionService- Parameters:
veniceStore- Venice Store for the partition.partitionId- Venice partition's id.sleepSeconds-numRetries-
-
dropStoragePartitionGracefully
public CompletableFuture<Void> dropStoragePartitionGracefully(VeniceStoreVersionConfig veniceStore, int partitionId) Drops the corresponding Venice Partition gracefully. This should only be called afterstopConsumptionAndWait(com.linkedin.davinci.config.VeniceStoreVersionConfig, int, int, int, boolean)has been called- Parameters:
veniceStore- Venice Store for the partition.partitionId- Venice partition's id.- Returns:
- a future for the drop partition action.
-
killConsumptionTask
Description copied from interface:StoreIngestionServiceKill all of running consumptions of given store.- Specified by:
killConsumptionTaskin interfaceStoreIngestionService- Parameters:
topicName- Venice topic (store and version number) for the corresponding consumer task that needs to be killed. No action is taken for invocations of killConsumptionTask on topics that are not in the map. This includes logging.- Returns:
- true if a kill is needed and called, otherwise false
-
addIngestionNotifier
Description copied from interface:StoreIngestionServiceAdds Notifier to get Notifications for get various status of the consumption tasks like start, completed, progress and error states. Multiple Notifiers can be added for the same consumption tasks and all of them will be notified in order.- Specified by:
addIngestionNotifierin interfaceStoreIngestionService
-
replaceAndAddTestNotifier
- Specified by:
replaceAndAddTestNotifierin interfaceStoreIngestionService
-
containsRunningConsumption
Description copied from interface:StoreIngestionServiceCheck whether there is a running consumption task for given store.- Specified by:
containsRunningConsumptionin interfaceStoreIngestionService
-
containsRunningConsumption
Description copied from interface:StoreIngestionServiceCheck whether there is a running consumption task for given store version topic.- Specified by:
containsRunningConsumptionin interfaceStoreIngestionService
-
isPartitionConsuming
Description copied from interface:StoreIngestionServiceCheck whether the specified partition is still being consumed- Specified by:
isPartitionConsumingin interfaceStoreIngestionService
-
getIngestingTopicsWithVersionStatusNotOnline
Description copied from interface:StoreIngestionServiceGet topic names that are currently maintained by the ingestion service with corresponding version status not in an online state. Topics with invalid store or version number are also included in the returned list.- Specified by:
getIngestingTopicsWithVersionStatusNotOnlinein interfaceStoreIngestionService- Returns:
- a
Setof topic names.
-
recordIngestionFailure
- Specified by:
recordIngestionFailurein interfaceStoreIngestionService
-
getAggVersionedIngestionStats
Description copied from interface:StoreIngestionServiceGet AggVersionedStorageIngestionStats- Specified by:
getAggVersionedIngestionStatsin interfaceStoreIngestionService- Returns:
- an instance of
AggVersionedIngestionStats
-
getStoreVersionCompressionDictionary
-
getStoreIngestionTask
- Specified by:
getStoreIngestionTaskin interfaceStoreIngestionService
-
isDaVinciClient
public boolean isDaVinciClient() -
getConsumptionSnapshots
-
getTopicPartitionIngestionContext
public ReplicaIngestionResponse getTopicPartitionIngestionContext(String versionTopic, String topicName, int partitionId) -
updatePartitionOffsetRecords
public void updatePartitionOffsetRecords(String topicName, int partition, ByteBuffer offsetRecordByteBuffer) This method updates all sub-partitions' latest offset records fetched from isolated ingestion process in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will not re-start the ingestion from scratch. -
getPartitionOffsetRecords
This method should only be called when the forked ingestion process is handing over ingestion task to main process. It collects the user partition's latest OffsetRecords from partition consumption states (PCS). In theory, PCS should be available in this situation as we haven't unsubscribed from topic. If it is not available, we will throw exception as this is not as expected. -
syncTopicPartitionOffset
Updates offset metadata and sync to storage for specified topic partition. This method is invoked only when isolated ingestion process is reporting topic partition completion to make sure ingestion process is persisted. -
getMetadataRepo
-
getPubSubContext
-
getKafkaValueSerializer
-
getTopicNameToIngestionTaskMap
-
registerRecordTransformerConfig
public void registerRecordTransformerConfig(String storeName, DaVinciRecordTransformerConfig recordTransformerConfig) -
getInternalRecordTransformerConfig
-
attemptToPrintIngestionInfoFor
-