Class StoreIngestionTask
java.lang.Object
com.linkedin.davinci.kafka.consumer.StoreIngestionTask
- All Implemented Interfaces:
- Closeable,- AutoCloseable,- Runnable
- Direct Known Subclasses:
- LeaderFollowerStoreIngestionTask
A runnable Kafka Consumer consuming messages from all the partition assigned to current node for a Kafka Topic.
- 
Nested Class SummaryNested ClassesModifier and TypeClassDescriptionprotected static enumThis enum represents all potential results after callingdelegateConsumerRecord(PubSubMessageProcessedResultWrapper, int, String, int, long, long).
- 
Field SummaryFieldsModifier and TypeFieldDescriptionprotected final AggKafkaConsumerServiceprotected final SparseConcurrentList<Object>protected final booleanprotected final longprotected static final intprotected final InMemoryChunkAssemblerprotected final CompressionStrategyprotected final Lazy<VeniceCompressor>protected final StorageEngineBackedCompressorFactoryprotected final VeniceConcurrentHashMap<String,Long> Map of broker URL to the total bytes consumed by ConsumptionTask since the last Global RT DIV syncprotected final AtomicIntegerprotected final PriorityBlockingQueue<ConsumerAction>protected final DataIntegrityValidatorThe consumer and drainer DIV must remain separate.protected final longMessage bytes consuming interval before persisting offset in offset db for deferred-write database.protected final longMessage bytes consuming interval before persisting offset in offset db for transactional mode database.protected intprotected final com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheckprotected final SparseConcurrentList<Object>protected final DiskUsageprotected final Consumer<DataValidationException>protected final AtomicBooleanprotected final longprotected final intUsed for reporting error when thepartitionConsumptionStateMapis emptyprotected Lazy<CountDownLatch>protected final HostLevelIngestionStatsprotected final Stringprotected final Optional<HybridStoreConfig>protected AtomicIntegerprotected final com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcherprotected final Stringprotected final booleanprotected final BooleanSupplierprotected booleanprotected final booleanprotected final booleanprotected final booleanprotected final booleanprotected final AtomicBooleanprotected final booleanprotected final booleanprotected final Propertiesprotected final StringTopics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety.protected static final longprotected final intprotected final Stringprotected final ChunkedValueManifestSerializerprotected final MetaStoreWriterprotected final ExecutorServiceprotected final ConcurrentMap<Integer,PartitionConsumptionState> Per-partition consumption state mapprotected final intThis would be the number of partitions in the StorageEngine and in version topicsprotected final Map<Integer,AtomicInteger> protected final PubSubContextprotected final PubSubTopicRepositoryprotected final longprotected final booleanprotected final PubSubTopicprotected final AtomicBooleanprotected static final RedundantExceptionFilterprotected final booleanstatic longprotected final ReadOnlySchemaRepositoryprotected final PubSubTopicprotected final VeniceServerConfigprotected final StorageEngineprotected final StorageMetadataServiceprotected final StorageServicestorage destination for consumptionstatic longprotected final AbstractStoreBufferServiceprotected final Stringprotected final ReadOnlyStoreRepositoryprotected final Stringprotected final intprotected final TopicManagerRepositoryprotected final AggVersionedDIVStatsprotected final AggVersionedIngestionStatsprotected final intprotected final PubSubTopicprotected static final longprotected intprotected Lazy<org.apache.helix.manager.zk.ZKHelixAdmin>
- 
Constructor SummaryConstructorsConstructorDescriptionStoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeVersionConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, InternalDaVinciRecordTransformerConfig internalRecordTransformerConfig, Queue<VeniceNotifier> notifiers, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin) 
- 
Method SummaryModifier and TypeMethodDescriptionprotected abstract doublecalculateAssembledRecordSizeRatio(long recordSize) protected abstract booleancheckAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState partitionConsumptionState, long lag, long threshold, boolean shouldLogLag, LagType lagType) Checks whether the lag is acceptable for hybrid storesprotected voidcheckIngestionProgress(Store store) protected abstract voidprotected voidcloneDrainerDivProducerStates(int partition, DataIntegrityValidator validator) We should only allowStoreIngestionTaskto accessdrainerDiv; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces.voidclose()Stops the consumer task.protected voidcloseVeniceViewWriters(boolean doFlush) voidcloseVeniceWriters(boolean doFlush) voidconsumerBatchUnsubscribe(Set<PubSubTopicPartition> topicPartitionSet) booleanbooleanconsumerHasSubscription(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) voidconsumerResetOffset(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) voidconsumerSubscribe(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, PubSubPosition startOffset, String kafkaURL) This method will try to resolve actual topic-partition from input Kafka URL and subscribe to the resolved topic-partition.abstract voidconsumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState) protected PropertiescreateKafkaConsumerProperties(Properties localConsumerProps, String remoteKafkaSourceAddress, boolean consumeRemotely) Override theConfigKeys.KAFKA_BOOTSTRAP_SERVERSconfig with a remote Kafka bootstrap url.protected abstract StoreIngestionTask.DelegateConsumerRecordResultdelegateConsumerRecord(PubSubMessageProcessedResultWrapper consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) abstract voiddemoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) voiddropStoragePartitionGracefully(PubSubTopicPartition topicPartition) Drops a storage partition gracefully.voiddumpPartitionConsumptionStates(AdminResponse response, ComplementSet<Integer> partitions) Invoked by admin request to dump the requested partition consumption statesvoiddumpStoreVersionState(AdminResponse response) Invoked by admin request to dump store version state metadata.voidprotected intextractUpstreamClusterId(DefaultPubSubMessage consumerRecord) protected PubSubPositionextractUpstreamPosition(DefaultPubSubMessage consumerRecord) Extract the upstream position from the given consumer record's leader metadata.protected CompressionStrategyprotected Lazy<VeniceCompressor>getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) When Global RT DIV is enabled the ConsumptionTask's DIV is exclusively used to validate data integrity.intprotected HostLevelIngestionStatsprotected abstract IngestionBatchProcessorprotected Stringprotected static longgetOffsetToOnlineLagThresholdPerPartition(Optional<HybridStoreConfig> hybridStoreConfig, String storeName, int partitionCount) getPartitionConsumptionState(int partitionId) getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) protected ReadOnlySchemaRepositoryprotected StoragePartitionConfiggetStoragePartitionConfig(boolean sorted, PartitionConsumptionState partitionConsumptionState) protected StoragePartitionConfiggetStoragePartitionConfig(PartitionConsumptionState partitionConsumptionState) protected TopicManagergetTopicManager(String sourceKafkaServer) The function returns local or remote topic manager.protected PubSubPositiongetTopicPartitionEndPosition(String pubSubBrokerAddress, PubSubTopicPartition topicPartition) protected AggVersionedDIVStatsprotected AggVersionedIngestionStatsintabstract intbooleanbooleanhasAnyPartitionConsumptionState(Predicate<PartitionConsumptionState> pcsPredicate) booleanbooleanbooleanhasPendingPartitionIngestionAction(int userPartition) protected booleanChecks whether there are any replicas currently present on this host.booleanprotected booleanbooleanprotected abstract booleanisHybridFollower(PartitionConsumptionState partitionConsumptionState) booleanbooleanThis helper function will check if the ingestion task has been idle for a long time.booleanbooleanbooleanisPartitionConsumingOrHasPendingIngestionAction(int userPartition) To check whether the given partition is still consuming message from KafkabooleanThis function is checking the following conditions: 1.protected booleanisReadyToServe(PartitionConsumptionState partitionConsumptionState) This function checks various conditions to verify if a store is ready to serve.protected abstract booleanisRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) booleanA function to allow the service to get the current status of the task.booleanbooleanisTransientRecordBufferUsed(PartitionConsumptionState partitionConsumptionState) This is not a per record state.booleanvoidkill()protected voidlogStorageOperationWhileUnsubscribed(int partition) For L/F hybrid stores, the leader periodically writes a special SOS message to the RT topic.protected abstract longmeasureHybridHeartbeatLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid heartbeat lag for replica being tracked in `partitionConsumptionState`.protected abstract longmeasureHybridHeartbeatTimestamp(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid heartbeat timestamp for replica being tracked in `partitionConsumptionState`.protected abstract longmeasureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid offset lag for replica being tracked in `partitionConsumptionState`.protected longmeasureLagWithCallToPubSub(String pubSubServerName, PubSubTopicPartition pubSubTopicPartition, PubSubPosition currentPosition) protected static longmeasureLagWithCallToPubSub(String pubSubBrokerAddress, PubSubTopicPartition pubSubTopicPartition, PubSubPosition currentPosition, Function<String, TopicManager> topicManagerProvider) protected longminZeroLag(long value) Because of timing considerations, it is possible that some lag metrics could compute negative values.protected intApply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).protected voidprocessCommonConsumerAction(ConsumerAction consumerAction) protected abstract voidprocessConsumerAction(ConsumerAction message, Store store) voidprocessConsumerRecord(DefaultPubSubMessage record, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) This function will be invoked inStoreBufferServiceto process bufferedPubSubMessage.protected voidprocessControlMessageForViews(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation inLeaderFollowerStoreIngestionTaskprotected voidprocessEndOfIncrementalPush(ControlMessage endOfIncrementalPush, PartitionConsumptionState partitionConsumptionState) protected voidprocessEndOfPush(KafkaMessageEnvelope endOfPushKME, PubSubPosition offset, PartitionConsumptionState partitionConsumptionState) protected voidprocessStartOfIncrementalPush(ControlMessage startOfIncrementalPush, PartitionConsumptionState partitionConsumptionState) protected voidprocessTopicSwitch(ControlMessage controlMessage, int partition, PubSubPosition offset, PartitionConsumptionState partitionConsumptionState) protected voidproduceToStoreBufferService(DefaultPubSubMessage consumedRecord, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs) This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained byStoreBufferService.protected voidproduceToStoreBufferServiceOrKafka(Iterable<DefaultPubSubMessage> records, PubSubTopicPartition topicPartition, String kafkaUrl, int kafkaClusterId) This function is in charge of producing the consumer records to the writer buffers maintained byStoreBufferService.protected voidproduceToStoreBufferServiceOrKafkaInBatch(Iterable<DefaultPubSubMessage> records, PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState, String kafkaUrl, int kafkaClusterId) abstract voidpromoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) protected voidputInStorageEngine(int partition, byte[] keyBytes, Put put) Persist Put record to storage engine.protected voidrecordAssembledRecordSize(int keyLen, ByteBuffer valueBytes, ByteBuffer rmdBytes, long currentTimeMs) Records metrics for the original size of full-assembled records (key + value) and RMD by utilizing the fieldChunkedValueManifest.size.protected abstract voidrecordAssembledRecordSizeRatio(double ratio, long currentTimeMs) voidprotected voidrecordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecord, String kafkaUrl) protected voidrecordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize) protected abstract voidrecordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState) protected voidprotected voidremoveFromStorageEngine(int partition, byte[] keyBytes, Delete delete) voidreportError(String message, int userPartition, Exception e) protected abstract voidreportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState) Check if the ingestion progress has reached to the end of the version topic.voidresetPartitionConsumptionOffset(PubSubTopicPartition topicPartition) Adds an asynchronous resetting partition consumption offset request for the task.protected TopicSwitchresolveSourceKafkaServersWithinTopicSwitch(TopicSwitch originalTopicSwitch) Applies name resolution to all Kafka URLs in the provided TopicSwitch.protected abstract voidresubscribe(PartitionConsumptionState partitionConsumptionState) voidrun()Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread.voidvoidprotected voidsetPartitionConsumptionState(int partition, PartitionConsumptionState pcs) protected booleanshouldPersistRecord(DefaultPubSubMessage record, PartitionConsumptionState partitionConsumptionState) protected booleanCommon record check for different state models: check whether server continues receiving messages after EOP for a batch-only store.booleanshutdownAndWait(int waitTime) This method is a blocking call to wait forStoreIngestionTaskfor fully shutdown in the given time.protected voidstartConsumingAsLeader(PartitionConsumptionState partitionConsumptionState) voidsubscribePartition(PubSubTopicPartition topicPartition) voidsubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction) Adds an asynchronous partition subscription request for the task.protected voidprotected voidthrowOrLogStorageFailureDependingIfStillSubscribed(int partition, VeniceException e) protected voidunsubscribeFromTopic(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) This method unsubscribes topic-partition from the input.unSubscribePartition(PubSubTopicPartition topicPartition) unSubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction) Adds an asynchronous partition unsubscription request for the task.protected voidupdateAndSyncOffsetFromSnapshot(PartitionTracker vtDivSnapshot, PubSubTopicPartition topicPartition) This version of the method syncs using a PartitionTracker object which contains the vtSegments and LCVPprotected abstract voidupdateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord.abstract voidupdateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState) voidupdateOffsetMetadataAndSync(int partitionId) protected voidprotected voidprotected abstract voidupdateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState) Sync the metadata about offset inOffsetRecord.protected abstract Iterable<DefaultPubSubMessage>validateAndFilterOutDuplicateMessagesFromLeaderTopic(Iterable<DefaultPubSubMessage> records, String kafkaUrl, PubSubTopicPartition topicPartition) protected static voidvalidateEndOfPushReceivedBeforeTopicSwitch(PartitionConsumptionState partitionConsumptionState, PubSubPosition position) Validates that END_OF_PUSH has been received before processing TOPIC_SWITCH.protected voidvalidateMessage(PartitionTracker.TopicType type, DataIntegrityValidator validator, DefaultPubSubMessage consumerRecord, PartitionConsumptionState partitionConsumptionState, boolean tolerateMissingMessagesForRealTimeTopic) Message validation using DIV.protected voidwaitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function.protected StoreVersionStatewaitVersionStateAvailable(String kafkaTopic) 
- 
Field Details- 
SCHEMA_POLLING_DELAY_MSpublic static long SCHEMA_POLLING_DELAY_MS
- 
STORE_VERSION_POLLING_DELAY_MSpublic static long STORE_VERSION_POLLING_DELAY_MS
- 
WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSEDprotected static final long WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED
- 
KILL_WAIT_TIME_MSprotected static final long KILL_WAIT_TIME_MS- See Also:
 
