Package com.linkedin.davinci.consumer
Class VeniceChangelogConsumerDaVinciRecordTransformerImpl<K,V>
java.lang.Object
com.linkedin.davinci.consumer.VeniceChangelogConsumerDaVinciRecordTransformerImpl<K,V>
- All Implemented Interfaces:
BootstrappingVeniceChangelogConsumer<K,
,V> VeniceChangelogConsumer<K,
V>
@Experimental
public class VeniceChangelogConsumerDaVinciRecordTransformerImpl<K,V>
extends Object
implements BootstrappingVeniceChangelogConsumer<K,V>, VeniceChangelogConsumer<K,V>
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionclass
-
Constructor Summary
ConstructorsConstructorDescriptionVeniceChangelogConsumerDaVinciRecordTransformerImpl
(ChangelogClientConfig changelogClientConfig) -
Method Summary
Modifier and TypeMethodDescriptionvoid
close()
Release the internal resources.int
boolean
In the experimental client, once this becomes true it will stay true even if we start to lag after the bootstrapping phase.boolean
void
pause()
Pause the client on all subscriptions.void
Pause the client on all or subset of partitions this client is subscribed to.poll
(long timeoutInMs) polls for the next batch of change events.void
resume()
Pause the client on all subscriptions.void
Resume the client on all or a subset of partitions this client is subscribed to and has paused.Seek to the beginning of the push for subscribed partitions.seekToBeginningOfPush
(Set<Integer> partitions) Seek to the beginning of the push for a set of partitions.seekToCheckpoint
(Set<VeniceChangeCoordinate> checkpoints) Seek the provided checkpoints for the specified partitions.Seek to the end of the push for all subscribed partitions.seekToEndOfPush
(Set<Integer> partitions) Seek to the end of the last push for a given set of partitions.Seek to the end of events which have been transmitted to Venice for all subscribed partitions.seekToTail
(Set<Integer> partitions) Seek to the end of events which have been transmitted to Venice and start consuming new events.seekToTimestamp
(Long timestamp) Seek to the specified timestamp for all subscribed partitions.seekToTimestamps
(Map<Integer, Long> timestamps) Seek to the provided timestamps for the specified partitions based on wall clock time for when this message was processed by Venice and produced to change capture.protected void
setBackgroundReporterThreadSleepIntervalSeconds
(long interval) start()
Start performs both a topic subscription and catch up.void
stop()
Subscribe a set of partitions for a store to this VeniceChangelogConsumer, with the earliest position in the topic for each partition.Subscribe all partitions belonging to a specific store, with the earliest position in the topic for each partition.void
unsubscribe
(Set<Integer> partitions) Stop ingesting messages from a set of partitions for a specific store.void
Stop ingesting messages from all partitions.
-
Constructor Details
-
VeniceChangelogConsumerDaVinciRecordTransformerImpl
public VeniceChangelogConsumerDaVinciRecordTransformerImpl(ChangelogClientConfig changelogClientConfig)
-
-
Method Details
-
start
Description copied from interface:BootstrappingVeniceChangelogConsumer
Start performs both a topic subscription and catch up. The client will look at the latest offset in the server and sync bootstrap data up to that point in changes. Once that is done for all partitions, the future will complete. NOTE: This future may take some time to complete depending on how much data needs to be ingested in order to catch up with the time that this client started. NOTE: In the experimental client, the future will complete when there is at least one message to be polled. We don't wait for all partitions to catch up, as loading every message into a buffer will result in an Out Of Memory error. Instead, use theBootstrappingVeniceChangelogConsumer.isCaughtUp()
method to determine once all subscribed partitions have caught up. NOTE: In the experimental client, if you pass in an empty set, it will subscribe to all partitions for the store- Specified by:
start
in interfaceBootstrappingVeniceChangelogConsumer<K,
V> - Parameters:
partitions
- which partition id's to catch up with- Returns:
- a future that completes once catch up is complete for all passed in partitions.
-
start
- Specified by:
start
in interfaceBootstrappingVeniceChangelogConsumer<K,
V>
-
stop
- Specified by:
stop
in interfaceBootstrappingVeniceChangelogConsumer<K,
V> - Throws:
Exception
-
getPartitionCount
public int getPartitionCount()- Specified by:
getPartitionCount
in interfaceVeniceChangelogConsumer<K,
V> - Returns:
- total number of store partitions
-
subscribe
Description copied from interface:VeniceChangelogConsumer
Subscribe a set of partitions for a store to this VeniceChangelogConsumer, with the earliest position in the topic for each partition. The VeniceChangelogConsumer should try to consume messages from all partitions that are subscribed to it.- Specified by:
subscribe
in interfaceVeniceChangelogConsumer<K,
V> - Parameters:
partitions
- the set of partition to subscribe and consume- Returns:
- a future which completes when data from the partitions are ready to be consumed
-
subscribeAll
Description copied from interface:VeniceChangelogConsumer
Subscribe all partitions belonging to a specific store, with the earliest position in the topic for each partition.- Specified by:
subscribeAll
in interfaceVeniceChangelogConsumer<K,
V> - Returns:
- a future which completes when all partitions are ready to be consumed data
-
unsubscribe
Description copied from interface:VeniceChangelogConsumer
Stop ingesting messages from a set of partitions for a specific store.- Specified by:
unsubscribe
in interfaceVeniceChangelogConsumer<K,
V> - Parameters:
partitions
- The set of topic partitions to unsubscribe
-
unsubscribeAll
public void unsubscribeAll()Description copied from interface:VeniceChangelogConsumer
Stop ingesting messages from all partitions.- Specified by:
unsubscribeAll
in interfaceVeniceChangelogConsumer<K,
V>
-
seekToBeginningOfPush
Description copied from interface:VeniceChangelogConsumer
Seek to the beginning of the push for a set of partitions. This is analogous to doing a bootstrap of data for the consumer. This seek will ONLY seek to the beginning of the version which is currently serving data, and the consumer will switch to reading data from a new version (should one get created) once it has read up to the point in the change capture stream that indicates the version swap (which can only occur after consuming all the data in the last push). This instructs the consumer to consume data from the batch push.- Specified by:
seekToBeginningOfPush
in interfaceVeniceChangelogConsumer<K,
V> - Parameters:
partitions
- the set of partitions to seek with- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToBeginningOfPush
Description copied from interface:VeniceChangelogConsumer
Seek to the beginning of the push for subscribed partitions. SeeVeniceChangelogConsumer.seekToBeginningOfPush(Set)
for more information.- Specified by:
seekToBeginningOfPush
in interfaceVeniceChangelogConsumer<K,
V> - Returns:
- a future which completes when the partitions are ready to be consumed data
-
seekToEndOfPush
Description copied from interface:VeniceChangelogConsumer
Seek to the end of the last push for a given set of partitions. This instructs the consumer to begin consuming events which are transmitted to Venice following the last batch push.- Specified by:
seekToEndOfPush
in interfaceVeniceChangelogConsumer<K,
V> - Parameters:
partitions
- the set of partitions to seek with- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToEndOfPush
Description copied from interface:VeniceChangelogConsumer
Seek to the end of the push for all subscribed partitions. SeeVeniceChangelogConsumer.seekToEndOfPush(Set)
for more information.- Specified by:
seekToEndOfPush
in interfaceVeniceChangelogConsumer<K,
V> - Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToTail
Description copied from interface:VeniceChangelogConsumer
Seek to the end of events which have been transmitted to Venice and start consuming new events. This will ONLY consume events transmitted via nearline and incremental push. It will not read batch push data.- Specified by:
seekToTail
in interfaceVeniceChangelogConsumer<K,
V> - Parameters:
partitions
- the set of partitions to seek with- Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToTail
Description copied from interface:VeniceChangelogConsumer
Seek to the end of events which have been transmitted to Venice for all subscribed partitions. SeeVeniceChangelogConsumer.seekToTail(Set)
for more information.- Specified by:
seekToTail
in interfaceVeniceChangelogConsumer<K,
V> - Returns:
- a future which completes when the operation has succeeded for all partitions.
-
seekToCheckpoint
Description copied from interface:VeniceChangelogConsumer
Seek the provided checkpoints for the specified partitions. Note about checkpoints: Checkpoints have the following properties and should be considered: - Checkpoints are NOT comparable or valid across partitions. - Checkpoints are NOT comparable or valid across regions - Checkpoints are NOT comparable across store versions - It is not possible to determine the number of events between two checkpoints - It is possible that a checkpoint is no longer on retention. In such case, we will return an exception to the caller.- Specified by:
seekToCheckpoint
in interfaceVeniceChangelogConsumer<K,
V> - Returns:
- a future which completes when seek has completed for all partitions
-
seekToTimestamps
Description copied from interface:VeniceChangelogConsumer
Seek to the provided timestamps for the specified partitions based on wall clock time for when this message was processed by Venice and produced to change capture. Note, this API can only be used to seek on nearline data applied to the current serving version in Venice. This will not seek on data transmitted via Batch Push. If the provided timestamp is lower than the earliest timestamp on a given stream, the earliest event will be returned. THIS WILL NOT SEEK TO DATA WHICH WAS APPLIED ON A PREVIOUS VERSION. You should never seek back in time to a timestamp which is smaller than the current time - rewindTimeInSeconds configured in the hybrid settings for this Venice store. The timestamp passed to this function should be associated to timestamps processed by this interface. The timestamp returned byPubSubMessage.getPubSubMessageTime()
refers to the time when Venice processed the event, and calls to this method will seek based on that sequence of events. Note: it bears no relation to timestamps provided by upstream producers when writing to Venice where a user may optionally provide a timestamp at time of producing a record.- Specified by:
seekToTimestamps
in interfaceVeniceChangelogConsumer<K,
V> - Parameters:
timestamps
- a map keyed by a partition ID, and the timestamp checkpoints to seek for each partition.- Returns:
-
seekToTimestamp
Description copied from interface:VeniceChangelogConsumer
Seek to the specified timestamp for all subscribed partitions. SeeVeniceChangelogConsumer.seekToTimestamps(Map)
for more information.- Specified by:
seekToTimestamp
in interfaceVeniceChangelogConsumer<K,
V> - Returns:
- a future which completes when the operation has succeeded for all partitions.
-
pause
Description copied from interface:VeniceChangelogConsumer
Pause the client on all or subset of partitions this client is subscribed to. Calls toVeniceChangelogConsumer.poll(long)
will not return results from paused partitions untilVeniceChangelogConsumer.resume(Set)
is called again later for those partitions.- Specified by:
pause
in interfaceVeniceChangelogConsumer<K,
V>
-
pause
public void pause()Description copied from interface:VeniceChangelogConsumer
Pause the client on all subscriptions. SeeVeniceChangelogConsumer.pause(Set)
for more information.- Specified by:
pause
in interfaceVeniceChangelogConsumer<K,
V>
-
resume
Description copied from interface:VeniceChangelogConsumer
Resume the client on all or a subset of partitions this client is subscribed to and has paused.- Specified by:
resume
in interfaceVeniceChangelogConsumer<K,
V>
-
resume
public void resume()Description copied from interface:VeniceChangelogConsumer
Pause the client on all subscriptions. SeeVeniceChangelogConsumer.resume(Set)
for more information.- Specified by:
resume
in interfaceVeniceChangelogConsumer<K,
V>
-
close
public void close()Description copied from interface:VeniceChangelogConsumer
Release the internal resources.- Specified by:
close
in interfaceVeniceChangelogConsumer<K,
V>
-
poll
Description copied from interface:BootstrappingVeniceChangelogConsumer
polls for the next batch of change events. The first records returned following calling 'start()' will be from the bootstrap state. Once this state is consumed, subsequent calls to poll will be based off of recent updates to the Venice store. In the experimental client, records will be returned in batches configured to the MAX_BUFFER_SIZE. So the initial calls to poll will be from records from the bootstrap state, until the partitions have caught up. Additionally, if the buffer hits the MAX_BUFFER_SIZE before the timeout is hit, poll will return immediately. If the PubSubMessage came from disk (after restart), the following fields will be set to sentinel values since record metadata information is not available to reduce disk utilization: - PubSubMessageTime - Position - WriterSchemaId - ReplicationMetadataPayload- Specified by:
poll
in interfaceBootstrappingVeniceChangelogConsumer<K,
V> - Specified by:
poll
in interfaceVeniceChangelogConsumer<K,
V> - Parameters:
timeoutInMs
- Maximum timeout of the poll invocation- Returns:
- a collection of Venice PubSubMessages
-
isCaughtUp
public boolean isCaughtUp()Description copied from interface:BootstrappingVeniceChangelogConsumer
In the experimental client, once this becomes true it will stay true even if we start to lag after the bootstrapping phase.- Specified by:
isCaughtUp
in interfaceBootstrappingVeniceChangelogConsumer<K,
V> - Specified by:
isCaughtUp
in interfaceVeniceChangelogConsumer<K,
V> - Returns:
- True if all subscribed partitions have caught up.
-
getRecordTransformerConfig
-
setBackgroundReporterThreadSleepIntervalSeconds
protected void setBackgroundReporterThreadSleepIntervalSeconds(long interval) -
isStarted
public boolean isStarted() -
getSubscribedPartitions
-