Class KafkaStoreIngestionService

  • All Implemented Interfaces:
    StoreIngestionService, java.io.Closeable, java.lang.AutoCloseable

    public class KafkaStoreIngestionService
    extends AbstractVeniceService
    implements StoreIngestionService
    Assumes: One to One mapping between a Venice Store and Kafka Topic. Manages Kafka topics and partitions that need to be consumed for the stores on this node. Launches StoreIngestionTask for each store version to consume and process messages. Uses the "new" Kafka Consumer.
    • Method Detail

      • hasCurrentVersionBootstrapping

        public boolean hasCurrentVersionBootstrapping()
      • hasCurrentVersionBootstrapping

        public static boolean hasCurrentVersionBootstrapping​(java.util.Map<java.lang.String,​StoreIngestionTask> ingestionTaskMap)
      • stopInner

        public void stopInner()
        Stops all the Kafka consumption tasks. Closes all the Kafka clients.
        Specified by:
        stopInner in class AbstractVeniceService
      • startConsumption

        public void startConsumption​(VeniceStoreVersionConfig veniceStore,
                                     int partitionId)
        Starts consuming messages from Kafka Partition corresponding to Venice Partition. Subscribes to partition if required.
        Specified by:
        startConsumption in interface StoreIngestionService
        Parameters:
        veniceStore - Venice Store for the partition.
        partitionId - Venice partition's id.
      • shutdownStoreIngestionTask

        public void shutdownStoreIngestionTask​(java.lang.String topicName)
        This method closes the specified StoreIngestionTask and wait for up to 10 seconds for fully shutdown.
        Parameters:
        topicName - Topic name of the ingestion task to be shutdown.
      • waitIngestionTaskToCompleteAllPartitionPendingActions

        public void waitIngestionTaskToCompleteAllPartitionPendingActions​(java.lang.String topicName,
                                                                          int partition,
                                                                          long retryIntervalInMs,
                                                                          int numRetries)
      • topicPartitionHasAnyPendingActions

        public boolean topicPartitionHasAnyPendingActions​(java.lang.String topic,
                                                          int partition)
      • isLiveUpdateSuppressionEnabled

        public boolean isLiveUpdateSuppressionEnabled()
      • updateStatsEmission

        protected void updateStatsEmission​(java.util.NavigableMap<java.lang.String,​StoreIngestionTask> taskMap,
                                           java.lang.String storeName,
                                           int maximumVersion)
        Find the task that matches both the storeName and maximumVersion number, enable metrics emission for this task and update ingestion stats with this task; disable metric emission for all the task that doesn't max version.
      • updateStatsEmission

        protected void updateStatsEmission​(java.util.NavigableMap<java.lang.String,​StoreIngestionTask> taskMap,
                                           java.lang.String storeName)
        This function will go through all known ingestion task in this server node, find the task that matches the storeName and has the largest version number; if the task doesn't enable metric emission, enable it and update store ingestion stats.
      • stopConsumption

        public java.util.concurrent.CompletableFuture<java.lang.Void> stopConsumption​(VeniceStoreVersionConfig veniceStore,
                                                                                      int partitionId)
        Stops consuming messages from Kafka Partition corresponding to Venice Partition.
        Specified by:
        stopConsumption in interface StoreIngestionService
        Parameters:
        veniceStore - Venice Store for the partition.
        partitionId - Venice partition's id.
      • stopConsumptionAndWait

        public void stopConsumptionAndWait​(VeniceStoreVersionConfig veniceStore,
                                           int partitionId,
                                           int sleepSeconds,
                                           int numRetries,
                                           boolean whetherToResetOffset)
        Stops consuming messages from Kafka Partition corresponding to Venice Partition and wait up to (sleepSeconds * numRetires) to make sure partition consumption is stopped.
        Specified by:
        stopConsumptionAndWait in interface StoreIngestionService
        Parameters:
        veniceStore - Venice Store for the partition.
        partitionId - Venice partition's id.
        sleepSeconds -
        numRetries -
      • killConsumptionTask

        public boolean killConsumptionTask​(java.lang.String topicName)
        Description copied from interface: StoreIngestionService
        Kill all of running consumptions of given store.
        Specified by:
        killConsumptionTask in interface StoreIngestionService
        Parameters:
        topicName - Venice topic (store and version number) for the corresponding consumer task that needs to be killed. No action is taken for invocations of killConsumptionTask on topics that are not in the map. This includes logging.
        Returns:
        true if a kill is needed and called, otherwise false
      • addIngestionNotifier

        public void addIngestionNotifier​(VeniceNotifier notifier)
        Description copied from interface: StoreIngestionService
        Adds Notifier to get Notifications for get various status of the consumption tasks like start, completed, progress and error states. Multiple Notifiers can be added for the same consumption tasks and all of them will be notified in order.
        Specified by:
        addIngestionNotifier in interface StoreIngestionService
      • getIngestingTopicsWithVersionStatusNotOnline

        public java.util.Set<java.lang.String> getIngestingTopicsWithVersionStatusNotOnline()
        Description copied from interface: StoreIngestionService
        Get topic names that are currently maintained by the ingestion service with corresponding version status not in an online state. Topics with invalid store or version number are also included in the returned list.
        Specified by:
        getIngestingTopicsWithVersionStatusNotOnline in interface StoreIngestionService
        Returns:
        a Set of topic names.
      • getStoreVersionCompressionDictionary

        public java.nio.ByteBuffer getStoreVersionCompressionDictionary​(java.lang.String topicName)
      • getConsumptionSnapshots

        public AdminResponse getConsumptionSnapshots​(java.lang.String topicName,
                                                     ComplementSet<java.lang.Integer> partitions)
      • getTopicPartitionIngestionContext

        public ReplicaIngestionResponse getTopicPartitionIngestionContext​(java.lang.String versionTopic,
                                                                          java.lang.String topicName,
                                                                          int partitionId)
      • updatePartitionOffsetRecords

        public void updatePartitionOffsetRecords​(java.lang.String topicName,
                                                 int partition,
                                                 java.nio.ByteBuffer offsetRecordByteBuffer)
        This method updates all sub-partitions' latest offset records fetched from isolated ingestion process in main process, so main process's in-memory storage metadata service could be aware of the latest updates and will not re-start the ingestion from scratch.
      • getPartitionOffsetRecords

        public java.nio.ByteBuffer getPartitionOffsetRecords​(java.lang.String topicName,
                                                             int partition)
        This method should only be called when the forked ingestion process is handing over ingestion task to main process. It collects the user partition's latest OffsetRecords from partition consumption states (PCS). In theory, PCS should be available in this situation as we haven't unsubscribed from topic. If it is not available, we will throw exception as this is not as expected.
      • syncTopicPartitionOffset

        public void syncTopicPartitionOffset​(java.lang.String topicName,
                                             int partition)
        Updates offset metadata and sync to storage for specified topic partition. This method is invoked only when isolated ingestion process is reporting topic partition completion to make sure ingestion process is persisted.