Package com.linkedin.davinci.consumer
Class VeniceChangelogConsumerImpl<K,V>
- java.lang.Object
-
- com.linkedin.davinci.consumer.VeniceChangelogConsumerImpl<K,V>
-
- All Implemented Interfaces:
VeniceChangelogConsumer<K,V>
- Direct Known Subclasses:
VeniceAfterImageConsumerImpl
public class VeniceChangelogConsumerImpl<K,V> extends java.lang.Object implements VeniceChangelogConsumer<K,V>
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected class
VeniceChangelogConsumerImpl.HeartbeatReporterThread
-
Field Summary
-
Constructor Summary
Constructors Constructor Description VeniceChangelogConsumerImpl(ChangelogClientConfig changelogClientConfig, PubSubConsumerAdapter pubSubConsumer)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
close()
Release the internal resources.protected java.util.Optional<PubSubMessage<K,ChangeEvent<V>,VeniceChangeCoordinate>>
convertPubSubMessageToPubSubChangeEventMessage(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> message, PubSubTopicPartition pubSubTopicPartition)
protected java.util.List<java.lang.Long>
extractOffsetVectorFromMessage(int valueSchemaId, int rmdProtocolId, java.nio.ByteBuffer replicationMetadataPayload)
protected BasicConsumerStats
getChangeCaptureStats()
protected ChangelogClientConfig
getChangelogClientConfig()
protected VeniceChangelogConsumerImpl.HeartbeatReporterThread
getHeartbeatReporterThread()
protected VeniceChangeCoordinate
getLatestCoordinate(java.lang.Integer partition)
int
getPartitionCount()
protected PubSubConsumerAdapter
getPubSubConsumer()
protected java.lang.Long
getSubscribeTime()
protected java.util.Set<PubSubTopicPartition>
getTopicAssignment()
protected PubSubTopicPartition
getTopicPartition(java.lang.Integer partition)
protected VeniceCompressor
getVersionCompressor(PubSubTopicPartition topicPartition)
protected boolean
handleControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, java.lang.String topicSuffix, byte[] key, long timestamp)
Handle control message from the given topic.protected boolean
handleVersionSwapControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, java.lang.String topicSuffix)
protected java.util.Collection<PubSubMessage<K,ChangeEvent<V>,VeniceChangeCoordinate>>
internalPoll(long timeoutInMs, java.lang.String topicSuffix)
protected java.util.Collection<PubSubMessage<K,ChangeEvent<V>,VeniceChangeCoordinate>>
internalPoll(long timeoutInMs, java.lang.String topicSuffix, boolean includeControlMessage)
protected java.util.concurrent.CompletableFuture<java.lang.Void>
internalSeek(java.util.Set<java.lang.Integer> partitions, PubSubTopic targetTopic, com.linkedin.davinci.consumer.VeniceChangelogConsumerImpl.SeekFunction seekAction)
java.util.concurrent.CompletableFuture<java.lang.Void>
internalSeekToTail(java.util.Set<java.lang.Integer> partitions, java.lang.String topicSuffix)
java.util.concurrent.CompletableFuture<java.lang.Void>
internalSeekToTimestamps(java.util.Map<java.lang.Integer,java.lang.Long> timestamps, java.lang.String topicSuffix)
protected java.util.concurrent.CompletableFuture<java.lang.Void>
internalSubscribe(java.util.Set<java.lang.Integer> partitions, PubSubTopic topic)
boolean
isCaughtUp()
Checks whether all subscribed partitions are caught up during bootstrap.void
pause()
Pause the client on all subscriptions.void
pause(java.util.Set<java.lang.Integer> partitions)
Pause the client on all or subset of partitions this client is subscribed to.java.util.Collection<PubSubMessage<K,ChangeEvent<V>,VeniceChangeCoordinate>>
poll(long timeoutInMs)
Polling function to get any available messages from the underlying system for all partitions subscribed.protected <T> T
processRecordBytes(java.nio.ByteBuffer decompressedBytes, T deserializedValue, byte[] key, java.nio.ByteBuffer value, PubSubTopicPartition partition, int valueSchemaId, long recordOffset)
void
resume()
Pause the client on all subscriptions.void
resume(java.util.Set<java.lang.Integer> partitions)
Resume the client on all or subset of partitions this client is subscribed to and has paused.java.util.concurrent.CompletableFuture<java.lang.Void>
seekToBeginningOfPush()
Seek to the beginning of the push for subscribed partitions.java.util.concurrent.CompletableFuture<java.lang.Void>
seekToBeginningOfPush(java.util.Set<java.lang.Integer> partitions)
Seek to the beginning of the push for a set of partitions.java.util.concurrent.CompletableFuture<java.lang.Void>
seekToCheckpoint(java.util.Set<VeniceChangeCoordinate> checkpoints)
Seek the provided checkpoints for the specified partitions.java.util.concurrent.CompletableFuture<java.lang.Void>
seekToEndOfPush()
Seek to the end of the push for all subscribed partitions.java.util.concurrent.CompletableFuture<java.lang.Void>
seekToEndOfPush(java.util.Set<java.lang.Integer> partitions)
Seek to the end of the last push for a given set of partitions.java.util.concurrent.CompletableFuture<java.lang.Void>
seekToTail()
Seek to the end of events which have been transmitted to Venice for all subscribed partitions.java.util.concurrent.CompletableFuture<java.lang.Void>
seekToTail(java.util.Set<java.lang.Integer> partitions)
Seek to the end of events which have been transmitted to Venice and start consuming new events.java.util.concurrent.CompletableFuture<java.lang.Void>
seekToTimestamp(java.lang.Long timestamp)
Seek to the specified timestamp for all subscribed partitions.java.util.concurrent.CompletableFuture<java.lang.Void>
seekToTimestamps(java.util.Map<java.lang.Integer,java.lang.Long> timestamps)
Seek to the provided timestamps for the specified partitions based on wall clock time for when this message was processed by Venice and produced to change capture.protected void
setStoreRepository(ThinClientMetaStoreBasedRepository repository)
java.util.concurrent.CompletableFuture<java.lang.Void>
subscribe(java.util.Set<java.lang.Integer> partitions)
Subscribe a set of partitions for a store to this VeniceChangelogConsumer.java.util.concurrent.CompletableFuture<java.lang.Void>
subscribeAll()
Subscribe all partitions belonging to a specific store.protected boolean
switchToNewTopic(PubSubTopic newTopic, java.lang.String topicSuffix, java.lang.Integer partition)
void
unsubscribe(java.util.Set<java.lang.Integer> partitions)
Stop ingesting messages from a set of partitions for a specific store.void
unsubscribeAll()
Stop ingesting messages from all partitions.
-
-
-
Field Detail
-
partitionCount
protected final int partitionCount
-
subscribeTime
protected long subscribeTime
-
NO_OP_COMPRESSOR
protected static final VeniceCompressor NO_OP_COMPRESSOR
-
compressorFactory
protected final CompressorFactory compressorFactory
-
compressorMap
protected final java.util.HashMap<java.lang.Integer,VeniceCompressor> compressorMap
-
storeDeserializerCache
protected StoreDeserializerCache<V> storeDeserializerCache
-
rmdDeserializerCache
protected StoreDeserializerCache<org.apache.avro.generic.GenericRecord> rmdDeserializerCache
-
specificValueClass
protected java.lang.Class specificValueClass
-
storeRepository
protected ThinClientMetaStoreBasedRepository storeRepository
-
userEventChunkingAdapter
protected final AbstractAvroChunkingAdapter<V> userEventChunkingAdapter
-
schemaReader
protected final SchemaReader schemaReader
-
pubSubTopicRepository
protected final PubSubTopicRepository pubSubTopicRepository
-
partitionToPutMessageCount
protected final java.util.Map<java.lang.Integer,java.util.concurrent.atomic.AtomicLong> partitionToPutMessageCount
-
partitionToDeleteMessageCount
protected final java.util.Map<java.lang.Integer,java.util.concurrent.atomic.AtomicLong> partitionToDeleteMessageCount
-
partitionToBootstrapState
protected final java.util.Map<java.lang.Integer,java.lang.Boolean> partitionToBootstrapState
-
startTimestamp
protected final long startTimestamp
-
keyDeserializer
protected final RecordDeserializer<K> keyDeserializer
-
replicationMetadataSchemaRepository
protected final ReplicationMetadataSchemaRepository replicationMetadataSchemaRepository
-
storeName
protected final java.lang.String storeName
-
pubSubConsumer
protected final PubSubConsumerAdapter pubSubConsumer
-
currentVersionHighWatermarks
protected final java.util.Map<java.lang.Integer,java.util.List<java.lang.Long>> currentVersionHighWatermarks
-
currentVersionLastHeartbeat
protected final java.util.Map<java.lang.Integer,java.lang.Long> currentVersionLastHeartbeat
-
currentValuePayloadSize
protected final int[] currentValuePayloadSize
-
changelogClientConfig
protected final ChangelogClientConfig changelogClientConfig
-
chunkAssembler
protected final ChunkAssembler chunkAssembler
-
changeCaptureStats
protected final BasicConsumerStats changeCaptureStats
-
heartbeatReporterThread
protected final VeniceChangelogConsumerImpl.HeartbeatReporterThread heartbeatReporterThread
-
-
Constructor Detail
-
VeniceChangelogConsumerImpl
public VeniceChangelogConsumerImpl(ChangelogClientConfig changelogClientConfig, PubSubConsumerAdapter pubSubConsumer)
-
-
Method Detail
-
getPartitionCount
public int getPartitionCount()
- Specified by:
getPartitionCount
in interfaceVeniceChangelogConsumer<K,V>
- Returns:
- total number of store partitions
-
subscribe
public java.util.concurrent.CompletableFuture<java.lang.Void> subscribe(java.util.Set<java.lang.Integer> partitions)
Description copied from interface:VeniceChangelogConsumer
Subscribe a set of partitions for a store to this VeniceChangelogConsumer. The VeniceChangelogConsumer should try to consume messages from all partitions that are subscribed to it.- Specified by:
subscribe
in interfaceVeniceChangelogConsumer<K,V>
- Parameters:
partitions
- the set of partition to subscribe and consume- Returns:
- a future which completes when the partitions are ready to be consumed data
-
internalSubscribe
protected java.util.concurrent.CompletableFuture<java.lang.Void> internalSubscribe(java.util.Set<java.lang.Integer> partitions, PubSubTopic topic)
-
getVersionCompressor
protected VeniceCompressor getVersionCompressor(PubSubTopicPartition topicPartition)
-
seekToBeginningOfPush
public java.util.concurrent.CompletableFuture<java.lang.Void> seekToBeginningOfPush(java.util.Set<java.lang.Integer> partitions)
Description copied from interface:VeniceChangelogConsumer
Seek to the beginning of the push for a set of partitions. This is analogous to doing a bootstrap of data for the consumer. This seek will ONLY seek to the beginning of the version which is currently serving data, and the consumer will switch to reading data from a new version (should one get created) once it has read up to the point in the change capture stream that indicates the version swap (which can only occur after consuming all the data in the last push). This instructs the consumer to consume data from the batch push.- Specified by:
seekToBeginningOfPush
in interfaceVeniceChangelogConsumer<K,V>
- Parameters:
partitions
- the set of partitions to seek with- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToBeginningOfPush
public java.util.concurrent.CompletableFuture<java.lang.Void> seekToBeginningOfPush()
Description copied from interface:VeniceChangelogConsumer
Seek to the beginning of the push for subscribed partitions. SeeVeniceChangelogConsumer.seekToBeginningOfPush(Set)
for more information.- Specified by:
seekToBeginningOfPush
in interfaceVeniceChangelogConsumer<K,V>
- Returns:
- a future which completes when the partitions are ready to be consumed data
-
seekToEndOfPush
public java.util.concurrent.CompletableFuture<java.lang.Void> seekToEndOfPush(java.util.Set<java.lang.Integer> partitions)
Description copied from interface:VeniceChangelogConsumer
Seek to the end of the last push for a given set of partitions. This instructs the consumer to begin consuming events which are transmitted to Venice following the last batch push.- Specified by:
seekToEndOfPush
in interfaceVeniceChangelogConsumer<K,V>
- Parameters:
partitions
- the set of partitions to seek with- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
pause
public void pause()
Description copied from interface:VeniceChangelogConsumer
Pause the client on all subscriptions. SeeVeniceChangelogConsumer.pause(Set)
for more information.- Specified by:
pause
in interfaceVeniceChangelogConsumer<K,V>
-
resume
public void resume(java.util.Set<java.lang.Integer> partitions)
Description copied from interface:VeniceChangelogConsumer
Resume the client on all or subset of partitions this client is subscribed to and has paused.- Specified by:
resume
in interfaceVeniceChangelogConsumer<K,V>
-
resume
public void resume()
Description copied from interface:VeniceChangelogConsumer
Pause the client on all subscriptions. SeeVeniceChangelogConsumer.resume(Set)
for more information.- Specified by:
resume
in interfaceVeniceChangelogConsumer<K,V>
-
pause
public void pause(java.util.Set<java.lang.Integer> partitions)
Description copied from interface:VeniceChangelogConsumer
Pause the client on all or subset of partitions this client is subscribed to. Calls toVeniceChangelogConsumer.poll(long)
will not return results from paused partitions untilVeniceChangelogConsumer.resume(Set)
is called again later for those partitions.- Specified by:
pause
in interfaceVeniceChangelogConsumer<K,V>
-
seekToEndOfPush
public java.util.concurrent.CompletableFuture<java.lang.Void> seekToEndOfPush()
Description copied from interface:VeniceChangelogConsumer
Seek to the end of the push for all subscribed partitions. SeeVeniceChangelogConsumer.seekToEndOfPush(Set)
for more information.- Specified by:
seekToEndOfPush
in interfaceVeniceChangelogConsumer<K,V>
- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToTail
public java.util.concurrent.CompletableFuture<java.lang.Void> seekToTail(java.util.Set<java.lang.Integer> partitions)
Description copied from interface:VeniceChangelogConsumer
Seek to the end of events which have been transmitted to Venice and start consuming new events. This will ONLY consume events transmitted via nearline and incremental push. It will not read batch push data.- Specified by:
seekToTail
in interfaceVeniceChangelogConsumer<K,V>
- Parameters:
partitions
- the set of partitions to seek with- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
internalSeekToTail
public java.util.concurrent.CompletableFuture<java.lang.Void> internalSeekToTail(java.util.Set<java.lang.Integer> partitions, java.lang.String topicSuffix)
-
seekToTail
public java.util.concurrent.CompletableFuture<java.lang.Void> seekToTail()
Description copied from interface:VeniceChangelogConsumer
Seek to the end of events which have been transmitted to Venice for all subscribed partitions. SeeVeniceChangelogConsumer.seekToTail(Set)
for more information.- Specified by:
seekToTail
in interfaceVeniceChangelogConsumer<K,V>
- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToCheckpoint
public java.util.concurrent.CompletableFuture<java.lang.Void> seekToCheckpoint(java.util.Set<VeniceChangeCoordinate> checkpoints) throws VeniceCoordinateOutOfRangeException
Description copied from interface:VeniceChangelogConsumer
Seek the provided checkpoints for the specified partitions. Note about checkpoints: Checkpoints have the following properties and should be considered: -Checkpoints are NOT comparable or valid across partitions. -Checkpoints are NOT comparable or valid across colos -Checkpoints are NOT comparable across store versions -It is not possible to determine the number of events between two checkpoints -It is possible that a checkpoint is no longer on retention. In such case, we will return an exception to the caller.- Specified by:
seekToCheckpoint
in interfaceVeniceChangelogConsumer<K,V>
- Returns:
- a future which completes when seek has completed for all partitions
- Throws:
VeniceCoordinateOutOfRangeException
- if passed checkpoint is no longer valid
-
subscribeAll
public java.util.concurrent.CompletableFuture<java.lang.Void> subscribeAll()
Description copied from interface:VeniceChangelogConsumer
Subscribe all partitions belonging to a specific store.- Specified by:
subscribeAll
in interfaceVeniceChangelogConsumer<K,V>
- Returns:
- a future which completes when all partitions are ready to be consumed data
-
seekToTimestamps
public java.util.concurrent.CompletableFuture<java.lang.Void> seekToTimestamps(java.util.Map<java.lang.Integer,java.lang.Long> timestamps)
Description copied from interface:VeniceChangelogConsumer
Seek to the provided timestamps for the specified partitions based on wall clock time for when this message was processed by Venice and produced to change capture. Note, this API can only be used to seek on nearline data applied to the current serving version in Venice. This will not seek on data transmitted via Batch Push. If the provided timestamp is lower than the earliest timestamp on a given stream, the earliest event will be returned. THIS WILL NOT SEEK TO DATA WHICH WAS APPLIED ON A PREVIOUS VERSION. You should never seek back in time to a timestamp which is smaller than the current time - rewindTimeInSeconds configured in the hybrid settings for this Venice store. The timestamp passed to this function should be associated to timestamps processed by this interface. The timestamp returned by {@link PubSubMessage.getPubSubMessageTime()} refers to the time when Venice processed the event, and calls to this method will seek based on that sequence of events. Note: it bears no relation to timestamps provided by upstream producers when writing to Venice where a user may optionally provide a timestamp at time of producing a record.- Specified by:
seekToTimestamps
in interfaceVeniceChangelogConsumer<K,V>
- Parameters:
timestamps
- a map keyed by a partition ID, and the timestamp checkpoints to seek for each partition.- Returns:
-
internalSeekToTimestamps
public java.util.concurrent.CompletableFuture<java.lang.Void> internalSeekToTimestamps(java.util.Map<java.lang.Integer,java.lang.Long> timestamps, java.lang.String topicSuffix)
-
seekToTimestamp
public java.util.concurrent.CompletableFuture<java.lang.Void> seekToTimestamp(java.lang.Long timestamp)
Description copied from interface:VeniceChangelogConsumer
Seek to the specified timestamp for all subscribed partitions. SeeVeniceChangelogConsumer.seekToTimestamps(Map)
for more information.- Specified by:
seekToTimestamp
in interfaceVeniceChangelogConsumer<K,V>
- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
isCaughtUp
public boolean isCaughtUp()
Description copied from interface:VeniceChangelogConsumer
Checks whether all subscribed partitions are caught up during bootstrap. If a partition's (currentTimestamp - latestMessageTimestamp) is smaller or equal to 1 min, we consider this partition is caught up.- Specified by:
isCaughtUp
in interfaceVeniceChangelogConsumer<K,V>
- Returns:
- True if all subscribed partitions have caught up.
-
internalSeek
protected java.util.concurrent.CompletableFuture<java.lang.Void> internalSeek(java.util.Set<java.lang.Integer> partitions, PubSubTopic targetTopic, com.linkedin.davinci.consumer.VeniceChangelogConsumerImpl.SeekFunction seekAction)
-
unsubscribe
public void unsubscribe(java.util.Set<java.lang.Integer> partitions)
Description copied from interface:VeniceChangelogConsumer
Stop ingesting messages from a set of partitions for a specific store.- Specified by:
unsubscribe
in interfaceVeniceChangelogConsumer<K,V>
- Parameters:
partitions
- The set of topic partitions to unsubscribe
-
unsubscribeAll
public void unsubscribeAll()
Description copied from interface:VeniceChangelogConsumer
Stop ingesting messages from all partitions.- Specified by:
unsubscribeAll
in interfaceVeniceChangelogConsumer<K,V>
-
poll
public java.util.Collection<PubSubMessage<K,ChangeEvent<V>,VeniceChangeCoordinate>> poll(long timeoutInMs)
Description copied from interface:VeniceChangelogConsumer
Polling function to get any available messages from the underlying system for all partitions subscribed.- Specified by:
poll
in interfaceVeniceChangelogConsumer<K,V>
- Parameters:
timeoutInMs
- The maximum time to block/wait in between two polling requests (must not be greater thanLong.MAX_VALUE
milliseconds)- Returns:
- a collection of messages since the last fetch for the subscribed list of topic partitions
-
internalPoll
protected java.util.Collection<PubSubMessage<K,ChangeEvent<V>,VeniceChangeCoordinate>> internalPoll(long timeoutInMs, java.lang.String topicSuffix, boolean includeControlMessage)
-
internalPoll
protected java.util.Collection<PubSubMessage<K,ChangeEvent<V>,VeniceChangeCoordinate>> internalPoll(long timeoutInMs, java.lang.String topicSuffix)
-
handleControlMessage
protected boolean handleControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, java.lang.String topicSuffix, byte[] key, long timestamp)
Handle control message from the given topic. Returns true if a topic switch should occur and records should be returned- Parameters:
controlMessage
- the encountered control messagepubSubTopicPartition
- the topic that this control message was concerned- Returns:
- true if a version switch happened, false if otherwise
-
processRecordBytes
protected <T> T processRecordBytes(java.nio.ByteBuffer decompressedBytes, T deserializedValue, byte[] key, java.nio.ByteBuffer value, PubSubTopicPartition partition, int valueSchemaId, long recordOffset) throws java.io.IOException
- Throws:
java.io.IOException
-
convertPubSubMessageToPubSubChangeEventMessage
protected java.util.Optional<PubSubMessage<K,ChangeEvent<V>,VeniceChangeCoordinate>> convertPubSubMessageToPubSubChangeEventMessage(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> message, PubSubTopicPartition pubSubTopicPartition)
-
extractOffsetVectorFromMessage
protected java.util.List<java.lang.Long> extractOffsetVectorFromMessage(int valueSchemaId, int rmdProtocolId, java.nio.ByteBuffer replicationMetadataPayload)
-
handleVersionSwapControlMessage
protected boolean handleVersionSwapControlMessage(ControlMessage controlMessage, PubSubTopicPartition pubSubTopicPartition, java.lang.String topicSuffix)
-
getTopicAssignment
protected java.util.Set<PubSubTopicPartition> getTopicAssignment()
-
switchToNewTopic
protected boolean switchToNewTopic(PubSubTopic newTopic, java.lang.String topicSuffix, java.lang.Integer partition)
-
close
public void close()
Description copied from interface:VeniceChangelogConsumer
Release the internal resources.- Specified by:
close
in interfaceVeniceChangelogConsumer<K,V>
-
setStoreRepository
protected void setStoreRepository(ThinClientMetaStoreBasedRepository repository)
-
getLatestCoordinate
protected VeniceChangeCoordinate getLatestCoordinate(java.lang.Integer partition)
-
getTopicPartition
protected PubSubTopicPartition getTopicPartition(java.lang.Integer partition)
-
getPubSubConsumer
protected PubSubConsumerAdapter getPubSubConsumer()
-
getChangelogClientConfig
protected ChangelogClientConfig getChangelogClientConfig()
-
getSubscribeTime
protected java.lang.Long getSubscribeTime()
-
getHeartbeatReporterThread
protected VeniceChangelogConsumerImpl.HeartbeatReporterThread getHeartbeatReporterThread()
-
getChangeCaptureStats
protected BasicConsumerStats getChangeCaptureStats()
-
-