- 
CHUNK_SCHEMA_IDprotected static final int CHUNK_SCHEMA_ID
- 
REDUNDANT_LOGGING_FILTER
- 
storageServicestorage destination for consumption
- 
storageEngine
- 
kafkaVersionTopicTopics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety.
- 
versionTopic
- 
realTimeTopic
- 
separateRealTimeTopic
- 
storeName
- 
storeVersionName
- 
isSystemStoreprotected final boolean isSystemStore
- 
versionNumberprotected final int versionNumber
- 
schemaRepository
- 
storeRepository
- 
ingestionTaskName
- 
kafkaProps
- 
isRunning
- 
emitMetrics
- 
consumerActionSequenceNumber
- 
consumerActionsQueue
- 
partitionToPendingConsumerActionCountMap
- 
storageMetadataService
- 
topicManagerRepository
- 
partitionConsumptionStateMapPer-partition consumption state map
- 
storeBufferService
- 
consumerDivThe consumer and drainer DIV must remain separate. Since the consumer is always ahead of the drainer, the consumer would be validating data ahead of the actual persisted data on the drainer. NOTE: Currently, the state clearing happens only in drainerDiv which persists its state to disk. consumerDiv is transient, not persisted to disk, and its state is not expected to grow as large. Thus, bouncing effectively clears it (which is not the case for drainerDiv). Later on, we could trigger state cleaning for this consumer DIV as well, if deemed necessary. NOTE: WhenisGlobalRtDivEnabled()is enabled, this will be used by leaders to produce Global RT DIV state to local VT. This will also be used to send DIV snapshots to the drainer to persist the VT + RT DIV on-disk.
- 
consumedBytesSinceLastSyncMap of broker URL to the total bytes consumed by ConsumptionTask since the last Global RT DIV sync
- 
hostLevelIngestionStats
- 
versionedDIVStats
- 
versionedIngestionStats
- 
recordTransformerStats
- 
isCurrentVersion
- 
hybridStoreConfig
- 
divErrorMetricCallback
- 
readCycleDelayMsprotected final long readCycleDelayMs
- 
emptyPollSleepMsprotected final long emptyPollSleepMs
- 
diskUsage
- 
databaseSyncBytesIntervalForTransactionalModeprotected final long databaseSyncBytesIntervalForTransactionalModeMessage bytes consuming interval before persisting offset in offset db for transactional mode database.
- 
databaseSyncBytesIntervalForDeferredWriteModeprotected final long databaseSyncBytesIntervalForDeferredWriteModeMessage bytes consuming interval before persisting offset in offset db for deferred-write database.
- 
serverConfig
- 
errorPartitionIdprotected final int errorPartitionIdUsed for reporting error when thepartitionConsumptionStateMapis empty
- 
defaultReadyToServeCheckerprotected final com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheck defaultReadyToServeChecker
- 
availableSchemaIds
- 
deserializedSchemaIds
- 
idleCounter
- 
aggKafkaConsumerService
- 
writeComputeFailureCodeprotected int writeComputeFailureCode
- 
isWriteComputationEnabledprotected final boolean isWriteComputationEnabled
- 
partitionCountprotected final int partitionCountThis would be the number of partitions in the StorageEngine and in version topics
- 
storeVersionPartitionCountprotected final int storeVersionPartitionCount
- 
bootstrapTimeoutInMsprotected final long bootstrapTimeoutInMs
- 
isIsolatedIngestionprotected final boolean isIsolatedIngestion
- 
ingestionNotificationDispatcherprotected final com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher ingestionNotificationDispatcher
- 
chunkAssembler
- 
localKafkaServer
- 
localKafkaClusterIdprotected final int localKafkaClusterId
- 
localKafkaServerSingletonSet
- 
isDaVinciClientprotected final boolean isDaVinciClient
- 
isDataRecoveryprotected boolean isDataRecovery
- 
dataRecoverySourceVersionNumberprotected int dataRecoverySourceVersionNumber
- 
readOnlyForBatchOnlyStoreEnabledprotected final boolean readOnlyForBatchOnlyStoreEnabled
- 
metaStoreWriter
- 
kafkaClusterUrlResolver
- 
resetErrorReplicaEnabledprotected final boolean resetErrorReplicaEnabled
- 
compressionStrategy
- 
compressorFactory
- 
compressor
- 
isChunkedprotected final boolean isChunked
- 
isRmdChunkedprotected final boolean isRmdChunked
- 
manifestSerializer
- 
pubSubContext
- 
pubSubTopicRepository
- 
recordLevelMetricEnabled
- 
isGlobalRtDivEnabledprotected final boolean isGlobalRtDivEnabled
- 
versionRole
- 
workloadType
- 
batchReportIncPushStatusEnabledprotected final boolean batchReportIncPushStatusEnabled
- 
parallelProcessingThreadPool
- 
gracefulShutdownLatch
- 
zkHelixAdmin
- 
hostName
 
