Package com.linkedin.davinci.consumer
Class VeniceChangelogConsumerImpl<K,V>
java.lang.Object
com.linkedin.davinci.consumer.VeniceChangelogConsumerImpl<K,V>
- All Implemented Interfaces:
VeniceChangelogConsumer<K,V>
- Direct Known Subclasses:
VeniceAfterImageConsumerImpl
public class VeniceChangelogConsumerImpl<K,V>
extends Object
implements VeniceChangelogConsumer<K,V>
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprotected class -
Field Summary
FieldsModifier and TypeFieldDescriptionprotected final BasicConsumerStatsprotected final ChangelogClientConfigprotected final ChunkAssemblerprotected final CompressorFactoryprotected final VeniceConcurrentHashMap<Integer,AtomicLong> protected final longprotected final ConcurrentHashMap<Integer,Long> protected final VeniceChangelogConsumerImpl<K,V>.HeartbeatReporterThread protected final AtomicBooleanprotected final RecordDeserializer<K>protected static final VeniceCompressorprotected final Map<Integer,AtomicLong> protected final Map<Integer,AtomicLong> protected final PubSubConsumerAdapterprotected final PubSubContextprotected final PubSubMessageDeserializerprotected final PubSubPositionDeserializerprotected final VeniceConcurrentHashMap<String,VeniceCompressor> protected final PubSubTopicRepositoryprotected final ReplicationMetadataSchemaRepositoryprotected StoreDeserializerCache<org.apache.avro.generic.GenericRecord>protected final SchemaReaderprotected final ExecutorServiceprotected Classprotected final longprotected StoreDeserializerCache<V>protected final Stringprotected NativeMetadataRepositoryViewAdapterprotected longprotected final ReadWriteLockprotected final AbstractAvroChunkingAdapter<V> -
Constructor Summary
ConstructorsConstructorDescriptionVeniceChangelogConsumerImpl(ChangelogClientConfig changelogClientConfig, PubSubConsumerAdapter pubSubConsumer, PubSubMessageDeserializer pubSubMessageDeserializer, VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory) -
Method Summary
Modifier and TypeMethodDescriptionvoidclose()Release the internal resources.protected Optional<PubSubMessage<K,ChangeEvent<V>, VeniceChangeCoordinate>> convertPubSubMessageToPubSubChangeEventMessage(DefaultPubSubMessage message, PubSubTopicPartition pubSubTopicPartition) extractOffsetVectorFromMessage(int valueSchemaId, int rmdProtocolId, ByteBuffer replicationMetadataPayload) protected BasicConsumerStatsprotected ChangelogClientConfigprotected PubSubTopicprotected VeniceChangelogConsumerImpl<K,V>.HeartbeatReporterThread Returns the timestamp of the last heartbeat received for each subscribed partition.protected VeniceChangeCoordinategetLatestCoordinate(Integer partition) protected longgetNextConsumerSequenceId(int partition) intprotected List<PubSubTopicPartition>getPartitionListToSubscribe(Set<Integer> partitions, Set<PubSubTopicPartition> topicPartitionSet, PubSubTopic topic) protected Longprotected Set<PubSubTopicPartition>protected PubSubTopicPartitiongetTopicPartition(Integer partition) protected VeniceCompressorgetVersionCompressor(PubSubTopic pubSubTopic) protected booleanhandleControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, String topicSuffix, byte[] key, long timestamp) Handle control message from the given topic.protected booleanhandleVersionSwapControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, String topicSuffix, Integer upstreamPartition) protected Collection<PubSubMessage<K,ChangeEvent<V>, VeniceChangeCoordinate>> internalPoll(long timeoutInMs, String topicSuffix) protected Collection<PubSubMessage<K,ChangeEvent<V>, VeniceChangeCoordinate>> internalPoll(long timeoutInMs, String topicSuffix, boolean includeControlMessage) protected CompletableFuture<Void>internalSeek(Set<Integer> partitions, PubSubTopic targetTopic, com.linkedin.davinci.consumer.VeniceChangelogConsumerImpl.SeekFunction seekAction) internalSeekToTail(Set<Integer> partitions, String topicSuffix) internalSeekToTimestamps(Map<Integer, Long> timestamps, String topicSuffix) internalSeekToTimestamps(Map<Integer, Long> timestamps, String topicSuffix, org.apache.logging.log4j.Logger logger) protected CompletableFuture<Void>internalSubscribe(Set<Integer> partitions, PubSubTopic topic) booleanChecks whether all subscribed partitions are caught up during bootstrap.voidpause()Pause the client on all subscriptions.voidPause the client on all or subset of partitions this client is subscribed to.poll(long timeoutInMs) Polling function to get any available messages from the underlying system for all partitions subscribed.protected <T> TprocessRecordBytes(ByteBuffer decompressedBytes, T deserializedValue, byte[] key, ByteBuffer value, PubSubTopicPartition partition, int valueSchemaId, PubSubPosition recordOffset) voidresume()Pause the client on all subscriptions.voidResume the client on all or a subset of partitions this client is subscribed to and has paused.Seek to the beginning of the push for subscribed partitions.seekToBeginningOfPush(Set<Integer> partitions) Seek to the beginning of the push for a set of partitions.seekToCheckpoint(Set<VeniceChangeCoordinate> checkpoints) Seek the provided checkpoints for the specified partitions.Seek to the end of the push for all subscribed partitions.seekToEndOfPush(Set<Integer> partitions) Seek to the end of the last push for a given set of partitions.Seek to the end of events which have been transmitted to Venice for all subscribed partitions.seekToTail(Set<Integer> partitions) Seek to the end of events which have been transmitted to Venice and start consuming new events.seekToTimestamp(Long timestamp) Seek to the specified timestamp for all subscribed partitions.seekToTimestamps(Map<Integer, Long> timestamps) Seek to the provided timestamps for the specified partitions based on wall clock time for when this message was processed by Venice and produced to change capture.protected voidSubscribe a set of partitions for a store to this VeniceChangelogConsumer, with the earliest position in the topic for each partition.Subscribe all partitions belonging to a specific store, with the earliest position in the topic for each partition.protected booleanswitchToNewTopic(PubSubTopic newTopic, String topicSuffix, Integer partition) protected voidsynchronousSeek(Set<Integer> partitions, PubSubTopic targetTopic, com.linkedin.davinci.consumer.VeniceChangelogConsumerImpl.SeekFunction seekAction) protected voidsynchronousSeekToCheckpoint(Set<VeniceChangeCoordinate> checkpoints) voidunsubscribe(Set<Integer> partitions) Stop ingesting messages from a set of partitions for a specific store.voidStop ingesting messages from all partitions.
-
Field Details
-
subscribeTime
protected long subscribeTime -
subscriptionLock
-
NO_OP_COMPRESSOR
-
compressorFactory
-
pubSubTopicNameToCompressorMap
-
storeDeserializerCache
-
rmdDeserializerCache
-
specificValueClass
-
storeRepository
-
userEventChunkingAdapter
-
schemaReader
-
partitionToPutMessageCount
-
partitionToDeleteMessageCount
-
partitionToBootstrapState
-
startTimestamp
protected final long startTimestamp -
isSubscribed
-
keyDeserializer
-
replicationMetadataSchemaRepository
-
storeName
-
pubSubConsumer
-
pubSubTopicRepository
-
pubSubPositionDeserializer
-
pubSubMessageDeserializer
-
pubSubContext
-
seekExecutorService
-
currentVersionHighWatermarks
-
currentVersionLastHeartbeat
-
changelogClientConfig
-
chunkAssembler
-
changeCaptureStats
-
heartbeatReporterThread
-
consumerSequenceIdGeneratorMap
-
consumerSequenceIdStartingValue
protected final long consumerSequenceIdStartingValue
-
-
Constructor Details
-
VeniceChangelogConsumerImpl
public VeniceChangelogConsumerImpl(ChangelogClientConfig changelogClientConfig, PubSubConsumerAdapter pubSubConsumer, PubSubMessageDeserializer pubSubMessageDeserializer, VeniceChangelogConsumerClientFactory veniceChangelogConsumerClientFactory)
-
-
Method Details
-
getPartitionCount
public int getPartitionCount()- Specified by:
getPartitionCountin interfaceVeniceChangelogConsumer<K,V> - Returns:
- total number of store partitions
-
subscribe
Description copied from interface:VeniceChangelogConsumerSubscribe a set of partitions for a store to this VeniceChangelogConsumer, with the earliest position in the topic for each partition. The VeniceChangelogConsumer should try to consume messages from all partitions that are subscribed to it.- Specified by:
subscribein interfaceVeniceChangelogConsumer<K,V> - Parameters:
partitions- the set of partition to subscribe and consume- Returns:
- a future which completes when data from the partitions are ready to be consumed
-
internalSubscribe
-
getVersionCompressor
-
seekToBeginningOfPush
Description copied from interface:VeniceChangelogConsumerSeek to the beginning of the push for a set of partitions. This is analogous to doing a bootstrap of data for the consumer. This seek will ONLY seek to the beginning of the version which is currently serving data, and the consumer will switch to reading data from a new version (should one get created) once it has read up to the point in the change capture stream that indicates the version swap (which can only occur after consuming all the data in the last push). This instructs the consumer to consume data from the batch push.- Specified by:
seekToBeginningOfPushin interfaceVeniceChangelogConsumer<K,V> - Parameters:
partitions- the set of partitions to seek with- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToBeginningOfPush
Description copied from interface:VeniceChangelogConsumerSeek to the beginning of the push for subscribed partitions. SeeVeniceChangelogConsumer.seekToBeginningOfPush(Set)for more information.- Specified by:
seekToBeginningOfPushin interfaceVeniceChangelogConsumer<K,V> - Returns:
- a future which completes when the partitions are ready to be consumed data
-
seekToEndOfPush
Description copied from interface:VeniceChangelogConsumerSeek to the end of the last push for a given set of partitions. This instructs the consumer to begin consuming events which are transmitted to Venice following the last batch push.- Specified by:
seekToEndOfPushin interfaceVeniceChangelogConsumer<K,V> - Parameters:
partitions- the set of partitions to seek with- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
pause
public void pause()Description copied from interface:VeniceChangelogConsumerPause the client on all subscriptions. SeeVeniceChangelogConsumer.pause(Set)for more information.- Specified by:
pausein interfaceVeniceChangelogConsumer<K,V>
-
resume
Description copied from interface:VeniceChangelogConsumerResume the client on all or a subset of partitions this client is subscribed to and has paused.- Specified by:
resumein interfaceVeniceChangelogConsumer<K,V>
-
resume
public void resume()Description copied from interface:VeniceChangelogConsumerPause the client on all subscriptions. SeeVeniceChangelogConsumer.resume(Set)for more information.- Specified by:
resumein interfaceVeniceChangelogConsumer<K,V>
-
pause
Description copied from interface:VeniceChangelogConsumerPause the client on all or subset of partitions this client is subscribed to. Calls toVeniceChangelogConsumer.poll(long)will not return results from paused partitions untilVeniceChangelogConsumer.resume(Set)is called again later for those partitions.- Specified by:
pausein interfaceVeniceChangelogConsumer<K,V>
-
seekToEndOfPush
Description copied from interface:VeniceChangelogConsumerSeek to the end of the push for all subscribed partitions. SeeVeniceChangelogConsumer.seekToEndOfPush(Set)for more information.- Specified by:
seekToEndOfPushin interfaceVeniceChangelogConsumer<K,V> - Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToTail
Description copied from interface:VeniceChangelogConsumerSeek to the end of events which have been transmitted to Venice and start consuming new events. This will ONLY consume events transmitted via nearline and incremental push. It will not read batch push data.- Specified by:
seekToTailin interfaceVeniceChangelogConsumer<K,V> - Parameters:
partitions- the set of partitions to seek with- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
internalSeekToTail
-
getCurrentServingVersionTopic
-
seekToTail
Description copied from interface:VeniceChangelogConsumerSeek to the end of events which have been transmitted to Venice for all subscribed partitions. SeeVeniceChangelogConsumer.seekToTail(Set)for more information.- Specified by:
seekToTailin interfaceVeniceChangelogConsumer<K,V> - Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToCheckpoint
public CompletableFuture<Void> seekToCheckpoint(Set<VeniceChangeCoordinate> checkpoints) throws VeniceCoordinateOutOfRangeException Description copied from interface:VeniceChangelogConsumerSeek the provided checkpoints for the specified partitions. Note about checkpoints: Checkpoints have the following properties and should be considered: - Checkpoints are NOT comparable or valid across partitions. - Checkpoints are NOT comparable or valid across regions - Checkpoints are NOT comparable across store versions - It is not possible to determine the number of events between two checkpoints - It is possible that a checkpoint is no longer on retention. In such case, we will return an exception to the caller.- Specified by:
seekToCheckpointin interfaceVeniceChangelogConsumer<K,V> - Returns:
- a future which completes when seek has completed for all partitions
- Throws:
VeniceCoordinateOutOfRangeException- if passed checkpoint is no longer valid
-
synchronousSeekToCheckpoint
-
subscribeAll
Description copied from interface:VeniceChangelogConsumerSubscribe all partitions belonging to a specific store, with the earliest position in the topic for each partition.- Specified by:
subscribeAllin interfaceVeniceChangelogConsumer<K,V> - Returns:
- a future which completes when all partitions are ready to be consumed data
-
seekToTimestamps
Description copied from interface:VeniceChangelogConsumerSeek to the provided timestamps for the specified partitions based on wall clock time for when this message was processed by Venice and produced to change capture. Note, this API can only be used to seek on nearline data applied to the current serving version in Venice. This will not seek on data transmitted via Batch Push. If the provided timestamp is lower than the earliest timestamp on a given stream, the earliest event will be returned. THIS WILL NOT SEEK TO DATA WHICH WAS APPLIED ON A PREVIOUS VERSION. You should never seek back in time to a timestamp which is smaller than the current time - rewindTimeInSeconds configured in the hybrid settings for this Venice store. The timestamp passed to this function should be associated to timestamps processed by this interface. The timestamp returned byPubSubMessage.getPubSubMessageTime()refers to the time when Venice processed the event, and calls to this method will seek based on that sequence of events. Note: it bears no relation to timestamps provided by upstream producers when writing to Venice where a user may optionally provide a timestamp at time of producing a record.- Specified by:
seekToTimestampsin interfaceVeniceChangelogConsumer<K,V> - Parameters:
timestamps- a map keyed by a partition ID, and the timestamp checkpoints to seek for each partition.- Returns:
-
internalSeekToTimestamps
-
internalSeekToTimestamps
-
seekToTimestamp
Description copied from interface:VeniceChangelogConsumerSeek to the specified timestamp for all subscribed partitions. SeeVeniceChangelogConsumer.seekToTimestamps(Map)for more information.- Specified by:
seekToTimestampin interfaceVeniceChangelogConsumer<K,V> - Returns:
- a future which completes when the operation has succeeded for all partitions.
-
isCaughtUp
public boolean isCaughtUp()Description copied from interface:VeniceChangelogConsumerChecks whether all subscribed partitions are caught up during bootstrap. If a partition's (currentTimestamp - latestMessageTimestamp) is smaller or equal to 1 min, we consider this partition is caught up.- Specified by:
isCaughtUpin interfaceVeniceChangelogConsumer<K,V> - Returns:
- True if all subscribed partitions have caught up.
-
internalSeek
protected CompletableFuture<Void> internalSeek(Set<Integer> partitions, PubSubTopic targetTopic, com.linkedin.davinci.consumer.VeniceChangelogConsumerImpl.SeekFunction seekAction) -
synchronousSeek
protected void synchronousSeek(Set<Integer> partitions, PubSubTopic targetTopic, com.linkedin.davinci.consumer.VeniceChangelogConsumerImpl.SeekFunction seekAction) -
getPartitionListToSubscribe
protected List<PubSubTopicPartition> getPartitionListToSubscribe(Set<Integer> partitions, Set<PubSubTopicPartition> topicPartitionSet, PubSubTopic topic) -
unsubscribe
Description copied from interface:VeniceChangelogConsumerStop ingesting messages from a set of partitions for a specific store.- Specified by:
unsubscribein interfaceVeniceChangelogConsumer<K,V> - Parameters:
partitions- The set of topic partitions to unsubscribe
-
unsubscribeAll
public void unsubscribeAll()Description copied from interface:VeniceChangelogConsumerStop ingesting messages from all partitions.- Specified by:
unsubscribeAllin interfaceVeniceChangelogConsumer<K,V>
-
poll
Description copied from interface:VeniceChangelogConsumerPolling function to get any available messages from the underlying system for all partitions subscribed.- Specified by:
pollin interfaceVeniceChangelogConsumer<K,V> - Parameters:
timeoutInMs- The maximum time to block/wait in between two polling requests (must not be greater thanLong.MAX_VALUEmilliseconds)- Returns:
- a collection of messages since the last fetch for the subscribed list of topic partitions
-
internalPoll
protected Collection<PubSubMessage<K,ChangeEvent<V>, internalPollVeniceChangeCoordinate>> (long timeoutInMs, String topicSuffix, boolean includeControlMessage) -
internalPoll
protected Collection<PubSubMessage<K,ChangeEvent<V>, internalPollVeniceChangeCoordinate>> (long timeoutInMs, String topicSuffix) -
handleControlMessage
protected boolean handleControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, String topicSuffix, byte[] key, long timestamp) Handle control message from the given topic. Returns true if a topic switch should occur and records should be returned- Parameters:
controlMessage- the encountered control messagepubSubTopicPartition- the topic that this control message was concerned- Returns:
- true if a version switch happened, false if otherwise
-
processRecordBytes
protected <T> T processRecordBytes(ByteBuffer decompressedBytes, T deserializedValue, byte[] key, ByteBuffer value, PubSubTopicPartition partition, int valueSchemaId, PubSubPosition recordOffset) throws IOException - Throws:
IOException
-
convertPubSubMessageToPubSubChangeEventMessage
protected Optional<PubSubMessage<K,ChangeEvent<V>, convertPubSubMessageToPubSubChangeEventMessageVeniceChangeCoordinate>> (DefaultPubSubMessage message, PubSubTopicPartition pubSubTopicPartition) -
extractOffsetVectorFromMessage
protected List<Long> extractOffsetVectorFromMessage(int valueSchemaId, int rmdProtocolId, ByteBuffer replicationMetadataPayload) -
handleVersionSwapControlMessage
protected boolean handleVersionSwapControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, String topicSuffix, Integer upstreamPartition) -
getTopicAssignment
-
switchToNewTopic
-
close
public void close()Description copied from interface:VeniceChangelogConsumerRelease the internal resources.- Specified by:
closein interfaceVeniceChangelogConsumer<K,V>
-
setStoreRepository
-
getLatestCoordinate
-
getTopicPartition
-
getChangelogClientConfig
-
getSubscribeTime
-
getHeartbeatReporterThread
-
getChangeCaptureStats
-
getNextConsumerSequenceId
protected long getNextConsumerSequenceId(int partition) -
getLastHeartbeatPerPartition
Description copied from interface:VeniceChangelogConsumerReturns the timestamp of the last heartbeat received for each subscribed partition. Heartbeats are messages sent periodically by Venice servers to measure lag.- Specified by:
getLastHeartbeatPerPartitionin interfaceVeniceChangelogConsumer<K,V> - Returns:
- a map of partition number to the timestamp, in milliseconds, of the last heartbeat received for that partition.
-