Package com.linkedin.davinci.ingestion
Class DefaultIngestionBackend
java.lang.Object
com.linkedin.davinci.ingestion.DefaultIngestionBackend
- All Implemented Interfaces:
IngestionBackend
,Closeable
,AutoCloseable
- Direct Known Subclasses:
IsolatedIngestionBackend
The default ingestion backend implementation. Ingestion will be done in the same JVM as the application.
-
Constructor Summary
ConstructorsConstructorDescriptionDefaultIngestionBackend
(StorageMetadataService storageMetadataService, KafkaStoreIngestionService storeIngestionService, StorageService storageService, BlobTransferManager blobTransferManager, VeniceServerConfig serverConfig) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addIngestionNotifier
(VeniceNotifier ingestionListener) void
close()
dropStoragePartitionGracefully
(VeniceStoreVersionConfig storeConfig, int partition, int timeoutInSeconds, boolean removeEmptyStorageEngine) This method stops to subscribe the specified topic partition and delete partition data from storage.boolean
Check whether there are any current version bootstrapping or not.boolean
isOffsetLagged
(String store, int versionNumber, int partition, long blobTransferDisabledOffsetLagThreshold, boolean hybridStore) A helper method to help decide if skip blob transfer and use kafka ingestion directly when there are some files already restore.void
killConsumptionTask
(String topicName) void
removeStorageEngine
(String topicName) void
setStorageEngineReference
(String topicName, AtomicReference<StorageEngine> storageEngineReference) void
shutdownIngestionTask
(String topicName) void
startConsumption
(VeniceStoreVersionConfig storeConfig, int partition) stopConsumption
(VeniceStoreVersionConfig storeConfig, int partition) Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
Methods inherited from interface com.linkedin.davinci.ingestion.IngestionBackend
dropStoragePartitionGracefully
-
Constructor Details
-
DefaultIngestionBackend
public DefaultIngestionBackend(StorageMetadataService storageMetadataService, KafkaStoreIngestionService storeIngestionService, StorageService storageService, BlobTransferManager blobTransferManager, VeniceServerConfig serverConfig)
-
-
Method Details
-
startConsumption
- Specified by:
startConsumption
in interfaceIngestionBackend
-
isOffsetLagged
public boolean isOffsetLagged(String store, int versionNumber, int partition, long blobTransferDisabledOffsetLagThreshold, boolean hybridStore) A helper method to help decide if skip blob transfer and use kafka ingestion directly when there are some files already restore. 1. If the store is a batch store, check if the end of push is received 2. If the store is a hybrid store, check the offset lag within the allowed threshold. Note: If `blobTransferDisabledOffsetLagThreshold` is negative, the offset lag check is skipped, and blob transfer always runs. This is because retained data may not be cleaned up unless a new host is added, making it difficult to validate this feature. This 'blobTransferDisabledOffsetLagThreshold' config ensures blob transfer always runs in such cases.- Parameters:
store
- the store nameversionNumber
- the version numberpartition
- the partition numberblobTransferDisabledOffsetLagThreshold
- the maximum allowed offset lag threshold. This value is controlled by config BLOB_TRANSFER_DISABLED_OFFSET_LAG_THRESHOLD, and default is 100000L. If the offset lag is within this threshold, bootstrapping from Kafka is allowed, even if blob transfer is enabled. If the lag exceeds this threshold, bootstrapping should happen from blobs transfer firstly.hybridStore
- whether the store is a hybrid store or not. If it is a hybrid store, then check via the offset. If it is a batch store, check if the batch push is done or not.- Returns:
- true if the store is lagged and needs to bootstrap from blob transfer, else false then bootstrap from Kafka.
-
stopConsumption
- Specified by:
stopConsumption
in interfaceIngestionBackend
-
killConsumptionTask
- Specified by:
killConsumptionTask
in interfaceIngestionBackend
-
shutdownIngestionTask
- Specified by:
shutdownIngestionTask
in interfaceIngestionBackend
-
removeStorageEngine
- Specified by:
removeStorageEngine
in interfaceIngestionBackend
-
dropStoragePartitionGracefully
public CompletableFuture<Void> dropStoragePartitionGracefully(VeniceStoreVersionConfig storeConfig, int partition, int timeoutInSeconds, boolean removeEmptyStorageEngine) Description copied from interface:IngestionBackend
This method stops to subscribe the specified topic partition and delete partition data from storage.- Specified by:
dropStoragePartitionGracefully
in interfaceIngestionBackend
- Parameters:
storeConfig
- Store version configpartition
- Partition number to be dropped in the store version.timeoutInSeconds
- Number of seconds to wait before timeout.removeEmptyStorageEngine
- Whether to drop storage engine when dropping the last partition.- Returns:
- a future for the drop partition action.
-
addIngestionNotifier
- Specified by:
addIngestionNotifier
in interfaceIngestionBackend
-
setStorageEngineReference
public void setStorageEngineReference(String topicName, AtomicReference<StorageEngine> storageEngineReference) - Specified by:
setStorageEngineReference
in interfaceIngestionBackend
-
hasCurrentVersionBootstrapping
public boolean hasCurrentVersionBootstrapping()Description copied from interface:IngestionBackend
Check whether there are any current version bootstrapping or not.- Specified by:
hasCurrentVersionBootstrapping
in interfaceIngestionBackend
-
getStoreIngestionService
- Specified by:
getStoreIngestionService
in interfaceIngestionBackend
-
close
public void close()- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
-