- 
- 
Constructor Details- 
StoreIngestionTaskpublic StoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeVersionConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, InternalDaVinciRecordTransformerConfig internalRecordTransformerConfig, Queue<VeniceNotifier> notifiers, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin) 
 
- 
- 
Method Details- 
getIngestionBatchProcessor
- 
getStorageEngine
- 
getIngestionTaskName
- 
getVersionNumberpublic int getVersionNumber()
- 
throwIfNotRunningprotected void throwIfNotRunning()
- 
nextSeqNumprotected int nextSeqNum()Apply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).- Returns:
- an unique and increasing sequence number for a new consumer action.
 
- 
subscribePartition
- 
subscribePartitionAdds an asynchronous partition subscription request for the task.
- 
unSubscribePartition
- 
unSubscribePartitionpublic CompletableFuture<Void> unSubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction) Adds an asynchronous partition unsubscription request for the task.
- 
dropStoragePartitionGracefullyDrops a storage partition gracefully. This is always a Helix triggered action.
- 
hasAnySubscriptionpublic boolean hasAnySubscription()
- 
hasAnyPendingSubscriptionpublic boolean hasAnyPendingSubscription()
- 
isIdleOverThresholdpublic boolean isIdleOverThreshold()This helper function will check if the ingestion task has been idle for a long time.
- 
resetPartitionConsumptionOffsetAdds an asynchronous resetting partition consumption offset request for the task.
- 
getStoreName
- 
isUserSystemStorepublic boolean isUserSystemStore()
- 
promoteToLeaderpublic abstract void promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) 
- 
demoteToStandbypublic abstract void demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) 
- 
hasPendingPartitionIngestionActionpublic boolean hasPendingPartitionIngestionAction(int userPartition) 
- 
killpublic void kill()
- 
getStoragePartitionConfigprotected StoragePartitionConfig getStoragePartitionConfig(PartitionConsumptionState partitionConsumptionState) 
- 
getStoragePartitionConfigprotected StoragePartitionConfig getStoragePartitionConfig(boolean sorted, PartitionConsumptionState partitionConsumptionState) 
- 
isHybridFollower
- 
checkAndLogIfLagIsAcceptableForHybridStoreprotected abstract boolean checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState partitionConsumptionState, long lag, long threshold, boolean shouldLogLag, LagType lagType) Checks whether the lag is acceptable for hybrid stores
- 
isReadyToServeThis function checks various conditions to verify if a store is ready to serve. Lag = (Source Max Offset - SOBR Source Offset) - (Current Offset - SOBR Destination Offset)- Returns:
- true if EOP was received and (for hybrid stores) if lag <= threshold
 
