Class VersionSwapMessageState

java.lang.Object
com.linkedin.davinci.consumer.VersionSwapMessageState

public class VersionSwapMessageState extends Object
A class initialized to indicate that the changelog consumer is undergoing version swap. The class is also keeping various states about this version swap. This class is thread safe by having all methods that change or read a state that's mutable synchronized. However, it's important to keep in mind that race can still occur if the caller is trying to perform a sequence of events that depend on each other. In those scenarios an external lock is required. E.g. (1) getFindNewTopicCheckpointFuture() and if the future is complete call (2) getNewTopicVersionSwapCheckpoints() By the time (2) is called it's possible that the result from (1) is no longer valid. A different thread slipped in between and changed the state.
  • Constructor Details

    • VersionSwapMessageState

      public VersionSwapMessageState(VersionSwap versionSwap, int totalRegionCount, Set<PubSubTopicPartition> currentAssignment, long versionSwapStartTimestamp)
  • Method Details

    • getVersionSwapLowWatermarkPosition

      public PubSubPosition getVersionSwapLowWatermarkPosition(String topic, int partitionId)
      Get the pub sub position of the first relevant version swap message for the given partition. Null will be returned if the partition have not consumed its version swap yet. This is acceptable because different partitions could be making progress towards version swap at different pace. e.g. partition 0 consumed its version swap message already but partition 1 could still be consuming regular messages from the old version topic before encountering any version swap messages.
      Parameters:
      topic - where the version swap message originated from
      partitionId - of the version topic
      Returns:
      the pub sub position or null
    • getOldVersionTopic

      public String getOldVersionTopic()
    • getNewVersionTopic

      public String getNewVersionTopic()
    • getVersionSwapGenerationId

      public long getVersionSwapGenerationId()
    • setFindNewTopicCheckpointFuture

      public void setFindNewTopicCheckpointFuture(CompletableFuture<Void> findNewTopicCheckpointFuture)
    • getFindNewTopicCheckpointFuture

      public CompletableFuture<Void> getFindNewTopicCheckpointFuture()
    • setNewTopicVersionSwapCheckpoints

      public void setNewTopicVersionSwapCheckpoints(Map<Integer,VeniceChangeCoordinate> newTopicVersionSwapCheckpoints)
    • setNewTopicEOPCheckpoints

      public void setNewTopicEOPCheckpoints(Map<Integer,VeniceChangeCoordinate> newTopicEOPCheckpoints)
    • getNewTopicVersionSwapCheckpoints

      public Set<VeniceChangeCoordinate> getNewTopicVersionSwapCheckpoints()
    • getNewTopicCheckpointsWithEOPAsBackup

      public Set<VeniceChangeCoordinate> getNewTopicCheckpointsWithEOPAsBackup()
      Intended to be used as a backup strategy if any partition still did not complete version swap within the timeout. Remaining partitions will be resumed from EOP instead of first relevant version swap message in the new topic.
    • getAssignedPartitions

      public Set<Integer> getAssignedPartitions()
    • getIncompletePartitions

      public Set<Integer> getIncompletePartitions()
    • getVersionSwapStartTimestamp

      public long getVersionSwapStartTimestamp()
      If we have reached the timeout for the version swap, we need to forcefully seek to the new topic using the EOP positions for any remaining partitions as our backup plan which should cover a variety of edge cases (e.g. consumer is not polling fast enough, consumer starting position was in between version swaps, a region is down, etc...) In all these edge cases it's better to go to the new topic and consume a lot of duplicate messages than staying on the old topic which will eventually be deleted.
      Returns:
      the timestamp when the version swap was started.
    • isVersionSwapMessagesReceivedForAllPartitions

      public boolean isVersionSwapMessagesReceivedForAllPartitions()
      Returns:
      true if all partitions have received all the version swap messages required for this version swap event. This means we can subscribe to the new version topic and resume normal consumption from the first relevant version swap message.
    • handleVersionSwap

      public boolean handleVersionSwap(VersionSwap versionSwap, PubSubTopicPartition pubSubTopicPartition, PubSubPosition position)
      Handle the version swap message and check if all version swap messages related to this version swap event have been received for the given partition.
      Parameters:
      versionSwap - message that we just received.
      pubSubTopicPartition - where we received the version swap message.
      position - of the version swap message.
      Returns:
      true if all version swap messages related to this version swap event have been received.
    • handleUnsubscribe

      public void handleUnsubscribe(Set<Integer> partitions)
      Remove unsubscribed partitions from the ongoing version swap states.
      Parameters:
      partitions - to unsubscribe
    • isVersionSwapRelevant

      public static boolean isVersionSwapRelevant(String currentVersionTopic, String clientRegion, VersionSwap versionSwap)
      Determines if the version swap message is relevant or not based on the version topic that it was consumed from and the version swap message content.
      Parameters:
      currentVersionTopic - where the version swap message was consumed from.
      clientRegion - of the consumer.
      versionSwap - message that was consumed.
      Returns:
      if the version swap message is relevant and should be processed.