Class DefaultIngestionBackend

java.lang.Object
com.linkedin.davinci.ingestion.DefaultIngestionBackend
All Implemented Interfaces:
IngestionBackend, Closeable, AutoCloseable
Direct Known Subclasses:
IsolatedIngestionBackend

public class DefaultIngestionBackend extends Object implements IngestionBackend
The default ingestion backend implementation. Ingestion will be done in the same JVM as the application.
  • Constructor Details

  • Method Details

    • startConsumption

      public void startConsumption(VeniceStoreVersionConfig storeConfig, int partition)
      Specified by:
      startConsumption in interface IngestionBackend
    • isOffsetLagged

      public boolean isOffsetLagged(String store, int versionNumber, int partition, long blobTransferDisabledOffsetLagThreshold, boolean hybridStore)
      A helper method to help decide if skip blob transfer and use kafka ingestion directly when there are some files already restore. 1. If the store is a batch store, check if the end of push is received 2. If the store is a hybrid store, check the offset lag within the allowed threshold. Note: If `blobTransferDisabledOffsetLagThreshold` is negative, the offset lag check is skipped, and blob transfer always runs. This is because retained data may not be cleaned up unless a new host is added, making it difficult to validate this feature. This 'blobTransferDisabledOffsetLagThreshold' config ensures blob transfer always runs in such cases.
      Parameters:
      store - the store name
      versionNumber - the version number
      partition - the partition number
      blobTransferDisabledOffsetLagThreshold - the maximum allowed offset lag threshold. This value is controlled by config BLOB_TRANSFER_DISABLED_OFFSET_LAG_THRESHOLD, and default is 100000L. If the offset lag is within this threshold, bootstrapping from Kafka is allowed, even if blob transfer is enabled. If the lag exceeds this threshold, bootstrapping should happen from blobs transfer firstly.
      hybridStore - whether the store is a hybrid store or not. If it is a hybrid store, then check via the offset. If it is a batch store, check if the batch push is done or not.
      Returns:
      true if the store is lagged and needs to bootstrap from blob transfer, else false then bootstrap from Kafka.
    • stopConsumption

      public CompletableFuture<Void> stopConsumption(VeniceStoreVersionConfig storeConfig, int partition)
      Specified by:
      stopConsumption in interface IngestionBackend
    • killConsumptionTask

      public void killConsumptionTask(String topicName)
      Specified by:
      killConsumptionTask in interface IngestionBackend
    • shutdownIngestionTask

      public void shutdownIngestionTask(String topicName)
      Specified by:
      shutdownIngestionTask in interface IngestionBackend
    • removeStorageEngine

      public void removeStorageEngine(String topicName)
      Specified by:
      removeStorageEngine in interface IngestionBackend
    • dropStoragePartitionGracefully

      public CompletableFuture<Void> dropStoragePartitionGracefully(VeniceStoreVersionConfig storeConfig, int partition, int timeoutInSeconds, boolean removeEmptyStorageEngine)
      Description copied from interface: IngestionBackend
      This method stops to subscribe the specified topic partition and delete partition data from storage.
      Specified by:
      dropStoragePartitionGracefully in interface IngestionBackend
      Parameters:
      storeConfig - Store version config
      partition - Partition number to be dropped in the store version.
      timeoutInSeconds - Number of seconds to wait before timeout.
      removeEmptyStorageEngine - Whether to drop storage engine when dropping the last partition.
      Returns:
      a future for the drop partition action.
    • addIngestionNotifier

      public void addIngestionNotifier(VeniceNotifier ingestionListener)
      Specified by:
      addIngestionNotifier in interface IngestionBackend
    • setStorageEngineReference

      public void setStorageEngineReference(String topicName, AtomicReference<StorageEngine> storageEngineReference)
      Specified by:
      setStorageEngineReference in interface IngestionBackend
    • hasCurrentVersionBootstrapping

      public boolean hasCurrentVersionBootstrapping()
      Description copied from interface: IngestionBackend
      Check whether there are any current version bootstrapping or not.
      Specified by:
      hasCurrentVersionBootstrapping in interface IngestionBackend
    • getStoreIngestionService

      public KafkaStoreIngestionService getStoreIngestionService()
      Specified by:
      getStoreIngestionService in interface IngestionBackend
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable