Package com.linkedin.venice.writer
Class VeniceWriter<K,V,U>
- java.lang.Object
-
- com.linkedin.venice.writer.AbstractVeniceWriter<K,V,U>
-
- com.linkedin.venice.writer.VeniceWriter<K,V,U>
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
public class VeniceWriter<K,V,U> extends AbstractVeniceWriter<K,V,U>
Class which acts as the primary writer API.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static interface
VeniceWriter.KeyProvider
An interface which enables the key to contain parts of the {@param producerMetadata} within it, which is useful for control messages and chunked values.
-
Field Summary
Fields Modifier and Type Field Description static long
APP_DEFAULT_LOGICAL_TS
This sentinel value indicates that the venice samza apps have not provided the logical timestamp.static java.lang.String
CHECK_SUM_TYPE
static java.lang.String
CLOSE_TIMEOUT_MS
static java.lang.String
DEFAULT_CHECK_SUM_TYPE
Default checksum type.static int
DEFAULT_CLOSE_TIMEOUT_MS
This controls the Kafka producer's close timeout.static LeaderMetadataWrapper
DEFAULT_LEADER_METADATA_WRAPPER
static int
DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSING
Default number of attempts when trying to produce to a Kafka topic and an exception is caught saying the topic is missing.static int
DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
950 KB for user payload should be conservative enough.static int
DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID
static long
DEFAULT_UPSTREAM_OFFSET
The default value of the "upstreamOffset" field in avro recordLeaderMetadata
.static byte[]
EMPTY_BYTE_ARRAY
static java.nio.ByteBuffer
EMPTY_BYTE_BUFFER
static java.lang.String
ENABLE_CHUNKING
static java.lang.String
ENABLE_RMD_CHUNKING
static java.lang.String
MAX_ATTEMPTS_WHEN_TOPIC_MISSING
static java.lang.String
MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS
A negative value or 0 will disable the feature.static java.lang.String
MAX_RECORD_SIZE_BYTES
Maximum Venice record size.static java.lang.String
MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
Chunk size.static java.util.concurrent.atomic.AtomicLong
OPEN_VENICE_WRITER_COUNT
A static counter shared by all VeniceWriter instances to track the number of active VeniceWriterstatic int
UNLIMITED_MAX_RECORD_SIZE
The default formaxRecordSizeBytes
is unlimited / unset (-1) just to be safe.static long
VENICE_DEFAULT_LOGICAL_TS
This sentinel value indicates that the venice samza apps do not support the logical timestamp.static int
VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID
This sentinel value indicates that metadata version id is not present.static int
VENICE_DEFAULT_VALUE_SCHEMA_ID
This sentinel value indicates that value schema id is not present.static java.util.concurrent.atomic.AtomicLong
VENICE_WRITER_CLOSE_FAILED_COUNT
A static counter shared by all VeniceWriter instances to track the number of VeniceWriter that fails to closestatic java.lang.String
VENICE_WRITER_CONFIG_PREFIX
-
Fields inherited from class com.linkedin.venice.writer.AbstractVeniceWriter
topicName
-
-
Constructor Summary
Constructors Constructor Description VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter)
VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter, org.apache.avro.Schema overrideProtocolSchema)
This constructor is currently used only in tests, in order to override the behavior of passing the protocol schema into the header of control messages.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Deprecated Methods Modifier and Type Method Description java.util.concurrent.Future<PubSubProduceResult>
asyncSendControlMessage(ControlMessage controlMessage, int partition, java.util.Map<java.lang.String,java.lang.String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
This API should be only used in Leader/Standby model for store ingestion.void
broadcastEndOfIncrementalPush(java.lang.String version, java.util.Map<java.lang.String,java.lang.String> debugInfo)
void
broadcastEndOfPush(java.util.Map<java.lang.String,java.lang.String> debugInfo)
This function might need synchronized locking.void
broadcastStartOfIncrementalPush(java.lang.String version, java.util.Map<java.lang.String,java.lang.String> debugInfo)
void
broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, java.util.Map<java.lang.String,java.lang.String> debugInfo)
void
broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, java.util.Optional<java.nio.ByteBuffer> optionalCompressionDictionary, java.util.Map<java.lang.String,java.lang.String> debugInfo)
void
broadcastStartOfPush(boolean sorted, java.util.Map<java.lang.String,java.lang.String> debugInfo)
void
broadcastStartOfPush(java.util.Map<java.lang.String,java.lang.String> debugInfo)
void
broadcastTopicSwitch(java.util.List<java.lang.CharSequence> sourceKafkaCluster, java.lang.String sourceTopicName, java.lang.Long rewindStartTimestamp, java.util.Map<java.lang.String,java.lang.String> debugInfo)
void
broadcastVersionSwap(java.lang.String oldServingVersionTopic, java.lang.String newServingVersionTopic, java.util.Map<java.lang.String,java.lang.String> debugInfo)
Broadcast control message to real-time topic partition, to be consumed by venice leader.void
close()
void
close(boolean gracefulClose)
Close theVeniceWriter
java.util.concurrent.CompletableFuture<VeniceResourceCloseResult>
closeAsync(boolean gracefulClose)
java.util.concurrent.CompletableFuture<VeniceResourceCloseResult>
closeAsync(boolean gracefulClose, boolean retryOnGracefulCloseFailure)
void
closePartition(int partition)
Close a single partition from this writer.java.util.concurrent.Future<PubSubProduceResult>
delete(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper)
DIV pass-through mode for deletejava.util.concurrent.Future<PubSubProduceResult>
delete(K key, long logicalTs, PubSubProducerCallback callback)
Execute a standard "delete" on the key.java.util.concurrent.Future<PubSubProduceResult>
delete(K key, PubSubProducerCallback callback)
Execute a standard "delete" on the key.java.util.concurrent.Future<PubSubProduceResult>
delete(K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata)
java.util.concurrent.Future<PubSubProduceResult>
delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
Execute a standard "delete" on the key.java.util.concurrent.Future<PubSubProduceResult>
delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs)
Execute a standard "delete" on the key.java.util.concurrent.Future<PubSubProduceResult>
delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, DeleteMetadata deleteMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest)
Execute a standard "delete" on the key.java.util.concurrent.Future<PubSubProduceResult>
delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata)
Execute a standard "delete" on the key.void
deleteDeprecatedChunk(byte[] serializedKey, int partition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata)
This method produces a DELETE request to a deprecated chunk key.void
flush()
Call flush on the internalPubSubProducerAdapter
.static ControlMessage
generateHeartbeatMessage(CheckSumType checkSumType)
java.lang.String
getDestination()
static KafkaMessageEnvelope
getHeartbeatKME(long originTimeStampMs, LeaderMetadataWrapper leaderMetadataWrapper, ControlMessage heartBeatMessage, java.lang.String writerId)
protected KafkaMessageEnvelope
getKafkaMessageEnvelope(MessageType messageType, boolean isEndOfSegment, int partition, boolean incrementSequenceNumber, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs)
A utility function to centralize some boiler plate code for the instantiation ofSpecificRecord
classes holding the content of our Kafka values.static PubSubMessageHeader
getLeaderCompleteStateHeader(LeaderCompleteState leaderCompleteState)
int
getMaxRecordSizeBytes()
int
getMaxSizeForUserPayloadPerMessageInBytes()
GUID
getProducerGUID()
Time
getTime()
java.lang.String
getTopicName()
boolean
isChunkingNeededForRecord(int recordSize)
boolean
isRecordTooLarge(int recordSize)
java.util.concurrent.Future<PubSubProduceResult>
put(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper)
Deprecated.java.util.concurrent.Future<PubSubProduceResult>
put(K key, V value, int valueSchemaId, long logicalTs, PubSubProducerCallback callback)
Execute a standard "put" on the key.java.util.concurrent.CompletableFuture<PubSubProduceResult>
put(K key, V value, int valueSchemaId, PubSubProducerCallback callback)
Execute a standard "put" on the key.java.util.concurrent.Future<PubSubProduceResult>
put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
VeniceWriter in the leader replica should call this API to fulfill extra metadata information --- upstreamOffset.java.util.concurrent.CompletableFuture<PubSubProduceResult>
put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata)
java.util.concurrent.CompletableFuture<PubSubProduceResult>
put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest)
Execute a standard "put" on the key.java.util.concurrent.Future<PubSubProduceResult>
put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest)
java.util.concurrent.Future<PubSubProduceResult>
put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata)
java.util.concurrent.CompletableFuture<PubSubProduceResult>
sendControlMessage(ControlMessage controlMessage, int partition, java.util.Map<java.lang.String,java.lang.String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
Main function for sending control messages.void
sendControlMessageWithRetriesForNonExistentTopic(ControlMessage controlMessage, int partition, java.util.Map<java.lang.String,java.lang.String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
This function sends a control message into the prescribed partition.java.util.concurrent.CompletableFuture<PubSubProduceResult>
sendHeartbeat(PubSubTopicPartition topicPartition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs)
void
sendStartOfSegment(int partition, java.util.Map<java.lang.String,java.lang.String> debugInfo)
Send aControlMessageType.START_OF_SEGMENT
control message into the designated partition.java.lang.String
toString()
java.util.concurrent.Future<PubSubProduceResult>
update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback)
java.util.concurrent.Future<PubSubProduceResult>
update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback, long logicalTs)
-
Methods inherited from class com.linkedin.venice.writer.AbstractVeniceWriter
put
-
-
-
-
Field Detail
-
VENICE_WRITER_CONFIG_PREFIX
public static final java.lang.String VENICE_WRITER_CONFIG_PREFIX
- See Also:
- Constant Field Values
-
CLOSE_TIMEOUT_MS
public static final java.lang.String CLOSE_TIMEOUT_MS
- See Also:
- Constant Field Values
-
CHECK_SUM_TYPE
public static final java.lang.String CHECK_SUM_TYPE
- See Also:
- Constant Field Values
-
ENABLE_CHUNKING
public static final java.lang.String ENABLE_CHUNKING
- See Also:
- Constant Field Values
-
ENABLE_RMD_CHUNKING
public static final java.lang.String ENABLE_RMD_CHUNKING
- See Also:
- Constant Field Values
-
MAX_ATTEMPTS_WHEN_TOPIC_MISSING
public static final java.lang.String MAX_ATTEMPTS_WHEN_TOPIC_MISSING
- See Also:
- Constant Field Values
-
MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS
public static final java.lang.String MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS
A negative value or 0 will disable the feature.- See Also:
- Constant Field Values
-
MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
public static final java.lang.String MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
Chunk size. Default: 972800 N.B.: This must be configured in relation to the following configs: 1. Kafka broker's global setting: `message.max.bytes` (default is 1000012, or ~976 KB) 2. Kafka broker's topic setting: `max.message.bytes` (default is 1000012, or ~976 KB) 3. Kafka producer's setting: `max.request.size` (default is 1048576)- See Also:
- Constant Field Values
-
MAX_RECORD_SIZE_BYTES
public static final java.lang.String MAX_RECORD_SIZE_BYTES
Maximum Venice record size. Default: -1 Large records can cause performance issues, so this setting is used to detect and prevent them. Not to be confused with Kafka record size (which is the ~1MB limitMAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
is designed to comply with). Venice records refer to a Venice key-value pair, which can be spread across 1+ Kafka records / events. Basically: Chunking Not Needed < ~1MB < Chunking Needed < Max Record Size 1. If a batch push data contains records larger than this setting, the push job will fail. 2. If a partial update creates records larger than this setting, consumption will be paused and manual intervention will be necessary.- See Also:
- Constant Field Values
-
DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
public static final int DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
950 KB for user payload should be conservative enough.- See Also:
- Constant Field Values
-
UNLIMITED_MAX_RECORD_SIZE
public static final int UNLIMITED_MAX_RECORD_SIZE
The default formaxRecordSizeBytes
is unlimited / unset (-1) just to be safe. A more specific default value should be set usingcom.linkedin.venice.ConfigKeys#CONTROLLER_DEFAULT_MAX_RECORD_SIZE_BYTES
the controller config on the cluster level.- See Also:
- Constant Field Values
-
DEFAULT_CLOSE_TIMEOUT_MS
public static final int DEFAULT_CLOSE_TIMEOUT_MS
This controls the Kafka producer's close timeout.- See Also:
- Constant Field Values
-
DEFAULT_CHECK_SUM_TYPE
public static final java.lang.String DEFAULT_CHECK_SUM_TYPE
Default checksum type. N.B.: Only MD5 (and having no checksums) supports checkpointing mid-checksum.
-
DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSING
public static final int DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSING
Default number of attempts when trying to produce to a Kafka topic and an exception is caught saying the topic is missing.- See Also:
- Constant Field Values
-
DEFAULT_UPSTREAM_OFFSET
public static final long DEFAULT_UPSTREAM_OFFSET
The default value of the "upstreamOffset" field in avro recordLeaderMetadata
. Even though we have set the default value for "upstreamOffset" field as -1, the initial value for the long field "upstreamOffset" is still 0 when we construct a LeaderMetadata record. Default field values are primarily used when reading records that don't have those fields, typically when we deserialize a record from older version to newer version.
-
DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID
public static final int DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID
-
OPEN_VENICE_WRITER_COUNT
public static final java.util.concurrent.atomic.AtomicLong OPEN_VENICE_WRITER_COUNT
A static counter shared by all VeniceWriter instances to track the number of active VeniceWriter
-
VENICE_WRITER_CLOSE_FAILED_COUNT
public static final java.util.concurrent.atomic.AtomicLong VENICE_WRITER_CLOSE_FAILED_COUNT
A static counter shared by all VeniceWriter instances to track the number of VeniceWriter that fails to close
-
VENICE_DEFAULT_LOGICAL_TS
public static final long VENICE_DEFAULT_LOGICAL_TS
This sentinel value indicates that the venice samza apps do not support the logical timestamp.- See Also:
- Constant Field Values
-
APP_DEFAULT_LOGICAL_TS
public static final long APP_DEFAULT_LOGICAL_TS
This sentinel value indicates that the venice samza apps have not provided the logical timestamp.- See Also:
- Constant Field Values
-
VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID
public static final int VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID
This sentinel value indicates that metadata version id is not present.- See Also:
- Constant Field Values
-
VENICE_DEFAULT_VALUE_SCHEMA_ID
public static final int VENICE_DEFAULT_VALUE_SCHEMA_ID
This sentinel value indicates that value schema id is not present.- See Also:
- Constant Field Values
-
EMPTY_BYTE_ARRAY
public static final byte[] EMPTY_BYTE_ARRAY
-
EMPTY_BYTE_BUFFER
public static final java.nio.ByteBuffer EMPTY_BYTE_BUFFER
-
DEFAULT_LEADER_METADATA_WRAPPER
public static final LeaderMetadataWrapper DEFAULT_LEADER_METADATA_WRAPPER
-
-
Constructor Detail
-
VeniceWriter
public VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter)
-
VeniceWriter
public VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter, org.apache.avro.Schema overrideProtocolSchema)
This constructor is currently used only in tests, in order to override the behavior of passing the protocol schema into the header of control messages.- Parameters:
overrideProtocolSchema
- The schema to pass in CM headers, or null to omit that header entirely
-
-
Method Detail
-
generateHeartbeatMessage
public static ControlMessage generateHeartbeatMessage(CheckSumType checkSumType)
-
close
public void close(boolean gracefulClose)
Close theVeniceWriter
- Specified by:
close
in classAbstractVeniceWriter<K,V,U>
- Parameters:
gracefulClose
- whether to end the segments and send END_OF_SEGMENT control message.
-
closeAsync
public java.util.concurrent.CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulClose)
-
closeAsync
public java.util.concurrent.CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulClose, boolean retryOnGracefulCloseFailure)
-
close
public void close()
-
flush
public void flush()
Call flush on the internalPubSubProducerAdapter
.- Specified by:
flush
in classAbstractVeniceWriter<K,V,U>
-
toString
public java.lang.String toString()
- Overrides:
toString
in classjava.lang.Object
-
getProducerGUID
public GUID getProducerGUID()
-
getTopicName
public java.lang.String getTopicName()
- Overrides:
getTopicName
in classAbstractVeniceWriter<K,V,U>
- Returns:
- the Kafka topic name that this
VeniceWriter
instance writes into.
-
delete
public java.util.concurrent.Future<PubSubProduceResult> delete(K key, PubSubProducerCallback callback)
Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - callback will be executed after Kafka producer completes on sending the message.- Returns:
- a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public java.util.concurrent.Future<PubSubProduceResult> delete(K key, long logicalTs, PubSubProducerCallback callback)
Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.logicalTs
- - An timestamp field to indicate when this record was produced from apps point of view.callback
- - callback will be executed after Kafka producer completes on sending the message.- Returns:
- a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public java.util.concurrent.Future<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - Callback function invoked by Kafka producer after sending the message.leaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.- Returns:
- a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public java.util.concurrent.Future<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs)
Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - callback will be executed after Kafka producer completes on sending the message.leaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.logicalTs
- - An timestamp field to indicate when this record was produced from apps point of view.- Returns:
- a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public java.util.concurrent.Future<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata)
Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - callback will be executed after Kafka producer completes on sending the message.leaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.deleteMetadata
- - a DeleteMetadata containing replication metadata related fields.- Returns:
- a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public java.util.concurrent.Future<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata)
- Specified by:
delete
in classAbstractVeniceWriter<K,V,U>
-
deleteDeprecatedChunk
public void deleteDeprecatedChunk(byte[] serializedKey, int partition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata)
This method produces a DELETE request to a deprecated chunk key.
-
delete
public java.util.concurrent.Future<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, DeleteMetadata deleteMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest)
Execute a standard "delete" on the key.- Parameters:
key
- - The key to delete in storage.callback
- - Callback function invoked by Kafka producer after sending the message.leaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a DELETE message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.logicalTs
- - An timestamp field to indicate when this record was produced from apps point of view.deleteMetadata
- - a DeleteMetadata containing replication metadata related fields (can be null).- Returns:
- a java.util.concurrent.Future. Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
public java.util.concurrent.CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback)
Execute a standard "put" on the key.- Specified by:
put
in classAbstractVeniceWriter<K,V,U>
- Parameters:
key
- - The key to put in storage.value
- - The value to be associated with the given keyvalueSchemaId
- - value schema id for the given valuecallback
- - Callback function invoked by Kafka producer after sending the message- Returns:
- a java.util.concurrent.Future. Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
public java.util.concurrent.Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata)
- Specified by:
put
in classAbstractVeniceWriter<K,V,U>
-
put
public java.util.concurrent.Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, long logicalTs, PubSubProducerCallback callback)
Execute a standard "put" on the key.- Parameters:
key
- - The key to put in storage.value
- - The value to be associated with the given keyvalueSchemaId
- - value schema id for the given valuelogicalTs
- - A timestamp field to indicate when this record was produced from apps view.callback
- - Callback function invoked by Kafka producer after sending the message- Returns:
- a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
public java.util.concurrent.Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
VeniceWriter in the leader replica should call this API to fulfill extra metadata information --- upstreamOffset. UpstreamOffset is the offset of PUT message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a put message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.
-
put
public java.util.concurrent.Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest)
-
put
public java.util.concurrent.CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata)
-
put
public java.util.concurrent.CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest)
Execute a standard "put" on the key. VeniceReducer and VeniceSystemProducer should call this API.- Parameters:
key
- - The key to put in storage.value
- - The value to be associated with the given keyvalueSchemaId
- - value schema id for the given valuecallback
- - Callback function invoked by Kafka producer after sending the messageleaderMetadataWrapper
- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a put message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.logicalTs
- - An timestamp field to indicate when this record was produced from apps view.putMetadata
- - a PutMetadata containing replication metadata related fields (can be null).- Returns:
- a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
@Deprecated public java.util.concurrent.Future<PubSubProduceResult> put(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper)
Deprecated.Write a message with the kafka message envelope (KME) passed in. This allows users re-using existing KME to speed up the performance. If this is called, VeniceWriter will also reuse the existing DIV data (producer metadata). It's the "pass-through" mode. TODO: move pass-through supports into a server-specific extension of VeniceWriter
-
delete
public java.util.concurrent.Future<PubSubProduceResult> delete(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper)
DIV pass-through mode for delete
-
update
public java.util.concurrent.Future<PubSubProduceResult> update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback)
- Specified by:
update
in classAbstractVeniceWriter<K,V,U>
-
update
public java.util.concurrent.Future<PubSubProduceResult> update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback, long logicalTs)
-
broadcastStartOfPush
public void broadcastStartOfPush(java.util.Map<java.lang.String,java.lang.String> debugInfo)
- Parameters:
debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastStartOfPush
public void broadcastStartOfPush(boolean sorted, java.util.Map<java.lang.String,java.lang.String> debugInfo)
-
broadcastStartOfPush
public void broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, java.util.Map<java.lang.String,java.lang.String> debugInfo)
- Parameters:
sorted
- whether the messages between 'StartOfPush' control messages and 'EndOfPush' control message in current topic partition is lexicographically sorted by key byteschunked
- whether the push has chunking support enabled, in which case all keys are to be appended withChunkedKeySuffix
and thePut
may contain either chunks orChunkedValueManifest
records, in addition to regular (small) values.compressionStrategy
- the store-version'sCompressionStrategy
debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastStartOfPush
public void broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, java.util.Optional<java.nio.ByteBuffer> optionalCompressionDictionary, java.util.Map<java.lang.String,java.lang.String> debugInfo)
- Parameters:
sorted
- whether the messages between 'StartOfPush' control messages and 'EndOfPush' control message in current topic partition is lexicographically sorted by key byteschunked
- whether the push has chunking support enabled, in which case all keys are to be appended withChunkedKeySuffix
and thePut
may contain either chunks orChunkedValueManifest
records, in addition to regular (small) values.compressionStrategy
- the store-version'sCompressionStrategy
optionalCompressionDictionary
- The raw bytes of dictionary used to compress/decompress records.debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastEndOfPush
public void broadcastEndOfPush(java.util.Map<java.lang.String,java.lang.String> debugInfo)
This function might need synchronized locking. It might be possible that segments are ended while another thread is broadcasting EOP. However, we could not trigger this scenario using tests. This issue could surface when we support fully auto stream reprocessing in future. Users would call broadcastEndOfPush inside their Samza processors which can have multiple threads. Here is an example without synchronization: - Thread A: broadcastControlMessage execution completes. - Thread B: is in the middle of executing broadcastControlMessage - Thread A: begins endAllSegments - Segments that Thread B was writing to have now been ended. Thus causing the consumer to see messages not inside segments.- Parameters:
debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastTopicSwitch
public void broadcastTopicSwitch(@Nonnull java.util.List<java.lang.CharSequence> sourceKafkaCluster, @Nonnull java.lang.String sourceTopicName, @Nonnull java.lang.Long rewindStartTimestamp, java.util.Map<java.lang.String,java.lang.String> debugInfo)
-
broadcastVersionSwap
public void broadcastVersionSwap(@Nonnull java.lang.String oldServingVersionTopic, @Nonnull java.lang.String newServingVersionTopic, java.util.Map<java.lang.String,java.lang.String> debugInfo)
Broadcast control message to real-time topic partition, to be consumed by venice leader. Partition high watermarks are left to local venice leader to prepare and then been produced to version topic partition.- Parameters:
oldServingVersionTopic
- the version topic change capture consumer should switch from.newServingVersionTopic
- the version topic change capture consumer should switch to.debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastStartOfIncrementalPush
public void broadcastStartOfIncrementalPush(java.lang.String version, java.util.Map<java.lang.String,java.lang.String> debugInfo)
-
broadcastEndOfIncrementalPush
public void broadcastEndOfIncrementalPush(java.lang.String version, java.util.Map<java.lang.String,java.lang.String> debugInfo)
-
closePartition
public void closePartition(int partition)
Close a single partition from this writer. It will send a final EOS to the partition and remove it from segment map.- Parameters:
partition
- The partition to be closed.
-
getLeaderCompleteStateHeader
public static PubSubMessageHeader getLeaderCompleteStateHeader(LeaderCompleteState leaderCompleteState)
-
sendStartOfSegment
public void sendStartOfSegment(int partition, java.util.Map<java.lang.String,java.lang.String> debugInfo)
Send aControlMessageType.START_OF_SEGMENT
control message into the designated partition.- Parameters:
partition
- the Kafka partition to write to.debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
sendControlMessageWithRetriesForNonExistentTopic
public void sendControlMessageWithRetriesForNonExistentTopic(ControlMessage controlMessage, int partition, java.util.Map<java.lang.String,java.lang.String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
This function sends a control message into the prescribed partition. If the Kafka topic does not exist, this function will try again for a total ofmaxAttemptsWhenTopicMissing
attempts. Note that this retry behavior does not happen insendMessage(KeyProvider, MessageType, Object, int, PubSubProducerCallback, LeaderMetadataWrapper, long)
because that function returns aFuture
, and it isFuture.get()
which throws the relevant exception. In any case, the topic should be seeded with aControlMessageType.START_OF_SEGMENT
at first, and therefore, there should be no cases where a topic has not been created yet, and we attempt to write a data message first, prior to a control message. If a topic did disappear later on in theVeniceWriter
's lifecycle, then it would be appropriate to let thatFuture
fail. This function has a synchronized block because if the retries need to be exercised, then it would cause a DIV failure if another message slipped in after the first attempt and before the eventually successful attempt.- Parameters:
controlMessage
- aControlMessage
instance to persist into Kafka.partition
- the Kafka partition to write to.debugInfo
- arbitrary key/value pairs of information that will be propagated alongside the control message.callback
- callback to execute when the record has been acknowledged by the Kafka server (null means no callback)
-
sendControlMessage
public java.util.concurrent.CompletableFuture<PubSubProduceResult> sendControlMessage(ControlMessage controlMessage, int partition, java.util.Map<java.lang.String,java.lang.String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
Main function for sending control messages. Synchronization is most minimal and there is no error checking. See also:sendControlMessageWithRetriesForNonExistentTopic(ControlMessage, int, Map, PubSubProducerCallback, LeaderMetadataWrapper)
-
asyncSendControlMessage
public java.util.concurrent.Future<PubSubProduceResult> asyncSendControlMessage(ControlMessage controlMessage, int partition, java.util.Map<java.lang.String,java.lang.String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
This API should be only used in Leader/Standby model for store ingestion. Producer DIV will be recalculated (not DIV pass-through mode); checksum for the input partition in this producer will also be updated.
-
getHeartbeatKME
public static KafkaMessageEnvelope getHeartbeatKME(long originTimeStampMs, LeaderMetadataWrapper leaderMetadataWrapper, ControlMessage heartBeatMessage, java.lang.String writerId)
-
sendHeartbeat
public java.util.concurrent.CompletableFuture<PubSubProduceResult> sendHeartbeat(PubSubTopicPartition topicPartition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs)
-
getKafkaMessageEnvelope
protected KafkaMessageEnvelope getKafkaMessageEnvelope(MessageType messageType, boolean isEndOfSegment, int partition, boolean incrementSequenceNumber, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs)
A utility function to centralize some boiler plate code for the instantiation ofSpecificRecord
classes holding the content of our Kafka values. Note: the payloadUnion must be set on the return object before producing into Kafka.- Parameters:
messageType
- an instance of theMessageType
enum.- Returns:
- A
KafkaMessageEnvelope
for producing into Kafka
-
getTime
public Time getTime()
-
getMaxRecordSizeBytes
public int getMaxRecordSizeBytes()
-
isRecordTooLarge
public boolean isRecordTooLarge(int recordSize)
-
isChunkingNeededForRecord
public boolean isChunkingNeededForRecord(int recordSize)
-
getMaxSizeForUserPayloadPerMessageInBytes
public int getMaxSizeForUserPayloadPerMessageInBytes()
-
getDestination
public java.lang.String getDestination()
- Returns:
- Returns a string of format: topicName@brokerAddress
-
-