Class StoreIngestionTask
java.lang.Object
com.linkedin.davinci.kafka.consumer.StoreIngestionTask
- All Implemented Interfaces:
Closeable
,AutoCloseable
,Runnable
- Direct Known Subclasses:
LeaderFollowerStoreIngestionTask
A runnable Kafka Consumer consuming messages from all the partition assigned to current node for a Kafka Topic.
-
Nested Class Summary
Modifier and TypeClassDescriptionprotected static enum
This enum represents all potential results after callingdelegateConsumerRecord(PubSubMessageProcessedResultWrapper, int, String, int, long, long)
. -
Field Summary
Modifier and TypeFieldDescriptionprotected final AggKafkaConsumerService
protected final SparseConcurrentList<Object>
protected final boolean
protected final long
protected final ChunkAssembler
protected final CompressionStrategy
protected final Lazy<VeniceCompressor>
protected final StorageEngineBackedCompressorFactory
protected final AtomicInteger
protected final PriorityBlockingQueue<ConsumerAction>
protected final long
Message bytes consuming interval before persisting offset in offset db for deferred-write database.protected final long
Message bytes consuming interval before persisting offset in offset db for transactional mode database.protected int
protected final com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheck
protected final SparseConcurrentList<Object>
protected final DiskUsage
protected final Consumer<DataValidationException>
protected final AtomicBoolean
protected final long
protected final int
Used for reporting error when thepartitionConsumptionStateMap
is emptyprotected Lazy<CountDownLatch>
protected final HostLevelIngestionStats
protected final String
protected final Optional<HybridStoreConfig>
protected int
protected final com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher
protected final String
protected final boolean
protected final BooleanSupplier
protected boolean
protected final boolean
protected final boolean
protected final boolean
protected final boolean
protected final AtomicBoolean
protected final boolean
protected final boolean
protected final Properties
protected final String
Topics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety.protected static final long
protected final int
protected final String
protected final ChunkedValueManifestSerializer
protected final MetaStoreWriter
protected final ExecutorService
protected final ConcurrentMap<Integer,
PartitionConsumptionState> Per-partition consumption state mapprotected final int
This would be the number of partitions in the StorageEngine and in version topicsprotected final Map<Integer,
AtomicInteger> protected final PubSubTopicRepository
protected final long
protected final boolean
protected final PubSubTopic
protected final AtomicBoolean
protected static final RedundantExceptionFilter
protected final boolean
static long
protected final ReadOnlySchemaRepository
protected final VeniceServerConfig
protected final AbstractStorageEngine
protected final StorageEngineRepository
protected final StorageMetadataService
protected final StorageService
storage destination for consumptionstatic long
protected final AbstractStoreBufferService
protected final String
protected final ReadOnlyStoreRepository
protected final int
protected final TopicManagerRepository
protected final AggVersionedDIVStats
protected final AggVersionedIngestionStats
protected final int
protected final PubSubTopic
protected static final long
protected int
protected Lazy<org.apache.helix.manager.zk.ZKHelixAdmin>
-
Constructor Summary
ConstructorDescriptionStoreIngestionTask
(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeVersionConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerConfig recordTransformerConfig, Queue<VeniceNotifier> notifiers, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin) -
Method Summary
Modifier and TypeMethodDescriptionprotected abstract double
calculateAssembledRecordSizeRatio
(long recordSize) protected abstract boolean
checkAndLogIfLagIsAcceptableForHybridStore
(PartitionConsumptionState partitionConsumptionState, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp) Checks whether the lag is acceptable for hybrid storesprotected void
checkIngestionProgress
(Store store) protected abstract void
protected void
cloneProducerStates
(int partition, KafkaDataIntegrityValidator validator) We should only allowStoreIngestionTask
to accesskafkaDataIntegrityValidator
; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces.void
close()
Stops the consumer task.protected void
void
closeVeniceWriters
(boolean doFlush) void
consumerBatchUnsubscribe
(Set<PubSubTopicPartition> topicPartitionSet) boolean
boolean
consumerHasSubscription
(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) void
consumerResetOffset
(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) void
consumerSubscribe
(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, long startOffset, String kafkaURL) This method will try to resolve actual topic-partition from input Kafka URL and subscribe to the resolved topic-partition.abstract void
consumerUnSubscribeAllTopics
(PartitionConsumptionState partitionConsumptionState) protected Properties
createKafkaConsumerProperties
(Properties localConsumerProps, String remoteKafkaSourceAddress, boolean consumeRemotely) Override theConfigKeys.KAFKA_BOOTSTRAP_SERVERS
config with a remote Kafka bootstrap url.protected abstract StoreIngestionTask.DelegateConsumerRecordResult
delegateConsumerRecord
(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) abstract void
demoteToStandby
(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) void
dropStoragePartitionGracefully
(PubSubTopicPartition topicPartition) Drops a storage partition gracefully.void
dumpPartitionConsumptionStates
(AdminResponse response, ComplementSet<Integer> partitions) Invoked by admin request to dump the requested partition consumption statesvoid
dumpStoreVersionState
(AdminResponse response) Invoked by admin request to dump store version state metadata.void
abstract long
abstract long
abstract long
protected CompressionStrategy
protected Lazy<VeniceCompressor>
getConsumptionSourceKafkaAddress
(PartitionConsumptionState partitionConsumptionState) int
abstract long
Measure the offset lag between follower and leaderprotected HostLevelIngestionStats
abstract long
abstract long
protected abstract IngestionBatchProcessor
protected String
abstract long
protected static long
getOffsetToOnlineLagThresholdPerPartition
(Optional<HybridStoreConfig> hybridStoreConfig, String storeName, int partitionCount) getPartitionConsumptionState
(int partitionId) protected long
getPartitionOffsetLagBasedOnMetrics
(String kafkaSourceAddress, PubSubTopic topic, int partition) getRealTimeDataSourceKafkaAddress
(PartitionConsumptionState partitionConsumptionState) abstract long
getRegionHybridOffsetLag
(int regionId) protected ReadOnlySchemaRepository
protected StoragePartitionConfig
getStoragePartitionConfig
(boolean sorted, PartitionConsumptionState partitionConsumptionState) protected StoragePartitionConfig
getStoragePartitionConfig
(PartitionConsumptionState partitionConsumptionState) protected TopicManager
getTopicManager
(String sourceKafkaServer) The function returns local or remote topic manager.protected long
getTopicPartitionEndOffSet
(String kafkaUrl, PubSubTopic pubSubTopic, int partition) protected AggVersionedDIVStats
protected AggVersionedIngestionStats
int
abstract int
boolean
boolean
hasAnyPartitionConsumptionState
(Predicate<PartitionConsumptionState> pcsPredicate) boolean
boolean
hasPendingPartitionIngestionAction
(int userPartition) boolean
protected boolean
boolean
boolean
protected abstract boolean
isHybridFollower
(PartitionConsumptionState partitionConsumptionState) boolean
boolean
boolean
boolean
isPartitionConsumingOrHasPendingIngestionAction
(int userPartition) To check whether the given partition is still consuming message from Kafkaboolean
This function is checking the following conditions: 1.protected boolean
isReadyToServe
(PartitionConsumptionState partitionConsumptionState) This function checks various conditions to verify if a store is ready to serve.boolean
protected abstract boolean
isRealTimeBufferReplayStarted
(PartitionConsumptionState partitionConsumptionState) boolean
A function to allow the service to get the current status of the task.protected boolean
isSegmentControlMsg
(ControlMessageType msgType) boolean
boolean
boolean
This is not a per record state.boolean
void
kill()
protected void
logStorageOperationWhileUnsubscribed
(int partition) For L/F hybrid stores, the leader periodically writes a special SOS message to the RT topic.protected abstract long
measureHybridOffsetLag
(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid offset lag for partition being tracked in `partitionConsumptionState`.protected long
measureLagWithCallToPubSub
(String pubSubServerName, PubSubTopic topic, int partition, long currentOffset) protected static long
measureLagWithCallToPubSub
(String pubSubServerName, PubSubTopic topic, int partition, long currentOffset, Function<String, TopicManager> topicManagerProvider) protected long
minZeroLag
(long value) Because of timing considerations, it is possible that some lag metrics could compute negative values.protected int
Apply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).protected void
processCommonConsumerAction
(ConsumerAction consumerAction) protected abstract void
processConsumerAction
(ConsumerAction message, Store store) void
processConsumerRecord
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) This function will be invoked inStoreBufferService
to process bufferedPubSubMessage
.protected void
processControlMessageForViews
(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation inLeaderFollowerStoreIngestionTask
protected void
processEndOfIncrementalPush
(ControlMessage endOfIncrementalPush, PartitionConsumptionState partitionConsumptionState) protected void
processEndOfPush
(KafkaMessageEnvelope endOfPushKME, int partition, long offset, PartitionConsumptionState partitionConsumptionState) protected void
processStartOfIncrementalPush
(ControlMessage startOfIncrementalPush, PartitionConsumptionState partitionConsumptionState) protected boolean
processTopicSwitch
(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState) protected void
produceToStoreBufferService
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumedRecord, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs) This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained byStoreBufferService
.protected void
produceToStoreBufferServiceOrKafka
(Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records, PubSubTopicPartition topicPartition, String kafkaUrl, int kafkaClusterId) This function is in charge of producing the consumer records to the writer buffers maintained byStoreBufferService
.protected void
produceToStoreBufferServiceOrKafkaInBatch
(Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records, PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState, String kafkaUrl, int kafkaClusterId) abstract void
promoteToLeader
(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) protected void
putInStorageEngine
(int partition, byte[] keyBytes, Put put) Persist Put record to storage engine.protected void
recordAssembledRecordSize
(int keyLen, ByteBuffer valueBytes, ByteBuffer rmdBytes, long currentTimeMs) Records metrics for the original size of full-assembled records (key + value) and RMD by utilizing the fieldChunkedValueManifest.size
.protected abstract void
recordAssembledRecordSizeRatio
(double ratio, long currentTimeMs) void
protected void
recordHeartbeatReceived
(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, String kafkaUrl) protected void
recordProcessedRecordStats
(PartitionConsumptionState partitionConsumptionState, int processedRecordSize) protected abstract void
recordWriterStats
(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState) protected void
removeFromStorageEngine
(int partition, byte[] keyBytes, Delete delete) void
reportError
(String message, int userPartition, Exception e) protected abstract void
reportIfCatchUpVersionTopicOffset
(PartitionConsumptionState partitionConsumptionState) Check if the ingestion progress has reached to the end of the version topic.void
resetPartitionConsumptionOffset
(PubSubTopicPartition topicPartition) Adds an asynchronous resetting partition consumption offset request for the task.protected TopicSwitch
resolveSourceKafkaServersWithinTopicSwitch
(TopicSwitch originalTopicSwitch) Applies name resolution to all Kafka URLs in the provided TopicSwitch.protected abstract void
resubscribe
(PartitionConsumptionState partitionConsumptionState) void
run()
Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread.void
void
protected void
setPartitionConsumptionState
(int partition, PartitionConsumptionState pcs) protected abstract boolean
protected boolean
shouldPersistRecord
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record, PartitionConsumptionState partitionConsumptionState) protected boolean
Common record check for different state models: check whether server continues receiving messages after EOP for a batch-only store.protected boolean
shouldUpdateUpstreamOffset
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord) Validate if the given consumerRecord has a valid upstream offset to update from.boolean
shutdownAndWait
(int waitTime) This method is a blocking call to wait forStoreIngestionTask
for fully shutdown in the given time.protected void
startConsumingAsLeader
(PartitionConsumptionState partitionConsumptionState) void
subscribePartition
(PubSubTopicPartition topicPartition) void
subscribePartition
(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction) Adds an asynchronous partition subscription request for the task.protected void
protected void
throwOrLogStorageFailureDependingIfStillSubscribed
(int partition, VeniceException e) void
unsubscribeFromTopic
(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) This method unsubscribes topic-partition from the input.unSubscribePartition
(PubSubTopicPartition topicPartition) unSubscribePartition
(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction) Adds an asynchronous partition unsubscription request for the task.protected void
protected abstract void
updateLatestInMemoryProcessedOffset
(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
.abstract void
updateLeaderTopicOnFollower
(PartitionConsumptionState partitionConsumptionState) void
updateOffsetMetadataAndSync
(String topic, int partitionId) protected void
protected abstract void
updateOffsetMetadataInOffsetRecord
(PartitionConsumptionState partitionConsumptionState) Sync the metadata about offset inOffsetRecord
.protected abstract Iterable<PubSubMessage<KafkaKey,
KafkaMessageEnvelope, Long>> validateAndFilterOutDuplicateMessagesFromLeaderTopic
(Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records, String kafkaUrl, PubSubTopicPartition topicPartition) protected void
validateMessage
(PartitionTracker.TopicType type, KafkaDataIntegrityValidator validator, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, boolean endOfPushReceived, PartitionConsumptionState partitionConsumptionState) Message validation using DIV.protected void
waitForAllMessageToBeProcessedFromTopicPartition
(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function.protected StoreVersionState
waitVersionStateAvailable
(String kafkaTopic)
-
Field Details
-
SCHEMA_POLLING_DELAY_MS
public static long SCHEMA_POLLING_DELAY_MS -
STORE_VERSION_POLLING_DELAY_MS
public static long STORE_VERSION_POLLING_DELAY_MS -
WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED
protected static final long WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED -
KILL_WAIT_TIME_MS
protected static final long KILL_WAIT_TIME_MS- See Also:
-
REDUNDANT_LOGGING_FILTER
-
storageService
storage destination for consumption -
storageEngineRepository
-
storageEngine
-
kafkaVersionTopic
Topics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety. -
versionTopic
-
realTimeTopic
-
storeName
-
versionNumber
protected final int versionNumber -
schemaRepository
-
storeRepository
-
ingestionTaskName
-
kafkaProps
-
isRunning
-
emitMetrics
-
consumerActionSequenceNumber
-
consumerActionsQueue
-
partitionToPendingConsumerActionCountMap
-
storageMetadataService
-
topicManagerRepository
-
partitionConsumptionStateMap
Per-partition consumption state map -
storeBufferService
-
hostLevelIngestionStats
-
versionedDIVStats
-
versionedIngestionStats
-
isCurrentVersion
-
hybridStoreConfig
-
divErrorMetricCallback
-
readCycleDelayMs
protected final long readCycleDelayMs -
emptyPollSleepMs
protected final long emptyPollSleepMs -
diskUsage
-
databaseSyncBytesIntervalForTransactionalMode
protected final long databaseSyncBytesIntervalForTransactionalModeMessage bytes consuming interval before persisting offset in offset db for transactional mode database. -
databaseSyncBytesIntervalForDeferredWriteMode
protected final long databaseSyncBytesIntervalForDeferredWriteModeMessage bytes consuming interval before persisting offset in offset db for deferred-write database. -
serverConfig
-
errorPartitionId
protected final int errorPartitionIdUsed for reporting error when thepartitionConsumptionStateMap
is empty -
defaultReadyToServeChecker
protected final com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheck defaultReadyToServeChecker -
availableSchemaIds
-
deserializedSchemaIds
-
idleCounter
protected int idleCounter -
aggKafkaConsumerService
-
writeComputeFailureCode
protected int writeComputeFailureCode -
isWriteComputationEnabled
protected final boolean isWriteComputationEnabled -
isSeparatedRealtimeTopicEnabled
protected final boolean isSeparatedRealtimeTopicEnabled -
partitionCount
protected final int partitionCountThis would be the number of partitions in the StorageEngine and in version topics -
storeVersionPartitionCount
protected final int storeVersionPartitionCount -
bootstrapTimeoutInMs
protected final long bootstrapTimeoutInMs -
isIsolatedIngestion
protected final boolean isIsolatedIngestion -
ingestionNotificationDispatcher
protected final com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher ingestionNotificationDispatcher -
chunkAssembler
-
localKafkaServer
-
localKafkaClusterId
protected final int localKafkaClusterId -
localKafkaServerSingletonSet
-
isDaVinciClient
protected final boolean isDaVinciClient -
isDataRecovery
protected boolean isDataRecovery -
dataRecoverySourceVersionNumber
protected int dataRecoverySourceVersionNumber -
readOnlyForBatchOnlyStoreEnabled
protected final boolean readOnlyForBatchOnlyStoreEnabled -
metaStoreWriter
-
kafkaClusterUrlResolver
-
resetErrorReplicaEnabled
protected final boolean resetErrorReplicaEnabled -
compressionStrategy
-
compressorFactory
-
compressor
-
isChunked
protected final boolean isChunked -
isRmdChunked
protected final boolean isRmdChunked -
manifestSerializer
-
pubSubTopicRepository
-
recordLevelMetricEnabled
-
isGlobalRtDivEnabled
protected final boolean isGlobalRtDivEnabled -
versionRole
-
workloadType
-
batchReportIncPushStatusEnabled
protected final boolean batchReportIncPushStatusEnabled -
parallelProcessingThreadPool
-
gracefulShutdownLatch
-
zkHelixAdmin
-
hostName
-
-
Constructor Details
-
StoreIngestionTask
public StoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeVersionConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerConfig recordTransformerConfig, Queue<VeniceNotifier> notifiers, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
-
Method Details
-
getIngestionBatchProcessor
-
getStorageEngine
-
getIngestionTaskName
-
getVersionNumber
public int getVersionNumber() -
isFutureVersion
public boolean isFutureVersion() -
throwIfNotRunning
protected void throwIfNotRunning() -
nextSeqNum
protected int nextSeqNum()Apply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).- Returns:
- an unique and increasing sequence number for a new consumer action.
-
subscribePartition
-
subscribePartition
Adds an asynchronous partition subscription request for the task. -
unSubscribePartition
-
unSubscribePartition
public CompletableFuture<Void> unSubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction) Adds an asynchronous partition unsubscription request for the task. -
dropStoragePartitionGracefully
Drops a storage partition gracefully. This is always a Helix triggered action. -
hasAnySubscription
public boolean hasAnySubscription() -
resetPartitionConsumptionOffset
Adds an asynchronous resetting partition consumption offset request for the task. -
getStoreName
-
isUserSystemStore
public boolean isUserSystemStore() -
promoteToLeader
public abstract void promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) -
demoteToStandby
public abstract void demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) -
hasPendingPartitionIngestionAction
public boolean hasPendingPartitionIngestionAction(int userPartition) -
kill
public void kill() -
getStoragePartitionConfig
protected StoragePartitionConfig getStoragePartitionConfig(PartitionConsumptionState partitionConsumptionState) -
getStoragePartitionConfig
protected StoragePartitionConfig getStoragePartitionConfig(boolean sorted, PartitionConsumptionState partitionConsumptionState) -
isHybridFollower
-
shouldCheckLeaderCompleteStateInFollower
protected abstract boolean shouldCheckLeaderCompleteStateInFollower() -
checkAndLogIfLagIsAcceptableForHybridStore
protected abstract boolean checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState partitionConsumptionState, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp) Checks whether the lag is acceptable for hybrid stores -
isReadyToServe
This function checks various conditions to verify if a store is ready to serve. Lag = (Source Max Offset - SOBR Source Offset) - (Current Offset - SOBR Destination Offset)- Returns:
- true if EOP was received and (for hybrid stores) if lag <= threshold
-
isReadyToServeAnnouncedWithRTLag
public boolean isReadyToServeAnnouncedWithRTLag() -
isRealTimeBufferReplayStarted
protected abstract boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) -
measureHybridOffsetLag
protected abstract long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) Measure the hybrid offset lag for partition being tracked in `partitionConsumptionState`. -
reportIfCatchUpVersionTopicOffset
protected abstract void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState) Check if the ingestion progress has reached to the end of the version topic. This is currently only usedLeaderFollowerStoreIngestionTask
. -
produceToStoreBufferService
protected void produceToStoreBufferService(PubSubMessage<KafkaKey, KafkaMessageEnvelope, throws InterruptedExceptionLong> consumedRecord, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs) This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained byStoreBufferService
.- Parameters:
consumedRecord
- : received consumer recordleaderProducedRecordContext
- : derived leaderProducedRecordContextpartition
-kafkaUrl
-- Throws:
InterruptedException
-
validateAndFilterOutDuplicateMessagesFromLeaderTopic
protected abstract Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope, validateAndFilterOutDuplicateMessagesFromLeaderTopicLong>> (Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records, String kafkaUrl, PubSubTopicPartition topicPartition) -
produceToStoreBufferServiceOrKafka
protected void produceToStoreBufferServiceOrKafka(Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, throws InterruptedExceptionLong>> records, PubSubTopicPartition topicPartition, String kafkaUrl, int kafkaClusterId) This function is in charge of producing the consumer records to the writer buffers maintained byStoreBufferService
. This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this call.- Parameters:
records
- : received consumer recordstopicPartition
-- Throws:
InterruptedException
-
produceToStoreBufferServiceOrKafkaInBatch
protected void produceToStoreBufferServiceOrKafkaInBatch(Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, throws InterruptedExceptionLong>> records, PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState, String kafkaUrl, int kafkaClusterId) - Throws:
InterruptedException
-
checkIngestionProgress
- Throws:
InterruptedException
-
updateIngestionRoleIfStoreChanged
- Throws:
InterruptedException
-
isIngestionTaskActive
public boolean isIngestionTaskActive() -
run
public void run()Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread. -
updateOffsetMetadataAndSyncOffset
-
closeVeniceWriters
public void closeVeniceWriters(boolean doFlush) -
closeVeniceViewWriters
protected void closeVeniceViewWriters() -
resolveSourceKafkaServersWithinTopicSwitch
Applies name resolution to all Kafka URLs in the provided TopicSwitch. Useful for translating URLs that came from a different runtime (e.g. from the controller, or from state persisted by a previous run of the same server).- Returns:
- the same TopicSwitch, mutated such that all Kafka URLs it contains are guaranteed to be usable by the current Venice server instance
-
getOffsetToOnlineLagThresholdPerPartition
protected static long getOffsetToOnlineLagThresholdPerPartition(Optional<HybridStoreConfig> hybridStoreConfig, String storeName, int partitionCount) -
processCommonConsumerAction
protected void processCommonConsumerAction(ConsumerAction consumerAction) throws InterruptedException - Throws:
InterruptedException
-
getTopicPartitionEndOffSet
- Returns:
- the end offset in kafka for the topic partition in SIT, or a negative value if it failed to get it. N.B.: The returned end offset is the last successfully replicated message plus one. If the partition has never been written to, the end offset is 0.
-
getPartitionOffsetLagBasedOnMetrics
protected long getPartitionOffsetLagBasedOnMetrics(String kafkaSourceAddress, PubSubTopic topic, int partition) -
checkLongRunningTaskState
- Throws:
InterruptedException
-
processConsumerAction
protected abstract void processConsumerAction(ConsumerAction message, Store store) throws InterruptedException - Throws:
InterruptedException
-
getConsumptionSourceKafkaAddress
protected abstract Set<String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) -
startConsumingAsLeader
-
getRealTimeDataSourceKafkaAddress
protected Set<String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) -
getPartitionConsumptionState
-
hasAnyPartitionConsumptionState
-
getFailedIngestionPartitionCount
public int getFailedIngestionPartitionCount() -
shouldProcessRecord
Common record check for different state models: check whether server continues receiving messages after EOP for a batch-only store. -
shouldPersistRecord
protected boolean shouldPersistRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record, PartitionConsumptionState partitionConsumptionState) -
processConsumerRecord
public void processConsumerRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) This function will be invoked inStoreBufferService
to process bufferedPubSubMessage
. -
recordHeartbeatReceived
protected void recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, String kafkaUrl) -
setLastConsumerException
-
setLastStoreIngestionException
-
recordChecksumVerificationFailure
public void recordChecksumVerificationFailure() -
recordAssembledRecordSize
protected void recordAssembledRecordSize(int keyLen, ByteBuffer valueBytes, ByteBuffer rmdBytes, long currentTimeMs) Records metrics for the original size of full-assembled records (key + value) and RMD by utilizing the fieldChunkedValueManifest.size
. Also records the ratio of assembled record size to maximum allowed size, which is intended to be used to alert customers about how close they are to hitting the size limit.- Parameters:
keyLen
- The size of the record's keyvalueBytes
-Put.putValue
which is expected to be a serializedChunkedValueManifest
rmdBytes
-Put.replicationMetadataPayload
which can be a serializedChunkedValueManifest
if RMD chunking was enabled or just the RMD payload otherwise
-
recordAssembledRecordSizeRatio
protected abstract void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs) -
calculateAssembledRecordSizeRatio
protected abstract double calculateAssembledRecordSizeRatio(long recordSize) -
getBatchReplicationLag
public abstract long getBatchReplicationLag() -
getLeaderOffsetLag
public abstract long getLeaderOffsetLag() -
getBatchLeaderOffsetLag
public abstract long getBatchLeaderOffsetLag() -
getHybridLeaderOffsetLag
public abstract long getHybridLeaderOffsetLag() -
measureLagWithCallToPubSub
protected long measureLagWithCallToPubSub(String pubSubServerName, PubSubTopic topic, int partition, long currentOffset) - Parameters:
pubSubServerName
- Pub Sub deployment to interrogatetopic
- topic to measurepartition
- for which to measure lag- Returns:
- the lag, or 9223372036854775807L if it failed to measure it N.B.: Note that the returned lag can be negative since the end offset used in the calculation is cached.
-
measureLagWithCallToPubSub
protected static long measureLagWithCallToPubSub(String pubSubServerName, PubSubTopic topic, int partition, long currentOffset, Function<String, TopicManager> topicManagerProvider) -
getFollowerOffsetLag
public abstract long getFollowerOffsetLag()Measure the offset lag between follower and leader -
getBatchFollowerOffsetLag
public abstract long getBatchFollowerOffsetLag() -
getHybridFollowerOffsetLag
public abstract long getHybridFollowerOffsetLag() -
getRegionHybridOffsetLag
public abstract long getRegionHybridOffsetLag(int regionId) -
getWriteComputeErrorCode
public abstract int getWriteComputeErrorCode() -
updateLeaderTopicOnFollower
public abstract void updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState) -
minZeroLag
protected long minZeroLag(long value) Because of timing considerations, it is possible that some lag metrics could compute negative values. Negative lag does not make sense so the intent is to ease interpretation by applying a lower bound of zero on these metrics... -
isHybridMode
public boolean isHybridMode() -
processEndOfPush
protected void processEndOfPush(KafkaMessageEnvelope endOfPushKME, int partition, long offset, PartitionConsumptionState partitionConsumptionState) -
processStartOfIncrementalPush
protected void processStartOfIncrementalPush(ControlMessage startOfIncrementalPush, PartitionConsumptionState partitionConsumptionState) -
processEndOfIncrementalPush
protected void processEndOfIncrementalPush(ControlMessage endOfIncrementalPush, PartitionConsumptionState partitionConsumptionState) -
processControlMessageForViews
protected void processControlMessageForViews(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation inLeaderFollowerStoreIngestionTask
-
processTopicSwitch
protected boolean processTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState) -
updateOffsetMetadataInOffsetRecord
protected abstract void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState) Sync the metadata about offset inOffsetRecord
.PartitionConsumptionState
will pass through some information toOffsetRecord
for persistence and Offset rewind/split brain has been guarded inupdateLatestInMemoryProcessedOffset(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState, com.linkedin.venice.pubsub.api.PubSubMessage<com.linkedin.venice.message.KafkaKey, com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope, java.lang.Long>, com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext, java.lang.String, boolean)
.- Parameters:
partitionConsumptionState
-
-
updateLatestInMemoryProcessedOffset
protected abstract void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset. -
recordWriterStats
protected abstract void recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState) -
validateMessage
protected void validateMessage(PartitionTracker.TopicType type, KafkaDataIntegrityValidator validator, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, boolean endOfPushReceived, PartitionConsumptionState partitionConsumptionState) Message validation using DIV. Leaders should pass in the validator instance fromLeaderFollowerStoreIngestionTask
; and drainers should pass in the validator instance fromStoreIngestionTask
1. If valid DIV errors happen after EOP is received, no fatal exceptions will be thrown. But the errors will be recorded into the DIV metrics. 2. For any DIV errors happened to unregistered producers && after EOP, the errors will be ignored. 3. For any DIV errors happened to records which is after logCompactionDelayInMs, the errors will be ignored. -
cloneProducerStates
We should only allowStoreIngestionTask
to accesskafkaDataIntegrityValidator
; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces. -
putInStorageEngine
Persist Put record to storage engine. -
removeFromStorageEngine
-
throwOrLogStorageFailureDependingIfStillSubscribed
-
logStorageOperationWhileUnsubscribed
protected void logStorageOperationWhileUnsubscribed(int partition) -
consumerHasAnySubscription
public boolean consumerHasAnySubscription() -
consumerHasSubscription
public boolean consumerHasSubscription(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) -
unsubscribeFromTopic
public void unsubscribeFromTopic(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) This method unsubscribes topic-partition from the input. If it is real-time topic and separate RT topic is enabled, it will also unsubscribe from separate real-time topic. -
consumerBatchUnsubscribe
-
consumerUnSubscribeAllTopics
public abstract void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState) -
consumerSubscribe
public void consumerSubscribe(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, long startOffset, String kafkaURL) This method will try to resolve actual topic-partition from input Kafka URL and subscribe to the resolved topic-partition. -
consumerResetOffset
public void consumerResetOffset(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState) -
waitVersionStateAvailable
protected StoreVersionState waitVersionStateAvailable(String kafkaTopic) throws InterruptedException - Throws:
InterruptedException
-
close
public void close()Stops the consumer task.- Specified by:
close
in interfaceAutoCloseable
- Specified by:
close
in interfaceCloseable
-
shutdownAndWait
public boolean shutdownAndWait(int waitTime) This method is a blocking call to wait forStoreIngestionTask
for fully shutdown in the given time.- Parameters:
waitTime
- Maximum wait time for the shutdown operation.- Returns:
- whether able to gracefully shut down within the waitTime
-
isRunning
public boolean isRunning()A function to allow the service to get the current status of the task. This would allow the service to create a new task if required. -
getVersionTopic
-
isMetricsEmissionEnabled
public boolean isMetricsEmissionEnabled() -
enableMetricsEmission
public void enableMetricsEmission() -
disableMetricsEmission
public void disableMetricsEmission() -
isPartitionConsumingOrHasPendingIngestionAction
public boolean isPartitionConsumingOrHasPendingIngestionAction(int userPartition) To check whether the given partition is still consuming message from Kafka -
createKafkaConsumerProperties
protected Properties createKafkaConsumerProperties(Properties localConsumerProps, String remoteKafkaSourceAddress, boolean consumeRemotely) Override theConfigKeys.KAFKA_BOOTSTRAP_SERVERS
config with a remote Kafka bootstrap url. -
resubscribe
protected abstract void resubscribe(PartitionConsumptionState partitionConsumptionState) throws InterruptedException - Throws:
InterruptedException
-
reportError
-
isActiveActiveReplicationEnabled
public boolean isActiveActiveReplicationEnabled() -
dumpPartitionConsumptionStates
public void dumpPartitionConsumptionStates(AdminResponse response, ComplementSet<Integer> partitions) Invoked by admin request to dump the requested partition consumption states -
dumpStoreVersionState
Invoked by admin request to dump store version state metadata. -
getServerConfig
-
updateOffsetMetadataAndSync
-
getTopicManager
The function returns local or remote topic manager.- Parameters:
sourceKafkaServer
- The address of source kafka bootstrap server.- Returns:
- topic manager
-
waitForAllMessageToBeProcessedFromTopicPartition
protected void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) throws InterruptedException The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function. This will make the calling thread to block.- Parameters:
topicPartition
- for which to wait- Throws:
InterruptedException
-
delegateConsumerRecord
protected abstract StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) -
recordProcessedRecordStats
protected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize) -
isSegmentControlMsg
-
isTransientRecordBufferUsed
public boolean isTransientRecordBufferUsed()This is not a per record state. Rather it's used to indicate if the transient record buffer is being used at all for this ingestion task or not. For L/F mode only WC ingestion task needs this buffer. -
setPartitionConsumptionState
-
getVersionedDIVStats
-
getVersionIngestionStats
-
getCompressionStrategy
-
getCompressor
-
isChunked
protected boolean isChunked() -
getSchemaRepo
-
getHostLevelIngestionStats
-
getKafkaVersionTopic
-
isStuckByMemoryConstraint
public boolean isStuckByMemoryConstraint() -
shouldUpdateUpstreamOffset
protected boolean shouldUpdateUpstreamOffset(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord) Validate if the given consumerRecord has a valid upstream offset to update from.- Parameters:
consumerRecord
-- Returns:
- true, if the record is not null and contains a valid upstream offset, otherwise false.
-
maybeSendIngestionHeartbeat
For L/F hybrid stores, the leader periodically writes a special SOS message to the RT topic. CheckLeaderFollowerStoreIngestionTask.maybeSendIngestionHeartbeat()
for more details. -
isProducingVersionTopicHealthy
public boolean isProducingVersionTopicHealthy()This function is checking the following conditions: 1. Whether the version topic exists or not. -
isCurrentVersion
public boolean isCurrentVersion() -
hasAllPartitionReportedCompleted
public boolean hasAllPartitionReportedCompleted() -
isSeparatedRealtimeTopicEnabled
public boolean isSeparatedRealtimeTopicEnabled()
-