Package com.linkedin.davinci.consumer
Class VersionSwapMessageState
java.lang.Object
com.linkedin.davinci.consumer.VersionSwapMessageState
A class initialized to indicate that the changelog consumer is undergoing version swap. The class is also keeping
various states about this version swap. This class is thread safe by having all methods that change or read a state
that's mutable synchronized. However, it's important to keep in mind that race can still occur if the caller is
trying to perform a sequence of events that depend on each other. In those scenarios an external lock is required.
E.g. (1) getFindNewTopicCheckpointFuture() and if the future is complete call (2) getNewTopicVersionSwapCheckpoints()
By the time (2) is called it's possible that the result from (1) is no longer valid. A different thread slipped in
between and changed the state.
-
Constructor Summary
ConstructorsConstructorDescriptionVersionSwapMessageState(VersionSwap versionSwap, int totalRegionCount, Set<PubSubTopicPartition> currentAssignment, long versionSwapStartTimestamp) -
Method Summary
Modifier and TypeMethodDescriptionIntended to be used as a backup strategy if any partition still did not complete version swap within the timeout.longgetVersionSwapLowWatermarkPosition(String topic, int partitionId) Get the pub sub position of the first relevant version swap message for the given partition.longIf we have reached the timeout for the version swap, we need to forcefully seek to the new topic using the EOP positions for any remaining partitions as our backup plan which should cover a variety of edge cases (e.g.voidhandleUnsubscribe(Set<Integer> partitions) Remove unsubscribed partitions from the ongoing version swap states.booleanhandleVersionSwap(VersionSwap versionSwap, PubSubTopicPartition pubSubTopicPartition, PubSubPosition position) Handle the version swap message and check if all version swap messages related to this version swap event have been received for the given partition.booleanstatic booleanisVersionSwapRelevant(String currentVersionTopic, String clientRegion, VersionSwap versionSwap) Determines if the version swap message is relevant or not based on the version topic that it was consumed from and the version swap message content.voidsetFindNewTopicCheckpointFuture(CompletableFuture<Void> findNewTopicCheckpointFuture) voidsetNewTopicEOPCheckpoints(Map<Integer, VeniceChangeCoordinate> newTopicEOPCheckpoints) voidsetNewTopicVersionSwapCheckpoints(Map<Integer, VeniceChangeCoordinate> newTopicVersionSwapCheckpoints)
-
Constructor Details
-
VersionSwapMessageState
public VersionSwapMessageState(VersionSwap versionSwap, int totalRegionCount, Set<PubSubTopicPartition> currentAssignment, long versionSwapStartTimestamp)
-
-
Method Details
-
getVersionSwapLowWatermarkPosition
Get the pub sub position of the first relevant version swap message for the given partition. Null will be returned if the partition have not consumed its version swap yet. This is acceptable because different partitions could be making progress towards version swap at different pace. e.g. partition 0 consumed its version swap message already but partition 1 could still be consuming regular messages from the old version topic before encountering any version swap messages.- Parameters:
topic- where the version swap message originated frompartitionId- of the version topic- Returns:
- the pub sub position or null
-
getOldVersionTopic
-
getNewVersionTopic
-
getVersionSwapGenerationId
public long getVersionSwapGenerationId() -
setFindNewTopicCheckpointFuture
-
getFindNewTopicCheckpointFuture
-
setNewTopicVersionSwapCheckpoints
public void setNewTopicVersionSwapCheckpoints(Map<Integer, VeniceChangeCoordinate> newTopicVersionSwapCheckpoints) -
setNewTopicEOPCheckpoints
-
getNewTopicVersionSwapCheckpoints
-
getNewTopicCheckpointsWithEOPAsBackup
Intended to be used as a backup strategy if any partition still did not complete version swap within the timeout. Remaining partitions will be resumed from EOP instead of first relevant version swap message in the new topic. -
getAssignedPartitions
-
getIncompletePartitions
-
getVersionSwapStartTimestamp
public long getVersionSwapStartTimestamp()If we have reached the timeout for the version swap, we need to forcefully seek to the new topic using the EOP positions for any remaining partitions as our backup plan which should cover a variety of edge cases (e.g. consumer is not polling fast enough, consumer starting position was in between version swaps, a region is down, etc...) In all these edge cases it's better to go to the new topic and consume a lot of duplicate messages than staying on the old topic which will eventually be deleted.- Returns:
- the timestamp when the version swap was started.
-
isVersionSwapMessagesReceivedForAllPartitions
public boolean isVersionSwapMessagesReceivedForAllPartitions()- Returns:
- true if all partitions have received all the version swap messages required for this version swap event. This means we can subscribe to the new version topic and resume normal consumption from the first relevant version swap message.
-
handleVersionSwap
public boolean handleVersionSwap(VersionSwap versionSwap, PubSubTopicPartition pubSubTopicPartition, PubSubPosition position) Handle the version swap message and check if all version swap messages related to this version swap event have been received for the given partition.- Parameters:
versionSwap- message that we just received.pubSubTopicPartition- where we received the version swap message.position- of the version swap message.- Returns:
- true if all version swap messages related to this version swap event have been received.
-
handleUnsubscribe
Remove unsubscribed partitions from the ongoing version swap states.- Parameters:
partitions- to unsubscribe
-
isVersionSwapRelevant
public static boolean isVersionSwapRelevant(String currentVersionTopic, String clientRegion, VersionSwap versionSwap) Determines if the version swap message is relevant or not based on the version topic that it was consumed from and the version swap message content.- Parameters:
currentVersionTopic- where the version swap message was consumed from.clientRegion- of the consumer.versionSwap- message that was consumed.- Returns:
- if the version swap message is relevant and should be processed.
-