Package com.linkedin.venice.writer
Class VeniceWriter<K,V,U>
java.lang.Object
com.linkedin.venice.writer.AbstractVeniceWriter<K,V,U>
com.linkedin.venice.writer.VeniceWriter<K,V,U>
- All Implemented Interfaces:
Closeable
,AutoCloseable
Class which acts as the primary writer API.
-
Nested Class Summary
Modifier and TypeClassDescriptionstatic class
static interface
An interface which enables the key to contain parts of the within it, which is useful for control messages and chunked values. -
Field Summary
Modifier and TypeFieldDescriptionstatic final long
This sentinel value indicates that the venice samza apps have not provided the logical timestamp.static final String
static final String
static final String
Default checksum type.static final int
This controls the Kafka producer's close timeout.static final LeaderMetadataWrapper
static final int
Default number of attempts when trying to produce to a Kafka topic and an exception is caught saying the topic is missing.static final int
950 KB for user payload should be conservative enough.static final int
static final long
The default value of the "upstreamOffset" field in avro recordLeaderMetadata
.static final byte[]
static final ByteBuffer
static final String
static final String
static final String
static final String
A negative value or 0 will disable the feature.static final String
Maximum Venice record size.static final String
Chunk size.static final AtomicLong
A static counter shared by all VeniceWriter instances to track the number of active VeniceWriterstatic final String
static final String
static final int
The default formaxRecordSizeBytes
is unlimited / unset (-1) just to be safe.static final long
This sentinel value indicates that the venice samza apps do not support the logical timestamp.static final int
This sentinel value indicates that metadata version id is not present.static final int
This sentinel value indicates that value schema id is not present.static final AtomicLong
A static counter shared by all VeniceWriter instances to track the number of VeniceWriter that fails to closestatic final String
Fields inherited from class com.linkedin.venice.writer.AbstractVeniceWriter
topicName
-
Constructor Summary
ConstructorDescriptionVeniceWriter
(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter) VeniceWriter
(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter, org.apache.avro.Schema overrideProtocolSchema) This constructor is currently used only in tests, in order to override the behavior of passing the protocol schema into the header of control messages. -
Method Summary
Modifier and TypeMethodDescriptionasyncSendControlMessage
(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) This API should be only used in Leader/Standby model for store ingestion.void
broadcastEndOfIncrementalPush
(String version, Map<String, String> debugInfo) void
broadcastEndOfPush
(Map<String, String> debugInfo) This function might need synchronized locking.void
broadcastStartOfIncrementalPush
(String version, Map<String, String> debugInfo) void
broadcastStartOfPush
(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Map<String, String> debugInfo) void
broadcastStartOfPush
(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Optional<ByteBuffer> optionalCompressionDictionary, Map<String, String> debugInfo) void
broadcastStartOfPush
(boolean sorted, Map<String, String> debugInfo) void
broadcastStartOfPush
(Map<String, String> debugInfo) void
broadcastTopicSwitch
(List<CharSequence> sourceKafkaCluster, String sourceTopicName, Long rewindStartTimestamp, Map<String, String> debugInfo) void
broadcastVersionSwap
(String oldServingVersionTopic, String newServingVersionTopic, Map<String, String> debugInfo) Broadcast control message to real-time topic partition, to be consumed by venice leader.void
close()
void
close
(boolean gracefulClose) Close theVeniceWriter
closeAsync
(boolean gracefulClose) closeAsync
(boolean gracefulClose, boolean retryOnGracefulCloseFailure) void
closePartition
(int partition) Close a single partition from this writer.delete
(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper) DIV pass-through mode for deletedelete
(K key, long logicalTs, PubSubProducerCallback callback) Execute a standard "delete" on the key.delete
(K key, PubSubProducerCallback callback) Execute a standard "delete" on the key.delete
(K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) delete
(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) Execute a standard "delete" on the key.delete
(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) Execute a standard "delete" on the key.delete
(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, DeleteMetadata deleteMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) Execute a standard "delete" on the key.delete
(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata) Execute a standard "delete" on the key.void
deleteDeprecatedChunk
(byte[] serializedKey, int partition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata) This method produces a DELETE request to a deprecated chunk key.void
flush()
Call flush on the internalPubSubProducerAdapter
.static ControlMessage
generateHeartbeatMessage
(CheckSumType checkSumType) static KafkaMessageEnvelope
getHeartbeatKME
(long originTimeStampMs, LeaderMetadataWrapper leaderMetadataWrapper, ControlMessage heartBeatMessage, String writerId) protected KafkaMessageEnvelope
getKafkaMessageEnvelope
(MessageType messageType, boolean isEndOfSegment, int partition, boolean incrementSequenceNumber, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) A utility function to centralize some boiler plate code for the instantiation ofSpecificRecord
classes holding the content of our Kafka values.static PubSubMessageHeader
getLeaderCompleteStateHeader
(LeaderCompleteState leaderCompleteState) int
int
getTime()
boolean
isChunkingNeededForRecord
(int recordSize) boolean
isRecordTooLarge
(int recordSize) put
(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper) Deprecated.put
(K key, V value, int valueSchemaId, long logicalTs, PubSubProducerCallback callback) Execute a standard "put" on the key.put
(K key, V value, int valueSchemaId, PubSubProducerCallback callback) Execute a standard "put" on the key.put
(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) VeniceWriter in the leader replica should call this API to fulfill extra metadata information --- upstreamOffset.put
(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata) put
(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) Execute a standard "put" on the key.put
(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) put
(K key, V value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata) sendControlMessage
(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) Main function for sending control messages.void
sendControlMessageWithRetriesForNonExistentTopic
(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) This function sends a control message into the prescribed partition.sendHeartbeat
(PubSubTopicPartition topicPartition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs) sendHeartbeat
(String topicName, int partitionNumber, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs) void
sendStartOfSegment
(int partition, Map<String, String> debugInfo) Send aControlMessageType.START_OF_SEGMENT
control message into the designated partition.toString()
update
(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback) update
(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback, long logicalTs) Methods inherited from class com.linkedin.venice.writer.AbstractVeniceWriter
put
-
Field Details
-
VENICE_WRITER_CONFIG_PREFIX
- See Also:
-
CLOSE_TIMEOUT_MS
- See Also:
-
CHECK_SUM_TYPE
- See Also:
-
ENABLE_CHUNKING
- See Also:
-
ENABLE_RMD_CHUNKING
- See Also:
-
MAX_ATTEMPTS_WHEN_TOPIC_MISSING
- See Also:
-
MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS
A negative value or 0 will disable the feature.- See Also:
-
MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
Chunk size. Default: 972800 N.B.: This must be configured in relation to the following configs: 1. Kafka broker's global setting: `message.max.bytes` (default is 1000012, or ~976 KB) 2. Kafka broker's topic setting: `max.message.bytes` (default is 1000012, or ~976 KB) 3. Kafka producer's setting: `max.request.size` (default is 1048576)- See Also:
-
MAX_RECORD_SIZE_BYTES
Maximum Venice record size. Default: -1 Large records can cause performance issues, so this setting is used to detect and prevent them. Not to be confused with Kafka record size (which is the ~1MB limitMAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
is designed to comply with). Venice records refer to a Venice key-value pair, which can be spread across 1+ Kafka records / events. Basically: Chunking Not Needed < ~1MB < Chunking Needed < Max Record Size 1. If a batch push data contains records larger than this setting, the push job will fail. 2. If a partial update creates records larger than this setting, consumption will be paused and manual intervention will be necessary.- See Also:
-
PRODUCER_THREAD_COUNT
- See Also:
-
PRODUCER_QUEUE_SIZE
- See Also:
-
DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
public static final int DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES950 KB for user payload should be conservative enough.- See Also:
-
UNLIMITED_MAX_RECORD_SIZE
public static final int UNLIMITED_MAX_RECORD_SIZEThe default formaxRecordSizeBytes
is unlimited / unset (-1) just to be safe. A more specific default value should be set usingcom.linkedin.venice.ConfigKeys#CONTROLLER_DEFAULT_MAX_RECORD_SIZE_BYTES
the controller config on the cluster level.- See Also:
-
DEFAULT_CLOSE_TIMEOUT_MS
public static final int DEFAULT_CLOSE_TIMEOUT_MSThis controls the Kafka producer's close timeout.- See Also:
-
DEFAULT_CHECK_SUM_TYPE
Default checksum type. N.B.: Only MD5 (and having no checksums) supports checkpointing mid-checksum. -
DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSING
public static final int DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSINGDefault number of attempts when trying to produce to a Kafka topic and an exception is caught saying the topic is missing.- See Also:
-
DEFAULT_UPSTREAM_OFFSET
public static final long DEFAULT_UPSTREAM_OFFSETThe default value of the "upstreamOffset" field in avro recordLeaderMetadata
. Even though we have set the default value for "upstreamOffset" field as -1, the initial value for the long field "upstreamOffset" is still 0 when we construct a LeaderMetadata record. Default field values are primarily used when reading records that don't have those fields, typically when we deserialize a record from older version to newer version. -
DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID
public static final int DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID -
OPEN_VENICE_WRITER_COUNT
A static counter shared by all VeniceWriter instances to track the number of active VeniceWriter -
VENICE_WRITER_CLOSE_FAILED_COUNT
A static counter shared by all VeniceWriter instances to track the number of VeniceWriter that fails to close -
VENICE_DEFAULT_LOGICAL_TS
public static final long VENICE_DEFAULT_LOGICAL_TSThis sentinel value indicates that the venice samza apps do not support the logical timestamp.- See Also:
-
APP_DEFAULT_LOGICAL_TS
public static final long APP_DEFAULT_LOGICAL_TSThis sentinel value indicates that the venice samza apps have not provided the logical timestamp.- See Also:
-
VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID
public static final int VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_IDThis sentinel value indicates that metadata version id is not present.- See Also:
-
VENICE_DEFAULT_VALUE_SCHEMA_ID
public static final int VENICE_DEFAULT_VALUE_SCHEMA_IDThis sentinel value indicates that value schema id is not present.- See Also:
-
EMPTY_BYTE_ARRAY
public static final byte[] EMPTY_BYTE_ARRAY -
EMPTY_BYTE_BUFFER
-
DEFAULT_LEADER_METADATA_WRAPPER
-
-
Constructor Details
-
VeniceWriter
public VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter) -
VeniceWriter
public VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter, org.apache.avro.Schema overrideProtocolSchema) This constructor is currently used only in tests, in order to override the behavior of passing the protocol schema into the header of control messages.- Parameters:
overrideProtocolSchema
- The schema to pass in CM headers, or null to omit that header entirely
-
-
Method Details
-
generateHeartbeatMessage
-
close
public void close(boolean gracefulClose) Close theVeniceWriter
- Specified by:
close
in classAbstractVeniceWriter<K,
V, U> - Parameters:
gracefulClose
- whether to end the segments and send END_OF_SEGMENT control message.
-
closeAsync
-
closeAsync
public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulClose, boolean retryOnGracefulCloseFailure) -
close
public void close() -
flush
public void flush()Call flush on the internalPubSubProducerAdapter
.- Specified by:
flush
in classAbstractVeniceWriter<K,
V, U>
-
toString
-
getProducerGUID
-
getTopicName
- Overrides:
getTopicName
in classAbstractVeniceWriter<K,
V, U> - Returns:
- the Kafka topic name that this
VeniceWriter
instance writes into.
-
delete
Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - callback will be executed after Kafka producer completes on sending the message.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, long logicalTs, PubSubProducerCallback callback) Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.logicalTs
- - An timestamp field to indicate when this record was produced from apps point of view.callback
- - callback will be executed after Kafka producer completes on sending the message.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - Callback function invoked by Kafka producer after sending the message.leaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - callback will be executed after Kafka producer completes on sending the message.leaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.logicalTs
- - An timestamp field to indicate when this record was produced from apps point of view.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata) Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - callback will be executed after Kafka producer completes on sending the message.leaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.deleteMetadata
- - a DeleteMetadata containing replication metadata related fields.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) - Specified by:
delete
in classAbstractVeniceWriter<K,
V, U>
-
deleteDeprecatedChunk
public void deleteDeprecatedChunk(byte[] serializedKey, int partition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata) This method produces a DELETE request to a deprecated chunk key. -
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, DeleteMetadata deleteMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - Callback function invoked by Kafka producer after sending the message.leaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a DELETE message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.logicalTs
- - An timestamp field to indicate when this record was produced from apps point of view.deleteMetadata
- - a DeleteMetadata containing replication metadata related fields (can be null).- Returns:
- a java.util.concurrent.CompletableFuture. Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.CompletableFuture's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback) Execute a standard "put" on the key.- Specified by:
put
in classAbstractVeniceWriter<K,
V, U> - Parameters:
key
- - The key to put in storage.value
- - The value to be associated with the given keyvalueSchemaId
- - value schema id for the given valuecallback
- - Callback function invoked by Kafka producer after sending the message- Returns:
- a java.util.concurrent.Future. Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
public Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata) - Specified by:
put
in classAbstractVeniceWriter<K,
V, U>
-
put
public Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, long logicalTs, PubSubProducerCallback callback) Execute a standard "put" on the key.- Parameters:
key
- - The key to put in storage.value
- - The value to be associated with the given keyvalueSchemaId
- - value schema id for the given valuelogicalTs
- - A timestamp field to indicate when this record was produced from apps view.callback
- - Callback function invoked by Kafka producer after sending the message- Returns:
- a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
public Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) VeniceWriter in the leader replica should call this API to fulfill extra metadata information --- upstreamOffset. UpstreamOffset is the offset of PUT message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a put message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic. -
put
public Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) -
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata) -
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) Execute a standard "put" on the key. VeniceReducer and VeniceSystemProducer should call this API.- Parameters:
key
- - The key to put in storage.value
- - The value to be associated with the given keyvalueSchemaId
- - value schema id for the given valuecallback
- - Callback function invoked by Kafka producer after sending the messageleaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a put message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.logicalTs
- - An timestamp field to indicate when this record was produced from apps view.putMetadata
- - a PutMetadata containing replication metadata related fields (can be null).- Returns:
- a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
@Deprecated public Future<PubSubProduceResult> put(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper) Deprecated.Write a message with the kafka message envelope (KME) passed in. This allows users re-using existing KME to speed up the performance. If this is called, VeniceWriter will also reuse the existing DIV data (producer metadata). It's the "pass-through" mode. TODO: move pass-through supports into a server-specific extension of VeniceWriter -
delete
public Future<PubSubProduceResult> delete(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper) DIV pass-through mode for delete -
update
public Future<PubSubProduceResult> update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback) - Specified by:
update
in classAbstractVeniceWriter<K,
V, U>
-
update
public Future<PubSubProduceResult> update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback, long logicalTs) -
broadcastStartOfPush
- Parameters:
debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastStartOfPush
-
broadcastStartOfPush
public void broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Map<String, String> debugInfo) - Parameters:
sorted
- whether the messages between 'StartOfPush' control messages and 'EndOfPush' control message in current topic partition is lexicographically sorted by key byteschunked
- whether the push has chunking support enabled, in which case all keys are to be appended withChunkedKeySuffix
and thePut
may contain either chunks orChunkedValueManifest
records, in addition to regular (small) values.compressionStrategy
- the store-version'sCompressionStrategy
debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastStartOfPush
public void broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Optional<ByteBuffer> optionalCompressionDictionary, Map<String, String> debugInfo) - Parameters:
sorted
- whether the messages between 'StartOfPush' control messages and 'EndOfPush' control message in current topic partition is lexicographically sorted by key byteschunked
- whether the push has chunking support enabled, in which case all keys are to be appended withChunkedKeySuffix
and thePut
may contain either chunks orChunkedValueManifest
records, in addition to regular (small) values.compressionStrategy
- the store-version'sCompressionStrategy
optionalCompressionDictionary
- The raw bytes of dictionary used to compress/decompress records.debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastEndOfPush
This function might need synchronized locking. It might be possible that segments are ended while another thread is broadcasting EOP. However, we could not trigger this scenario using tests. This issue could surface when we support fully auto stream reprocessing in future. Users would call broadcastEndOfPush inside their Samza processors which can have multiple threads. Here is an example without synchronization: - Thread A: broadcastControlMessage execution completes. - Thread B: is in the middle of executing broadcastControlMessage - Thread A: begins endAllSegments - Segments that Thread B was writing to have now been ended. Thus causing the consumer to see messages not inside segments.- Parameters:
debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastTopicSwitch
-
broadcastVersionSwap
public void broadcastVersionSwap(@Nonnull String oldServingVersionTopic, @Nonnull String newServingVersionTopic, Map<String, String> debugInfo) Broadcast control message to real-time topic partition, to be consumed by venice leader. Partition high watermarks are left to local venice leader to prepare and then been produced to version topic partition.- Parameters:
oldServingVersionTopic
- the version topic change capture consumer should switch from.newServingVersionTopic
- the version topic change capture consumer should switch to.debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastStartOfIncrementalPush
-
broadcastEndOfIncrementalPush
-
closePartition
public void closePartition(int partition) Close a single partition from this writer. It will send a final EOS to the partition and remove it from segment map.- Parameters:
partition
- The partition to be closed.
-
getLeaderCompleteStateHeader
public static PubSubMessageHeader getLeaderCompleteStateHeader(LeaderCompleteState leaderCompleteState) -
sendStartOfSegment
Send aControlMessageType.START_OF_SEGMENT
control message into the designated partition.- Parameters:
partition
- the Kafka partition to write to.debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
sendControlMessageWithRetriesForNonExistentTopic
public void sendControlMessageWithRetriesForNonExistentTopic(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) This function sends a control message into the prescribed partition. If the Kafka topic does not exist, this function will try again for a total ofmaxAttemptsWhenTopicMissing
attempts. Note that this retry behavior does not happen insendMessage(KeyProvider, MessageType, Object, int, PubSubProducerCallback, LeaderMetadataWrapper, long)
because that function returns aFuture
, and it isFuture.get()
which throws the relevant exception. In any case, the topic should be seeded with aControlMessageType.START_OF_SEGMENT
at first, and therefore, there should be no cases where a topic has not been created yet, and we attempt to write a data message first, prior to a control message. If a topic did disappear later on in theVeniceWriter
's lifecycle, then it would be appropriate to let thatFuture
fail. This function has a synchronized block because if the retries need to be exercised, then it would cause a DIV failure if another message slipped in after the first attempt and before the eventually successful attempt.- Parameters:
controlMessage
- aControlMessage
instance to persist into Kafka.partition
- the Kafka partition to write to.debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.callback
- callback to execute when the record has been acknowledged by the Kafka server (null means no callback)
-
sendControlMessage
public CompletableFuture<PubSubProduceResult> sendControlMessage(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) Main function for sending control messages. Synchronization is most minimal and there is no error checking. See also:sendControlMessageWithRetriesForNonExistentTopic(ControlMessage, int, Map, PubSubProducerCallback, LeaderMetadataWrapper)
-
asyncSendControlMessage
public Future<PubSubProduceResult> asyncSendControlMessage(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) This API should be only used in Leader/Standby model for store ingestion. Producer DIV will be recalculated (not DIV pass-through mode); checksum for the input partition in this producer will also be updated. -
getHeartbeatKME
public static KafkaMessageEnvelope getHeartbeatKME(long originTimeStampMs, LeaderMetadataWrapper leaderMetadataWrapper, ControlMessage heartBeatMessage, String writerId) -
sendHeartbeat
public CompletableFuture<PubSubProduceResult> sendHeartbeat(PubSubTopicPartition topicPartition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs) -
sendHeartbeat
public CompletableFuture<PubSubProduceResult> sendHeartbeat(String topicName, int partitionNumber, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs) -
getKafkaMessageEnvelope
protected KafkaMessageEnvelope getKafkaMessageEnvelope(MessageType messageType, boolean isEndOfSegment, int partition, boolean incrementSequenceNumber, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) A utility function to centralize some boiler plate code for the instantiation ofSpecificRecord
classes holding the content of our Kafka values. Note: the payloadUnion must be set on the return object before producing into Kafka.- Parameters:
messageType
- an instance of theMessageType
enum.- Returns:
- A
KafkaMessageEnvelope
for producing into Kafka
-
getTime
-
getMaxRecordSizeBytes
public int getMaxRecordSizeBytes() -
isRecordTooLarge
public boolean isRecordTooLarge(int recordSize) -
isChunkingNeededForRecord
public boolean isChunkingNeededForRecord(int recordSize) -
getMaxSizeForUserPayloadPerMessageInBytes
public int getMaxSizeForUserPayloadPerMessageInBytes() -
getDestination
- Returns:
- Returns a string of format: topicName@brokerAddress
-
getProducerAdapter
-