Class StoreIngestionTask

  • All Implemented Interfaces:, java.lang.AutoCloseable, java.lang.Runnable
    Direct Known Subclasses:

    public abstract class StoreIngestionTask
    extends java.lang.Object
    implements java.lang.Runnable,
    A runnable Kafka Consumer consuming messages from all the partition assigned to current node for a Kafka Topic.
    • Field Detail


        public static long SCHEMA_POLLING_DELAY_MS

        public static long STORE_VERSION_POLLING_DELAY_MS
      • storageEngineRepository

        protected final StorageEngineRepository storageEngineRepository
        storage destination for consumption
      • kafkaVersionTopic

        protected final java.lang.String kafkaVersionTopic
        Topics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety.
      • versionTopic

        protected final PubSubTopic versionTopic
      • realTimeTopic

        protected final PubSubTopic realTimeTopic
      • storeName

        protected final java.lang.String storeName
      • versionNumber

        protected final int versionNumber
      • ingestionTaskName

        protected final java.lang.String ingestionTaskName
      • kafkaProps

        protected final java.util.Properties kafkaProps
      • isRunning

        protected final java.util.concurrent.atomic.AtomicBoolean isRunning
      • emitMetrics

        protected final java.util.concurrent.atomic.AtomicBoolean emitMetrics
      • consumerActionSequenceNumber

        protected final java.util.concurrent.atomic.AtomicInteger consumerActionSequenceNumber
      • consumerActionsQueue

        protected final java.util.concurrent.PriorityBlockingQueue<ConsumerAction> consumerActionsQueue
      • partitionToPendingConsumerActionCountMap

        protected final java.util.Map<java.lang.Integer,​java.util.concurrent.atomic.AtomicInteger> partitionToPendingConsumerActionCountMap
      • partitionConsumptionStateMap

        protected final java.util.concurrent.ConcurrentMap<java.lang.Integer,​PartitionConsumptionState> partitionConsumptionStateMap
        Per-partition consumption state map
      • isCurrentVersion

        protected final java.util.function.BooleanSupplier isCurrentVersion
      • hybridStoreConfig

        protected final java.util.Optional<HybridStoreConfig> hybridStoreConfig
      • divErrorMetricCallback

        protected final java.util.function.Consumer<DataValidationException> divErrorMetricCallback
      • readCycleDelayMs

        protected final long readCycleDelayMs
      • emptyPollSleepMs

        protected final long emptyPollSleepMs
      • diskUsage

        protected final DiskUsage diskUsage
      • databaseSyncBytesIntervalForTransactionalMode

        protected final long databaseSyncBytesIntervalForTransactionalMode
        Message bytes consuming interval before persisting offset in offset db for transactional mode database.
      • databaseSyncBytesIntervalForDeferredWriteMode

        protected final long databaseSyncBytesIntervalForDeferredWriteMode
        Message bytes consuming interval before persisting offset in offset db for deferred-write database.
      • defaultReadyToServeChecker

        protected final com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheck defaultReadyToServeChecker
      • deserializedSchemaIds

        protected final SparseConcurrentList<java.lang.Object> deserializedSchemaIds
      • idleCounter

        protected int idleCounter
      • writeComputeFailureCode

        protected int writeComputeFailureCode
      • isWriteComputationEnabled

        protected final boolean isWriteComputationEnabled
      • subPartitionCount

        protected final int subPartitionCount
        This would be the number of partitions in the StorageEngine and in version topics
      • amplificationFactor

        protected final int amplificationFactor
      • storeVersionPartitionCount

        protected final int storeVersionPartitionCount
      • bootstrapTimeoutInMs

        protected final long bootstrapTimeoutInMs
      • isIsolatedIngestion

        protected final boolean isIsolatedIngestion
      • localKafkaServer

        protected final java.lang.String localKafkaServer
      • localKafkaClusterId

        protected final int localKafkaClusterId
      • localKafkaServerSingletonSet

        protected final java.util.Set<java.lang.String> localKafkaServerSingletonSet
      • isDaVinciClient

        protected final boolean isDaVinciClient
      • isDataRecovery

        protected boolean isDataRecovery
      • dataRecoverySourceVersionNumber

        protected int dataRecoverySourceVersionNumber
      • kafkaClusterUrlResolver

        protected final java.util.function.Function<java.lang.String,​java.lang.String> kafkaClusterUrlResolver
      • readOnlyForBatchOnlyStoreEnabled

        protected final boolean readOnlyForBatchOnlyStoreEnabled
      • isChunked

        protected final boolean isChunked
    • Method Detail

      • getIngestionTaskName

        public java.lang.String getIngestionTaskName()
      • getVersionNumber

        public int getVersionNumber()
      • isFutureVersion

        public boolean isFutureVersion()
      • throwIfNotRunning

        protected void throwIfNotRunning()
      • nextSeqNum

        protected int nextSeqNum()
        Apply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).
        an unique and increasing sequence number for a new consumer action.
      • subscribePartition

        public void subscribePartition​(PubSubTopicPartition topicPartition,
                                       java.util.Optional<LeaderFollowerStateType> leaderState,
                                       boolean isHelixTriggeredAction)
        Adds an asynchronous partition subscription request for the task.
      • unSubscribePartition

        public java.util.concurrent.CompletableFuture<java.lang.Void> unSubscribePartition​(PubSubTopicPartition topicPartition)
      • unSubscribePartition

        public java.util.concurrent.CompletableFuture<java.lang.Void> unSubscribePartition​(PubSubTopicPartition topicPartition,
                                                                                           boolean isHelixTriggeredAction)
        Adds an asynchronous partition unsubscription request for the task.
      • hasAnySubscription

        public boolean hasAnySubscription()
      • resetPartitionConsumptionOffset

        public void resetPartitionConsumptionOffset​(PubSubTopicPartition topicPartition)
        Adds an asynchronous resetting partition consumption offset request for the task.
      • getStoreName

        public java.lang.String getStoreName()
      • isUserSystemStore

        public boolean isUserSystemStore()
      • hasPendingPartitionIngestionAction

        public boolean hasPendingPartitionIngestionAction​(int userPartition)
      • kill

        public void kill()
      • checkAndLogIfLagIsAcceptableForHybridStore

        protected abstract boolean checkAndLogIfLagIsAcceptableForHybridStore​(PartitionConsumptionState partitionConsumptionState,
                                                                              long offsetLag,
                                                                              long offsetThreshold,
                                                                              boolean shouldLogLag,
                                                                              LagType lagType,
                                                                              long latestConsumedProducerTimestamp)
        Checks whether the lag is acceptable for hybrid stores
      • isReadyToServe

        protected boolean isReadyToServe​(PartitionConsumptionState partitionConsumptionState)
        This function checks various conditions to verify if a store is ready to serve. Lag = (Source Max Offset - SOBR Source Offset) - (Current Offset - SOBR Destination Offset)
        true if EOP was received and (for hybrid stores) if lag <= threshold
      • isReadyToServeAnnouncedWithRTLag

        public boolean isReadyToServeAnnouncedWithRTLag()
      • getLatestConsumedProducerTimestampWithSubPartition

        protected long getLatestConsumedProducerTimestampWithSubPartition​(long consumedProducerTimestamp,
                                                                          PartitionConsumptionState partitionConsumptionState)
      • isRealTimeBufferReplayStarted

        protected abstract boolean isRealTimeBufferReplayStarted​(PartitionConsumptionState partitionConsumptionState)
      • measureHybridOffsetLag

        protected abstract long measureHybridOffsetLag​(PartitionConsumptionState partitionConsumptionState,
                                                       boolean shouldLogLag)
        Measure the hybrid offset lag for partition being tracked in `partitionConsumptionState`.
      • reportIfCatchUpVersionTopicOffset

        protected abstract void reportIfCatchUpVersionTopicOffset​(PartitionConsumptionState partitionConsumptionState)
        Check if the ingestion progress has reached to the end of the version topic. This is currently only used LeaderFollowerStoreIngestionTask.
      • produceToStoreBufferService

        protected void produceToStoreBufferService​(PubSubMessage<KafkaKey,​KafkaMessageEnvelope,​java.lang.Long> consumedRecord,
                                                   LeaderProducedRecordContext leaderProducedRecordContext,
                                                   int subPartition,
                                                   java.lang.String kafkaUrl,
                                                   long beforeProcessingRecordTimestampNs,
                                                   long currentTimeForMetricsMs)
                                            throws java.lang.InterruptedException
        This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained by StoreBufferService.
        consumedRecord - : received consumer record
        leaderProducedRecordContext - : derived leaderProducedRecordContext
        subPartition -
        kafkaUrl -
      • produceToStoreBufferServiceOrKafka

        protected void produceToStoreBufferServiceOrKafka​(java.lang.Iterable<PubSubMessage<KafkaKey,​KafkaMessageEnvelope,​java.lang.Long>> records,
                                                          PubSubTopicPartition topicPartition,
                                                          java.lang.String kafkaUrl,
                                                          int kafkaClusterId)
                                                   throws java.lang.InterruptedException
        This function is in charge of producing the consumer records to the writer buffers maintained by StoreBufferService. This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this call.
        records - : received consumer records
        topicPartition -
      • checkIngestionProgress

        protected void checkIngestionProgress​(Store store)
                                       throws java.lang.InterruptedException
      • isIngestionTaskActive

        public boolean isIngestionTaskActive()
      • run

        public void run()
        Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread.
        Specified by:
        run in interface java.lang.Runnable
      • closeVeniceWriters

        public void closeVeniceWriters​(boolean doFlush)
      • closeVeniceViewWriters

        protected void closeVeniceViewWriters()
      • resolveSourceKafkaServersWithinTopicSwitch

        protected TopicSwitch resolveSourceKafkaServersWithinTopicSwitch​(TopicSwitch originalTopicSwitch)
        Applies name resolution to all Kafka URLs in the provided TopicSwitch. Useful for translating URLs that came from a different runtime (e.g. from the controller, or from state persisted by a previous run of the same server).
        the same TopicSwitch, mutated such that all Kafka URLs it contains are guaranteed to be usable by the current Venice server instance
      • getOffsetToOnlineLagThresholdPerPartition

        protected static long getOffsetToOnlineLagThresholdPerPartition​(java.util.Optional<HybridStoreConfig> hybridStoreConfig,
                                                                        ReadOnlyStoreRepository storeRepository,
                                                                        java.lang.String storeName,
                                                                        int subPartitionCount)
      • processCommonConsumerAction

        protected void processCommonConsumerAction​(ConsumerAction consumerAction)
                                            throws java.lang.InterruptedException
      • getTopicPartitionEndOffSet

        protected long getTopicPartitionEndOffSet​(java.lang.String kafkaUrl,
                                                  PubSubTopic pubSubTopic,
                                                  int partition)
        the end offset in kafka for the topic partition in SIT, or a negative value if it failed to get it. N.B.: The returned end offset is the last successfully replicated message plus one. If the partition has never been written to, the end offset is 0.
      • getPartitionOffsetLagBasedOnMetrics

        protected long getPartitionOffsetLagBasedOnMetrics​(java.lang.String kafkaSourceAddress,
                                                           PubSubTopic topic,
                                                           int partition)
      • checkLongRunningTaskState

        protected abstract void checkLongRunningTaskState()
                                                   throws java.lang.InterruptedException
      • processConsumerAction

        protected abstract void processConsumerAction​(ConsumerAction message,
                                                      Store store)
                                               throws java.lang.InterruptedException
      • getConsumptionSourceKafkaAddress

        protected abstract java.util.Set<java.lang.String> getConsumptionSourceKafkaAddress​(PartitionConsumptionState partitionConsumptionState)
      • getRealTimeDataSourceKafkaAddress

        protected java.util.Set<java.lang.String> getRealTimeDataSourceKafkaAddress​(PartitionConsumptionState partitionConsumptionState)
      • hasAnyPartitionConsumptionState

        public boolean hasAnyPartitionConsumptionState​(java.util.function.Predicate<PartitionConsumptionState> pcsPredicate)
      • getFailedIngestionPartitionCount

        public int getFailedIngestionPartitionCount()
      • shouldProcessRecord

        protected boolean shouldProcessRecord​(PubSubMessage<KafkaKey,​KafkaMessageEnvelope,​java.lang.Long> record,
                                              int subPartition)
        Common record check for different state models: check whether server continues receiving messages after EOP for a batch-only store.
      • getLeaderState

        public LeaderFollowerStateType getLeaderState​(int partition)
        Retrieve current LeaderFollowerState from partition's PCS. This method is used by IsolatedIngestionServer to sync user-partition LeaderFollower status from child process to parent process in ingestion isolation.
      • setLastConsumerException

        public void setLastConsumerException​(java.lang.Exception e)
      • setLastStoreIngestionException

        public void setLastStoreIngestionException​(java.lang.Exception e)
      • recordChecksumVerificationFailure

        public void recordChecksumVerificationFailure()
      • getBatchReplicationLag

        public abstract long getBatchReplicationLag()
      • getLeaderOffsetLag

        public abstract long getLeaderOffsetLag()
      • getBatchLeaderOffsetLag

        public abstract long getBatchLeaderOffsetLag()
      • getHybridLeaderOffsetLag

        public abstract long getHybridLeaderOffsetLag()
      • measureLagWithCallToPubSub

        protected long measureLagWithCallToPubSub​(java.lang.String pubSubServerName,
                                                  PubSubTopic topic,
                                                  int partition,
                                                  long currentOffset)
        pubSubServerName - Pub Sub deployment to interrogate
        topic - topic to measure
        partition - for which to measure lag
        the lag, or 9223372036854775807L if it failed to measure it N.B.: Note that the returned lag can be negative since the end offset used in the calculation is cached.
      • measureLagWithCallToPubSub

        protected static long measureLagWithCallToPubSub​(java.lang.String pubSubServerName,
                                                         PubSubTopic topic,
                                                         int partition,
                                                         long currentOffset,
                                                         java.util.function.Function<java.lang.String,​TopicManager> topicManagerProvider)
      • getFollowerOffsetLag

        public abstract long getFollowerOffsetLag()
        Measure the offset lag between follower and leader
      • getBatchFollowerOffsetLag

        public abstract long getBatchFollowerOffsetLag()
      • getHybridFollowerOffsetLag

        public abstract long getHybridFollowerOffsetLag()
      • getRegionHybridOffsetLag

        public abstract long getRegionHybridOffsetLag​(int regionId)
      • getWriteComputeErrorCode

        public abstract int getWriteComputeErrorCode()
      • updateLeaderTopicOnFollower

        public abstract void updateLeaderTopicOnFollower​(PartitionConsumptionState partitionConsumptionState)
      • minZeroLag

        protected long minZeroLag​(long value)
        Because of timing considerations, it is possible that some lag metrics could compute negative values. Negative lag does not make sense so the intent is to ease interpretation by applying a lower bound of zero on these metrics...
      • isHybridMode

        public boolean isHybridMode()
      • updateLatestInMemoryProcessedOffset

        protected abstract void updateLatestInMemoryProcessedOffset​(PartitionConsumptionState partitionConsumptionState,
                                                                    PubSubMessage<KafkaKey,​KafkaMessageEnvelope,​java.lang.Long> consumerRecordWrapper,
                                                                    LeaderProducedRecordContext leaderProducedRecordContext,
                                                                    java.lang.String kafkaUrl,
                                                                    boolean dryRun)
        Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets inside OffsetRecord. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset.
      • recordWriterStats

        protected void recordWriterStats​(long consumerTimestampMs,
                                         long producerBrokerLatencyMs,
                                         long brokerConsumerLatencyMs,
                                         PartitionConsumptionState partitionConsumptionState)
      • validateMessage

        protected void validateMessage​(KafkaDataIntegrityValidator validator,
                                       PubSubMessage<KafkaKey,​KafkaMessageEnvelope,​java.lang.Long> consumerRecord,
                                       boolean endOfPushReceived,
                                       PartitionConsumptionState partitionConsumptionState)
        Message validation using DIV. Leaders should pass in the validator instance from LeaderFollowerStoreIngestionTask; and drainers should pass in the validator instance from StoreIngestionTask 1. If valid DIV errors happen after EOP is received, no fatal exceptions will be thrown. But the errors will be recorded into the DIV metrics. 2. For any DIV errors happened to unregistered producers && after EOP, the errors will be ignored. 3. For any DIV errors happened to records which is after logCompactionDelayInMs, the errors will be ignored.
      • cloneProducerStates

        protected void cloneProducerStates​(int partition,
                                           KafkaDataIntegrityValidator validator)
        We should only allow StoreIngestionTask to access kafkaDataIntegrityValidator; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces.
      • putInStorageEngine

        protected void putInStorageEngine​(int partition,
                                          byte[] keyBytes,
                                          Put put)
        Persist Put record to storage engine.
      • removeFromStorageEngine

        protected void removeFromStorageEngine​(int partition,
                                               byte[] keyBytes,
                                               Delete delete)
      • throwOrLogStorageFailureDependingIfStillSubscribed

        protected void throwOrLogStorageFailureDependingIfStillSubscribed​(int partition,
                                                                          VeniceException e)
      • logStorageOperationWhileUnsubscribed

        protected void logStorageOperationWhileUnsubscribed​(int partition)
      • consumerHasAnySubscription

        public boolean consumerHasAnySubscription()
      • consumerBatchUnsubscribe

        public void consumerBatchUnsubscribe​(java.util.Set<PubSubTopicPartition> topicPartitionSet)
      • consumerUnSubscribeAllTopics

        public abstract void consumerUnSubscribeAllTopics​(PartitionConsumptionState partitionConsumptionState)
      • consumerSubscribe

        public void consumerSubscribe​(PubSubTopicPartition topicPartition,
                                      long startOffset,
                                      java.lang.String kafkaURL)
      • waitVersionStateAvailable

        protected StoreVersionState waitVersionStateAvailable​(java.lang.String kafkaTopic)
                                                       throws java.lang.InterruptedException
      • close

        public void close()
        Stops the consumer task.
        Specified by:
        close in interface java.lang.AutoCloseable
        Specified by:
        close in interface
      • shutdown

        public void shutdown​(int waitTime)
        This method is a blocking call to wait for StoreIngestionTask for fully shutdown in the given time.
        waitTime - Maximum wait time for the shutdown operation.
      • isRunning

        public boolean isRunning()
        A function to allow the service to get the current status of the task. This would allow the service to create a new task if required.
      • getVersionTopic

        public PubSubTopic getVersionTopic()
      • getRealtimeTopic

        public PubSubTopic getRealtimeTopic()
      • isMetricsEmissionEnabled

        public boolean isMetricsEmissionEnabled()
      • enableMetricsEmission

        public void enableMetricsEmission()
      • disableMetricsEmission

        public void disableMetricsEmission()
      • isPartitionConsumingOrHasPendingIngestionAction

        public boolean isPartitionConsumingOrHasPendingIngestionAction​(int userPartition)
        To check whether the given partition is still consuming message from Kafka
      • createKafkaConsumerProperties

        protected java.util.Properties createKafkaConsumerProperties​(java.util.Properties localConsumerProps,
                                                                     java.lang.String remoteKafkaSourceAddress,
                                                                     boolean consumeRemotely)
        Override the CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG config with a remote Kafka bootstrap url.
      • reportError

        public void reportError​(java.lang.String message,
                                int userPartition,
                                java.lang.Exception e)
      • getAmplificationFactor

        public int getAmplificationFactor()
      • isActiveActiveReplicationEnabled

        public boolean isActiveActiveReplicationEnabled()
      • dumpPartitionConsumptionStates

        public void dumpPartitionConsumptionStates​(AdminResponse response,
                                                   ComplementSet<java.lang.Integer> partitions)
        Invoked by admin request to dump the requested partition consumption states
      • dumpStoreVersionState

        public void dumpStoreVersionState​(AdminResponse response)
        Invoked by admin request to dump store version state metadata.
      • updateOffsetMetadataAndSync

        public void updateOffsetMetadataAndSync​(java.lang.String topic,
                                                int partitionId)
      • getTopicManager

        protected TopicManager getTopicManager​(java.lang.String sourceKafkaServer)
        The function returns local or remote topic manager.
        sourceKafkaServer - The address of source kafka bootstrap server.
        topic manager
      • waitForAllMessageToBeProcessedFromTopicPartition

        protected void waitForAllMessageToBeProcessedFromTopicPartition​(PubSubTopicPartition topicPartition,
                                                                        PartitionConsumptionState partitionConsumptionState)
                                                                 throws java.lang.InterruptedException
        The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function. This will make the calling thread to block.
        topicPartition - for which to wait
      • recordProcessedRecordStats

        protected void recordProcessedRecordStats​(PartitionConsumptionState partitionConsumptionState,
                                                  int processedRecordSize)
      • isSegmentControlMsg

        protected boolean isSegmentControlMsg​(ControlMessageType msgType)
      • isTransientRecordBufferUsed

        public boolean isTransientRecordBufferUsed()
        This is not a per record state. Rather it's used to indicate if the transient record buffer is being used at all for this ingestion task or not. For L/F mode only WC ingestion task needs this buffer.
      • setPartitionConsumptionState

        protected void setPartitionConsumptionState​(int partition,
                                                    PartitionConsumptionState pcs)
      • isChunked

        protected boolean isChunked()
      • getKafkaVersionTopic

        protected java.lang.String getKafkaVersionTopic()
      • isStuckByMemoryConstraint

        public boolean isStuckByMemoryConstraint()
      • shouldUpdateUpstreamOffset

        protected boolean shouldUpdateUpstreamOffset​(PubSubMessage<KafkaKey,​KafkaMessageEnvelope,​java.lang.Long> consumerRecord)
        Validate if the given consumerRecord has a valid upstream offset to update from.
        consumerRecord -
        true, if the record is not null and contains a valid upstream offset, otherwise false.
      • isProducingVersionTopicHealthy

        public boolean isProducingVersionTopicHealthy()
        This function is checking the following conditions: 1. Whether the version topic exists or not.