- 
isRealTimeBufferReplayStartedprotected abstract boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) 
- 
measureHybridOffsetLagprotected abstract long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid offset lag for replica being tracked in `partitionConsumptionState`.
- 
measureHybridHeartbeatLagprotected abstract long measureHybridHeartbeatLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid heartbeat lag for replica being tracked in `partitionConsumptionState`. If it is Da Vinci client, returnHeartbeatMonitoringService.INVALID_HEARTBEAT_LAGbefore it is implemented.
- 
measureHybridHeartbeatTimestampprotected abstract long measureHybridHeartbeatTimestamp(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid heartbeat timestamp for replica being tracked in `partitionConsumptionState`. If it is Da Vinci client, returnHeartbeatMonitoringService.INVALID_MESSAGE_TIMESTAMPbefore it is implemented.
- 
reportIfCatchUpVersionTopicOffsetprotected abstract void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState) Check if the ingestion progress has reached to the end of the version topic. This is currently only usedLeaderFollowerStoreIngestionTask.
- 
produceToStoreBufferServiceprotected void produceToStoreBufferService(DefaultPubSubMessage consumedRecord, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs) throws InterruptedException This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained byStoreBufferService.- Parameters:
- consumedRecord- : received consumer record
- leaderProducedRecordContext- : derived leaderProducedRecordContext
- partition-
- kafkaUrl-
- Throws:
- InterruptedException
 
