Class BlobTransferIngestionHelper

java.lang.Object
com.linkedin.davinci.kafka.consumer.BlobTransferIngestionHelper

public class BlobTransferIngestionHelper extends Object
Utility class encapsulating blob transfer logic for use within StoreIngestionTask. Methods are extracted from StoreIngestionTask and DefaultIngestionBackend for better readability.
  • Constructor Details

  • Method Details

    • shouldStartBlobTransfer

      public boolean shouldStartBlobTransfer(ConsumerAction consumerAction, Store store, String storeName, int versionNumber, int partition, String replicaId, boolean isDaVinciClient, boolean isHybrid, String kafkaVersionTopic, PubSubContext pubSubContext)
      Determines whether blob transfer should be started for a given partition. Checks blob transfer manager availability, consumer action type, store config, and replica lag.
    • shouldEnableBlobTransfer

      public boolean shouldEnableBlobTransfer(Store store, boolean isDaVinciClient)
      Checks store/server config to determine if blob transfer is enabled.
    • isReplicaLaggedAndNeedBlobTransfer

      public boolean isReplicaLaggedAndNeedBlobTransfer(String storeName, int versionNumber, int partition, String replicaId, boolean isHybrid, String kafkaVersionTopic, PubSubContext pubSubContext)
      Checks if the replica is lagged enough to warrant blob transfer instead of Kafka bootstrap.
    • startBlobTransferAsyncForPartition

      public void startBlobTransferAsyncForPartition(int partition, PartitionConsumptionState pcs, StorageEngine storageEngine, String storeName, int versionNumber, VeniceStoreVersionConfig storeVersionConfig, String kafkaVersionTopic)
      Starts an async blob transfer for the given partition. Prepares storage, cleans up directories, opens a blob-transfer-in-progress partition, and kicks off the async transfer. The returned future is stored on the PCS.
    • requestPendingBlobTransferCancellation

      public void requestPendingBlobTransferCancellation(PartitionConsumptionState pcs)
      Cancels a pending blob transfer for the given PCS if one is in progress. This is fire-and-forget — used by UNSUBSCRIBE which doesn't delete files.
    • cancelBlobTransferAndAwaitTermination

      public void cancelBlobTransferAndAwaitTermination(int partition, int timeoutInSeconds, String replicaId)
      Waits for blob transfer to reach a final state before returning. Used before dropping a partition to ensure Netty has finished writing files. Polls the BlobTransferStatusTrackingManager until the transfer reaches a final state (null, TRANSFER_COMPLETED, or TRANSFER_CANCELLED).
    • validateDirectoriesAfterBlobTransfer

      public void validateDirectoriesAfterBlobTransfer(String storeName, int versionNumber, int partition, boolean transferSuccessful, String replicaId)
      Validates directories after blob transfer completes. On success: clean up any remaining temp directory. On failure: clean up both partition and temp directories.
    • adjustStoragePartitionWhenBlobTransferComplete

      public void adjustStoragePartitionWhenBlobTransferComplete(StorageEngine storageEngine, int partition, String replicaId)
      Adjusts the storage partition after blob transfer completes. Drops the blob-transfer-in-progress partition and creates a new one with default options (RocksDB open).
    • updateBlobTransferResponseStats

      public void updateBlobTransferResponseStats(String storeName, int versionNumber, boolean transferSucceeded, CompletableFuture<Void> future)
      Records blob transfer response stats.
    • getRequestTableFormat

      public BlobTransferUtils.BlobTransferTableFormat getRequestTableFormat()
      Determines the RocksDB table format to request for blob transfer.
    • requestBlobTransferCancellation

      public void requestBlobTransferCancellation(String replicaId)
      Signals the blob transfer manager to cancel a transfer for the given replica.
    • clearTransferStatus

      public void clearTransferStatus(String replicaId)
      Clears the tracking manager status for the given replica. Called after blob transfer is fully handled (success or failure) to prevent stale state.
    • getBlobTransferManager

      public BlobTransferManager getBlobTransferManager()