Class PartitionConsumptionState
This class tracks everything the replica cares about: how far it's consumed from the real-time (RT) and version topics (VT), what data has been processed, and what's been committed.
This state is not durable — it's periodically checkpointed by updating the
OffsetRecord, which wraps the persisted PartitionState on disk.
Note: OffsetRecord is not persisted to disk until the flush/sync operation is called.
When the replica is the leader for the partition, RT and remote VT positions are updated by directly consuming from the corresponding topics. For followers, these positions are derived from the leader footer attached to every version topic message, or from global DIV snapshots — if global DIV is enabled.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classThis immutable class holds a association between a key and value and the source offset of the consumed message. -
Constructor Summary
ConstructorsConstructorDescriptionPartitionConsumptionState(PubSubTopicPartition partitionReplica, OffsetRecord offsetRecord, PubSubContext pubSubContext, boolean hybrid, org.apache.avro.Schema keySchema) -
Method Summary
Modifier and TypeMethodDescriptionvoidaddIncPushVersionToPendingReportList(String incPushVersion) voidvoidbooleanvoidenableKeyUrnCompressionUponStartOfPush(List<String> keyUrnFields) voidvoidlonggetDivRtCheckpointPosition(String pubSubBrokerAddress) Returns the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker.longbyte[]longgetLatestConsumedRtPosition(String pubSubBrokerAddress) Retrieves the latest real-time topic position consumed from the given upstream PubSub broker.longlongReturns the latest processed upstream version topic position.getLatestProcessedRtPosition(String pubSubBrokerAddress) Retrieves the latest real-time topic position processed by the drainer for a given upstream PubSub broker address.Returns the in-memory map of latest real-time topic positions processed by the drainer for each upstream PubSub broker address.Returns the latest processed local version topic position.final LeaderFollowerStateTypegetLeaderPosition(String pubSubBrokerAddress, boolean useCheckpointedDivRtPosition) Returns the position the leader should consume from, based on the current leader topic and whether real-time (RT) or version topic (VT) consumption is active.intlongbooleanlonglonggetTransientRecord(byte[] key) intLazy<VeniceWriter<byte[],byte[], byte[]>> booleanvoidincrementProcessedRecordSizeSinceLastSync(int recordSize) voidbooleanbooleanbooleanbooleanbooleanbooleanfinal booleanbooleanfinal booleanisHybrid()booleanbooleanbooleanbooleanisNearlineMetricsRecordingValid(long producerTimeStampInMs) check to ignore calculating latency from pubsub broker to ready to serve for messages that are getting caught up from previous pushes.booleanbooleanbooleanvoidvoidmaybeUpdateExpectedChecksum(byte[] key, Put put) Keep updating the checksum for key/value pair received from kafka PUT message.mayRemoveTransientRecord(int kafkaClusterId, PubSubPosition recordPosition, byte[] key) This operation is performed atomically to delete the record only when the provided sourceOffset matches.voidThis persists to the offsetRecord associated to this partitionConsumptionState that the ready to serve check has passed.voidvoidvoidvoidsetConsumeRemotely(boolean isConsumingRemotely) voidsetCurrentVersionSupplier(BooleanSupplier isCurrentVersion) voidsetDataRecoveryCompleted(boolean dataRecoveryCompleted) voidsetDeferredWrite(boolean deferredWrite) voidsetDivRtCheckpointPosition(String pubSubBrokerAddress, PubSubPosition divRtCheckpointPosition) Sets the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker.voidsetEndOfPushTimestamp(long endOfPushTimestamp) voidsetLastLeaderCompleteStateUpdateInMs(long lastLeaderCompleteStateUpdateInMs) voidsetLastLeaderPersistFuture(Future<Void> future) voidsetLastQueuedRecordPersistedFuture(CompletableFuture<Void> lastQueuedRecordPersistedFuture) voidsetLastVTProduceCallFuture(CompletableFuture<Void> lastVTProduceCallFuture) voidvoidsetLatestConsumedRtPosition(String pubSubBrokerAddress, PubSubPosition lastConsumedRtPosition) Updates the in-memory latest real-time topic position consumed from the given upstream PubSub broker.voidsetLatestMessageConsumedTimestampInMs(long consumedTimestampInMs) voidsetLatestPolledMessageTimestampInMs(long timestampInMs) voidsetLatestProcessedRemoteVtPosition(PubSubPosition upstreamVtPosition) Sets the latest processed upstream version topic position.voidsetLatestProcessedRtPosition(String pubSubBrokerAddress, PubSubPosition rtPosition) Updates the in-memory latest real-time topic position for the given upstream PubSub broker address.voidsetLatestProcessedVtPosition(PubSubPosition vtPosition) Sets the latest processed local version topic position.voidsetLeaderCompleteState(LeaderCompleteState leaderCompleteState) voidvoidsetLeaderGUID(GUID leaderGUID) voidsetLeaderHostId(String hostId) voidsetReadyToServeTimeLagThresholdInMs(long readyToServeTimeLagThresholdInMs) voidsetSkipKafkaMessage(boolean skipKafkaMessage) voidsetStartOfPushTimestamp(long startOfPushTimestamp) voidsetTopicSwitch(TopicSwitchWrapper topicSwitch) Update the in-memory state for TopicSwitch whenever encounter a new TopicSwitch message or after a restart.voidsetTransientRecord(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, byte[] value, int valueOffset, int valueLen, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord) voidsetTransientRecord(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord) voidsetVeniceWriterLazyRef(Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef) booleantoString()void
-
Constructor Details
-
PartitionConsumptionState
public PartitionConsumptionState(PubSubTopicPartition partitionReplica, OffsetRecord offsetRecord, PubSubContext pubSubContext, boolean hybrid, org.apache.avro.Schema keySchema)
-
-
Method Details
-
getPartition
public int getPartition() -
getLastVTProduceCallFuture
-
setLastVTProduceCallFuture
-
setCurrentVersionSupplier
-
getOffsetRecord
-
setDeferredWrite
public void setDeferredWrite(boolean deferredWrite) -
isDeferredWrite
public boolean isDeferredWrite() -
isStarted
public boolean isStarted() -
isEndOfPushReceived
public final boolean isEndOfPushReceived() -
isWaitingForReplicationLag
public boolean isWaitingForReplicationLag() -
lagHasCaughtUp
public void lagHasCaughtUp() -
isCurrentVersion
public boolean isCurrentVersion() -
hasLagCaughtUp
public boolean hasLagCaughtUp() -
isNearlineMetricsRecordingValid
public boolean isNearlineMetricsRecordingValid(long producerTimeStampInMs) check to ignore calculating latency from pubsub broker to ready to serve for messages that are getting caught up from previous pushes.- Parameters:
producerTimeStampInMs- timestamp of the message
-
isCompletionReported
public boolean isCompletionReported() -
completionReported
public void completionReported() -
isSubscribed
public boolean isSubscribed() -
unsubscribe
public void unsubscribe() -
isLatchCreated
public boolean isLatchCreated() -
setLatchCreated
public void setLatchCreated() -
isLatchReleased
public boolean isLatchReleased() -
releaseLatch
public void releaseLatch() -
errorReported
public void errorReported() -
isErrorReported
public boolean isErrorReported() -
isComplete
public boolean isComplete() -
isHybrid
public final boolean isHybrid() -
isBatchOnly
public boolean isBatchOnly() -
toString
-
getProcessedRecordSizeSinceLastSync
public long getProcessedRecordSizeSinceLastSync() -
incrementProcessedRecordSizeSinceLastSync
public void incrementProcessedRecordSizeSinceLastSync(int recordSize) -
resetProcessedRecordSizeSinceLastSync
public void resetProcessedRecordSizeSinceLastSync() -
setLeaderFollowerState
-
getLeaderFollowerState
-
setLastLeaderPersistFuture
-
getLastLeaderPersistFuture
-
getLastQueuedRecordPersistedFuture
-
setLastQueuedRecordPersistedFuture
public void setLastQueuedRecordPersistedFuture(CompletableFuture<Void> lastQueuedRecordPersistedFuture) -
setTopicSwitch
Update the in-memory state for TopicSwitch whenever encounter a new TopicSwitch message or after a restart. -
getTopicSwitch
-
setConsumeRemotely
public void setConsumeRemotely(boolean isConsumingRemotely) -
consumeRemotely
public boolean consumeRemotely() -
initializeExpectedChecksum
public void initializeExpectedChecksum() -
finalizeExpectedChecksum
public void finalizeExpectedChecksum() -
getVeniceWriterLazyRef
-
setVeniceWriterLazyRef
-
maybeUpdateExpectedChecksum
Keep updating the checksum for key/value pair received from kafka PUT message. If the checksum instance is not configured viainitializeExpectedChecksum()then do nothing. This api will keep the caller's code clean.- Parameters:
key-put-
-
resetExpectedChecksum
public void resetExpectedChecksum() -
getExpectedChecksum
public byte[] getExpectedChecksum() -
getLatestMessageConsumedTimestampInMs
public long getLatestMessageConsumedTimestampInMs() -
setLatestMessageConsumedTimestampInMs
public void setLatestMessageConsumedTimestampInMs(long consumedTimestampInMs) -
getLatestPolledMessageTimestampInMs
public long getLatestPolledMessageTimestampInMs() -
setLatestPolledMessageTimestampInMs
public void setLatestPolledMessageTimestampInMs(long timestampInMs) -
getConsumptionStartTimeInMs
public long getConsumptionStartTimeInMs() -
setTransientRecord
public void setTransientRecord(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord) -
setTransientRecord
public void setTransientRecord(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, byte[] value, int valueOffset, int valueLen, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord) -
getTransientRecord
-
mayRemoveTransientRecord
public PartitionConsumptionState.TransientRecord mayRemoveTransientRecord(int kafkaClusterId, PubSubPosition recordPosition, byte[] key) This operation is performed atomically to delete the record only when the provided sourceOffset matches.- Parameters:
kafkaClusterId-recordPosition-key-- Returns:
-
getSourceTopicPartition
-
getTransientRecordMapSize
public int getTransientRecordMapSize() -
skipKafkaMessage
public boolean skipKafkaMessage() -
setSkipKafkaMessage
public void setSkipKafkaMessage(boolean skipKafkaMessage) -
recordReadyToServeInOffsetRecord
public void recordReadyToServeInOffsetRecord()This persists to the offsetRecord associated to this partitionConsumptionState that the ready to serve check has passed. This will be persisted to disk once the offsetRecord is checkpointed, and subsequent restarts will consult this information when determining if the node should come online or not to serve traffic -
getReadyToServeInOffsetRecord
public boolean getReadyToServeInOffsetRecord() -
setLatestConsumedRtPosition
public void setLatestConsumedRtPosition(String pubSubBrokerAddress, PubSubPosition lastConsumedRtPosition) Updates the in-memory latest real-time topic position consumed from the given upstream PubSub broker. This position reflects the most recent message observed by the leader during real-time (RT) consumption and is used for tracking ingestion progress.- Parameters:
pubSubBrokerAddress- the source PubSub broker addresslastConsumedRtPosition- the latest consumed real-time topic position
-
getLatestConsumedRtPosition
Retrieves the latest real-time topic position consumed from the given upstream PubSub broker. If no position is recorded, returnsPubSubSymbolicPosition.EARLIESTas the default.- Parameters:
pubSubBrokerAddress- the source PubSub broker address- Returns:
- the latest consumed real-time topic position, or EARLIEST if not present
-
setDivRtCheckpointPosition
public void setDivRtCheckpointPosition(String pubSubBrokerAddress, PubSubPosition divRtCheckpointPosition) Sets the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker.- Parameters:
pubSubBrokerAddress- the source PubSub broker addressdivRtCheckpointPosition- the position till which DIV has been generated and persisted
-
getDivRtCheckpointPosition
Returns the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker. If no checkpoint exists for the broker,PubSubSymbolicPosition.EARLIESTis returned.- Parameters:
pubSubBrokerAddress- the source PubSub broker address- Returns:
- the checkpointed position for DIV, or EARLIEST if not found
-
setLatestProcessedVtPosition
Sets the latest processed local version topic position.- Parameters:
vtPosition- the version topic position to set
-
getLatestProcessedVtPosition
Returns the latest processed local version topic position.- Returns:
- the current version topic position
-
setLatestProcessedRemoteVtPosition
Sets the latest processed upstream version topic position.- Parameters:
upstreamVtPosition- the upstream version topic position to set
-
getLatestProcessedRemoteVtPosition
Returns the latest processed upstream version topic position.- Returns:
- the current upstream version topic position
-
setLatestProcessedRtPosition
Updates the in-memory latest real-time topic position for the given upstream PubSub broker address.- Parameters:
pubSubBrokerAddress- the source PubSub broker addressrtPosition- the latest real-time topic position to set
-
getLatestProcessedRtPosition
Retrieves the latest real-time topic position processed by the drainer for a given upstream PubSub broker address.This method first checks the in-memory state stored in
latestProcessedUpstreamRtPositions, which reflects positions already processed and tracked by the drainer. If no entry is found, or if the position isPubSubSymbolicPosition.EARLIEST, it falls back to the offset record, which is periodically flushed to disk and may contain checkpointed upstream offsets.This fallback is necessary during initial processing of
TopicSwitchcontrol messages, where upstream offsets are written only to theOffsetRecordbefore actual records are processed. In such cases, the in-memory map may still be uninitialized.- Parameters:
pubSubBrokerAddress- the source PubSub broker address whose position is being queried- Returns:
- the latest real-time topic position for the given broker address
-
getLatestProcessedRtPositions
Returns the in-memory map of latest real-time topic positions processed by the drainer for each upstream PubSub broker address.- Returns:
- a map from PubSub broker address to the latest processed real-time topic position
-
getLeaderPosition
public PubSubPosition getLeaderPosition(String pubSubBrokerAddress, boolean useCheckpointedDivRtPosition) Returns the position the leader should consume from, based on the current leader topic and whether real-time (RT) or version topic (VT) consumption is active.If the leader topic is an RT topic:
- If
useCheckpointedDivRtPositionis true, returns the last checkpointed RT position from global DIV (LCRP). - Otherwise, returns the latest processed RT position from in-memory state (or OffsetRecord if required)
If the leader topic is a version topic:
- If remote consumption is enabled, returns the latest processed remote VT position.
- Otherwise, returns the latest processed local VT position.
- Parameters:
pubSubBrokerAddress- the upstream PubSub broker addressuseCheckpointedDivRtPosition- whether to use the global DIV checkpoint (LCRP) as the RT start position- Returns:
- the position the leader should consume from
- If
-
setStartOfPushTimestamp
public void setStartOfPushTimestamp(long startOfPushTimestamp) -
getStartOfPushTimestamp
public long getStartOfPushTimestamp() -
setEndOfPushTimestamp
public void setEndOfPushTimestamp(long endOfPushTimestamp) -
getEndOfPushTimestamp
public long getEndOfPushTimestamp() -
getLatestConsumedVtPosition
-
setDataRecoveryCompleted
public void setDataRecoveryCompleted(boolean dataRecoveryCompleted) -
isDataRecoveryCompleted
public boolean isDataRecoveryCompleted() -
getLeaderGUID
-
setLeaderGUID
-
getLeaderHostId
-
setLeaderHostId
-
isLeaderCompleted
public boolean isLeaderCompleted() -
getLeaderCompleteState
-
setLeaderCompleteState
-
getLastLeaderCompleteStateUpdateInMs
public long getLastLeaderCompleteStateUpdateInMs() -
setLastLeaderCompleteStateUpdateInMs
public void setLastLeaderCompleteStateUpdateInMs(long lastLeaderCompleteStateUpdateInMs) -
getReplicaId
-
getReplicaTopicPartition
-
addIncPushVersionToPendingReportList
-
getPendingReportIncPushVersionList
-
clearPendingReportIncPushVersionList
public void clearPendingReportIncPushVersionList() -
getPubSubContext
-
getReadyToServeTimeLagThresholdInMs
public long getReadyToServeTimeLagThresholdInMs() -
setReadyToServeTimeLagThresholdInMs
public void setReadyToServeTimeLagThresholdInMs(long readyToServeTimeLagThresholdInMs) -
enableKeyUrnCompressionUponStartOfPush
-
getKeyUrnCompressionDict
-
getKeyDictCompressor
-