Class StoreIngestionTask
java.lang.Object
com.linkedin.davinci.kafka.consumer.StoreIngestionTask
- All Implemented Interfaces:
Closeable,AutoCloseable,Runnable
- Direct Known Subclasses:
LeaderFollowerStoreIngestionTask
A runnable Kafka Consumer consuming messages from all the partition assigned to current node for a Kafka Topic.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected static enumThis enum represents all potential results after callingdelegateConsumerRecord(PubSubMessageProcessedResultWrapper, int, String, int, long, long). -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final AggKafkaConsumerServiceprotected final SparseConcurrentList<Object>protected final booleanprotected final longprotected static final intprotected final InMemoryChunkAssemblerprotected final CompressionStrategyprotected final Lazy<VeniceCompressor>protected final StorageEngineBackedCompressorFactoryprotected final VeniceConcurrentHashMap<String,Long> Map of broker URL to the total bytes consumed by ConsumptionTask since the last Global RT DIV syncprotected final AtomicIntegerprotected final PriorityBlockingQueue<ConsumerAction>protected final DataIntegrityValidatorThe consumer and drainer DIV must remain separate.protected final longMessage bytes consuming interval before persisting offset in offset db for deferred-write database.protected final longMessage bytes consuming interval before persisting offset in offset db for transactional mode database.protected intprotected final com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheckprotected final SparseConcurrentList<Object>protected final DiskUsageprotected final Consumer<DataValidationException>protected final AtomicBooleanprotected final longprotected final intUsed for reporting error when thepartitionConsumptionStateMapis emptyprotected Lazy<CountDownLatch>protected final HostLevelIngestionStatsprotected final Stringprotected final Optional<HybridStoreConfig>protected AtomicIntegerprotected final com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcherprotected final Stringprotected final booleanprotected final BooleanSupplierprotected booleanprotected final booleanprotected final booleanprotected final booleanprotected final booleanprotected final AtomicBooleanprotected final booleanprotected final booleanprotected final Propertiesprotected final StringTopics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety.protected static final longprotected final intprotected final Stringprotected final ChunkedValueManifestSerializerprotected final MetaStoreWriterprotected final ExecutorServiceprotected final ConcurrentMap<Integer,PartitionConsumptionState> Per-partition consumption state mapprotected final intThis would be the number of partitions in the StorageEngine and in version topicsprotected final Map<Integer,AtomicInteger> protected final PubSubContextprotected final PubSubTopicRepositoryprotected final longprotected final booleanprotected final PubSubTopicprotected final AtomicBooleanprotected static final RedundantExceptionFilterprotected final booleanstatic longprotected final ReadOnlySchemaRepositoryprotected final PubSubTopicprotected final VeniceServerConfigprotected final StorageEngineprotected final StorageMetadataServiceprotected final StorageServicestorage destination for consumptionstatic longprotected final AbstractStoreBufferServiceprotected final Stringprotected final ReadOnlyStoreRepositoryprotected final Stringprotected final intprotected final TopicManagerRepositoryprotected final AggVersionedDIVStatsprotected final AggVersionedIngestionStatsprotected final intprotected final PubSubTopicprotected static final longprotected intprotected Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> -
Constructor Summary
ConstructorsConstructorDescriptionStoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeVersionConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, InternalDaVinciRecordTransformerConfig internalRecordTransformerConfig, Queue<VeniceNotifier> notifiers, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin) -
Method Summary
Modifier and TypeMethodDescriptionprotected abstract doublecalculateAssembledRecordSizeRatio(long recordSize) protected abstract booleancheckAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState partitionConsumptionState, long lag, long threshold, boolean shouldLogLag, LagType lagType) Checks whether the lag is acceptable for hybrid storesprotected voidcheckIngestionProgress(Store store) protected abstract voidprotected voidcloneDrainerDivProducerStates(int partition, DataIntegrityValidator validator) We should only allowStoreIngestionTaskto accessdrainerDiv; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces.voidclose()Stops the consumer task.protected voidcloseVeniceViewWriters(boolean doFlush) voidcloseVeniceWriters(boolean doFlush) voidconsumerBatchUnsubscribe(Set<PubSubTopicPartition> topicPartitionSet) booleanbooleanconsumerHasSubscription(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) voidconsumerResetOffset(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) voidconsumerSubscribe(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, PubSubPosition startOffset, String kafkaURL) This method will try to resolve actual topic-partition from input Kafka URL and subscribe to the resolved topic-partition.abstract voidconsumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState) protected PropertiescreateKafkaConsumerProperties(Properties localConsumerProps, String remoteKafkaSourceAddress, boolean consumeRemotely) Override theConfigKeys.KAFKA_BOOTSTRAP_SERVERSconfig with a remote Kafka bootstrap url.protected abstract StoreIngestionTask.DelegateConsumerRecordResultdelegateConsumerRecord(PubSubMessageProcessedResultWrapper consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) abstract voiddemoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) voiddropStoragePartitionGracefully(PubSubTopicPartition topicPartition) Drops a storage partition gracefully.voiddumpPartitionConsumptionStates(AdminResponse response, ComplementSet<Integer> partitions) Invoked by admin request to dump the requested partition consumption statesvoiddumpStoreVersionState(AdminResponse response) Invoked by admin request to dump store version state metadata.voidprotected intextractUpstreamClusterId(DefaultPubSubMessage consumerRecord) protected PubSubPositionextractUpstreamPosition(DefaultPubSubMessage consumerRecord) Extract the upstream position from the given consumer record's leader metadata.protected CompressionStrategyprotected Lazy<VeniceCompressor>getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) When Global RT DIV is enabled the ConsumptionTask's DIV is exclusively used to validate data integrity.intprotected HostLevelIngestionStatsprotected abstract IngestionBatchProcessorprotected Stringprotected static longgetOffsetToOnlineLagThresholdPerPartition(Optional<HybridStoreConfig> hybridStoreConfig, String storeName, int partitionCount) getPartitionConsumptionState(int partitionId) getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) protected ReadOnlySchemaRepositoryprotected StoragePartitionConfiggetStoragePartitionConfig(boolean sorted, PartitionConsumptionState partitionConsumptionState) protected StoragePartitionConfiggetStoragePartitionConfig(PartitionConsumptionState partitionConsumptionState) protected TopicManagergetTopicManager(String sourceKafkaServer) The function returns local or remote topic manager.protected PubSubPositiongetTopicPartitionEndPosition(String pubSubBrokerAddress, PubSubTopicPartition topicPartition) protected AggVersionedDIVStatsprotected AggVersionedIngestionStatsintabstract intbooleanbooleanhasAnyPartitionConsumptionState(Predicate<PartitionConsumptionState> pcsPredicate) booleanbooleanbooleanhasPendingPartitionIngestionAction(int userPartition) protected booleanChecks whether there are any replicas currently present on this host.booleanprotected booleanbooleanprotected abstract booleanisHybridFollower(PartitionConsumptionState partitionConsumptionState) booleanbooleanThis helper function will check if the ingestion task has been idle for a long time.booleanbooleanbooleanisPartitionConsumingOrHasPendingIngestionAction(int userPartition) To check whether the given partition is still consuming message from KafkabooleanThis function is checking the following conditions: 1.protected booleanisReadyToServe(PartitionConsumptionState partitionConsumptionState) This function checks various conditions to verify if a store is ready to serve.protected abstract booleanisRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) booleanA function to allow the service to get the current status of the task.booleanbooleanisTransientRecordBufferUsed(PartitionConsumptionState partitionConsumptionState) This is not a per record state.booleanvoidkill()protected voidlogStorageOperationWhileUnsubscribed(int partition) For L/F hybrid stores, the leader periodically writes a special SOS message to the RT topic.protected abstract longmeasureHybridHeartbeatLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid heartbeat lag for replica being tracked in `partitionConsumptionState`.protected abstract longmeasureHybridHeartbeatTimestamp(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid heartbeat timestamp for replica being tracked in `partitionConsumptionState`.protected abstract longmeasureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid offset lag for replica being tracked in `partitionConsumptionState`.protected longmeasureLagWithCallToPubSub(String pubSubServerName, PubSubTopicPartition pubSubTopicPartition, PubSubPosition currentPosition) protected static longmeasureLagWithCallToPubSub(String pubSubBrokerAddress, PubSubTopicPartition pubSubTopicPartition, PubSubPosition currentPosition, Function<String, TopicManager> topicManagerProvider) protected longminZeroLag(long value) Because of timing considerations, it is possible that some lag metrics could compute negative values.protected intApply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).protected voidprocessCommonConsumerAction(ConsumerAction consumerAction) protected abstract voidprocessConsumerAction(ConsumerAction message, Store store) voidprocessConsumerRecord(DefaultPubSubMessage record, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) This function will be invoked inStoreBufferServiceto process bufferedPubSubMessage.protected voidprocessControlMessageForViews(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation inLeaderFollowerStoreIngestionTaskprotected voidprocessEndOfIncrementalPush(ControlMessage endOfIncrementalPush, PartitionConsumptionState partitionConsumptionState, long pubSubMessageTime) protected voidprocessEndOfPush(KafkaMessageEnvelope endOfPushKME, PubSubPosition offset, PartitionConsumptionState partitionConsumptionState, EndOfPush endOfPush) protected voidprocessStartOfIncrementalPush(ControlMessage startOfIncrementalPush, PartitionConsumptionState partitionConsumptionState, long pubSubMessageTime) protected voidprocessTopicSwitch(ControlMessage controlMessage, int partition, PubSubPosition offset, PartitionConsumptionState partitionConsumptionState) protected voidproduceToStoreBufferService(DefaultPubSubMessage consumedRecord, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs) This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained byStoreBufferService.protected voidproduceToStoreBufferServiceOrKafka(Iterable<DefaultPubSubMessage> records, PubSubTopicPartition topicPartition, String kafkaUrl, int kafkaClusterId) This function is in charge of producing the consumer records to the writer buffers maintained byStoreBufferService.protected voidproduceToStoreBufferServiceOrKafkaInBatch(Iterable<DefaultPubSubMessage> records, PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState, String kafkaUrl, int kafkaClusterId) abstract voidpromoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) protected voidputInStorageEngine(int partition, byte[] keyBytes, Put put) Persist Put record to storage engine.protected voidrecordAssembledRecordSize(int keyLen, ByteBuffer valueBytes, ByteBuffer rmdBytes, long currentTimeMs) Records metrics for the original size of full-assembled records (key + value) and RMD by utilizing the fieldChunkedValueManifest.size.protected abstract voidrecordAssembledRecordSizeRatio(double ratio, long currentTimeMs) voidprotected voidrecordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecord, String kafkaUrl) protected voidrecordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize) protected abstract voidrecordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState) protected voidprotected voidremoveFromStorageEngine(int partition, byte[] keyBytes, Delete delete) voidreportError(String message, int userPartition, Exception e) protected abstract voidreportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState) Check if the ingestion progress has reached to the end of the version topic.voidresetPartitionConsumptionOffset(PubSubTopicPartition topicPartition) Adds an asynchronous resetting partition consumption offset request for the task.protected TopicSwitchresolveSourceKafkaServersWithinTopicSwitch(TopicSwitch originalTopicSwitch) Applies name resolution to all Kafka URLs in the provided TopicSwitch.protected abstract voidresubscribe(PartitionConsumptionState partitionConsumptionState) voidrun()Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread.voidvoidprotected voidsetPartitionConsumptionState(int partition, PartitionConsumptionState pcs) protected booleanshouldPersistRecord(DefaultPubSubMessage record, PartitionConsumptionState partitionConsumptionState) protected booleanCommon record check for different state models: check whether server continues receiving messages after EOP for a batch-only store.booleanshutdownAndWait(int waitTime) This method is a blocking call to wait forStoreIngestionTaskfor fully shutdown in the given time.protected voidstartConsumingAsLeader(PartitionConsumptionState partitionConsumptionState) voidsubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction, Optional<PubSubPosition> pubSubPosition) Adds an asynchronous partition subscription request for the task.voidsubscribePartition(PubSubTopicPartition topicPartition, Optional<PubSubPosition> pubSubPosition) protected voidprotected voidthrowOrLogStorageFailureDependingIfStillSubscribed(int partition, VeniceException e) protected voidunsubscribeFromTopic(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) This method unsubscribes topic-partition from the input.unSubscribePartition(PubSubTopicPartition topicPartition) unSubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction) Adds an asynchronous partition unsubscription request for the task.protected voidupdateAndSyncOffsetFromSnapshot(PartitionTracker vtDivSnapshot, PubSubTopicPartition topicPartition) This version of the method syncs using a PartitionTracker object which contains the vtSegments and LCVPprotected abstract voidupdateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord.abstract voidupdateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState) voidupdateOffsetMetadataAndSync(int partitionId) protected voidprotected voidprotected abstract voidupdateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState) Sync the metadata about offset inOffsetRecord.protected abstract Iterable<DefaultPubSubMessage>validateAndFilterOutDuplicateMessagesFromLeaderTopic(Iterable<DefaultPubSubMessage> records, String kafkaUrl, PubSubTopicPartition topicPartition) protected static voidvalidateEndOfPushReceivedBeforeTopicSwitch(PartitionConsumptionState partitionConsumptionState, PubSubPosition position) Validates that END_OF_PUSH has been received before processing TOPIC_SWITCH.protected voidvalidateMessage(PartitionTracker.TopicType type, DataIntegrityValidator validator, DefaultPubSubMessage consumerRecord, PartitionConsumptionState partitionConsumptionState, boolean tolerateMissingMessagesForRealTimeTopic) Message validation using DIV.protected voidwaitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function.protected StoreVersionStatewaitVersionStateAvailable(String kafkaTopic)
-
Field Details
-
SCHEMA_POLLING_DELAY_MS
public static long SCHEMA_POLLING_DELAY_MS -
STORE_VERSION_POLLING_DELAY_MS
public static long STORE_VERSION_POLLING_DELAY_MS -
WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED
protected static final long WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED -
KILL_WAIT_TIME_MS
protected static final long KILL_WAIT_TIME_MS- See Also:
-
CHUNK_SCHEMA_ID
protected static final int CHUNK_SCHEMA_ID -
REDUNDANT_LOGGING_FILTER
-
storageService
storage destination for consumption -
storageEngine
-
kafkaVersionTopic
Topics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety. -
versionTopic
-
realTimeTopic
-
separateRealTimeTopic
-
storeName
-
storeVersionName
-
isSystemStore
protected final boolean isSystemStore -
versionNumber
protected final int versionNumber -
schemaRepository
-
storeRepository
-
ingestionTaskName
-
kafkaProps
-
isRunning
-
emitMetrics
-
consumerActionSequenceNumber
-
consumerActionsQueue
-
partitionToPendingConsumerActionCountMap
-
storageMetadataService
-
topicManagerRepository
-
partitionConsumptionStateMap
Per-partition consumption state map -
storeBufferService
-
consumerDiv
The consumer and drainer DIV must remain separate. Since the consumer is always ahead of the drainer, the consumer would be validating data ahead of the actual persisted data on the drainer. NOTE: Currently, the state clearing happens only in drainerDiv which persists its state to disk. consumerDiv is transient, not persisted to disk, and its state is not expected to grow as large. Thus, bouncing effectively clears it (which is not the case for drainerDiv). Later on, we could trigger state cleaning for this consumer DIV as well, if deemed necessary. NOTE: WhenisGlobalRtDivEnabled()is enabled, this will be used by leaders to produce Global RT DIV state to local VT. This will also be used to send DIV snapshots to the drainer to persist the VT + RT DIV on-disk. -
consumedBytesSinceLastSync
Map of broker URL to the total bytes consumed by ConsumptionTask since the last Global RT DIV sync -
hostLevelIngestionStats
-
versionedDIVStats
-
versionedIngestionStats
-
recordTransformerStats
-
isCurrentVersion
-
hybridStoreConfig
-
divErrorMetricCallback
-
readCycleDelayMs
protected final long readCycleDelayMs -
emptyPollSleepMs
protected final long emptyPollSleepMs -
diskUsage
-
databaseSyncBytesIntervalForTransactionalMode
protected final long databaseSyncBytesIntervalForTransactionalModeMessage bytes consuming interval before persisting offset in offset db for transactional mode database. -
databaseSyncBytesIntervalForDeferredWriteMode
protected final long databaseSyncBytesIntervalForDeferredWriteModeMessage bytes consuming interval before persisting offset in offset db for deferred-write database. -
serverConfig
-
errorPartitionId
protected final int errorPartitionIdUsed for reporting error when thepartitionConsumptionStateMapis empty -
defaultReadyToServeChecker
protected final com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheck defaultReadyToServeChecker -
availableSchemaIds
-
deserializedSchemaIds
-
idleCounter
-
aggKafkaConsumerService
-
writeComputeFailureCode
protected int writeComputeFailureCode -
isWriteComputationEnabled
protected final boolean isWriteComputationEnabled -
partitionCount
protected final int partitionCountThis would be the number of partitions in the StorageEngine and in version topics -
storeVersionPartitionCount
protected final int storeVersionPartitionCount -
bootstrapTimeoutInMs
protected final long bootstrapTimeoutInMs -
isIsolatedIngestion
protected final boolean isIsolatedIngestion -
ingestionNotificationDispatcher
protected final com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher ingestionNotificationDispatcher -
chunkAssembler
-
localKafkaServer
-
localKafkaClusterId
protected final int localKafkaClusterId -
localKafkaServerSingletonSet
-
isDaVinciClient
protected final boolean isDaVinciClient -
isDataRecovery
protected boolean isDataRecovery -
dataRecoverySourceVersionNumber
protected int dataRecoverySourceVersionNumber -
readOnlyForBatchOnlyStoreEnabled
protected final boolean readOnlyForBatchOnlyStoreEnabled -
metaStoreWriter
-
kafkaClusterUrlResolver
-
resetErrorReplicaEnabled
protected final boolean resetErrorReplicaEnabled -
compressionStrategy
-
compressorFactory
-
compressor
-
isChunked
protected final boolean isChunked -
isRmdChunked
protected final boolean isRmdChunked -
manifestSerializer
-
pubSubContext
-
pubSubTopicRepository
-
recordLevelMetricEnabled
-
isGlobalRtDivEnabled
protected final boolean isGlobalRtDivEnabled -
versionRole
-
workloadType
-
batchReportIncPushStatusEnabled
protected final boolean batchReportIncPushStatusEnabled -
parallelProcessingThreadPool
-
gracefulShutdownLatch
-
zkHelixAdmin
-
hostName
-
-
Constructor Details
-
StoreIngestionTask
public StoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeVersionConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, InternalDaVinciRecordTransformerConfig internalRecordTransformerConfig, Queue<VeniceNotifier> notifiers, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
-
Method Details
-
getIngestionBatchProcessor
-
getStorageEngine
-
getIngestionTaskName
-
getVersionNumber
public int getVersionNumber() -
throwIfNotRunning
protected void throwIfNotRunning() -
nextSeqNum
protected int nextSeqNum()Apply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).- Returns:
- an unique and increasing sequence number for a new consumer action.
-
subscribePartition
public void subscribePartition(PubSubTopicPartition topicPartition, Optional<PubSubPosition> pubSubPosition) -
subscribePartition
public void subscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction, Optional<PubSubPosition> pubSubPosition) Adds an asynchronous partition subscription request for the task. -
unSubscribePartition
-
unSubscribePartition
public CompletableFuture<Void> unSubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction) Adds an asynchronous partition unsubscription request for the task. -
dropStoragePartitionGracefully
Drops a storage partition gracefully. This is always a Helix triggered action. -
hasAnySubscription
public boolean hasAnySubscription() -
hasAnyPendingSubscription
public boolean hasAnyPendingSubscription() -
isIdleOverThreshold
public boolean isIdleOverThreshold()This helper function will check if the ingestion task has been idle for a long time. -
resetPartitionConsumptionOffset
Adds an asynchronous resetting partition consumption offset request for the task. -
getStoreName
-
isUserSystemStore
public boolean isUserSystemStore() -
promoteToLeader
public abstract void promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) -
demoteToStandby
public abstract void demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) -
hasPendingPartitionIngestionAction
public boolean hasPendingPartitionIngestionAction(int userPartition) -
kill
public void kill() -
getStoragePartitionConfig
protected StoragePartitionConfig getStoragePartitionConfig(PartitionConsumptionState partitionConsumptionState) -
getStoragePartitionConfig
protected StoragePartitionConfig getStoragePartitionConfig(boolean sorted, PartitionConsumptionState partitionConsumptionState) -
isHybridFollower
-
checkAndLogIfLagIsAcceptableForHybridStore
protected abstract boolean checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState partitionConsumptionState, long lag, long threshold, boolean shouldLogLag, LagType lagType) Checks whether the lag is acceptable for hybrid stores -
isReadyToServe
This function checks various conditions to verify if a store is ready to serve. Lag = (Source Max Offset - SOBR Source Offset) - (Current Offset - SOBR Destination Offset)- Returns:
- true if EOP was received and (for hybrid stores) if lag <= threshold
-
isRealTimeBufferReplayStarted
protected abstract boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) -
measureHybridOffsetLag
protected abstract long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid offset lag for replica being tracked in `partitionConsumptionState`. -
measureHybridHeartbeatLag
protected abstract long measureHybridHeartbeatLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid heartbeat lag for replica being tracked in `partitionConsumptionState`. If it is Da Vinci client, returnHeartbeatMonitoringService.INVALID_HEARTBEAT_LAGbefore it is implemented. -
measureHybridHeartbeatTimestamp
protected abstract long measureHybridHeartbeatTimestamp(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid heartbeat timestamp for replica being tracked in `partitionConsumptionState`. If it is Da Vinci client, returnHeartbeatMonitoringService.INVALID_MESSAGE_TIMESTAMPbefore it is implemented. -
reportIfCatchUpVersionTopicOffset
protected abstract void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState) Check if the ingestion progress has reached to the end of the version topic. This is currently only usedLeaderFollowerStoreIngestionTask. -
produceToStoreBufferService
protected void produceToStoreBufferService(DefaultPubSubMessage consumedRecord, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs) throws InterruptedException This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained byStoreBufferService.- Parameters:
consumedRecord- : received consumer recordleaderProducedRecordContext- : derived leaderProducedRecordContextpartition-kafkaUrl-- Throws:
InterruptedException
-
validateAndFilterOutDuplicateMessagesFromLeaderTopic
protected abstract Iterable<DefaultPubSubMessage> validateAndFilterOutDuplicateMessagesFromLeaderTopic(Iterable<DefaultPubSubMessage> records, String kafkaUrl, PubSubTopicPartition topicPartition) -
produceToStoreBufferServiceOrKafka
protected void produceToStoreBufferServiceOrKafka(Iterable<DefaultPubSubMessage> records, PubSubTopicPartition topicPartition, String kafkaUrl, int kafkaClusterId) throws InterruptedException This function is in charge of producing the consumer records to the writer buffers maintained byStoreBufferService. This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this call.- Parameters:
records- : received consumer recordstopicPartition-- Throws:
InterruptedException
-
produceToStoreBufferServiceOrKafkaInBatch
protected void produceToStoreBufferServiceOrKafkaInBatch(Iterable<DefaultPubSubMessage> records, PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState, String kafkaUrl, int kafkaClusterId) throws InterruptedException - Throws:
InterruptedException
-
checkIngestionProgress
- Throws:
InterruptedException
-
refreshIngestionContextIfChanged
- Throws:
InterruptedException
-
isIngestionTaskActive
public boolean isIngestionTaskActive() -
run
public void run()Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread. -
updateOffsetMetadataAndSyncOffset
-
updateOffsetMetadataAndSyncOffset
protected void updateOffsetMetadataAndSyncOffset(DataIntegrityValidator div, @Nonnull PartitionConsumptionState pcs) -
updateAndSyncOffsetFromSnapshot
protected void updateAndSyncOffsetFromSnapshot(PartitionTracker vtDivSnapshot, PubSubTopicPartition topicPartition) This version of the method syncs using a PartitionTracker object which contains the vtSegments and LCVP -
closeVeniceWriters
public void closeVeniceWriters(boolean doFlush) -
closeVeniceViewWriters
protected void closeVeniceViewWriters(boolean doFlush) -
resolveSourceKafkaServersWithinTopicSwitch
Applies name resolution to all Kafka URLs in the provided TopicSwitch. Useful for translating URLs that came from a different runtime (e.g. from the controller, or from state persisted by a previous run of the same server).- Returns:
- the same TopicSwitch, mutated such that all Kafka URLs it contains are guaranteed to be usable by the current Venice server instance
-
getOffsetToOnlineLagThresholdPerPartition
protected static long getOffsetToOnlineLagThresholdPerPartition(Optional<HybridStoreConfig> hybridStoreConfig, String storeName, int partitionCount) -
processCommonConsumerAction
protected void processCommonConsumerAction(ConsumerAction consumerAction) throws InterruptedException - Throws:
InterruptedException
-
getTopicPartitionEndPosition
protected PubSubPosition getTopicPartitionEndPosition(String pubSubBrokerAddress, PubSubTopicPartition topicPartition) - Returns:
- the end position for the topic partition in SIT, or a
PubSubSymbolicPosition.LATESTvalue if it failed to get it. N.B.: The returned end position is the last successfully replicated message plus one. If the partition has never been written to, the end position is equal to the start position.
-
checkLongRunningTaskState
- Throws:
InterruptedException
-
processConsumerAction
protected abstract void processConsumerAction(ConsumerAction message, Store store) throws InterruptedException - Throws:
InterruptedException
-
getConsumptionSourceKafkaAddress
protected abstract Set<String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) -
startConsumingAsLeader
-
getRealTimeDataSourceKafkaAddress
protected Set<String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) -
getPartitionConsumptionState
-
hasAnyPartitionConsumptionState
-
getFailedIngestionPartitionCount
public int getFailedIngestionPartitionCount() -
shouldProcessRecord
Common record check for different state models: check whether server continues receiving messages after EOP for a batch-only store. -
shouldPersistRecord
protected boolean shouldPersistRecord(DefaultPubSubMessage record, PartitionConsumptionState partitionConsumptionState) -
processConsumerRecord
public void processConsumerRecord(DefaultPubSubMessage record, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) This function will be invoked inStoreBufferServiceto process bufferedPubSubMessage. -
recordHeartbeatReceived
protected void recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecord, String kafkaUrl) -
setLastConsumerException
-
setLastStoreIngestionException
-
recordChecksumVerificationFailure
public void recordChecksumVerificationFailure() -
recordAssembledRecordSize
protected void recordAssembledRecordSize(int keyLen, ByteBuffer valueBytes, ByteBuffer rmdBytes, long currentTimeMs) Records metrics for the original size of full-assembled records (key + value) and RMD by utilizing the fieldChunkedValueManifest.size. Also records the ratio of assembled record size to maximum allowed size, which is intended to be used to alert customers about how close they are to hitting the size limit.- Parameters:
keyLen- The size of the record's keyvalueBytes-Put.putValuewhich is expected to be a serializedChunkedValueManifestrmdBytes-Put.replicationMetadataPayloadwhich can be a serializedChunkedValueManifestif RMD chunking was enabled or just the RMD payload otherwise
-
recordAssembledRecordSizeRatio
protected abstract void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs) -
calculateAssembledRecordSizeRatio
protected abstract double calculateAssembledRecordSizeRatio(long recordSize) -
measureLagWithCallToPubSub
protected long measureLagWithCallToPubSub(String pubSubServerName, PubSubTopicPartition pubSubTopicPartition, PubSubPosition currentPosition) - Parameters:
pubSubServerName- Pub Sub deployment to interrogatepubSubTopicPartition- the topic partition to measure lag for- Returns:
- the lag, or 9223372036854775807L if it failed to measure it N.B.: Note that the returned lag can be negative since the end offset used in the calculation is cached.
-
measureLagWithCallToPubSub
protected static long measureLagWithCallToPubSub(String pubSubBrokerAddress, PubSubTopicPartition pubSubTopicPartition, PubSubPosition currentPosition, Function<String, TopicManager> topicManagerProvider) -
getWriteComputeErrorCode
public abstract int getWriteComputeErrorCode() -
updateLeaderTopicOnFollower
public abstract void updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState) -
minZeroLag
protected long minZeroLag(long value) Because of timing considerations, it is possible that some lag metrics could compute negative values. Negative lag does not make sense so the intent is to ease interpretation by applying a lower bound of zero on these metrics... -
isHybridMode
public boolean isHybridMode() -
processEndOfPush
protected void processEndOfPush(KafkaMessageEnvelope endOfPushKME, PubSubPosition offset, PartitionConsumptionState partitionConsumptionState, EndOfPush endOfPush) -
processStartOfIncrementalPush
protected void processStartOfIncrementalPush(ControlMessage startOfIncrementalPush, PartitionConsumptionState partitionConsumptionState, long pubSubMessageTime) -
processEndOfIncrementalPush
protected void processEndOfIncrementalPush(ControlMessage endOfIncrementalPush, PartitionConsumptionState partitionConsumptionState, long pubSubMessageTime) -
processControlMessageForViews
protected void processControlMessageForViews(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation inLeaderFollowerStoreIngestionTask -
processTopicSwitch
protected void processTopicSwitch(ControlMessage controlMessage, int partition, PubSubPosition offset, PartitionConsumptionState partitionConsumptionState) -
updateOffsetMetadataInOffsetRecord
protected abstract void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState) Sync the metadata about offset inOffsetRecord.PartitionConsumptionStatewill pass through some information toOffsetRecordfor persistence and Offset rewind/split brain has been guarded inupdateLatestInMemoryProcessedOffset(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState, com.linkedin.venice.pubsub.api.DefaultPubSubMessage, com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext, java.lang.String, boolean). -
updateLatestInMemoryProcessedOffset
protected abstract void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, DefaultPubSubMessage consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset. -
recordWriterStats
protected abstract void recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState) -
validateMessage
protected void validateMessage(PartitionTracker.TopicType type, DataIntegrityValidator validator, DefaultPubSubMessage consumerRecord, PartitionConsumptionState partitionConsumptionState, boolean tolerateMissingMessagesForRealTimeTopic) Message validation using DIV. Leaders should pass in the validator instance fromLeaderFollowerStoreIngestionTask; and drainers should pass in the validator instance fromStoreIngestionTask1. If valid DIV errors happen after EOP is received, no fatal exceptions will be thrown. But the errors will be recorded into the DIV metrics. 2. For any DIV errors happened to unregistered producers && after EOP, the errors will be ignored. 3. For any DIV errors happened to records which is after logCompactionDelayInMs, the errors will be ignored. -
cloneDrainerDivProducerStates
We should only allowStoreIngestionTaskto accessdrainerDiv; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces. -
putInStorageEngine
Persist Put record to storage engine. -
removeFromStorageEngine
-
throwOrLogStorageFailureDependingIfStillSubscribed
-
logStorageOperationWhileUnsubscribed
protected void logStorageOperationWhileUnsubscribed(int partition) -
consumerHasAnySubscription
public boolean consumerHasAnySubscription() -
consumerHasSubscription
public boolean consumerHasSubscription(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) -
unsubscribeFromTopic
protected void unsubscribeFromTopic(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) This method unsubscribes topic-partition from the input. If it is real-time topic and separate RT topic is enabled, it will also unsubscribe from separate real-time topic. -
consumerBatchUnsubscribe
-
consumerUnSubscribeAllTopics
public abstract void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState) -
consumerSubscribe
public void consumerSubscribe(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, PubSubPosition startOffset, String kafkaURL) This method will try to resolve actual topic-partition from input Kafka URL and subscribe to the resolved topic-partition. -
consumerResetOffset
public void consumerResetOffset(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) -
waitVersionStateAvailable
protected StoreVersionState waitVersionStateAvailable(String kafkaTopic) throws InterruptedException - Throws:
InterruptedException
-
close
public void close()Stops the consumer task.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceCloseable
-
shutdownAndWait
public boolean shutdownAndWait(int waitTime) This method is a blocking call to wait forStoreIngestionTaskfor fully shutdown in the given time.- Parameters:
waitTime- Maximum wait time for the shutdown operation.- Returns:
- whether able to gracefully shut down within the waitTime
-
isRunning
public boolean isRunning()A function to allow the service to get the current status of the task. This would allow the service to create a new task if required. -
getVersionTopic
-
isMetricsEmissionEnabled
public boolean isMetricsEmissionEnabled() -
enableMetricsEmission
public void enableMetricsEmission() -
disableMetricsEmission
public void disableMetricsEmission() -
isPartitionConsumingOrHasPendingIngestionAction
public boolean isPartitionConsumingOrHasPendingIngestionAction(int userPartition) To check whether the given partition is still consuming message from Kafka -
createKafkaConsumerProperties
protected Properties createKafkaConsumerProperties(Properties localConsumerProps, String remoteKafkaSourceAddress, boolean consumeRemotely) Override theConfigKeys.KAFKA_BOOTSTRAP_SERVERSconfig with a remote Kafka bootstrap url. -
resubscribe
protected abstract void resubscribe(PartitionConsumptionState partitionConsumptionState) throws InterruptedException - Throws:
InterruptedException
-
reportError
-
isActiveActiveReplicationEnabled
public boolean isActiveActiveReplicationEnabled() -
dumpPartitionConsumptionStates
public void dumpPartitionConsumptionStates(AdminResponse response, ComplementSet<Integer> partitions) Invoked by admin request to dump the requested partition consumption states -
dumpStoreVersionState
Invoked by admin request to dump store version state metadata. -
getServerConfig
-
updateOffsetMetadataAndSync
public void updateOffsetMetadataAndSync(int partitionId) -
getTopicManager
The function returns local or remote topic manager.- Parameters:
sourceKafkaServer- The address of source kafka bootstrap server.- Returns:
- topic manager
-
waitForAllMessageToBeProcessedFromTopicPartition
protected void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) throws InterruptedException The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function. This will make the calling thread to block.- Parameters:
topicPartition- for which to wait- Throws:
InterruptedException
-
delegateConsumerRecord
protected abstract StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) -
recordProcessedRecordStats
protected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize) -
isTransientRecordBufferUsed
This is not a per record state. Rather it's used to indicate if the transient record buffer is being used at all for this ingestion task/partition or not. The criterias are the following: 1. For L/F mode only WC ingestion task needs this buffer. 2. The transient record buffer is only used post-EOP. -
setPartitionConsumptionState
-
getVersionedDIVStats
-
getVersionIngestionStats
-
getCompressionStrategy
-
getCompressor
-
isChunked
protected boolean isChunked() -
getSchemaRepo
-
getHostLevelIngestionStats
-
getKafkaVersionTopic
-
extractUpstreamPosition
Extract the upstream position from the given consumer record's leader metadata.- Parameters:
consumerRecord- the consumer record to extract upstream position from- Returns:
- the upstream position if available, otherwise PubSubSymbolicPosition.EARLIEST
-
extractUpstreamClusterId
-
maybeSendIngestionHeartbeat
For L/F hybrid stores, the leader periodically writes a special SOS message to the RT topic. CheckLeaderFollowerStoreIngestionTask.maybeSendIngestionHeartbeat()for more details. -
isProducingVersionTopicHealthy
public boolean isProducingVersionTopicHealthy()This function is checking the following conditions: 1. Whether the version topic exists or not. -
isCurrentVersion
public boolean isCurrentVersion() -
hasAllPartitionReportedCompleted
public boolean hasAllPartitionReportedCompleted() -
isSeparatedRealtimeTopicEnabled
public boolean isSeparatedRealtimeTopicEnabled() -
getDataIntegrityValidator
When Global RT DIV is enabled the ConsumptionTask's DIV is exclusively used to validate data integrity. -
getStorageUtilizationManager
-
hasReplicas
protected boolean hasReplicas()Checks whether there are any replicas currently present on this host.For batch-only stores, a replica might be removed from the PCS map but still physically present on the host.
activeReplicaCountcounter tracks the replica's lifecycle from the bootstrap stage through to the drop stage. -
validateEndOfPushReceivedBeforeTopicSwitch
protected static void validateEndOfPushReceivedBeforeTopicSwitch(PartitionConsumptionState partitionConsumptionState, PubSubPosition position) Validates that END_OF_PUSH has been received before processing TOPIC_SWITCH.- Parameters:
partitionConsumptionState- The partition consumption state to validateposition- The position/offset for error reporting- Throws:
VeniceException- if END_OF_PUSH has not been received
-