Class StoreIngestionTask
- java.lang.Object
-
- com.linkedin.davinci.kafka.consumer.StoreIngestionTask
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
,java.lang.Runnable
- Direct Known Subclasses:
LeaderFollowerStoreIngestionTask
public abstract class StoreIngestionTask extends java.lang.Object implements java.lang.Runnable, java.io.Closeable
A runnable Kafka Consumer consuming messages from all the partition assigned to current node for a Kafka Topic.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static class
StoreIngestionTask.DelegateConsumerRecordResult
This enum represents all potential results after callingdelegateConsumerRecord(PubSubMessageProcessedResultWrapper, int, String, int, long, long)
.
-
Field Summary
Fields Modifier and Type Field Description protected AggKafkaConsumerService
aggKafkaConsumerService
protected SparseConcurrentList<java.lang.Object>
availableSchemaIds
protected boolean
batchReportIncPushStatusEnabled
protected long
bootstrapTimeoutInMs
protected ChunkAssembler
chunkAssembler
protected CompressionStrategy
compressionStrategy
protected Lazy<VeniceCompressor>
compressor
protected StorageEngineBackedCompressorFactory
compressorFactory
protected java.util.concurrent.atomic.AtomicInteger
consumerActionSequenceNumber
protected java.util.concurrent.PriorityBlockingQueue<ConsumerAction>
consumerActionsQueue
protected long
databaseSyncBytesIntervalForDeferredWriteMode
Message bytes consuming interval before persisting offset in offset db for deferred-write database.protected long
databaseSyncBytesIntervalForTransactionalMode
Message bytes consuming interval before persisting offset in offset db for transactional mode database.protected int
dataRecoverySourceVersionNumber
protected com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheck
defaultReadyToServeChecker
protected SparseConcurrentList<java.lang.Object>
deserializedSchemaIds
protected DiskUsage
diskUsage
protected java.util.function.Consumer<DataValidationException>
divErrorMetricCallback
protected java.util.concurrent.atomic.AtomicBoolean
emitMetrics
protected long
emptyPollSleepMs
protected int
errorPartitionId
Used for reporting error when thepartitionConsumptionStateMap
is emptyprotected java.util.concurrent.CountDownLatch
gracefulShutdownLatch
protected HostLevelIngestionStats
hostLevelIngestionStats
protected java.lang.String
hostName
protected java.util.Optional<HybridStoreConfig>
hybridStoreConfig
protected int
idleCounter
protected com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher
ingestionNotificationDispatcher
protected java.lang.String
ingestionTaskName
protected boolean
isChunked
protected java.util.function.BooleanSupplier
isCurrentVersion
protected boolean
isDataRecovery
protected boolean
isDaVinciClient
protected boolean
isGlobalRtDivEnabled
protected boolean
isIsolatedIngestion
protected boolean
isRmdChunked
protected java.util.concurrent.atomic.AtomicBoolean
isRunning
protected boolean
isSeparatedRealtimeTopicEnabled
protected boolean
isWriteComputationEnabled
protected java.util.function.Function<java.lang.String,java.lang.String>
kafkaClusterUrlResolver
protected java.util.Properties
kafkaProps
protected java.lang.String
kafkaVersionTopic
Topics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety.protected static long
KILL_WAIT_TIME_MS
protected int
localKafkaClusterId
protected java.lang.String
localKafkaServer
protected java.util.Set<java.lang.String>
localKafkaServerSingletonSet
protected ChunkedValueManifestSerializer
manifestSerializer
protected MetaStoreWriter
metaStoreWriter
protected java.util.concurrent.ExecutorService
parallelProcessingThreadPool
protected java.util.concurrent.ConcurrentMap<java.lang.Integer,PartitionConsumptionState>
partitionConsumptionStateMap
Per-partition consumption state mapprotected int
partitionCount
This would be the number of partitions in the StorageEngine and in version topicsprotected java.util.Map<java.lang.Integer,java.util.concurrent.atomic.AtomicInteger>
partitionToPendingConsumerActionCountMap
protected PubSubTopicRepository
pubSubTopicRepository
protected long
readCycleDelayMs
protected boolean
readOnlyForBatchOnlyStoreEnabled
protected PubSubTopic
realTimeTopic
protected java.util.concurrent.atomic.AtomicBoolean
recordLevelMetricEnabled
protected static RedundantExceptionFilter
REDUNDANT_LOGGING_FILTER
protected boolean
resetErrorReplicaEnabled
static long
SCHEMA_POLLING_DELAY_MS
protected ReadOnlySchemaRepository
schemaRepository
protected VeniceServerConfig
serverConfig
protected AbstractStorageEngine
storageEngine
protected StorageEngineRepository
storageEngineRepository
protected StorageMetadataService
storageMetadataService
protected StorageService
storageService
storage destination for consumptionstatic long
STORE_VERSION_POLLING_DELAY_MS
protected AbstractStoreBufferService
storeBufferService
protected java.lang.String
storeName
protected ReadOnlyStoreRepository
storeRepository
protected int
storeVersionPartitionCount
protected TopicManagerRepository
topicManagerRepository
protected AggVersionedDIVStats
versionedDIVStats
protected AggVersionedIngestionStats
versionedIngestionStats
protected int
versionNumber
protected PartitionReplicaIngestionContext.VersionRole
versionRole
protected PubSubTopic
versionTopic
protected static long
WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED
protected PartitionReplicaIngestionContext.WorkloadType
workloadType
protected int
writeComputeFailureCode
protected Lazy<org.apache.helix.manager.zk.ZKHelixAdmin>
zkHelixAdmin
-
Constructor Summary
Constructors Constructor Description StoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, java.util.Properties kafkaConsumerProperties, java.util.function.BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, java.util.Queue<VeniceNotifier> notifiers, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
Method Summary
All Methods Static Methods Instance Methods Abstract Methods Concrete Methods Modifier and Type Method Description protected abstract double
calculateAssembledRecordSizeRatio(long recordSize)
protected abstract boolean
checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState partitionConsumptionState, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp)
Checks whether the lag is acceptable for hybrid storesprotected void
checkIngestionProgress(Store store)
protected abstract void
checkLongRunningTaskState()
protected void
cloneProducerStates(int partition, KafkaDataIntegrityValidator validator)
We should only allowStoreIngestionTask
to accesskafkaDataIntegrityValidator
; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces.void
close()
Stops the consumer task.protected void
closeVeniceViewWriters()
void
closeVeniceWriters(boolean doFlush)
void
consumerBatchUnsubscribe(java.util.Set<PubSubTopicPartition> topicPartitionSet)
boolean
consumerHasAnySubscription()
boolean
consumerHasSubscription(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState)
void
consumerResetOffset(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState)
void
consumerSubscribe(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, long startOffset, java.lang.String kafkaURL)
This method will try to resolve actual topic-partition from input Kafka URL and subscribe to the resolved topic-partition.abstract void
consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState)
protected java.util.Properties
createKafkaConsumerProperties(java.util.Properties localConsumerProps, java.lang.String remoteKafkaSourceAddress, boolean consumeRemotely)
Override theCommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
config with a remote Kafka bootstrap url.protected abstract StoreIngestionTask.DelegateConsumerRecordResult
delegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)
abstract void
demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
void
disableMetricsEmission()
java.util.concurrent.CompletableFuture<java.lang.Void>
dropStoragePartitionGracefully(PubSubTopicPartition topicPartition)
Drops a storage partition gracefully.void
dumpPartitionConsumptionStates(AdminResponse response, ComplementSet<java.lang.Integer> partitions)
Invoked by admin request to dump the requested partition consumption statesvoid
dumpStoreVersionState(AdminResponse response)
Invoked by admin request to dump store version state metadata.void
enableMetricsEmission()
abstract long
getBatchFollowerOffsetLag()
abstract long
getBatchLeaderOffsetLag()
abstract long
getBatchReplicationLag()
protected CompressionStrategy
getCompressionStrategy()
protected Lazy<VeniceCompressor>
getCompressor()
protected abstract java.util.Set<java.lang.String>
getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
int
getFailedIngestionPartitionCount()
abstract long
getFollowerOffsetLag()
Measure the offset lag between follower and leaderprotected HostLevelIngestionStats
getHostLevelIngestionStats()
abstract long
getHybridFollowerOffsetLag()
abstract long
getHybridLeaderOffsetLag()
protected abstract IngestionBatchProcessor
getIngestionBatchProcessor()
java.lang.String
getIngestionTaskName()
protected java.lang.String
getKafkaVersionTopic()
abstract long
getLeaderOffsetLag()
protected static long
getOffsetToOnlineLagThresholdPerPartition(java.util.Optional<HybridStoreConfig> hybridStoreConfig, java.lang.String storeName, int partitionCount)
PartitionConsumptionState
getPartitionConsumptionState(int partitionId)
protected long
getPartitionOffsetLagBasedOnMetrics(java.lang.String kafkaSourceAddress, PubSubTopic topic, int partition)
protected java.util.Set<java.lang.String>
getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
abstract long
getRegionHybridOffsetLag(int regionId)
protected ReadOnlySchemaRepository
getSchemaRepo()
VeniceServerConfig
getServerConfig()
AbstractStorageEngine
getStorageEngine()
protected StoragePartitionConfig
getStoragePartitionConfig(boolean sorted, PartitionConsumptionState partitionConsumptionState)
protected StoragePartitionConfig
getStoragePartitionConfig(PartitionConsumptionState partitionConsumptionState)
java.lang.String
getStoreName()
protected TopicManager
getTopicManager(java.lang.String sourceKafkaServer)
The function returns local or remote topic manager.protected long
getTopicPartitionEndOffSet(java.lang.String kafkaUrl, PubSubTopic pubSubTopic, int partition)
protected AggVersionedDIVStats
getVersionedDIVStats()
protected AggVersionedIngestionStats
getVersionIngestionStats()
int
getVersionNumber()
PubSubTopic
getVersionTopic()
abstract int
getWriteComputeErrorCode()
boolean
hasAllPartitionReportedCompleted()
boolean
hasAnyPartitionConsumptionState(java.util.function.Predicate<PartitionConsumptionState> pcsPredicate)
boolean
hasAnySubscription()
boolean
hasPendingPartitionIngestionAction(int userPartition)
boolean
isActiveActiveReplicationEnabled()
protected boolean
isChunked()
boolean
isCurrentVersion()
protected boolean
isDaVinciClient()
boolean
isFutureVersion()
protected abstract boolean
isHybridFollower(PartitionConsumptionState partitionConsumptionState)
boolean
isHybridMode()
boolean
isIngestionTaskActive()
boolean
isMetricsEmissionEnabled()
boolean
isPartitionConsumingOrHasPendingIngestionAction(int userPartition)
To check whether the given partition is still consuming message from Kafkaboolean
isProducingVersionTopicHealthy()
This function is checking the following conditions: 1.protected boolean
isReadyToServe(PartitionConsumptionState partitionConsumptionState)
This function checks various conditions to verify if a store is ready to serve.boolean
isReadyToServeAnnouncedWithRTLag()
protected abstract boolean
isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState)
boolean
isRunning()
A function to allow the service to get the current status of the task.protected boolean
isSegmentControlMsg(ControlMessageType msgType)
boolean
isSeparatedRealtimeTopicEnabled()
boolean
isStuckByMemoryConstraint()
boolean
isTransientRecordBufferUsed()
This is not a per record state.boolean
isUserSystemStore()
void
kill()
protected void
logStorageOperationWhileUnsubscribed(int partition)
protected abstract java.util.Set<java.lang.String>
maybeSendIngestionHeartbeat()
For L/F hybrid stores, the leader periodically writes a special SOS message to the RT topic.protected abstract long
measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)
Measure the hybrid offset lag for partition being tracked in `partitionConsumptionState`.protected long
measureLagWithCallToPubSub(java.lang.String pubSubServerName, PubSubTopic topic, int partition, long currentOffset)
protected static long
measureLagWithCallToPubSub(java.lang.String pubSubServerName, PubSubTopic topic, int partition, long currentOffset, java.util.function.Function<java.lang.String,TopicManager> topicManagerProvider)
protected long
minZeroLag(long value)
Because of timing considerations, it is possible that some lag metrics could compute negative values.protected int
nextSeqNum()
Apply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).protected void
processCommonConsumerAction(ConsumerAction consumerAction)
protected abstract void
processConsumerAction(ConsumerAction message, Store store)
void
processConsumerRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs)
This function will be invoked inStoreBufferService
to process bufferedPubSubMessage
.protected void
processEndOfIncrementalPush(ControlMessage endOfIncrementalPush, PartitionConsumptionState partitionConsumptionState)
protected void
processEndOfPush(KafkaMessageEnvelope endOfPushKME, int partition, long offset, PartitionConsumptionState partitionConsumptionState)
protected void
processStartOfIncrementalPush(ControlMessage startOfIncrementalPush, PartitionConsumptionState partitionConsumptionState)
protected boolean
processTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState)
protected void
processVersionSwapMessage(ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState)
This isn't really used for ingestion outside of A/A, so we NoOp here and rely on the actual implementation inActiveActiveStoreIngestionTask
protected void
produceToStoreBufferService(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumedRecord, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs)
This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained byStoreBufferService
.protected void
produceToStoreBufferServiceOrKafka(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, PubSubTopicPartition topicPartition, java.lang.String kafkaUrl, int kafkaClusterId)
This function is in charge of producing the consumer records to the writer buffers maintained byStoreBufferService
.protected void
produceToStoreBufferServiceOrKafkaInBatch(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState, java.lang.String kafkaUrl, int kafkaClusterId)
abstract void
promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
protected void
putInStorageEngine(int partition, byte[] keyBytes, Put put)
Persist Put record to storage engine.protected void
recordAssembledRecordSize(int keyLen, java.nio.ByteBuffer valueBytes, java.nio.ByteBuffer rmdBytes, long currentTimeMs)
Records metrics for the original size of full-assembled records (key + value) and RMD by utilizing the fieldChunkedValueManifest.size
.protected abstract void
recordAssembledRecordSizeRatio(double ratio, long currentTimeMs)
void
recordChecksumVerificationFailure()
protected void
recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, java.lang.String kafkaUrl)
protected void
recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize)
protected abstract void
recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState)
protected void
removeFromStorageEngine(int partition, byte[] keyBytes, Delete delete)
void
reportError(java.lang.String message, int userPartition, java.lang.Exception e)
protected abstract void
reportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState)
Check if the ingestion progress has reached to the end of the version topic.void
resetPartitionConsumptionOffset(PubSubTopicPartition topicPartition)
Adds an asynchronous resetting partition consumption offset request for the task.protected TopicSwitch
resolveSourceKafkaServersWithinTopicSwitch(TopicSwitch originalTopicSwitch)
Applies name resolution to all Kafka URLs in the provided TopicSwitch.protected abstract void
resubscribe(PartitionConsumptionState partitionConsumptionState)
void
run()
Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread.void
setLastConsumerException(java.lang.Exception e)
void
setLastStoreIngestionException(java.lang.Exception e)
protected void
setPartitionConsumptionState(int partition, PartitionConsumptionState pcs)
protected abstract boolean
shouldCheckLeaderCompleteStateInFollower()
protected boolean
shouldPersistRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record, PartitionConsumptionState partitionConsumptionState)
protected boolean
shouldProcessRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record)
Common record check for different state models: check whether server continues receiving messages after EOP for a batch-only store.protected boolean
shouldUpdateUpstreamOffset(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord)
Validate if the given consumerRecord has a valid upstream offset to update from.void
shutdownAndWait(int waitTime)
This method is a blocking call to wait forStoreIngestionTask
for fully shutdown in the given time.protected void
startConsumingAsLeader(PartitionConsumptionState partitionConsumptionState)
void
subscribePartition(PubSubTopicPartition topicPartition)
void
subscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction)
Adds an asynchronous partition subscription request for the task.protected void
throwIfNotRunning()
protected void
throwOrLogStorageFailureDependingIfStillSubscribed(int partition, VeniceException e)
void
unsubscribeFromTopic(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState)
This method unsubscribes topic-partition from the input.java.util.concurrent.CompletableFuture<java.lang.Void>
unSubscribePartition(PubSubTopicPartition topicPartition)
java.util.concurrent.CompletableFuture<java.lang.Void>
unSubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction)
Adds an asynchronous partition unsubscription request for the task.protected void
updateIngestionRoleIfStoreChanged(Store store)
protected abstract void
updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, java.lang.String kafkaUrl, boolean dryRun)
Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
.abstract void
updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState)
void
updateOffsetMetadataAndSync(java.lang.String topic, int partitionId)
protected void
updateOffsetMetadataAndSyncOffset(PartitionConsumptionState pcs)
protected abstract void
updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState)
Sync the metadata about offset inOffsetRecord
.protected abstract java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>
validateAndFilterOutDuplicateMessagesFromLeaderTopic(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, java.lang.String kafkaUrl, PubSubTopicPartition topicPartition)
protected void
validateMessage(PartitionTracker.TopicType type, KafkaDataIntegrityValidator validator, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, boolean endOfPushReceived, PartitionConsumptionState partitionConsumptionState)
Message validation using DIV.protected void
waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState)
The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function.protected StoreVersionState
waitVersionStateAvailable(java.lang.String kafkaTopic)
-
-
-
Field Detail
-
SCHEMA_POLLING_DELAY_MS
public static long SCHEMA_POLLING_DELAY_MS
-
STORE_VERSION_POLLING_DELAY_MS
public static long STORE_VERSION_POLLING_DELAY_MS
-
WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED
protected static final long WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED
-
KILL_WAIT_TIME_MS
protected static final long KILL_WAIT_TIME_MS
- See Also:
- Constant Field Values
-
REDUNDANT_LOGGING_FILTER
protected static final RedundantExceptionFilter REDUNDANT_LOGGING_FILTER
-
storageService
protected final StorageService storageService
storage destination for consumption
-
storageEngineRepository
protected final StorageEngineRepository storageEngineRepository
-
storageEngine
protected final AbstractStorageEngine storageEngine
-
kafkaVersionTopic
protected final java.lang.String kafkaVersionTopic
Topics used for this topic consumption TODO: Using a PubSubVersionTopic and PubSubRealTimeTopic extending PubSubTopic for type safety.
-
versionTopic
protected final PubSubTopic versionTopic
-
realTimeTopic
protected final PubSubTopic realTimeTopic
-
storeName
protected final java.lang.String storeName
-
versionNumber
protected final int versionNumber
-
schemaRepository
protected final ReadOnlySchemaRepository schemaRepository
-
storeRepository
protected final ReadOnlyStoreRepository storeRepository
-
ingestionTaskName
protected final java.lang.String ingestionTaskName
-
kafkaProps
protected final java.util.Properties kafkaProps
-
isRunning
protected final java.util.concurrent.atomic.AtomicBoolean isRunning
-
emitMetrics
protected final java.util.concurrent.atomic.AtomicBoolean emitMetrics
-
consumerActionSequenceNumber
protected final java.util.concurrent.atomic.AtomicInteger consumerActionSequenceNumber
-
consumerActionsQueue
protected final java.util.concurrent.PriorityBlockingQueue<ConsumerAction> consumerActionsQueue
-
partitionToPendingConsumerActionCountMap
protected final java.util.Map<java.lang.Integer,java.util.concurrent.atomic.AtomicInteger> partitionToPendingConsumerActionCountMap
-
storageMetadataService
protected final StorageMetadataService storageMetadataService
-
topicManagerRepository
protected final TopicManagerRepository topicManagerRepository
-
partitionConsumptionStateMap
protected final java.util.concurrent.ConcurrentMap<java.lang.Integer,PartitionConsumptionState> partitionConsumptionStateMap
Per-partition consumption state map
-
storeBufferService
protected final AbstractStoreBufferService storeBufferService
-
hostLevelIngestionStats
protected final HostLevelIngestionStats hostLevelIngestionStats
-
versionedDIVStats
protected final AggVersionedDIVStats versionedDIVStats
-
versionedIngestionStats
protected final AggVersionedIngestionStats versionedIngestionStats
-
isCurrentVersion
protected final java.util.function.BooleanSupplier isCurrentVersion
-
hybridStoreConfig
protected final java.util.Optional<HybridStoreConfig> hybridStoreConfig
-
divErrorMetricCallback
protected final java.util.function.Consumer<DataValidationException> divErrorMetricCallback
-
readCycleDelayMs
protected final long readCycleDelayMs
-
emptyPollSleepMs
protected final long emptyPollSleepMs
-
diskUsage
protected final DiskUsage diskUsage
-
databaseSyncBytesIntervalForTransactionalMode
protected final long databaseSyncBytesIntervalForTransactionalMode
Message bytes consuming interval before persisting offset in offset db for transactional mode database.
-
databaseSyncBytesIntervalForDeferredWriteMode
protected final long databaseSyncBytesIntervalForDeferredWriteMode
Message bytes consuming interval before persisting offset in offset db for deferred-write database.
-
serverConfig
protected final VeniceServerConfig serverConfig
-
errorPartitionId
protected final int errorPartitionId
Used for reporting error when thepartitionConsumptionStateMap
is empty
-
defaultReadyToServeChecker
protected final com.linkedin.davinci.kafka.consumer.StoreIngestionTask.ReadyToServeCheck defaultReadyToServeChecker
-
availableSchemaIds
protected final SparseConcurrentList<java.lang.Object> availableSchemaIds
-
deserializedSchemaIds
protected final SparseConcurrentList<java.lang.Object> deserializedSchemaIds
-
idleCounter
protected int idleCounter
-
aggKafkaConsumerService
protected final AggKafkaConsumerService aggKafkaConsumerService
-
writeComputeFailureCode
protected int writeComputeFailureCode
-
isWriteComputationEnabled
protected final boolean isWriteComputationEnabled
-
isSeparatedRealtimeTopicEnabled
protected final boolean isSeparatedRealtimeTopicEnabled
-
partitionCount
protected final int partitionCount
This would be the number of partitions in the StorageEngine and in version topics
-
storeVersionPartitionCount
protected final int storeVersionPartitionCount
-
bootstrapTimeoutInMs
protected final long bootstrapTimeoutInMs
-
isIsolatedIngestion
protected final boolean isIsolatedIngestion
-
ingestionNotificationDispatcher
protected final com.linkedin.davinci.kafka.consumer.IngestionNotificationDispatcher ingestionNotificationDispatcher
-
chunkAssembler
protected final ChunkAssembler chunkAssembler
-
localKafkaServer
protected final java.lang.String localKafkaServer
-
localKafkaClusterId
protected final int localKafkaClusterId
-
localKafkaServerSingletonSet
protected final java.util.Set<java.lang.String> localKafkaServerSingletonSet
-
isDaVinciClient
protected final boolean isDaVinciClient
-
isDataRecovery
protected boolean isDataRecovery
-
dataRecoverySourceVersionNumber
protected int dataRecoverySourceVersionNumber
-
readOnlyForBatchOnlyStoreEnabled
protected final boolean readOnlyForBatchOnlyStoreEnabled
-
metaStoreWriter
protected final MetaStoreWriter metaStoreWriter
-
kafkaClusterUrlResolver
protected final java.util.function.Function<java.lang.String,java.lang.String> kafkaClusterUrlResolver
-
resetErrorReplicaEnabled
protected final boolean resetErrorReplicaEnabled
-
compressionStrategy
protected final CompressionStrategy compressionStrategy
-
compressorFactory
protected final StorageEngineBackedCompressorFactory compressorFactory
-
compressor
protected final Lazy<VeniceCompressor> compressor
-
isChunked
protected final boolean isChunked
-
isRmdChunked
protected final boolean isRmdChunked
-
manifestSerializer
protected final ChunkedValueManifestSerializer manifestSerializer
-
pubSubTopicRepository
protected final PubSubTopicRepository pubSubTopicRepository
-
recordLevelMetricEnabled
protected final java.util.concurrent.atomic.AtomicBoolean recordLevelMetricEnabled
-
isGlobalRtDivEnabled
protected final boolean isGlobalRtDivEnabled
-
versionRole
protected volatile PartitionReplicaIngestionContext.VersionRole versionRole
-
workloadType
protected volatile PartitionReplicaIngestionContext.WorkloadType workloadType
-
batchReportIncPushStatusEnabled
protected final boolean batchReportIncPushStatusEnabled
-
parallelProcessingThreadPool
protected final java.util.concurrent.ExecutorService parallelProcessingThreadPool
-
gracefulShutdownLatch
protected final java.util.concurrent.CountDownLatch gracefulShutdownLatch
-
zkHelixAdmin
protected Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin
-
hostName
protected final java.lang.String hostName
-
-
Constructor Detail
-
StoreIngestionTask
public StoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, java.util.Properties kafkaConsumerProperties, java.util.function.BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, java.util.Queue<VeniceNotifier> notifiers, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
-
Method Detail
-
getIngestionBatchProcessor
protected abstract IngestionBatchProcessor getIngestionBatchProcessor()
-
getStorageEngine
public AbstractStorageEngine getStorageEngine()
-
getIngestionTaskName
public java.lang.String getIngestionTaskName()
-
getVersionNumber
public int getVersionNumber()
-
isFutureVersion
public boolean isFutureVersion()
-
throwIfNotRunning
protected void throwIfNotRunning()
-
nextSeqNum
protected int nextSeqNum()
Apply an unique and increasing sequence number for each consumer action, so if there are multiple consumer actions in the queue and they have the same priority, whichever be added first into the queue will be polled out first from the queue (FIFO).- Returns:
- an unique and increasing sequence number for a new consumer action.
-
subscribePartition
public void subscribePartition(PubSubTopicPartition topicPartition)
-
subscribePartition
public void subscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction)
Adds an asynchronous partition subscription request for the task.
-
unSubscribePartition
public java.util.concurrent.CompletableFuture<java.lang.Void> unSubscribePartition(PubSubTopicPartition topicPartition)
-
unSubscribePartition
public java.util.concurrent.CompletableFuture<java.lang.Void> unSubscribePartition(PubSubTopicPartition topicPartition, boolean isHelixTriggeredAction)
Adds an asynchronous partition unsubscription request for the task.
-
dropStoragePartitionGracefully
public java.util.concurrent.CompletableFuture<java.lang.Void> dropStoragePartitionGracefully(PubSubTopicPartition topicPartition)
Drops a storage partition gracefully. This is always a Helix triggered action.
-
hasAnySubscription
public boolean hasAnySubscription()
-
resetPartitionConsumptionOffset
public void resetPartitionConsumptionOffset(PubSubTopicPartition topicPartition)
Adds an asynchronous resetting partition consumption offset request for the task.
-
getStoreName
public java.lang.String getStoreName()
-
isUserSystemStore
public boolean isUserSystemStore()
-
promoteToLeader
public abstract void promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
-
demoteToStandby
public abstract void demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
-
hasPendingPartitionIngestionAction
public boolean hasPendingPartitionIngestionAction(int userPartition)
-
kill
public void kill()
-
getStoragePartitionConfig
protected StoragePartitionConfig getStoragePartitionConfig(PartitionConsumptionState partitionConsumptionState)
-
getStoragePartitionConfig
protected StoragePartitionConfig getStoragePartitionConfig(boolean sorted, PartitionConsumptionState partitionConsumptionState)
-
isHybridFollower
protected abstract boolean isHybridFollower(PartitionConsumptionState partitionConsumptionState)
-
shouldCheckLeaderCompleteStateInFollower
protected abstract boolean shouldCheckLeaderCompleteStateInFollower()
-
checkAndLogIfLagIsAcceptableForHybridStore
protected abstract boolean checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState partitionConsumptionState, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp)
Checks whether the lag is acceptable for hybrid stores
-
isReadyToServe
protected boolean isReadyToServe(PartitionConsumptionState partitionConsumptionState)
This function checks various conditions to verify if a store is ready to serve. Lag = (Source Max Offset - SOBR Source Offset) - (Current Offset - SOBR Destination Offset)- Returns:
- true if EOP was received and (for hybrid stores) if lag <= threshold
-
isReadyToServeAnnouncedWithRTLag
public boolean isReadyToServeAnnouncedWithRTLag()
-
isRealTimeBufferReplayStarted
protected abstract boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState)
-
measureHybridOffsetLag
protected abstract long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)
Measure the hybrid offset lag for partition being tracked in `partitionConsumptionState`.
-
reportIfCatchUpVersionTopicOffset
protected abstract void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState partitionConsumptionState)
Check if the ingestion progress has reached to the end of the version topic. This is currently only usedLeaderFollowerStoreIngestionTask
.
-
produceToStoreBufferService
protected void produceToStoreBufferService(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumedRecord, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs, long currentTimeForMetricsMs) throws java.lang.InterruptedException
This function will produce a pair of consumer record and a it's derived produced record to the writer buffers maintained byStoreBufferService
.- Parameters:
consumedRecord
- : received consumer recordleaderProducedRecordContext
- : derived leaderProducedRecordContextpartition
-kafkaUrl
-- Throws:
java.lang.InterruptedException
-
validateAndFilterOutDuplicateMessagesFromLeaderTopic
protected abstract java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> validateAndFilterOutDuplicateMessagesFromLeaderTopic(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, java.lang.String kafkaUrl, PubSubTopicPartition topicPartition)
-
produceToStoreBufferServiceOrKafka
protected void produceToStoreBufferServiceOrKafka(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, PubSubTopicPartition topicPartition, java.lang.String kafkaUrl, int kafkaClusterId) throws java.lang.InterruptedException
This function is in charge of producing the consumer records to the writer buffers maintained byStoreBufferService
. This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this call.- Parameters:
records
- : received consumer recordstopicPartition
-- Throws:
java.lang.InterruptedException
-
produceToStoreBufferServiceOrKafkaInBatch
protected void produceToStoreBufferServiceOrKafkaInBatch(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState, java.lang.String kafkaUrl, int kafkaClusterId) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
checkIngestionProgress
protected void checkIngestionProgress(Store store) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
updateIngestionRoleIfStoreChanged
protected void updateIngestionRoleIfStoreChanged(Store store) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
isIngestionTaskActive
public boolean isIngestionTaskActive()
-
run
public void run()
Polls the producer for new messages in an infinite loop by a dedicated consumer thread and processes the new messages by current thread.- Specified by:
run
in interfacejava.lang.Runnable
-
updateOffsetMetadataAndSyncOffset
protected void updateOffsetMetadataAndSyncOffset(PartitionConsumptionState pcs)
-
closeVeniceWriters
public void closeVeniceWriters(boolean doFlush)
-
closeVeniceViewWriters
protected void closeVeniceViewWriters()
-
resolveSourceKafkaServersWithinTopicSwitch
protected TopicSwitch resolveSourceKafkaServersWithinTopicSwitch(TopicSwitch originalTopicSwitch)
Applies name resolution to all Kafka URLs in the provided TopicSwitch. Useful for translating URLs that came from a different runtime (e.g. from the controller, or from state persisted by a previous run of the same server).- Returns:
- the same TopicSwitch, mutated such that all Kafka URLs it contains are guaranteed to be usable by the current Venice server instance
-
getOffsetToOnlineLagThresholdPerPartition
protected static long getOffsetToOnlineLagThresholdPerPartition(java.util.Optional<HybridStoreConfig> hybridStoreConfig, java.lang.String storeName, int partitionCount)
-
processCommonConsumerAction
protected void processCommonConsumerAction(ConsumerAction consumerAction) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
getTopicPartitionEndOffSet
protected long getTopicPartitionEndOffSet(java.lang.String kafkaUrl, PubSubTopic pubSubTopic, int partition)
- Returns:
- the end offset in kafka for the topic partition in SIT, or a negative value if it failed to get it. N.B.: The returned end offset is the last successfully replicated message plus one. If the partition has never been written to, the end offset is 0.
-
getPartitionOffsetLagBasedOnMetrics
protected long getPartitionOffsetLagBasedOnMetrics(java.lang.String kafkaSourceAddress, PubSubTopic topic, int partition)
-
checkLongRunningTaskState
protected abstract void checkLongRunningTaskState() throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
processConsumerAction
protected abstract void processConsumerAction(ConsumerAction message, Store store) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
getConsumptionSourceKafkaAddress
protected abstract java.util.Set<java.lang.String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
-
startConsumingAsLeader
protected void startConsumingAsLeader(PartitionConsumptionState partitionConsumptionState)
-
getRealTimeDataSourceKafkaAddress
protected java.util.Set<java.lang.String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
-
getPartitionConsumptionState
public PartitionConsumptionState getPartitionConsumptionState(int partitionId)
-
hasAnyPartitionConsumptionState
public boolean hasAnyPartitionConsumptionState(java.util.function.Predicate<PartitionConsumptionState> pcsPredicate)
-
getFailedIngestionPartitionCount
public int getFailedIngestionPartitionCount()
-
shouldProcessRecord
protected boolean shouldProcessRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record)
Common record check for different state models: check whether server continues receiving messages after EOP for a batch-only store.
-
shouldPersistRecord
protected boolean shouldPersistRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record, PartitionConsumptionState partitionConsumptionState)
-
processConsumerRecord
public void processConsumerRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs)
This function will be invoked inStoreBufferService
to process bufferedPubSubMessage
.
-
recordHeartbeatReceived
protected void recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, java.lang.String kafkaUrl)
-
setLastConsumerException
public void setLastConsumerException(java.lang.Exception e)
-
setLastStoreIngestionException
public void setLastStoreIngestionException(java.lang.Exception e)
-
recordChecksumVerificationFailure
public void recordChecksumVerificationFailure()
-
recordAssembledRecordSize
protected void recordAssembledRecordSize(int keyLen, java.nio.ByteBuffer valueBytes, java.nio.ByteBuffer rmdBytes, long currentTimeMs)
Records metrics for the original size of full-assembled records (key + value) and RMD by utilizing the fieldChunkedValueManifest.size
. Also records the ratio of assembled record size to maximum allowed size, which is intended to be used to alert customers about how close they are to hitting the size limit.- Parameters:
keyLen
- The size of the record's keyvalueBytes
-Put.putValue
which is expected to be a serializedChunkedValueManifest
rmdBytes
-Put.replicationMetadataPayload
which can be a serializedChunkedValueManifest
if RMD chunking was enabled or just the RMD payload otherwise
-
recordAssembledRecordSizeRatio
protected abstract void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs)
-
calculateAssembledRecordSizeRatio
protected abstract double calculateAssembledRecordSizeRatio(long recordSize)
-
getBatchReplicationLag
public abstract long getBatchReplicationLag()
-
getLeaderOffsetLag
public abstract long getLeaderOffsetLag()
-
getBatchLeaderOffsetLag
public abstract long getBatchLeaderOffsetLag()
-
getHybridLeaderOffsetLag
public abstract long getHybridLeaderOffsetLag()
-
measureLagWithCallToPubSub
protected long measureLagWithCallToPubSub(java.lang.String pubSubServerName, PubSubTopic topic, int partition, long currentOffset)
- Parameters:
pubSubServerName
- Pub Sub deployment to interrogatetopic
- topic to measurepartition
- for which to measure lag- Returns:
- the lag, or 9223372036854775807L if it failed to measure it N.B.: Note that the returned lag can be negative since the end offset used in the calculation is cached.
-
measureLagWithCallToPubSub
protected static long measureLagWithCallToPubSub(java.lang.String pubSubServerName, PubSubTopic topic, int partition, long currentOffset, java.util.function.Function<java.lang.String,TopicManager> topicManagerProvider)
-
getFollowerOffsetLag
public abstract long getFollowerOffsetLag()
Measure the offset lag between follower and leader
-
getBatchFollowerOffsetLag
public abstract long getBatchFollowerOffsetLag()
-
getHybridFollowerOffsetLag
public abstract long getHybridFollowerOffsetLag()
-
getRegionHybridOffsetLag
public abstract long getRegionHybridOffsetLag(int regionId)
-
getWriteComputeErrorCode
public abstract int getWriteComputeErrorCode()
-
updateLeaderTopicOnFollower
public abstract void updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState)
-
minZeroLag
protected long minZeroLag(long value)
Because of timing considerations, it is possible that some lag metrics could compute negative values. Negative lag does not make sense so the intent is to ease interpretation by applying a lower bound of zero on these metrics...
-
isHybridMode
public boolean isHybridMode()
-
processEndOfPush
protected void processEndOfPush(KafkaMessageEnvelope endOfPushKME, int partition, long offset, PartitionConsumptionState partitionConsumptionState)
-
processStartOfIncrementalPush
protected void processStartOfIncrementalPush(ControlMessage startOfIncrementalPush, PartitionConsumptionState partitionConsumptionState)
-
processEndOfIncrementalPush
protected void processEndOfIncrementalPush(ControlMessage endOfIncrementalPush, PartitionConsumptionState partitionConsumptionState)
-
processVersionSwapMessage
protected void processVersionSwapMessage(ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState)
This isn't really used for ingestion outside of A/A, so we NoOp here and rely on the actual implementation inActiveActiveStoreIngestionTask
-
processTopicSwitch
protected boolean processTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState)
-
updateOffsetMetadataInOffsetRecord
protected abstract void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState)
Sync the metadata about offset inOffsetRecord
.PartitionConsumptionState
will pass through some information toOffsetRecord
for persistence and Offset rewind/split brain has been guarded inupdateLatestInMemoryProcessedOffset(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState, com.linkedin.venice.pubsub.api.PubSubMessage<com.linkedin.venice.message.KafkaKey, com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope, java.lang.Long>, com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext, java.lang.String, boolean)
.- Parameters:
partitionConsumptionState
-
-
updateLatestInMemoryProcessedOffset
protected abstract void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, java.lang.String kafkaUrl, boolean dryRun)
Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset.
-
recordWriterStats
protected abstract void recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState)
-
validateMessage
protected void validateMessage(PartitionTracker.TopicType type, KafkaDataIntegrityValidator validator, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, boolean endOfPushReceived, PartitionConsumptionState partitionConsumptionState)
Message validation using DIV. Leaders should pass in the validator instance fromLeaderFollowerStoreIngestionTask
; and drainers should pass in the validator instance fromStoreIngestionTask
1. If valid DIV errors happen after EOP is received, no fatal exceptions will be thrown. But the errors will be recorded into the DIV metrics. 2. For any DIV errors happened to unregistered producers && after EOP, the errors will be ignored. 3. For any DIV errors happened to records which is after logCompactionDelayInMs, the errors will be ignored.
-
cloneProducerStates
protected void cloneProducerStates(int partition, KafkaDataIntegrityValidator validator)
We should only allowStoreIngestionTask
to accesskafkaDataIntegrityValidator
; other components like leaders in LeaderFollowerStoreIngestionTask should never access the DIV validator in drainer, because messages consumption in leader is ahead of drainer, leaders and drainers are processing messages at different paces.
-
putInStorageEngine
protected void putInStorageEngine(int partition, byte[] keyBytes, Put put)
Persist Put record to storage engine.
-
removeFromStorageEngine
protected void removeFromStorageEngine(int partition, byte[] keyBytes, Delete delete)
-
throwOrLogStorageFailureDependingIfStillSubscribed
protected void throwOrLogStorageFailureDependingIfStillSubscribed(int partition, VeniceException e)
-
logStorageOperationWhileUnsubscribed
protected void logStorageOperationWhileUnsubscribed(int partition)
-
consumerHasAnySubscription
public boolean consumerHasAnySubscription()
-
consumerHasSubscription
public boolean consumerHasSubscription(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState)
-
unsubscribeFromTopic
public void unsubscribeFromTopic(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState)
This method unsubscribes topic-partition from the input. If it is real-time topic and separate RT topic is enabled, it will also unsubscribe from separate real-time topic.
-
consumerBatchUnsubscribe
public void consumerBatchUnsubscribe(java.util.Set<PubSubTopicPartition> topicPartitionSet)
-
consumerUnSubscribeAllTopics
public abstract void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState)
-
consumerSubscribe
public void consumerSubscribe(PubSubTopic pubSubTopic, PartitionConsumptionState partitionConsumptionState, long startOffset, java.lang.String kafkaURL)
This method will try to resolve actual topic-partition from input Kafka URL and subscribe to the resolved topic-partition.
-
consumerResetOffset
public void consumerResetOffset(PubSubTopic topic, PartitionConsumptionState partitionConsumptionState)
-
waitVersionStateAvailable
protected StoreVersionState waitVersionStateAvailable(java.lang.String kafkaTopic) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
close
public void close()
Stops the consumer task.- Specified by:
close
in interfacejava.lang.AutoCloseable
- Specified by:
close
in interfacejava.io.Closeable
-
shutdownAndWait
public void shutdownAndWait(int waitTime)
This method is a blocking call to wait forStoreIngestionTask
for fully shutdown in the given time.- Parameters:
waitTime
- Maximum wait time for the shutdown operation.
-
isRunning
public boolean isRunning()
A function to allow the service to get the current status of the task. This would allow the service to create a new task if required.
-
getVersionTopic
public PubSubTopic getVersionTopic()
-
isMetricsEmissionEnabled
public boolean isMetricsEmissionEnabled()
-
enableMetricsEmission
public void enableMetricsEmission()
-
disableMetricsEmission
public void disableMetricsEmission()
-
isPartitionConsumingOrHasPendingIngestionAction
public boolean isPartitionConsumingOrHasPendingIngestionAction(int userPartition)
To check whether the given partition is still consuming message from Kafka
-
createKafkaConsumerProperties
protected java.util.Properties createKafkaConsumerProperties(java.util.Properties localConsumerProps, java.lang.String remoteKafkaSourceAddress, boolean consumeRemotely)
Override theCommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG
config with a remote Kafka bootstrap url.
-
resubscribe
protected abstract void resubscribe(PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
reportError
public void reportError(java.lang.String message, int userPartition, java.lang.Exception e)
-
isActiveActiveReplicationEnabled
public boolean isActiveActiveReplicationEnabled()
-
dumpPartitionConsumptionStates
public void dumpPartitionConsumptionStates(AdminResponse response, ComplementSet<java.lang.Integer> partitions)
Invoked by admin request to dump the requested partition consumption states
-
dumpStoreVersionState
public void dumpStoreVersionState(AdminResponse response)
Invoked by admin request to dump store version state metadata.
-
getServerConfig
public VeniceServerConfig getServerConfig()
-
updateOffsetMetadataAndSync
public void updateOffsetMetadataAndSync(java.lang.String topic, int partitionId)
-
getTopicManager
protected TopicManager getTopicManager(java.lang.String sourceKafkaServer)
The function returns local or remote topic manager.- Parameters:
sourceKafkaServer
- The address of source kafka bootstrap server.- Returns:
- topic manager
-
waitForAllMessageToBeProcessedFromTopicPartition
protected void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
The purpose of this function is to wait for the complete processing (including persistence to disk) of all the messages those were consumed from this kafka {topic, partition} prior to calling this function. This will make the calling thread to block.- Parameters:
topicPartition
- for which to wait- Throws:
java.lang.InterruptedException
-
delegateConsumerRecord
protected abstract StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)
-
recordProcessedRecordStats
protected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize)
-
isSegmentControlMsg
protected boolean isSegmentControlMsg(ControlMessageType msgType)
-
isTransientRecordBufferUsed
public boolean isTransientRecordBufferUsed()
This is not a per record state. Rather it's used to indicate if the transient record buffer is being used at all for this ingestion task or not. For L/F mode only WC ingestion task needs this buffer.
-
setPartitionConsumptionState
protected void setPartitionConsumptionState(int partition, PartitionConsumptionState pcs)
-
getVersionedDIVStats
protected AggVersionedDIVStats getVersionedDIVStats()
-
getVersionIngestionStats
protected AggVersionedIngestionStats getVersionIngestionStats()
-
getCompressionStrategy
protected CompressionStrategy getCompressionStrategy()
-
getCompressor
protected Lazy<VeniceCompressor> getCompressor()
-
isChunked
protected boolean isChunked()
-
getSchemaRepo
protected ReadOnlySchemaRepository getSchemaRepo()
-
getHostLevelIngestionStats
protected HostLevelIngestionStats getHostLevelIngestionStats()
-
getKafkaVersionTopic
protected java.lang.String getKafkaVersionTopic()
-
isStuckByMemoryConstraint
public boolean isStuckByMemoryConstraint()
-
shouldUpdateUpstreamOffset
protected boolean shouldUpdateUpstreamOffset(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord)
Validate if the given consumerRecord has a valid upstream offset to update from.- Parameters:
consumerRecord
-- Returns:
- true, if the record is not null and contains a valid upstream offset, otherwise false.
-
maybeSendIngestionHeartbeat
protected abstract java.util.Set<java.lang.String> maybeSendIngestionHeartbeat()
For L/F hybrid stores, the leader periodically writes a special SOS message to the RT topic. CheckLeaderFollowerStoreIngestionTask.maybeSendIngestionHeartbeat()
for more details.
-
isProducingVersionTopicHealthy
public boolean isProducingVersionTopicHealthy()
This function is checking the following conditions: 1. Whether the version topic exists or not.
-
isCurrentVersion
public boolean isCurrentVersion()
-
hasAllPartitionReportedCompleted
public boolean hasAllPartitionReportedCompleted()
-
isSeparatedRealtimeTopicEnabled
public boolean isSeparatedRealtimeTopicEnabled()
-
isDaVinciClient
protected boolean isDaVinciClient()
-
-