- 
validateAndFilterOutDuplicateMessagesFromLeaderTopicprotected abstract Iterable<DefaultPubSubMessage> validateAndFilterOutDuplicateMessagesFromLeaderTopic(Iterable<DefaultPubSubMessage> records, String kafkaUrl, PubSubTopicPartition topicPartition) 
- 
produceToStoreBufferServiceOrKafkaprotected void produceToStoreBufferServiceOrKafka(Iterable<DefaultPubSubMessage> records, PubSubTopicPartition topicPartition, String kafkaUrl, int kafkaClusterId) throws InterruptedException This function is in charge of producing the consumer records to the writer buffers maintained byStoreBufferService. This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this call.- Parameters:
- records- : received consumer records
- topicPartition-
- Throws:
- InterruptedException
 
- 
produceToStoreBufferServiceOrKafkaInBatchprotected void produceToStoreBufferServiceOrKafkaInBatch(Iterable<DefaultPubSubMessage> records, PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState, String kafkaUrl, int kafkaClusterId) throws InterruptedException - Throws:
- InterruptedException
 
- 
checkIngestionProgress- Throws:
- InterruptedException
 
- 
refreshIngestionContextIfChanged- Throws:
- InterruptedException
 
- 
isIngestionTaskActivepublic boolean isIngestionTaskActive()
- 
runpublic void run()Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread.
- 
updateOffsetMetadataAndSyncOffset
- 
updateOffsetMetadataAndSyncOffsetprotected void updateOffsetMetadataAndSyncOffset(DataIntegrityValidator div, @Nonnull PartitionConsumptionState pcs) 
- 
updateAndSyncOffsetFromSnapshotprotected void updateAndSyncOffsetFromSnapshot(PartitionTracker vtDivSnapshot, PubSubTopicPartition topicPartition) This version of the method syncs using a PartitionTracker object which contains the vtSegments and LCVP
- 
closeVeniceWriterspublic void closeVeniceWriters(boolean doFlush) 
- 
closeVeniceViewWritersprotected void closeVeniceViewWriters(boolean doFlush) 
- 
resolveSourceKafkaServersWithinTopicSwitchApplies name resolution to all Kafka URLs in the provided TopicSwitch. Useful for translating URLs that came from a different runtime (e.g. from the controller, or from state persisted by a previous run of the same server).- Returns:
- the same TopicSwitch, mutated such that all Kafka URLs it contains are guaranteed to be usable by the current Venice server instance
 
