Class BlobTransferIngestionHelper
java.lang.Object
com.linkedin.davinci.kafka.consumer.BlobTransferIngestionHelper
Utility class encapsulating blob transfer logic for use within
StoreIngestionTask.
Methods are extracted from StoreIngestionTask and
DefaultIngestionBackend for better readability.-
Constructor Summary
ConstructorsConstructorDescriptionBlobTransferIngestionHelper(BlobTransferManager blobTransferManager, StorageService storageService, StorageMetadataService storageMetadataService, VeniceServerConfig serverConfig, Set<String> blobTransferDisabledStores) -
Method Summary
Modifier and TypeMethodDescriptionvoidadjustStoragePartitionWhenBlobTransferComplete(StorageEngine storageEngine, int partition, String replicaId) Adjusts the storage partition after blob transfer completes.voidcancelBlobTransferAndAwaitTermination(int partition, int timeoutInSeconds, String replicaId) Waits for blob transfer to reach a final state before returning.voidclearTransferStatus(String replicaId) Clears the tracking manager status for the given replica.Determines the RocksDB table format to request for blob transfer.booleanisReplicaLaggedAndNeedBlobTransfer(String storeName, int versionNumber, int partition, String replicaId, boolean isHybrid, String kafkaVersionTopic, PubSubContext pubSubContext) Checks if the replica is lagged enough to warrant blob transfer instead of Kafka bootstrap.voidrequestBlobTransferCancellation(String replicaId) Signals the blob transfer manager to cancel a transfer for the given replica.voidCancels a pending blob transfer for the given PCS if one is in progress.booleanshouldEnableBlobTransfer(Store store, boolean isDaVinciClient) Checks store/server config to determine if blob transfer is enabled.booleanshouldStartBlobTransfer(ConsumerAction consumerAction, Store store, String storeName, int versionNumber, int partition, String replicaId, boolean isDaVinciClient, boolean isHybrid, String kafkaVersionTopic, PubSubContext pubSubContext) Determines whether blob transfer should be started for a given partition.voidstartBlobTransferAsyncForPartition(int partition, PartitionConsumptionState pcs, StorageEngine storageEngine, String storeName, int versionNumber, VeniceStoreVersionConfig storeVersionConfig, String kafkaVersionTopic) Starts an async blob transfer for the given partition.voidupdateBlobTransferResponseStats(String storeName, int versionNumber, boolean transferSucceeded, CompletableFuture<Void> future) Records blob transfer response stats.voidvalidateDirectoriesAfterBlobTransfer(String storeName, int versionNumber, int partition, boolean transferSuccessful, String replicaId) Validates directories after blob transfer completes.
-
Constructor Details
-
BlobTransferIngestionHelper
public BlobTransferIngestionHelper(BlobTransferManager blobTransferManager, StorageService storageService, StorageMetadataService storageMetadataService, VeniceServerConfig serverConfig, Set<String> blobTransferDisabledStores)
-
-
Method Details
-
shouldStartBlobTransfer
public boolean shouldStartBlobTransfer(ConsumerAction consumerAction, Store store, String storeName, int versionNumber, int partition, String replicaId, boolean isDaVinciClient, boolean isHybrid, String kafkaVersionTopic, PubSubContext pubSubContext) Determines whether blob transfer should be started for a given partition. Checks blob transfer manager availability, consumer action type, store config, and replica lag. -
shouldEnableBlobTransfer
Checks store/server config to determine if blob transfer is enabled. -
isReplicaLaggedAndNeedBlobTransfer
public boolean isReplicaLaggedAndNeedBlobTransfer(String storeName, int versionNumber, int partition, String replicaId, boolean isHybrid, String kafkaVersionTopic, PubSubContext pubSubContext) Checks if the replica is lagged enough to warrant blob transfer instead of Kafka bootstrap. -
startBlobTransferAsyncForPartition
public void startBlobTransferAsyncForPartition(int partition, PartitionConsumptionState pcs, StorageEngine storageEngine, String storeName, int versionNumber, VeniceStoreVersionConfig storeVersionConfig, String kafkaVersionTopic) Starts an async blob transfer for the given partition. Prepares storage, cleans up directories, opens a blob-transfer-in-progress partition, and kicks off the async transfer. The returned future is stored on the PCS. -
requestPendingBlobTransferCancellation
Cancels a pending blob transfer for the given PCS if one is in progress. This is fire-and-forget — used by UNSUBSCRIBE which doesn't delete files. -
cancelBlobTransferAndAwaitTermination
public void cancelBlobTransferAndAwaitTermination(int partition, int timeoutInSeconds, String replicaId) Waits for blob transfer to reach a final state before returning. Used before dropping a partition to ensure Netty has finished writing files. Polls the BlobTransferStatusTrackingManager until the transfer reaches a final state (null, TRANSFER_COMPLETED, or TRANSFER_CANCELLED). -
validateDirectoriesAfterBlobTransfer
public void validateDirectoriesAfterBlobTransfer(String storeName, int versionNumber, int partition, boolean transferSuccessful, String replicaId) Validates directories after blob transfer completes. On success: clean up any remaining temp directory. On failure: clean up both partition and temp directories. -
adjustStoragePartitionWhenBlobTransferComplete
public void adjustStoragePartitionWhenBlobTransferComplete(StorageEngine storageEngine, int partition, String replicaId) Adjusts the storage partition after blob transfer completes. Drops the blob-transfer-in-progress partition and creates a new one with default options (RocksDB open). -
updateBlobTransferResponseStats
public void updateBlobTransferResponseStats(String storeName, int versionNumber, boolean transferSucceeded, CompletableFuture<Void> future) Records blob transfer response stats. -
getRequestTableFormat
Determines the RocksDB table format to request for blob transfer. -
requestBlobTransferCancellation
Signals the blob transfer manager to cancel a transfer for the given replica. -
clearTransferStatus
Clears the tracking manager status for the given replica. Called after blob transfer is fully handled (success or failure) to prevent stale state. -
getBlobTransferManager
-