Class StoreIngestionTask

java.lang.Object
com.linkedin.davinci.kafka.consumer.StoreIngestionTask
All Implemented Interfaces:
Closeable, AutoCloseable, Runnable
Direct Known Subclasses:
LeaderFollowerStoreIngestionTask

public abstract class StoreIngestionTask extends Object implements Runnable, Closeable
A runnable Kafka Consumer consuming messages from all the partition assigned to current node for a Kafka Topic.
  • Field Details

    • SCHEMA_POLLING_DELAY_MS

      public static long SCHEMA_POLLING_DELAY_MS
    • STORE_VERSION_POLLING_DELAY_MS

      public static long STORE_VERSION_POLLING_DELAY_MS
    • WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED

      protected static final long WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED
    • KILL_WAIT_TIME_MS

      protected static final long KILL_WAIT_TIME_MS
      See Also:
    • REDUNDANT_LOGGING_FILTER

      protected static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER
    • storageService

      protected final StorageService storageService
      storage destination for consumption
    • storageEngineRepository

      protected final StorageEngineRepository storageEngineRepository
    • storageEngine

      protected final AbstractStorageEngine storageEngine
    • kafkaVersionTopic

      protected final String kafkaVersionTopic
      Topics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety.
    • versionTopic

      protected final PubSubTopic versionTopic
    • realTimeTopic

      protected final PubSubTopic realTimeTopic
    • storeName

      protected final String storeName
    • versionNumber

      protected final int versionNumber
    • schemaRepository

      protected final ReadOnlySchemaRepository schemaRepository
    • storeRepository

      protected final ReadOnlyStoreRepository storeRepository
    • ingestionTaskName

      protected final String ingestionTaskName
    • kafkaProps

      protected final Properties kafkaProps
    • isRunning

      protected final AtomicBoolean isRunning
    • emitMetrics

      protected final AtomicBoolean emitMetrics
    • consumerActionSequenceNumber

      protected final AtomicInteger consumerActionSequenceNumber
    • consumerActionsQueue

      protected final PriorityBlockingQueue<ConsumerAction> consumerActionsQueue
    • partitionToPendingConsumerActionCountMap

      protected final Map<Integer,AtomicInteger> partitionToPendingConsumerActionCountMap
    • storageMetadataService

      protected final StorageMetadataService storageMetadataService
    • topicManagerRepository

      protected final TopicManagerRepository topicManagerRepository
    • partitionConsumptionStateMap

      protected final ConcurrentMap<Integer,PartitionConsumptionState> partitionConsumptionStateMap
      Per-partition consumption state map
    • storeBufferService

      protected final AbstractStoreBufferService storeBufferService
    • hostLevelIngestionStats

      protected final HostLevelIngestionStats hostLevelIngestionStats
    • versionedDIVStats

      protected final AggVersionedDIVStats versionedDIVStats
    • versionedIngestionStats

      protected final AggVersionedIngestionStats versionedIngestionStats
    • isCurrentVersion

      protected final BooleanSupplier isCurrentVersion
    • hybridStoreConfig

      protected final Optional<HybridStoreConfig> hybridStoreConfig
    • divErrorMetricCallback

      protected final Consumer<DataValidationException> divErrorMetricCallback
    • readCycleDelayMs

      protected final long readCycleDelayMs
    • emptyPollSleepMs

      protected final long emptyPollSleepMs
    • diskUsage

      protected final DiskUsage diskUsage
    • databaseSyncBytesIntervalForTransactionalMode

      protected final long databaseSyncBytesIntervalForTransactionalMode
      Message bytes consuming interval before persisting offset in offset db for transactional mode database.
    • databaseSyncBytesIntervalForDeferredWriteMode

      protected final long databaseSyncBytesIntervalForDeferredWriteMode
      Message bytes consuming interval before persisting offset in offset db for deferred-write database.
    • serverConfig

      protected final VeniceServerConfig serverConfig
    • errorPartitionId

      protected final int errorPartitionId
      Used for reporting error when the partitionConsumptionStateMap is empty
    • defaultReadyToServeChecker

      protected final com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheck defaultReadyToServeChecker
    • availableSchemaIds

      protected final SparseConcurrentList<Object> availableSchemaIds
    • deserializedSchemaIds

      protected final SparseConcurrentList<Object> deserializedSchemaIds
    • idleCounter

      protected int idleCounter
    • aggKafkaConsumerService

      protected final AggKafkaConsumerService aggKafkaConsumerService
    • writeComputeFailureCode

      protected int writeComputeFailureCode
    • isWriteComputationEnabled

      protected final boolean isWriteComputationEnabled
    • isSeparatedRealtimeTopicEnabled

      protected final boolean isSeparatedRealtimeTopicEnabled
    • partitionCount

      protected final int partitionCount
      This would be the number of partitions in the StorageEngine and in version topics
    • storeVersionPartitionCount

      protected final int storeVersionPartitionCount
    • bootstrapTimeoutInMs

      protected final long bootstrapTimeoutInMs
    • isIsolatedIngestion

      protected final boolean isIsolatedIngestion
    • ingestionNotificationDispatcher

      protected final com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher ingestionNotificationDispatcher
    • chunkAssembler

      protected final ChunkAssembler chunkAssembler
    • localKafkaServer

      protected final String localKafkaServer
    • localKafkaClusterId

      protected final int localKafkaClusterId
    • localKafkaServerSingletonSet

      protected final Set<String> localKafkaServerSingletonSet
    • isDaVinciClient

      protected final boolean isDaVinciClient
    • isDataRecovery

      protected boolean isDataRecovery
    • dataRecoverySourceVersionNumber

      protected int dataRecoverySourceVersionNumber
    • readOnlyForBatchOnlyStoreEnabled

      protected final boolean readOnlyForBatchOnlyStoreEnabled
    • metaStoreWriter

      protected final MetaStoreWriter metaStoreWriter
    • kafkaClusterUrlResolver

      protected final Function<String,String> kafkaClusterUrlResolver
    • resetErrorReplicaEnabled

      protected final boolean resetErrorReplicaEnabled
    • compressionStrategy

      protected final CompressionStrategy compressionStrategy
    • compressorFactory

      protected final StorageEngineBackedCompressorFactory compressorFactory
    • compressor

      protected final Lazy<VeniceCompressor> compressor
    • isChunked

      protected final boolean isChunked
    • isRmdChunked

      protected final boolean isRmdChunked
    • manifestSerializer

      protected final ChunkedValueManifestSerializer manifestSerializer
    • pubSubTopicRepository

      protected final PubSubTopicRepository pubSubTopicRepository
    • recordLevelMetricEnabled

      protected final AtomicBoolean recordLevelMetricEnabled
    • isGlobalRtDivEnabled

      protected final boolean isGlobalRtDivEnabled
    • versionRole

      protected volatile PartitionReplicaIngestionContext.VersionRole versionRole
    • workloadType

      protected volatile PartitionReplicaIngestionContext.WorkloadType workloadType
    • batchReportIncPushStatusEnabled

      protected final boolean batchReportIncPushStatusEnabled
    • parallelProcessingThreadPool

      protected final ExecutorService parallelProcessingThreadPool
    • gracefulShutdownLatch

      protected Lazy<CountDownLatch> gracefulShutdownLatch
    • zkHelixAdmin

      protected Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin
    • hostName

      protected final String hostName
  • Constructor Details

  • Method Details

    • getIngestionBatchProcessor

      protected abstract IngestionBatchProcessor getIngestionBatchProcessor()
    • getStorageEngine

      public AbstractStorageEngine getStorageEngine()
    • getIngestionTaskName

      public String getIngestionTaskName()
    • getVersionNumber

      public int getVersionNumber()
    • isFutureVersion

      public boolean isFutureVersion()
    • throwIfNotRunning

      protected void throwIfNotRunning()
    • nextSeqNum

      protected int nextSeqNum()
      Apply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).
      Returns:
      an unique and increasing sequence number for a new consumer action.
    • subscribePartition

      public void subscribePartition(PubSubTopicPartition topicPartition)
    • subscribePartition

      public void subscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction)
      Adds an asynchronous partition subscription request for the task.
    • unSubscribePartition

      public CompletableFuture<Void> unSubscribePartition(PubSubTopicPartition topicPartition)
    • unSubscribePartition

      public CompletableFuture<Void> unSubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction)
      Adds an asynchronous partition unsubscription request for the task.
    • dropStoragePartitionGracefully

      public CompletableFuture<Void> dropStoragePartitionGracefully(PubSubTopicPartition topicPartition)
      Drops a storage partition gracefully. This is always a Helix triggered action.
    • hasAnySubscription

      public boolean hasAnySubscription()
    • resetPartitionConsumptionOffset

      public void resetPartitionConsumptionOffset(PubSubTopicPartition topicPartition)
      Adds an asynchronous resetting partition consumption offset request for the task.
    • getStoreName

      public String getStoreName()
    • isUserSystemStore

      public boolean isUserSystemStore()
    • promoteToLeader

      public abstract void promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
    • demoteToStandby

      public abstract void demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
    • hasPendingPartitionIngestionAction

      public boolean hasPendingPartitionIngestionAction(int userPartition)
    • kill

      public void kill()
    • getStoragePartitionConfig

      protected StoragePartitionConfig getStoragePartitionConfig(PartitionConsumptionState partitionConsumptionState)
    • getStoragePartitionConfig

      protected StoragePartitionConfig getStoragePartitionConfig(boolean sorted, PartitionConsumptionState partitionConsumptionState)
    • isHybridFollower

      protected abstract boolean isHybridFollower(PartitionConsumptionState partitionConsumptionState)
    • shouldCheckLeaderCompleteStateInFollower

      protected abstract boolean shouldCheckLeaderCompleteStateInFollower()
    • checkAndLogIfLagIsAcceptableForHybridStore

      protected abstract boolean checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState partitionConsumptionState, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp)
      Checks whether the lag is acceptable for hybrid stores
    • isReadyToServe

      protected boolean isReadyToServe(PartitionConsumptionState partitionConsumptionState)
      This function checks various conditions to verify if a store is ready to serve. Lag = (Source Max Offset - SOBR Source Offset) - (Current Offset - SOBR Destination Offset)
      Returns:
      true if EOP was received and (for hybrid stores) if lag <= threshold
    • isReadyToServeAnnouncedWithRTLag

      public boolean isReadyToServeAnnouncedWithRTLag()
    • isRealTimeBufferReplayStarted

      protected abstract boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState)
    • measureHybridOffsetLag

      protected abstract long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)
      Measure the hybrid offset lag for partition being tracked in `partitionConsumptionState`.
    • reportIfCatchUpVersionTopicOffset

      protected abstract void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState)
      Check if the ingestion progress has reached to the end of the version topic. This is currently only used LeaderFollowerStoreIngestionTask.
    • produceToStoreBufferService

      protected void produceToStoreBufferService(PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long> consumedRecord, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs) throws InterruptedException
      This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained by StoreBufferService.
      Parameters:
      consumedRecord - : received consumer record
      leaderProducedRecordContext - : derived leaderProducedRecordContext
      partition -
      kafkaUrl -
      Throws:
      InterruptedException
    • validateAndFilterOutDuplicateMessagesFromLeaderTopic

      protected abstract Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long>> validateAndFilterOutDuplicateMessagesFromLeaderTopic(Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long>> records, String kafkaUrl, PubSubTopicPartition topicPartition)
    • produceToStoreBufferServiceOrKafka

      protected void produceToStoreBufferServiceOrKafka(Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long>> records, PubSubTopicPartition topicPartition, String kafkaUrl, int kafkaClusterId) throws InterruptedException
      This function is in charge of producing the consumer records to the writer buffers maintained by StoreBufferService. This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this call.
      Parameters:
      records - : received consumer records
      topicPartition -
      Throws:
      InterruptedException
    • produceToStoreBufferServiceOrKafkaInBatch

      protected void produceToStoreBufferServiceOrKafkaInBatch(Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long>> records, PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState, String kafkaUrl, int kafkaClusterId) throws InterruptedException
      Throws:
      InterruptedException
    • checkIngestionProgress

      protected void checkIngestionProgress(Store store) throws InterruptedException
      Throws:
      InterruptedException
    • updateIngestionRoleIfStoreChanged

      protected void updateIngestionRoleIfStoreChanged(Store store) throws InterruptedException
      Throws:
      InterruptedException
    • isIngestionTaskActive

      public boolean isIngestionTaskActive()
    • run

      public void run()
      Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread.
      Specified by:
      run in interface Runnable
    • updateOffsetMetadataAndSyncOffset

      protected void updateOffsetMetadataAndSyncOffset(PartitionConsumptionState pcs)
    • closeVeniceWriters

      public void closeVeniceWriters(boolean doFlush)
    • closeVeniceViewWriters

      protected void closeVeniceViewWriters()
    • resolveSourceKafkaServersWithinTopicSwitch

      protected TopicSwitch resolveSourceKafkaServersWithinTopicSwitch(TopicSwitch originalTopicSwitch)
      Applies name resolution to all Kafka URLs in the provided TopicSwitch. Useful for translating URLs that came from a different runtime (e.g. from the controller, or from state persisted by a previous run of the same server).
      Returns:
      the same TopicSwitch, mutated such that all Kafka URLs it contains are guaranteed to be usable by the current Venice server instance
    • getOffsetToOnlineLagThresholdPerPartition

      protected static long getOffsetToOnlineLagThresholdPerPartition(Optional<HybridStoreConfig> hybridStoreConfig, String storeName, int partitionCount)
    • processCommonConsumerAction

      protected void processCommonConsumerAction(ConsumerAction consumerAction) throws InterruptedException
      Throws:
      InterruptedException
    • getTopicPartitionEndOffSet

      protected long getTopicPartitionEndOffSet(String kafkaUrl, PubSubTopic pubSubTopic, int partition)
      Returns:
      the end offset in kafka for the topic partition in SIT, or a negative value if it failed to get it. N.B.: The returned end offset is the last successfully replicated message plus one. If the partition has never been written to, the end offset is 0.
    • getPartitionOffsetLagBasedOnMetrics

      protected long getPartitionOffsetLagBasedOnMetrics(String kafkaSourceAddress, PubSubTopic topic, int partition)
    • checkLongRunningTaskState

      protected abstract void checkLongRunningTaskState() throws InterruptedException
      Throws:
      InterruptedException
    • processConsumerAction

      protected abstract void processConsumerAction(ConsumerAction message, Store store) throws InterruptedException
      Throws:
      InterruptedException
    • getConsumptionSourceKafkaAddress

      protected abstract Set<String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
    • startConsumingAsLeader

      protected void startConsumingAsLeader(PartitionConsumptionState partitionConsumptionState)
    • getRealTimeDataSourceKafkaAddress

      protected Set<String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
    • getPartitionConsumptionState

      public PartitionConsumptionState getPartitionConsumptionState(int partitionId)
    • hasAnyPartitionConsumptionState

      public boolean hasAnyPartitionConsumptionState(Predicate<PartitionConsumptionState> pcsPredicate)
    • getFailedIngestionPartitionCount

      public int getFailedIngestionPartitionCount()
    • shouldProcessRecord

      protected boolean shouldProcessRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long> record)
      Common record check for different state models: check whether server continues receiving messages after EOP for a batch-only store.
    • shouldPersistRecord

      protected boolean shouldPersistRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long> record, PartitionConsumptionState partitionConsumptionState)
    • processConsumerRecord

      public void processConsumerRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long> record, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs)
      This function will be invoked in StoreBufferService to process buffered PubSubMessage.
    • recordHeartbeatReceived

      protected void recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long> consumerRecord, String kafkaUrl)
    • setLastConsumerException

      public void setLastConsumerException(Exception e)
    • setLastStoreIngestionException

      public void setLastStoreIngestionException(Exception e)
    • recordChecksumVerificationFailure

      public void recordChecksumVerificationFailure()
    • recordAssembledRecordSize

      protected void recordAssembledRecordSize(int keyLen, ByteBuffer valueBytes, ByteBuffer rmdBytes, long currentTimeMs)
      Records metrics for the original size of full-assembled records (key + value) and RMD by utilizing the field ChunkedValueManifest.size. Also records the ratio of assembled record size to maximum allowed size, which is intended to be used to alert customers about how close they are to hitting the size limit.
      Parameters:
      keyLen - The size of the record's key
      valueBytes - Put.putValue which is expected to be a serialized ChunkedValueManifest
      rmdBytes - Put.replicationMetadataPayload which can be a serialized ChunkedValueManifest if RMD chunking was enabled or just the RMD payload otherwise
    • recordAssembledRecordSizeRatio

      protected abstract void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs)
    • calculateAssembledRecordSizeRatio

      protected abstract double calculateAssembledRecordSizeRatio(long recordSize)
    • getBatchReplicationLag

      public abstract long getBatchReplicationLag()
    • getLeaderOffsetLag

      public abstract long getLeaderOffsetLag()
    • getBatchLeaderOffsetLag

      public abstract long getBatchLeaderOffsetLag()
    • getHybridLeaderOffsetLag

      public abstract long getHybridLeaderOffsetLag()
    • measureLagWithCallToPubSub

      protected long measureLagWithCallToPubSub(String pubSubServerName, PubSubTopic topic, int partition, long currentOffset)
      Parameters:
      pubSubServerName - Pub Sub deployment to interrogate
      topic - topic to measure
      partition - for which to measure lag
      Returns:
      the lag, or 9223372036854775807L if it failed to measure it N.B.: Note that the returned lag can be negative since the end offset used in the calculation is cached.
    • measureLagWithCallToPubSub

      protected static long measureLagWithCallToPubSub(String pubSubServerName, PubSubTopic topic, int partition, long currentOffset, Function<String,TopicManager> topicManagerProvider)
    • getFollowerOffsetLag

      public abstract long getFollowerOffsetLag()
      Measure the offset lag between follower and leader
    • getBatchFollowerOffsetLag

      public abstract long getBatchFollowerOffsetLag()
    • getHybridFollowerOffsetLag

      public abstract long getHybridFollowerOffsetLag()
    • getRegionHybridOffsetLag

      public abstract long getRegionHybridOffsetLag(int regionId)
    • getWriteComputeErrorCode

      public abstract int getWriteComputeErrorCode()
    • updateLeaderTopicOnFollower

      public abstract void updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState)
    • minZeroLag

      protected long minZeroLag(long value)
      Because of timing considerations, it is possible that some lag metrics could compute negative values. Negative lag does not make sense so the intent is to ease interpretation by applying a lower bound of zero on these metrics...
    • isHybridMode

      public boolean isHybridMode()
    • processEndOfPush

      protected void processEndOfPush(KafkaMessageEnvelope endOfPushKME, int partition, long offset, PartitionConsumptionState partitionConsumptionState)
    • processStartOfIncrementalPush

      protected void processStartOfIncrementalPush(ControlMessage startOfIncrementalPush, PartitionConsumptionState partitionConsumptionState)
    • processEndOfIncrementalPush

      protected void processEndOfIncrementalPush(ControlMessage endOfIncrementalPush, PartitionConsumptionState partitionConsumptionState)
    • processControlMessageForViews

      protected void processControlMessageForViews(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState)
      This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation in LeaderFollowerStoreIngestionTask
    • processTopicSwitch

      protected boolean processTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState)
    • updateOffsetMetadataInOffsetRecord

      protected abstract void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState)
      Parameters:
      partitionConsumptionState -
    • updateLatestInMemoryProcessedOffset

      protected abstract void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun)
      Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets inside OffsetRecord. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset.
    • recordWriterStats

      protected abstract void recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState)
    • validateMessage

      protected void validateMessage(PartitionTracker.TopicType type, KafkaDataIntegrityValidator validator, PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long> consumerRecord, boolean endOfPushReceived, PartitionConsumptionState partitionConsumptionState)
      Message validation using DIV. Leaders should pass in the validator instance from LeaderFollowerStoreIngestionTask; and drainers should pass in the validator instance from StoreIngestionTask 1. If valid DIV errors happen after EOP is received, no fatal exceptions will be thrown. But the errors will be recorded into the DIV metrics. 2. For any DIV errors happened to unregistered producers && after EOP, the errors will be ignored. 3. For any DIV errors happened to records which is after logCompactionDelayInMs, the errors will be ignored.
    • cloneProducerStates

      protected void cloneProducerStates(int partition, KafkaDataIntegrityValidator validator)
      We should only allow StoreIngestionTask to access kafkaDataIntegrityValidator; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces.
    • putInStorageEngine

      protected void putInStorageEngine(int partition, byte[] keyBytes, Put put)
      Persist Put record to storage engine.
    • removeFromStorageEngine

      protected void removeFromStorageEngine(int partition, byte[] keyBytes, Delete delete)
    • throwOrLogStorageFailureDependingIfStillSubscribed

      protected void throwOrLogStorageFailureDependingIfStillSubscribed(int partition, VeniceException e)
    • logStorageOperationWhileUnsubscribed

      protected void logStorageOperationWhileUnsubscribed(int partition)
    • consumerHasAnySubscription

      public boolean consumerHasAnySubscription()
    • consumerHasSubscription

      public boolean consumerHasSubscription(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState)
    • unsubscribeFromTopic

      public void unsubscribeFromTopic(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState)
      This method unsubscribes topic-partition from the input. If it is real-time topic and separate RT topic is enabled, it will also unsubscribe from separate real-time topic.
    • consumerBatchUnsubscribe

      public void consumerBatchUnsubscribe(Set<PubSubTopicPartition> topicPartitionSet)
    • consumerUnSubscribeAllTopics

      public abstract void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState)
    • consumerSubscribe

      public void consumerSubscribe(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, long startOffset, String kafkaURL)
      This method will try to resolve actual topic-partition from input Kafka URL and subscribe to the resolved topic-partition.
    • consumerResetOffset

      public void consumerResetOffset(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState)
    • waitVersionStateAvailable

      protected StoreVersionState waitVersionStateAvailable(String kafkaTopic) throws InterruptedException
      Throws:
      InterruptedException
    • close

      public void close()
      Stops the consumer task.
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
    • shutdownAndWait

      public boolean shutdownAndWait(int waitTime)
      This method is a blocking call to wait for StoreIngestionTask for fully shutdown in the given time.
      Parameters:
      waitTime - Maximum wait time for the shutdown operation.
      Returns:
      whether able to gracefully shut down within the waitTime
    • isRunning

      public boolean isRunning()
      A function to allow the service to get the current status of the task. This would allow the service to create a new task if required.
    • getVersionTopic

      public PubSubTopic getVersionTopic()
    • isMetricsEmissionEnabled

      public boolean isMetricsEmissionEnabled()
    • enableMetricsEmission

      public void enableMetricsEmission()
    • disableMetricsEmission

      public void disableMetricsEmission()
    • isPartitionConsumingOrHasPendingIngestionAction

      public boolean isPartitionConsumingOrHasPendingIngestionAction(int userPartition)
      To check whether the given partition is still consuming message from Kafka
    • createKafkaConsumerProperties

      protected Properties createKafkaConsumerProperties(Properties localConsumerProps, String remoteKafkaSourceAddress, boolean consumeRemotely)
      Override the ConfigKeys.KAFKA_BOOTSTRAP_SERVERS config with a remote Kafka bootstrap url.
    • resubscribe

      protected abstract void resubscribe(PartitionConsumptionState partitionConsumptionState) throws InterruptedException
      Throws:
      InterruptedException
    • reportError

      public void reportError(String message, int userPartition, Exception e)
    • isActiveActiveReplicationEnabled

      public boolean isActiveActiveReplicationEnabled()
    • dumpPartitionConsumptionStates

      public void dumpPartitionConsumptionStates(AdminResponse response, ComplementSet<Integer> partitions)
      Invoked by admin request to dump the requested partition consumption states
    • dumpStoreVersionState

      public void dumpStoreVersionState(AdminResponse response)
      Invoked by admin request to dump store version state metadata.
    • getServerConfig

      public VeniceServerConfig getServerConfig()
    • updateOffsetMetadataAndSync

      public void updateOffsetMetadataAndSync(String topic, int partitionId)
    • getTopicManager

      protected TopicManager getTopicManager(String sourceKafkaServer)
      The function returns local or remote topic manager.
      Parameters:
      sourceKafkaServer - The address of source kafka bootstrap server.
      Returns:
      topic manager
    • waitForAllMessageToBeProcessedFromTopicPartition

      protected void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) throws InterruptedException
      The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function. This will make the calling thread to block.
      Parameters:
      topicPartition - for which to wait
      Throws:
      InterruptedException
    • delegateConsumerRecord

      protected abstract StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,Long> consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)
    • recordProcessedRecordStats

      protected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize)
    • isSegmentControlMsg

      protected boolean isSegmentControlMsg(ControlMessageType msgType)
    • isTransientRecordBufferUsed

      public boolean isTransientRecordBufferUsed()
      This is not a per record state. Rather it's used to indicate if the transient record buffer is being used at all for this ingestion task or not. For L/F mode only WC ingestion task needs this buffer.
    • setPartitionConsumptionState

      protected void setPartitionConsumptionState(int partition, PartitionConsumptionState pcs)
    • getVersionedDIVStats

      protected AggVersionedDIVStats getVersionedDIVStats()
    • getVersionIngestionStats

      protected AggVersionedIngestionStats getVersionIngestionStats()
    • getCompressionStrategy

      protected CompressionStrategy getCompressionStrategy()
    • getCompressor

      protected Lazy<VeniceCompressor> getCompressor()
    • isChunked

      protected boolean isChunked()
    • getSchemaRepo

      protected ReadOnlySchemaRepository getSchemaRepo()
    • getHostLevelIngestionStats

      protected HostLevelIngestionStats getHostLevelIngestionStats()
    • getKafkaVersionTopic

      protected String getKafkaVersionTopic()
    • isStuckByMemoryConstraint

      public boolean isStuckByMemoryConstraint()
    • shouldUpdateUpstreamOffset

      protected boolean shouldUpdateUpstreamOffset(PubSubMessage<KafkaKey,KafkaMessageEnvelope,Long> consumerRecord)
      Validate if the given consumerRecord has a valid upstream offset to update from.
      Parameters:
      consumerRecord -
      Returns:
      true, if the record is not null and contains a valid upstream offset, otherwise false.
    • maybeSendIngestionHeartbeat

      protected abstract Set<String> maybeSendIngestionHeartbeat()
      For L/F hybrid stores, the leader periodically writes a special SOS message to the RT topic. Check LeaderFollowerStoreIngestionTask.maybeSendIngestionHeartbeat() for more details.
    • isProducingVersionTopicHealthy

      public boolean isProducingVersionTopicHealthy()
      This function is checking the following conditions: 1. Whether the version topic exists or not.
    • isCurrentVersion

      public boolean isCurrentVersion()
    • hasAllPartitionReportedCompleted

      public boolean hasAllPartitionReportedCompleted()
    • isSeparatedRealtimeTopicEnabled

      public boolean isSeparatedRealtimeTopicEnabled()