Class PartitionConsumptionState
This class tracks everything the replica cares about: how far it's consumed from the real-time (RT) and version topics (VT), what data has been processed, and what's been committed.
This state is not durable — it's periodically checkpointed by updating the
OffsetRecord
, which wraps the persisted PartitionState
on disk.
Note: OffsetRecord is not persisted to disk until the flush/sync operation is called.
When the replica is the leader for the partition, RT and remote VT positions are updated by directly consuming from the corresponding topics. For followers, these positions are derived from the leader footer attached to every version topic message, or from global DIV snapshots — if global DIV is enabled.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic class
This immutable class holds a association between a key and value and the source offset of the consumed message. -
Constructor Summary
ConstructorsConstructorDescriptionPartitionConsumptionState
(String replicaId, int partition, OffsetRecord offsetRecord, PubSubContext pubSubContext, boolean hybrid) -
Method Summary
Modifier and TypeMethodDescriptionvoid
addIncPushVersionToPendingReportList
(String incPushVersion) void
void
boolean
void
void
long
getDivRtCheckpointPosition
(String pubSubBrokerAddress) Returns the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker.long
byte[]
long
getLatestConsumedRtPosition
(String pubSubBrokerAddress) Retrieves the latest real-time topic position consumed from the given upstream PubSub broker.long
long
Returns the latest processed upstream version topic position.getLatestProcessedRtPosition
(String pubSubBrokerAddress) Retrieves the latest real-time topic position processed by the drainer for a given upstream PubSub broker address.Returns the in-memory map of latest real-time topic positions processed by the drainer for each upstream PubSub broker address.Returns the latest processed local version topic position.final LeaderFollowerStateType
getLeaderPosition
(String pubSubBrokerAddress, boolean useCheckpointedDivRtPosition) Returns the position the leader should consume from, based on the current leader topic and whether real-time (RT) or version topic (VT) consumption is active.int
long
boolean
long
long
getTransientRecord
(byte[] key) int
Lazy<VeniceWriter<byte[],
byte[], byte[]>> boolean
void
incrementProcessedRecordSizeSinceLastSync
(int recordSize) void
boolean
boolean
boolean
boolean
boolean
boolean
final boolean
boolean
final boolean
isHybrid()
boolean
boolean
boolean
boolean
isNearlineMetricsRecordingValid
(long producerTimeStampInMs) check to ignore calculating latency from pubsub broker to ready to serve for messages that are getting caught up from previous pushes.boolean
boolean
boolean
void
void
maybeUpdateExpectedChecksum
(byte[] key, Put put) Keep updating the checksum for key/value pair received from kafka PUT message.mayRemoveTransientRecord
(int kafkaClusterId, PubSubPosition recordPosition, byte[] key) This operation is performed atomically to delete the record only when the provided sourceOffset matches.void
This persists to the offsetRecord associated to this partitionConsumptionState that the ready to serve check has passed.void
void
void
void
setConsumeRemotely
(boolean isConsumingRemotely) void
setCurrentVersionSupplier
(BooleanSupplier isCurrentVersion) void
setDataRecoveryCompleted
(boolean dataRecoveryCompleted) void
setDeferredWrite
(boolean deferredWrite) void
setDivRtCheckpointPosition
(String pubSubBrokerAddress, PubSubPosition divRtCheckpointPosition) Sets the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker.void
setEndOfPushTimestamp
(long endOfPushTimestamp) void
setLastLeaderCompleteStateUpdateInMs
(long lastLeaderCompleteStateUpdateInMs) void
setLastLeaderPersistFuture
(Future<Void> future) void
setLastQueuedRecordPersistedFuture
(CompletableFuture<Void> lastQueuedRecordPersistedFuture) void
setLastVTProduceCallFuture
(CompletableFuture<Void> lastVTProduceCallFuture) void
void
setLatestConsumedRtPosition
(String pubSubBrokerAddress, PubSubPosition lastConsumedRtPosition) Updates the in-memory latest real-time topic position consumed from the given upstream PubSub broker.void
setLatestMessageConsumedTimestampInMs
(long consumedTimestampInMs) void
setLatestPolledMessageTimestampInMs
(long timestampInMs) void
setLatestProcessedRemoteVtPosition
(PubSubPosition upstreamVtPosition) Sets the latest processed upstream version topic position.void
setLatestProcessedRtPosition
(String pubSubBrokerAddress, PubSubPosition rtPosition) Updates the in-memory latest real-time topic position for the given upstream PubSub broker address.void
setLatestProcessedVtPosition
(PubSubPosition vtPosition) Sets the latest processed local version topic position.void
setLeaderCompleteState
(LeaderCompleteState leaderCompleteState) void
void
setLeaderGUID
(GUID leaderGUID) void
setLeaderHostId
(String hostId) void
setReadyToServeTimeLagThresholdInMs
(long readyToServeTimeLagThresholdInMs) void
setSkipKafkaMessage
(boolean skipKafkaMessage) void
setStartOfPushTimestamp
(long startOfPushTimestamp) void
setTopicSwitch
(TopicSwitchWrapper topicSwitch) Update the in-memory state for TopicSwitch whenever encounter a new TopicSwitch message or after a restart.void
setTransientRecord
(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, byte[] value, int valueOffset, int valueLen, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord) void
setTransientRecord
(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord) void
setVeniceWriterLazyRef
(Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef) boolean
toString()
void
-
Constructor Details
-
PartitionConsumptionState
public PartitionConsumptionState(String replicaId, int partition, OffsetRecord offsetRecord, PubSubContext pubSubContext, boolean hybrid)
-
-
Method Details
-
getPartition
public int getPartition() -
getLastVTProduceCallFuture
-
setLastVTProduceCallFuture
-
setCurrentVersionSupplier
-
getOffsetRecord
-
setDeferredWrite
public void setDeferredWrite(boolean deferredWrite) -
isDeferredWrite
public boolean isDeferredWrite() -
isStarted
public boolean isStarted() -
isEndOfPushReceived
public final boolean isEndOfPushReceived() -
isWaitingForReplicationLag
public boolean isWaitingForReplicationLag() -
lagHasCaughtUp
public void lagHasCaughtUp() -
isCurrentVersion
public boolean isCurrentVersion() -
hasLagCaughtUp
public boolean hasLagCaughtUp() -
isNearlineMetricsRecordingValid
public boolean isNearlineMetricsRecordingValid(long producerTimeStampInMs) check to ignore calculating latency from pubsub broker to ready to serve for messages that are getting caught up from previous pushes.- Parameters:
producerTimeStampInMs
- timestamp of the message
-
isCompletionReported
public boolean isCompletionReported() -
completionReported
public void completionReported() -
isSubscribed
public boolean isSubscribed() -
unsubscribe
public void unsubscribe() -
isLatchCreated
public boolean isLatchCreated() -
setLatchCreated
public void setLatchCreated() -
isLatchReleased
public boolean isLatchReleased() -
releaseLatch
public void releaseLatch() -
errorReported
public void errorReported() -
isErrorReported
public boolean isErrorReported() -
isComplete
public boolean isComplete() -
isHybrid
public final boolean isHybrid() -
isBatchOnly
public boolean isBatchOnly() -
toString
-
getProcessedRecordSizeSinceLastSync
public long getProcessedRecordSizeSinceLastSync() -
incrementProcessedRecordSizeSinceLastSync
public void incrementProcessedRecordSizeSinceLastSync(int recordSize) -
resetProcessedRecordSizeSinceLastSync
public void resetProcessedRecordSizeSinceLastSync() -
setLeaderFollowerState
-
getLeaderFollowerState
-
setLastLeaderPersistFuture
-
getLastLeaderPersistFuture
-
getLastQueuedRecordPersistedFuture
-
setLastQueuedRecordPersistedFuture
public void setLastQueuedRecordPersistedFuture(CompletableFuture<Void> lastQueuedRecordPersistedFuture) -
setTopicSwitch
Update the in-memory state for TopicSwitch whenever encounter a new TopicSwitch message or after a restart. -
getTopicSwitch
-
setConsumeRemotely
public void setConsumeRemotely(boolean isConsumingRemotely) -
consumeRemotely
public boolean consumeRemotely() -
initializeExpectedChecksum
public void initializeExpectedChecksum() -
finalizeExpectedChecksum
public void finalizeExpectedChecksum() -
getVeniceWriterLazyRef
-
setVeniceWriterLazyRef
-
maybeUpdateExpectedChecksum
Keep updating the checksum for key/value pair received from kafka PUT message. If the checksum instance is not configured viainitializeExpectedChecksum()
then do nothing. This api will keep the caller's code clean.- Parameters:
key
-put
-
-
resetExpectedChecksum
public void resetExpectedChecksum() -
getExpectedChecksum
public byte[] getExpectedChecksum() -
getLatestMessageConsumedTimestampInMs
public long getLatestMessageConsumedTimestampInMs() -
setLatestMessageConsumedTimestampInMs
public void setLatestMessageConsumedTimestampInMs(long consumedTimestampInMs) -
getLatestPolledMessageTimestampInMs
public long getLatestPolledMessageTimestampInMs() -
setLatestPolledMessageTimestampInMs
public void setLatestPolledMessageTimestampInMs(long timestampInMs) -
getConsumptionStartTimeInMs
public long getConsumptionStartTimeInMs() -
setTransientRecord
public void setTransientRecord(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord) -
setTransientRecord
public void setTransientRecord(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, byte[] value, int valueOffset, int valueLen, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord) -
getTransientRecord
-
mayRemoveTransientRecord
public PartitionConsumptionState.TransientRecord mayRemoveTransientRecord(int kafkaClusterId, PubSubPosition recordPosition, byte[] key) This operation is performed atomically to delete the record only when the provided sourceOffset matches.- Parameters:
kafkaClusterId
-recordPosition
-key
-- Returns:
-
getSourceTopicPartition
-
getTransientRecordMapSize
public int getTransientRecordMapSize() -
skipKafkaMessage
public boolean skipKafkaMessage() -
setSkipKafkaMessage
public void setSkipKafkaMessage(boolean skipKafkaMessage) -
recordReadyToServeInOffsetRecord
public void recordReadyToServeInOffsetRecord()This persists to the offsetRecord associated to this partitionConsumptionState that the ready to serve check has passed. This will be persisted to disk once the offsetRecord is checkpointed, and subsequent restarts will consult this information when determining if the node should come online or not to serve traffic -
getReadyToServeInOffsetRecord
public boolean getReadyToServeInOffsetRecord() -
setLatestConsumedRtPosition
public void setLatestConsumedRtPosition(String pubSubBrokerAddress, PubSubPosition lastConsumedRtPosition) Updates the in-memory latest real-time topic position consumed from the given upstream PubSub broker. This position reflects the most recent message observed by the leader during real-time (RT) consumption and is used for tracking ingestion progress.- Parameters:
pubSubBrokerAddress
- the source PubSub broker addresslastConsumedRtPosition
- the latest consumed real-time topic position
-
getLatestConsumedRtPosition
Retrieves the latest real-time topic position consumed from the given upstream PubSub broker. If no position is recorded, returnsPubSubSymbolicPosition.EARLIEST
as the default.- Parameters:
pubSubBrokerAddress
- the source PubSub broker address- Returns:
- the latest consumed real-time topic position, or EARLIEST if not present
-
setDivRtCheckpointPosition
public void setDivRtCheckpointPosition(String pubSubBrokerAddress, PubSubPosition divRtCheckpointPosition) Sets the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker.- Parameters:
pubSubBrokerAddress
- the source PubSub broker addressdivRtCheckpointPosition
- the position till which DIV has been generated and persisted
-
getDivRtCheckpointPosition
Returns the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker. If no checkpoint exists for the broker,PubSubSymbolicPosition.EARLIEST
is returned.- Parameters:
pubSubBrokerAddress
- the source PubSub broker address- Returns:
- the checkpointed position for DIV, or EARLIEST if not found
-
setLatestProcessedVtPosition
Sets the latest processed local version topic position.- Parameters:
vtPosition
- the version topic position to set
-
getLatestProcessedVtPosition
Returns the latest processed local version topic position.- Returns:
- the current version topic position
-
setLatestProcessedRemoteVtPosition
Sets the latest processed upstream version topic position.- Parameters:
upstreamVtPosition
- the upstream version topic position to set
-
getLatestProcessedRemoteVtPosition
Returns the latest processed upstream version topic position.- Returns:
- the current upstream version topic position
-
setLatestProcessedRtPosition
Updates the in-memory latest real-time topic position for the given upstream PubSub broker address.- Parameters:
pubSubBrokerAddress
- the source PubSub broker addressrtPosition
- the latest real-time topic position to set
-
getLatestProcessedRtPosition
Retrieves the latest real-time topic position processed by the drainer for a given upstream PubSub broker address.This method first checks the in-memory state stored in
latestProcessedUpstreamRtPositions
, which reflects positions already processed and tracked by the drainer. If no entry is found, or if the position isPubSubSymbolicPosition.EARLIEST
, it falls back to the offset record, which is periodically flushed to disk and may contain checkpointed upstream offsets.This fallback is necessary during initial processing of
TopicSwitch
control messages, where upstream offsets are written only to theOffsetRecord
before actual records are processed. In such cases, the in-memory map may still be uninitialized.- Parameters:
pubSubBrokerAddress
- the source PubSub broker address whose position is being queried- Returns:
- the latest real-time topic position for the given broker address
-
getLatestProcessedRtPositions
Returns the in-memory map of latest real-time topic positions processed by the drainer for each upstream PubSub broker address.- Returns:
- a map from PubSub broker address to the latest processed real-time topic position
-
getLeaderPosition
public PubSubPosition getLeaderPosition(String pubSubBrokerAddress, boolean useCheckpointedDivRtPosition) Returns the position the leader should consume from, based on the current leader topic and whether real-time (RT) or version topic (VT) consumption is active.If the leader topic is an RT topic:
- If
useCheckpointedDivRtPosition
is true, returns the last checkpointed RT position from global DIV (LCRO). - Otherwise, returns the latest processed RT position from in-memory state (or OffsetRecord if required)
If the leader topic is a version topic:
- If remote consumption is enabled, returns the latest processed remote VT position.
- Otherwise, returns the latest processed local VT position.
- Parameters:
pubSubBrokerAddress
- the upstream PubSub broker addressuseCheckpointedDivRtPosition
- whether to use the global DIV checkpoint (LCRO) as the RT start position- Returns:
- the position the leader should consume from
- If
-
setStartOfPushTimestamp
public void setStartOfPushTimestamp(long startOfPushTimestamp) -
getStartOfPushTimestamp
public long getStartOfPushTimestamp() -
setEndOfPushTimestamp
public void setEndOfPushTimestamp(long endOfPushTimestamp) -
getEndOfPushTimestamp
public long getEndOfPushTimestamp() -
getLatestConsumedVtPosition
-
setDataRecoveryCompleted
public void setDataRecoveryCompleted(boolean dataRecoveryCompleted) -
isDataRecoveryCompleted
public boolean isDataRecoveryCompleted() -
getLeaderGUID
-
setLeaderGUID
-
getLeaderHostId
-
setLeaderHostId
-
isLeaderCompleted
public boolean isLeaderCompleted() -
getLeaderCompleteState
-
setLeaderCompleteState
-
getLastLeaderCompleteStateUpdateInMs
public long getLastLeaderCompleteStateUpdateInMs() -
setLastLeaderCompleteStateUpdateInMs
public void setLastLeaderCompleteStateUpdateInMs(long lastLeaderCompleteStateUpdateInMs) -
getReplicaId
-
addIncPushVersionToPendingReportList
-
getPendingReportIncPushVersionList
-
clearPendingReportIncPushVersionList
public void clearPendingReportIncPushVersionList() -
getPubSubContext
-
getReadyToServeTimeLagThresholdInMs
public long getReadyToServeTimeLagThresholdInMs() -
setReadyToServeTimeLagThresholdInMs
public void setReadyToServeTimeLagThresholdInMs(long readyToServeTimeLagThresholdInMs)
-