- 
getOffsetToOnlineLagThresholdPerPartitionprotected static long getOffsetToOnlineLagThresholdPerPartition(Optional<HybridStoreConfig> hybridStoreConfig, String storeName, int partitionCount) 
- 
processCommonConsumerActionprotected void processCommonConsumerAction(ConsumerAction consumerAction) throws InterruptedException - Throws:
- InterruptedException
 
- 
getTopicPartitionEndPositionprotected PubSubPosition getTopicPartitionEndPosition(String pubSubBrokerAddress, PubSubTopicPartition topicPartition) - Returns:
- the end position for the topic partition in SIT, or a PubSubSymbolicPosition.LATESTvalue if it failed to get it. N.B.: The returned end position is the last successfully replicated message plus one. If the partition has never been written to, the end position is equal to the start position.
 
- 
checkLongRunningTaskState- Throws:
- InterruptedException
 
- 
processConsumerActionprotected abstract void processConsumerAction(ConsumerAction message, Store store) throws InterruptedException - Throws:
- InterruptedException
 
- 
getConsumptionSourceKafkaAddressprotected abstract Set<String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) 
- 
startConsumingAsLeader
- 
getRealTimeDataSourceKafkaAddressprotected Set<String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) 
- 
getPartitionConsumptionState
- 
hasAnyPartitionConsumptionState
- 
getFailedIngestionPartitionCountpublic int getFailedIngestionPartitionCount()
- 
shouldProcessRecordCommon record check for different state models: check whether server continues receiving messages after EOP for a batch-only store.
- 
shouldPersistRecordprotected boolean shouldPersistRecord(DefaultPubSubMessage record, PartitionConsumptionState partitionConsumptionState) 
- 
processConsumerRecordpublic void processConsumerRecord(DefaultPubSubMessage record, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) This function will be invoked inStoreBufferServiceto process bufferedPubSubMessage.
- 
recordHeartbeatReceivedprotected void recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecord, String kafkaUrl) 
- 
setLastConsumerException
- 
setLastStoreIngestionException
- 
recordChecksumVerificationFailurepublic void recordChecksumVerificationFailure()
- 
recordAssembledRecordSizeprotected void recordAssembledRecordSize(int keyLen, ByteBuffer valueBytes, ByteBuffer rmdBytes, long currentTimeMs) Records metrics for the original size of full-assembled records (key + value) and RMD by utilizing the fieldChunkedValueManifest.size. Also records the ratio of assembled record size to maximum allowed size, which is intended to be used to alert customers about how close they are to hitting the size limit.- Parameters:
- keyLen- The size of the record's key
- valueBytes-- Put.putValuewhich is expected to be a serialized- ChunkedValueManifest
- rmdBytes-- Put.replicationMetadataPayloadwhich can be a serialized- ChunkedValueManifestif RMD chunking was enabled or just the RMD payload otherwise
 
- 
recordAssembledRecordSizeRatioprotected abstract void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs) 
- 
calculateAssembledRecordSizeRatioprotected abstract double calculateAssembledRecordSizeRatio(long recordSize) 
- 
measureLagWithCallToPubSubprotected long measureLagWithCallToPubSub(String pubSubServerName, PubSubTopicPartition pubSubTopicPartition, PubSubPosition currentPosition) - Parameters:
- pubSubServerName- Pub Sub deployment to interrogate
- pubSubTopicPartition- the topic partition to measure lag for
- Returns:
- the lag, or 9223372036854775807L if it failed to measure it N.B.: Note that the returned lag can be negative since the end offset used in the calculation is cached.
 
