Class PartitionConsumptionState

java.lang.Object
com.linkedin.davinci.kafka.consumer.PartitionConsumptionState

public class PartitionConsumptionState extends Object
In-memory state that represents a replica's view of partition consumption.

This class tracks everything the replica cares about: how far it's consumed from the real-time (RT) and version topics (VT), what data has been processed, and what's been committed.

This state is not durable — it's periodically checkpointed by updating the OffsetRecord, which wraps the persisted PartitionState on disk. Note: OffsetRecord is not persisted to disk until the flush/sync operation is called.

When the replica is the leader for the partition, RT and remote VT positions are updated by directly consuming from the corresponding topics. For followers, these positions are derived from the leader footer attached to every version topic message, or from global DIV snapshots — if global DIV is enabled.

  • Constructor Details

    • PartitionConsumptionState

      public PartitionConsumptionState(String replicaId, int partition, OffsetRecord offsetRecord, PubSubContext pubSubContext, boolean hybrid)
  • Method Details

    • getPartition

      public int getPartition()
    • getLastVTProduceCallFuture

      public CompletableFuture<Void> getLastVTProduceCallFuture()
    • setLastVTProduceCallFuture

      public void setLastVTProduceCallFuture(CompletableFuture<Void> lastVTProduceCallFuture)
    • setCurrentVersionSupplier

      public void setCurrentVersionSupplier(BooleanSupplier isCurrentVersion)
    • getOffsetRecord

      public OffsetRecord getOffsetRecord()
    • setDeferredWrite

      public void setDeferredWrite(boolean deferredWrite)
    • isDeferredWrite

      public boolean isDeferredWrite()
    • isStarted

      public boolean isStarted()
    • isEndOfPushReceived

      public final boolean isEndOfPushReceived()
    • isWaitingForReplicationLag

      public boolean isWaitingForReplicationLag()
    • lagHasCaughtUp

      public void lagHasCaughtUp()
    • isCurrentVersion

      public boolean isCurrentVersion()
    • hasLagCaughtUp

      public boolean hasLagCaughtUp()
    • isNearlineMetricsRecordingValid

      public boolean isNearlineMetricsRecordingValid(long producerTimeStampInMs)
      check to ignore calculating latency from pubsub broker to ready to serve for messages that are getting caught up from previous pushes.
      Parameters:
      producerTimeStampInMs - timestamp of the message
    • isCompletionReported

      public boolean isCompletionReported()
    • completionReported

      public void completionReported()
    • isSubscribed

      public boolean isSubscribed()
    • unsubscribe

      public void unsubscribe()
    • isLatchCreated

      public boolean isLatchCreated()
    • setLatchCreated

      public void setLatchCreated()
    • isLatchReleased

      public boolean isLatchReleased()
    • releaseLatch

      public void releaseLatch()
    • errorReported

      public void errorReported()
    • isErrorReported

      public boolean isErrorReported()
    • isComplete

      public boolean isComplete()
    • isHybrid

      public final boolean isHybrid()
    • isBatchOnly

      public boolean isBatchOnly()
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • getProcessedRecordSizeSinceLastSync

      public long getProcessedRecordSizeSinceLastSync()
    • incrementProcessedRecordSizeSinceLastSync

      public void incrementProcessedRecordSizeSinceLastSync(int recordSize)
    • resetProcessedRecordSizeSinceLastSync

      public void resetProcessedRecordSizeSinceLastSync()
    • setLeaderFollowerState

      public void setLeaderFollowerState(LeaderFollowerStateType state)
    • getLeaderFollowerState

      public final LeaderFollowerStateType getLeaderFollowerState()
    • setLastLeaderPersistFuture

      public void setLastLeaderPersistFuture(Future<Void> future)
    • getLastLeaderPersistFuture

      public Future<Void> getLastLeaderPersistFuture()
    • getLastQueuedRecordPersistedFuture

      public CompletableFuture<Void> getLastQueuedRecordPersistedFuture()
    • setLastQueuedRecordPersistedFuture

      public void setLastQueuedRecordPersistedFuture(CompletableFuture<Void> lastQueuedRecordPersistedFuture)
    • setTopicSwitch

      public void setTopicSwitch(TopicSwitchWrapper topicSwitch)
      Update the in-memory state for TopicSwitch whenever encounter a new TopicSwitch message or after a restart.
    • getTopicSwitch

      public TopicSwitchWrapper getTopicSwitch()
    • setConsumeRemotely

      public void setConsumeRemotely(boolean isConsumingRemotely)
    • consumeRemotely

      public boolean consumeRemotely()
    • initializeExpectedChecksum

      public void initializeExpectedChecksum()
    • finalizeExpectedChecksum

      public void finalizeExpectedChecksum()
    • getVeniceWriterLazyRef

      public Lazy<VeniceWriter<byte[],byte[],byte[]>> getVeniceWriterLazyRef()
    • setVeniceWriterLazyRef

      public void setVeniceWriterLazyRef(Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterLazyRef)
    • maybeUpdateExpectedChecksum

      public void maybeUpdateExpectedChecksum(byte[] key, Put put)
      Keep updating the checksum for key/value pair received from kafka PUT message. If the checksum instance is not configured via initializeExpectedChecksum() then do nothing. This api will keep the caller's code clean.
      Parameters:
      key -
      put -
    • resetExpectedChecksum

      public void resetExpectedChecksum()
    • getExpectedChecksum

      public byte[] getExpectedChecksum()
    • getLatestMessageConsumedTimestampInMs

      public long getLatestMessageConsumedTimestampInMs()
    • setLatestMessageConsumedTimestampInMs

      public void setLatestMessageConsumedTimestampInMs(long consumedTimestampInMs)
    • getLatestPolledMessageTimestampInMs

      public long getLatestPolledMessageTimestampInMs()
    • setLatestPolledMessageTimestampInMs

      public void setLatestPolledMessageTimestampInMs(long timestampInMs)
    • getConsumptionStartTimeInMs

      public long getConsumptionStartTimeInMs()
    • setTransientRecord

      public void setTransientRecord(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord)
    • setTransientRecord

      public void setTransientRecord(int kafkaClusterId, PubSubPosition consumedPosition, byte[] key, byte[] value, int valueOffset, int valueLen, int valueSchemaId, org.apache.avro.generic.GenericRecord replicationMetadataRecord)
    • getTransientRecord

      public PartitionConsumptionState.TransientRecord getTransientRecord(byte[] key)
    • mayRemoveTransientRecord

      public PartitionConsumptionState.TransientRecord mayRemoveTransientRecord(int kafkaClusterId, PubSubPosition recordPosition, byte[] key)
      This operation is performed atomically to delete the record only when the provided sourceOffset matches.
      Parameters:
      kafkaClusterId -
      recordPosition -
      key -
      Returns:
    • getSourceTopicPartition

      public PubSubTopicPartition getSourceTopicPartition(PubSubTopic topic)
    • getTransientRecordMapSize

      public int getTransientRecordMapSize()
    • skipKafkaMessage

      public boolean skipKafkaMessage()
    • setSkipKafkaMessage

      public void setSkipKafkaMessage(boolean skipKafkaMessage)
    • recordReadyToServeInOffsetRecord

      public void recordReadyToServeInOffsetRecord()
      This persists to the offsetRecord associated to this partitionConsumptionState that the ready to serve check has passed. This will be persisted to disk once the offsetRecord is checkpointed, and subsequent restarts will consult this information when determining if the node should come online or not to serve traffic
    • getReadyToServeInOffsetRecord

      public boolean getReadyToServeInOffsetRecord()
    • setLatestConsumedRtPosition

      public void setLatestConsumedRtPosition(String pubSubBrokerAddress, PubSubPosition lastConsumedRtPosition)
      Updates the in-memory latest real-time topic position consumed from the given upstream PubSub broker. This position reflects the most recent message observed by the leader during real-time (RT) consumption and is used for tracking ingestion progress.
      Parameters:
      pubSubBrokerAddress - the source PubSub broker address
      lastConsumedRtPosition - the latest consumed real-time topic position
    • getLatestConsumedRtPosition

      public PubSubPosition getLatestConsumedRtPosition(String pubSubBrokerAddress)
      Retrieves the latest real-time topic position consumed from the given upstream PubSub broker. If no position is recorded, returns PubSubSymbolicPosition.EARLIEST as the default.
      Parameters:
      pubSubBrokerAddress - the source PubSub broker address
      Returns:
      the latest consumed real-time topic position, or EARLIEST if not present
    • setDivRtCheckpointPosition

      public void setDivRtCheckpointPosition(String pubSubBrokerAddress, PubSubPosition divRtCheckpointPosition)
      Sets the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker.
      Parameters:
      pubSubBrokerAddress - the source PubSub broker address
      divRtCheckpointPosition - the position till which DIV has been generated and persisted
    • getDivRtCheckpointPosition

      public PubSubPosition getDivRtCheckpointPosition(String pubSubBrokerAddress)
      Returns the real-time topic position till which Data Integrity Validation (DIV) has been checkpointed for the given upstream broker. If no checkpoint exists for the broker, PubSubSymbolicPosition.EARLIEST is returned.
      Parameters:
      pubSubBrokerAddress - the source PubSub broker address
      Returns:
      the checkpointed position for DIV, or EARLIEST if not found
    • setLatestProcessedVtPosition

      public void setLatestProcessedVtPosition(PubSubPosition vtPosition)
      Sets the latest processed local version topic position.
      Parameters:
      vtPosition - the version topic position to set
    • getLatestProcessedVtPosition

      public PubSubPosition getLatestProcessedVtPosition()
      Returns the latest processed local version topic position.
      Returns:
      the current version topic position
    • setLatestProcessedRemoteVtPosition

      public void setLatestProcessedRemoteVtPosition(PubSubPosition upstreamVtPosition)
      Sets the latest processed upstream version topic position.
      Parameters:
      upstreamVtPosition - the upstream version topic position to set
    • getLatestProcessedRemoteVtPosition

      public PubSubPosition getLatestProcessedRemoteVtPosition()
      Returns the latest processed upstream version topic position.
      Returns:
      the current upstream version topic position
    • setLatestProcessedRtPosition

      public void setLatestProcessedRtPosition(String pubSubBrokerAddress, PubSubPosition rtPosition)
      Updates the in-memory latest real-time topic position for the given upstream PubSub broker address.
      Parameters:
      pubSubBrokerAddress - the source PubSub broker address
      rtPosition - the latest real-time topic position to set
    • getLatestProcessedRtPosition

      public PubSubPosition getLatestProcessedRtPosition(String pubSubBrokerAddress)
      Retrieves the latest real-time topic position processed by the drainer for a given upstream PubSub broker address.

      This method first checks the in-memory state stored in latestProcessedUpstreamRtPositions, which reflects positions already processed and tracked by the drainer. If no entry is found, or if the position is PubSubSymbolicPosition.EARLIEST, it falls back to the offset record, which is periodically flushed to disk and may contain checkpointed upstream offsets.

      This fallback is necessary during initial processing of TopicSwitch control messages, where upstream offsets are written only to the OffsetRecord before actual records are processed. In such cases, the in-memory map may still be uninitialized.

      Parameters:
      pubSubBrokerAddress - the source PubSub broker address whose position is being queried
      Returns:
      the latest real-time topic position for the given broker address
    • getLatestProcessedRtPositions

      public Map<String,PubSubPosition> getLatestProcessedRtPositions()
      Returns the in-memory map of latest real-time topic positions processed by the drainer for each upstream PubSub broker address.
      Returns:
      a map from PubSub broker address to the latest processed real-time topic position
    • getLeaderPosition

      public PubSubPosition getLeaderPosition(String pubSubBrokerAddress, boolean useCheckpointedDivRtPosition)
      Returns the position the leader should consume from, based on the current leader topic and whether real-time (RT) or version topic (VT) consumption is active.

      If the leader topic is an RT topic:

      • If useCheckpointedDivRtPosition is true, returns the last checkpointed RT position from global DIV (LCRO).
      • Otherwise, returns the latest processed RT position from in-memory state (or OffsetRecord if required)

      If the leader topic is a version topic:

      • If remote consumption is enabled, returns the latest processed remote VT position.
      • Otherwise, returns the latest processed local VT position.
      Parameters:
      pubSubBrokerAddress - the upstream PubSub broker address
      useCheckpointedDivRtPosition - whether to use the global DIV checkpoint (LCRO) as the RT start position
      Returns:
      the position the leader should consume from
    • setStartOfPushTimestamp

      public void setStartOfPushTimestamp(long startOfPushTimestamp)
    • getStartOfPushTimestamp

      public long getStartOfPushTimestamp()
    • setEndOfPushTimestamp

      public void setEndOfPushTimestamp(long endOfPushTimestamp)
    • getEndOfPushTimestamp

      public long getEndOfPushTimestamp()
    • getLatestConsumedVtPosition

      public PubSubPosition getLatestConsumedVtPosition()
    • setDataRecoveryCompleted

      public void setDataRecoveryCompleted(boolean dataRecoveryCompleted)
    • isDataRecoveryCompleted

      public boolean isDataRecoveryCompleted()
    • getLeaderGUID

      public GUID getLeaderGUID()
    • setLeaderGUID

      public void setLeaderGUID(GUID leaderGUID)
    • getLeaderHostId

      public String getLeaderHostId()
    • setLeaderHostId

      public void setLeaderHostId(String hostId)
    • isLeaderCompleted

      public boolean isLeaderCompleted()
    • getLeaderCompleteState

      public LeaderCompleteState getLeaderCompleteState()
    • setLeaderCompleteState

      public void setLeaderCompleteState(LeaderCompleteState leaderCompleteState)
    • getLastLeaderCompleteStateUpdateInMs

      public long getLastLeaderCompleteStateUpdateInMs()
    • setLastLeaderCompleteStateUpdateInMs

      public void setLastLeaderCompleteStateUpdateInMs(long lastLeaderCompleteStateUpdateInMs)
    • getReplicaId

      public String getReplicaId()
    • addIncPushVersionToPendingReportList

      public void addIncPushVersionToPendingReportList(String incPushVersion)
    • getPendingReportIncPushVersionList

      public List<String> getPendingReportIncPushVersionList()
    • clearPendingReportIncPushVersionList

      public void clearPendingReportIncPushVersionList()
    • getPubSubContext

      public PubSubContext getPubSubContext()
    • getReadyToServeTimeLagThresholdInMs

      public long getReadyToServeTimeLagThresholdInMs()
    • setReadyToServeTimeLagThresholdInMs

      public void setReadyToServeTimeLagThresholdInMs(long readyToServeTimeLagThresholdInMs)