- 
measureLagWithCallToPubSubprotected static long measureLagWithCallToPubSub(String pubSubBrokerAddress, PubSubTopicPartition pubSubTopicPartition, PubSubPosition currentPosition, Function<String, TopicManager> topicManagerProvider) 
- 
getWriteComputeErrorCodepublic abstract int getWriteComputeErrorCode()
- 
updateLeaderTopicOnFollowerpublic abstract void updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState) 
- 
minZeroLagprotected long minZeroLag(long value) Because of timing considerations, it is possible that some lag metrics could compute negative values. Negative lag does not make sense so the intent is to ease interpretation by applying a lower bound of zero on these metrics...
- 
isHybridModepublic boolean isHybridMode()
- 
processEndOfPushprotected void processEndOfPush(KafkaMessageEnvelope endOfPushKME, PubSubPosition offset, PartitionConsumptionState partitionConsumptionState) 
- 
processStartOfIncrementalPushprotected void processStartOfIncrementalPush(ControlMessage startOfIncrementalPush, PartitionConsumptionState partitionConsumptionState) 
- 
processEndOfIncrementalPushprotected void processEndOfIncrementalPush(ControlMessage endOfIncrementalPush, PartitionConsumptionState partitionConsumptionState) 
- 
processControlMessageForViewsprotected void processControlMessageForViews(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation inLeaderFollowerStoreIngestionTask
- 
processTopicSwitchprotected void processTopicSwitch(ControlMessage controlMessage, int partition, PubSubPosition offset, PartitionConsumptionState partitionConsumptionState) 
- 
updateOffsetMetadataInOffsetRecordprotected abstract void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState) Sync the metadata about offset inOffsetRecord.PartitionConsumptionStatewill pass through some information toOffsetRecordfor persistence and Offset rewind/split brain has been guarded inupdateLatestInMemoryProcessedOffset(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState, com.linkedin.venice.pubsub.api.DefaultPubSubMessage, com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext, java.lang.String, boolean).
- 
updateLatestInMemoryProcessedOffsetprotected abstract void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset.
- 
recordWriterStatsprotected abstract void recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState) 
- 
validateMessageprotected void validateMessage(PartitionTracker.TopicType type, DataIntegrityValidator validator, DefaultPubSubMessage consumerRecord, PartitionConsumptionState partitionConsumptionState, boolean tolerateMissingMessagesForRealTimeTopic) Message validation using DIV. Leaders should pass in the validator instance fromLeaderFollowerStoreIngestionTask; and drainers should pass in the validator instance fromStoreIngestionTask1. If valid DIV errors happen after EOP is received, no fatal exceptions will be thrown. But the errors will be recorded into the DIV metrics. 2. For any DIV errors happened to unregistered producers && after EOP, the errors will be ignored. 3. For any DIV errors happened to records which is after logCompactionDelayInMs, the errors will be ignored.
- 
cloneDrainerDivProducerStatesWe should only allowStoreIngestionTaskto accessdrainerDiv; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces.
- 
putInStorageEnginePersist Put record to storage engine.
- 
removeFromStorageEngine
- 
throwOrLogStorageFailureDependingIfStillSubscribed
- 
logStorageOperationWhileUnsubscribedprotected void logStorageOperationWhileUnsubscribed(int partition) 
- 
consumerHasAnySubscriptionpublic boolean consumerHasAnySubscription()
- 
consumerHasSubscriptionpublic boolean consumerHasSubscription(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) 
- 
unsubscribeFromTopicprotected void unsubscribeFromTopic(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) This method unsubscribes topic-partition from the input. If it is real-time topic and separate RT topic is enabled, it will also unsubscribe from separate real-time topic.
- 
consumerBatchUnsubscribe
- 
consumerUnSubscribeAllTopicspublic abstract void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState) 
- 
consumerSubscribepublic void consumerSubscribe(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, PubSubPosition startOffset, String kafkaURL) This method will try to resolve actual topic-partition from input Kafka URL and subscribe to the resolved topic-partition.
- 
consumerResetOffsetpublic void consumerResetOffset(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) 
- 
waitVersionStateAvailableprotected StoreVersionState waitVersionStateAvailable(String kafkaTopic) throws InterruptedException - Throws:
- InterruptedException
 
- 
closepublic void close()Stops the consumer task.- Specified by:
- closein interface- AutoCloseable
- Specified by:
- closein interface- Closeable
 
- 
shutdownAndWaitpublic boolean shutdownAndWait(int waitTime) This method is a blocking call to wait forStoreIngestionTaskfor fully shutdown in the given time.- Parameters:
- waitTime- Maximum wait time for the shutdown operation.
- Returns:
- whether able to gracefully shut down within the waitTime
 
- 
isRunningpublic boolean isRunning()A function to allow the service to get the current status of the task. This would allow the service to create a new task if required.
- 
getVersionTopic
- 
isMetricsEmissionEnabledpublic boolean isMetricsEmissionEnabled()
- 
enableMetricsEmissionpublic void enableMetricsEmission()
- 
disableMetricsEmissionpublic void disableMetricsEmission()
- 
isPartitionConsumingOrHasPendingIngestionActionpublic boolean isPartitionConsumingOrHasPendingIngestionAction(int userPartition) To check whether the given partition is still consuming message from Kafka
- 
createKafkaConsumerPropertiesprotected Properties createKafkaConsumerProperties(Properties localConsumerProps, String remoteKafkaSourceAddress, boolean consumeRemotely) Override theConfigKeys.KAFKA_BOOTSTRAP_SERVERSconfig with a remote Kafka bootstrap url.
- 
resubscribeprotected abstract void resubscribe(PartitionConsumptionState partitionConsumptionState) throws InterruptedException - Throws:
- InterruptedException
 
- 
reportError
- 
isActiveActiveReplicationEnabledpublic boolean isActiveActiveReplicationEnabled()
- 
dumpPartitionConsumptionStatespublic void dumpPartitionConsumptionStates(AdminResponse response, ComplementSet<Integer> partitions) Invoked by admin request to dump the requested partition consumption states
- 
dumpStoreVersionStateInvoked by admin request to dump store version state metadata.
- 
getServerConfig
- 
updateOffsetMetadataAndSyncpublic void updateOffsetMetadataAndSync(int partitionId) 
- 
getTopicManagerThe function returns local or remote topic manager.- Parameters:
- sourceKafkaServer- The address of source kafka bootstrap server.
- Returns:
- topic manager
 
- 
waitForAllMessageToBeProcessedFromTopicPartitionprotected void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) throws InterruptedException The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function. This will make the calling thread to block.- Parameters:
- topicPartition- for which to wait
- Throws:
- InterruptedException
 
- 
delegateConsumerRecordprotected abstract StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) 
- 
recordProcessedRecordStatsprotected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize) 
- 
isTransientRecordBufferUsedThis is not a per record state. Rather it's used to indicate if the transient record buffer is being used at all for this ingestion task/partition or not. The criterias are the following: 1. For L/F mode only WC ingestion task needs this buffer. 2. The transient record buffer is only used post-EOP.
- 
setPartitionConsumptionState
- 
getVersionedDIVStats
- 
getVersionIngestionStats
- 
getCompressionStrategy
- 
getCompressor
- 
isChunkedprotected boolean isChunked()
- 
getSchemaRepo
- 
getHostLevelIngestionStats
- 
getKafkaVersionTopic
- 
extractUpstreamPositionExtract the upstream position from the given consumer record's leader metadata.- Parameters:
- consumerRecord- the consumer record to extract upstream position from
- Returns:
- the upstream position if available, otherwise PubSubSymbolicPosition.EARLIEST
 
- 
extractUpstreamClusterId
- 
maybeSendIngestionHeartbeatFor L/F hybrid stores, the leader periodically writes a special SOS message to the RT topic. CheckLeaderFollowerStoreIngestionTask.maybeSendIngestionHeartbeat()for more details.
- 
isProducingVersionTopicHealthypublic boolean isProducingVersionTopicHealthy()This function is checking the following conditions: 1. Whether the version topic exists or not.
- 
isCurrentVersionpublic boolean isCurrentVersion()
- 
hasAllPartitionReportedCompletedpublic boolean hasAllPartitionReportedCompleted()
- 
isSeparatedRealtimeTopicEnabledpublic boolean isSeparatedRealtimeTopicEnabled()
- 
getDataIntegrityValidatorWhen Global RT DIV is enabled the ConsumptionTask's DIV is exclusively used to validate data integrity.
- 
getStorageUtilizationManager
- 
hasReplicasprotected boolean hasReplicas()Checks whether there are any replicas currently present on this host.For batch-only stores, a replica might be removed from the PCS map but still physically present on the host. activeReplicaCountcounter tracks the replica's lifecycle from the bootstrap stage through to the drop stage.
- 
validateEndOfPushReceivedBeforeTopicSwitchprotected static void validateEndOfPushReceivedBeforeTopicSwitch(PartitionConsumptionState partitionConsumptionState, PubSubPosition position) Validates that END_OF_PUSH has been received before processing TOPIC_SWITCH.- Parameters:
- partitionConsumptionState- The partition consumption state to validate
- position- The position/offset for error reporting
- Throws:
- VeniceException- if END_OF_PUSH has not been received
 
 
-