Package com.linkedin.venice.writer
Class VeniceWriter<K,V,U>
java.lang.Object
com.linkedin.venice.writer.AbstractVeniceWriter<K,V,U>
com.linkedin.venice.writer.VeniceWriter<K,V,U>
- All Implemented Interfaces:
Closeable,AutoCloseable
- Direct Known Subclasses:
ComplexVeniceWriter
Class which acts as the primary writer API.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classstatic interfaceAn interface which enables the key to contain parts of the within it, which is useful for control messages and chunked values. -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final longThis sentinel value indicates that the venice samza apps have not provided the logical timestamp.static final Stringstatic final Stringstatic final StringDefault checksum type.static final intThis controls the Kafka producer's close timeout.static final LeaderMetadataWrapperstatic final intDefault number of attempts when trying to produce to a Kafka topic and an exception is caught saying the topic is missing.static final longDefault elapsed time for a segment in milliseconds.static final int950 KB for user payload should be conservative enough.static final longstatic final intstatic final byte[]static final ByteBufferstatic final Stringstatic final Stringstatic final StringEnable maximum record size before record compression.protected final VeniceKafkaSerializer<K>static final Map<LeaderCompleteState,PubSubMessageHeader> protected final org.apache.logging.log4j.Loggerstatic final Stringstatic final StringA negative value or 0 will disable the feature.static final StringMaximum Venice record size.static final StringChunk size.protected final intstatic final AtomicLongA static counter shared by all VeniceWriter instances to track the number of active VeniceWriterprotected final VenicePartitionerstatic final Stringstatic final Stringstatic final intThe default formaxRecordSizeBytesis unlimited / unset (-1) just to be safe.protected final VeniceKafkaSerializer<V>static final longThis sentinel value indicates that the venice samza apps do not support the logical timestamp.static final intThis sentinel value indicates that metadata version id is not present.static final intThis sentinel value indicates that value schema id is not present.static final AtomicLongA static counter shared by all VeniceWriter instances to track the number of VeniceWriter that fails to closestatic final StringFields inherited from class com.linkedin.venice.writer.AbstractVeniceWriter
topicName -
Constructor Summary
ConstructorsConstructorDescriptionVeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter) VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter, org.apache.avro.Schema overrideProtocolSchema) This constructor is currently used only in tests, in order to override the behavior of passing the protocol schema into the header of control messages. -
Method Summary
Modifier and TypeMethodDescriptionasyncSendControlMessage(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) This API should be only used in Leader/Standby model for store ingestion.voidbroadcastEndOfIncrementalPush(String version, Map<String, String> debugInfo) voidbroadcastEndOfPush(Map<String, String> debugInfo) This function might need synchronized locking.voidbroadcastStartOfIncrementalPush(String version, Map<String, String> debugInfo) voidbroadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Map<String, String> debugInfo) voidbroadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Optional<ByteBuffer> optionalCompressionDictionary, Map<String, String> debugInfo) voidbroadcastStartOfPush(boolean sorted, Map<String, String> debugInfo) voidbroadcastStartOfPush(Map<String, String> debugInfo) voidbroadcastTopicSwitch(List<CharSequence> sourceKafkaCluster, String sourceTopicName, Long rewindStartTimestamp, Map<String, String> debugInfo) voidbroadcastVersionSwap(String oldServingVersionTopic, String newServingVersionTopic, Map<String, String> debugInfo) Broadcast control message to the real-time or version topic partition.protected PutbuildPutPayload(byte[] serializedValue, int valueSchemaId, PutMetadata putMetadata) voidclose()voidclose(boolean gracefulClose) Close theVeniceWritercloseAsync(boolean gracefulClose) closeAsync(boolean gracefulClose, boolean retryOnGracefulCloseFailure) voidclosePartition(int partition) Close a single partition from this writer.protected CompletableFuture<PubSubProduceResult>delete(byte[] serializedKey, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, DeleteMetadata deleteMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, int partition) delete(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper) DIV pass-through mode for deletedelete(K key, long logicalTs, PubSubProducerCallback callback) Execute a standard "delete" on the key.delete(K key, PubSubProducerCallback callback) Execute a standard "delete" on the key.delete(K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) Execute a standard "delete" on the key.delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) Execute a standard "delete" on the key.delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, DeleteMetadata deleteMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) Execute a standard "delete" on the key.delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata) Execute a standard "delete" on the key.voiddeleteDeprecatedChunk(byte[] serializedKey, int partition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata, boolean isGlobalRtDiv) This method produces a DELETE request to a deprecated chunk key.voidflush()Call flush on the internalPubSubProducerAdapter.static ControlMessagegenerateHeartbeatMessage(CheckSumType checkSumType) static KafkaMessageEnvelopegetHeartbeatKME(long originTimeStampMs, LeaderMetadataWrapper leaderMetadataWrapper, ControlMessage heartBeatMessage, String writerId) getKafkaMessageEnvelope(MessageType messageType, boolean isEndOfSegment, int partition, boolean incrementSequenceNumber, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) A utility function to centralize some boiler plate code for the instantiation ofSpecificRecordclasses holding the content of our Kafka values.intintprotected intgetPartition(byte[] key) getTime()booleanisChunkingNeededForRecord(int recordSize) booleanisRecordTooLarge(int recordSize) nonBlockingBroadcastVersionSwapWithRegionInfo(String oldServingVersionTopic, String newServingVersionTopic, String sourceRegion, String destinationRegion, long generationId, Map<String, String> debugInfo) Similar tobroadcastVersionSwap(String, String, Map)but with region info intended to guide Venice change capture consumer to perform version swap correctly in a true A/A setup (with unique writers in each region).put(byte[] serializedKey, byte[] serializedValue, int partition, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, boolean isGlobalRtDiv) put(byte[] serializedKey, byte[] serializedValue, int partition, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, boolean isGlobalRtDiv, PubSubMessageHeaders pubSubMessageHeaders) Write a record with new DIV to a predetermined partition.put(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper) Deprecated.put(K key, V value, int valueSchemaId, long logicalTs, PubSubProducerCallback callback) Execute a standard "put" on the key.put(K key, V value, int valueSchemaId, long logicalTimestamp, PubSubProducerCallback callback, PutMetadata putMetadata) put(K key, V value, int valueSchemaId, PubSubProducerCallback callback) Execute a standard "put" on the key.put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) VeniceWriter in the leader replica should call this API to fulfill extra metadata information --- upstreamOffset.put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata) put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) Execute a standard "put" on the key.put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, PubSubMessageHeaders pubSubMessageHeaders) Write a record with new DIV to a predetermined partition.put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata) protected CompletableFuture<PubSubProduceResult>putLargeValue(byte[] serializedKey, byte[] serializedValue, int valueSchemaId, PubSubProducerCallback callback, int partition, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, boolean isGlobalRtDiv) This function implements chunking of a large value into many small values.sendControlMessage(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) Main function for sending control messages.voidsendControlMessageWithRetriesForNonExistentTopic(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) This function sends a control message into the prescribed partition.sendHeartbeat(PubSubTopicPartition topicPartition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs) sendHeartbeat(String topicName, int partitionNumber, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs) protected CompletableFuture<PubSubProduceResult>sendMessage(VeniceWriter.KeyProvider keyProvider, MessageType messageType, Object payload, int partition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) Data message like PUT and DELETE should call this API to enable DIV check.voidsendStartOfSegment(int partition, Map<String, String> debugInfo) Send aControlMessageType.START_OF_SEGMENTcontrol message into the designated partition.toString()update(K key, U update, int valueSchemaId, int derivedSchemaId, long logicalTimestamp, PubSubProducerCallback callback) update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback) update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback, long logicalTs) Methods inherited from class com.linkedin.venice.writer.AbstractVeniceWriter
put
-
Field Details
-
logger
protected final org.apache.logging.log4j.Logger logger -
VENICE_WRITER_CONFIG_PREFIX
- See Also:
-
CLOSE_TIMEOUT_MS
- See Also:
-
CHECK_SUM_TYPE
- See Also:
-
ENABLE_CHUNKING
- See Also:
-
ENABLE_RMD_CHUNKING
- See Also:
-
MAX_ATTEMPTS_WHEN_TOPIC_MISSING
- See Also:
-
MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS
A negative value or 0 will disable the feature.- See Also:
-
MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
Chunk size. Default: 972800 N.B.: This must be configured in relation to the following configs: 1. Kafka broker's global setting: `message.max.bytes` (default is 1000012, or ~976 KB) 2. Kafka broker's topic setting: `max.message.bytes` (default is 1000012, or ~976 KB) 3. Kafka producer's setting: `max.request.size` (default is 1048576)- See Also:
-
MAX_RECORD_SIZE_BYTES
Maximum Venice record size. Default: -1 Large records can cause performance issues, so this setting is used to detect and prevent them. Not to be confused with Kafka record size (which is the ~1MB limitMAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTESis designed to comply with). Venice records refer to a Venice key-value pair, which can be spread across 1+ Kafka records / events. Basically: Chunking Not Needed < ~1MB < Chunking Needed < Max Record Size 1. If a batch push data contains records larger than this setting, the push job will fail. 2. If a partial update creates records larger than this setting, consumption will be paused and manual intervention will be necessary.- See Also:
-
ENABLE_UNCOMPRESSED_RECORD_SIZE_LIMIT
Enable maximum record size before record compression. Default: false- See Also:
-
PRODUCER_THREAD_COUNT
- See Also:
-
PRODUCER_QUEUE_SIZE
- See Also:
-
DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
public static final int DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES950 KB for user payload should be conservative enough.- See Also:
-
UNLIMITED_MAX_RECORD_SIZE
public static final int UNLIMITED_MAX_RECORD_SIZEThe default formaxRecordSizeBytesis unlimited / unset (-1) just to be safe. A more specific default value should be set usingConfigKeys.DEFAULT_MAX_RECORD_SIZE_BYTESthe controller config on the cluster level.- See Also:
-
DEFAULT_CLOSE_TIMEOUT_MS
public static final int DEFAULT_CLOSE_TIMEOUT_MSThis controls the Kafka producer's close timeout.- See Also:
-
DEFAULT_CHECK_SUM_TYPE
Default checksum type. N.B.: Only MD5 (and having no checksums) supports checkpointing mid-checksum. -
DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSING
public static final int DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSINGDefault number of attempts when trying to produce to a Kafka topic and an exception is caught saying the topic is missing.- See Also:
-
DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID
public static final int DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID -
DEFAULT_TERM_ID
public static final long DEFAULT_TERM_ID -
OPEN_VENICE_WRITER_COUNT
A static counter shared by all VeniceWriter instances to track the number of active VeniceWriter -
VENICE_WRITER_CLOSE_FAILED_COUNT
A static counter shared by all VeniceWriter instances to track the number of VeniceWriter that fails to close -
DEFAULT_MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS
public static final long DEFAULT_MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MSDefault elapsed time for a segment in milliseconds. -
VENICE_DEFAULT_LOGICAL_TS
public static final long VENICE_DEFAULT_LOGICAL_TSThis sentinel value indicates that the venice samza apps do not support the logical timestamp.- See Also:
-
APP_DEFAULT_LOGICAL_TS
public static final long APP_DEFAULT_LOGICAL_TSThis sentinel value indicates that the venice samza apps have not provided the logical timestamp.- See Also:
-
VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID
public static final int VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_IDThis sentinel value indicates that metadata version id is not present.- See Also:
-
VENICE_DEFAULT_VALUE_SCHEMA_ID
public static final int VENICE_DEFAULT_VALUE_SCHEMA_IDThis sentinel value indicates that value schema id is not present.- See Also:
-
EMPTY_BYTE_ARRAY
public static final byte[] EMPTY_BYTE_ARRAY -
EMPTY_BYTE_BUFFER
-
DEFAULT_LEADER_METADATA_WRAPPER
-
keySerializer
-
valueSerializer
-
partitioner
-
numberOfPartitions
protected final int numberOfPartitions -
LEADER_COMPLETE_STATE_HEADERS
-
-
Constructor Details
-
VeniceWriter
public VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter) -
VeniceWriter
public VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter, org.apache.avro.Schema overrideProtocolSchema) This constructor is currently used only in tests, in order to override the behavior of passing the protocol schema into the header of control messages.- Parameters:
overrideProtocolSchema- The schema to pass in CM headers, or null to omit that header entirely
-
-
Method Details
-
generateHeartbeatMessage
-
close
public void close(boolean gracefulClose) Close theVeniceWriter- Specified by:
closein classAbstractVeniceWriter<K,V, U> - Parameters:
gracefulClose- whether to end the segments and send END_OF_SEGMENT control message.
-
closeAsync
-
closeAsync
public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulClose, boolean retryOnGracefulCloseFailure) -
close
public void close() -
flush
public void flush()Call flush on the internalPubSubProducerAdapter.- Specified by:
flushin classAbstractVeniceWriter<K,V, U>
-
toString
-
getProducerGUID
-
getTopicName
- Overrides:
getTopicNamein classAbstractVeniceWriter<K,V, U> - Returns:
- the Kafka topic name that this
VeniceWriterinstance writes into.
-
delete
Execute a standard "delete" on the key.- Specified by:
deletein classAbstractVeniceWriter<K,V, U> - Parameters:
key- - The key to delete in storage.callback- - callback will be executed after Kafka producer completes on sending the message.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, long logicalTs, PubSubProducerCallback callback) Execute a standard "delete" on the key.- Specified by:
deletein classAbstractVeniceWriter<K,V, U> - Parameters:
key- - The key to delete in storage.logicalTs- - An timestamp field to indicate when this record was produced from apps point of view.callback- - callback will be executed after Kafka producer completes on sending the message.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) Execute a standard "delete" on the key.- Parameters:
key- - The key to delete in storage.callback- - Callback function invoked by Kafka producer after sending the message.leaderMetadataWrapper- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) Execute a standard "delete" on the key.- Parameters:
key- - The key to delete in storage.callback- - callback will be executed after Kafka producer completes on sending the message.leaderMetadataWrapper- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.logicalTs- - An timestamp field to indicate when this record was produced from apps point of view.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata) Execute a standard "delete" on the key.- Parameters:
key- - The key to delete in storage.callback- - callback will be executed after Kafka producer completes on sending the message.leaderMetadataWrapper- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.deleteMetadata- - a DeleteMetadata containing replication metadata related fields.- Returns:
- a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata) - Specified by:
deletein classAbstractVeniceWriter<K,V, U>
-
deleteDeprecatedChunk
public void deleteDeprecatedChunk(byte[] serializedKey, int partition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata, boolean isGlobalRtDiv) This method produces a DELETE request to a deprecated chunk key. -
delete
public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, DeleteMetadata deleteMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) Execute a standard "delete" on the key.- Parameters:
key- - The key to delete in storage.callback- - Callback function invoked by Kafka producer after sending the message.leaderMetadataWrapper- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a DELETE message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.logicalTs- - An timestamp field to indicate when this record was produced from apps point of view.deleteMetadata- - a DeleteMetadata containing replication metadata related fields (can be null).- Returns:
- a java.util.concurrent.CompletableFuture. Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.CompletableFuture's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
delete
protected CompletableFuture<PubSubProduceResult> delete(byte[] serializedKey, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, DeleteMetadata deleteMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, int partition) -
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback) Execute a standard "put" on the key.- Specified by:
putin classAbstractVeniceWriter<K,V, U> - Parameters:
key- - The key to put in storage.value- - The value to be associated with the given keyvalueSchemaId- - value schema id for the given valuecallback- - Callback function invoked by Kafka producer after sending the message- Returns:
- a java.util.concurrent.Future. Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata) - Specified by:
putin classAbstractVeniceWriter<K,V, U>
-
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, long logicalTimestamp, PubSubProducerCallback callback, PutMetadata putMetadata) - Specified by:
putin classAbstractVeniceWriter<K,V, U>
-
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, long logicalTs, PubSubProducerCallback callback) Execute a standard "put" on the key.- Specified by:
putin classAbstractVeniceWriter<K,V, U> - Parameters:
key- - The key to put in storage.value- - The value to be associated with the given keyvalueSchemaId- - value schema id for the given valuelogicalTs- - A timestamp field to indicate when this record was produced from apps view.callback- - Callback function invoked by Kafka producer after sending the message- Returns:
- a java.util.concurrent.Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
public Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) VeniceWriter in the leader replica should call this API to fulfill extra metadata information --- upstreamOffset. UpstreamOffset is the offset of PUT message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a put message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic. -
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata) -
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest) Execute a standard "put" on the key. VeniceReducer and VeniceSystemProducer should call this API.- Parameters:
key- - The key to put in storage.value- - The value to be associated with the given keyvalueSchemaId- - value schema id for the given valuecallback- - Callback function invoked by Kafka producer after sending the messageleaderMetadataWrapper- - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic. If views are enabled the metadata wrapper can also contain view partition map to be sent asPubSubMessageHeader. >=0: Leader replica consumes a put message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.logicalTs- - An timestamp field to indicate when this record was produced from apps view.putMetadata- - a PutMetadata containing replication metadata related fields (can be null).- Returns:
- a java.util.concurrent.Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
-
put
@Deprecated public Future<PubSubProduceResult> put(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper) Deprecated.Write a message with the kafka message envelope (KME) passed in. This allows users re-using existing KME to speed up the performance. If this is called, VeniceWriter will also reuse the existing DIV data (producer metadata). It's the "pass-through" mode. TODO: move pass-through supports into a server-specific extension of VeniceWriter -
put
public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, PubSubMessageHeaders pubSubMessageHeaders) Write a record with new DIV to a predetermined partition. -
put
public CompletableFuture<PubSubProduceResult> put(byte[] serializedKey, byte[] serializedValue, int partition, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, boolean isGlobalRtDiv) -
put
public CompletableFuture<PubSubProduceResult> put(byte[] serializedKey, byte[] serializedValue, int partition, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, boolean isGlobalRtDiv, PubSubMessageHeaders pubSubMessageHeaders) Write a record with new DIV to a predetermined partition. -
buildPutPayload
-
delete
public Future<PubSubProduceResult> delete(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper) DIV pass-through mode for delete -
update
public Future<PubSubProduceResult> update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback) - Specified by:
updatein classAbstractVeniceWriter<K,V, U>
-
update
public CompletableFuture<PubSubProduceResult> update(K key, U update, int valueSchemaId, int derivedSchemaId, long logicalTimestamp, PubSubProducerCallback callback) - Specified by:
updatein classAbstractVeniceWriter<K,V, U>
-
update
public CompletableFuture<PubSubProduceResult> update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback, long logicalTs) -
broadcastStartOfPush
- Parameters:
debugInfo- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastStartOfPush
-
broadcastStartOfPush
public void broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Map<String, String> debugInfo) - Parameters:
sorted- whether the messages between 'StartOfPush' control messages and 'EndOfPush' control message in current topic partition is lexicographically sorted by key byteschunked- whether the push has chunking support enabled, in which case all keys are to be appended withChunkedKeySuffixand thePutmay contain either chunks orChunkedValueManifestrecords, in addition to regular (small) values.compressionStrategy- the store-version'sCompressionStrategydebugInfo- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastStartOfPush
public void broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Optional<ByteBuffer> optionalCompressionDictionary, Map<String, String> debugInfo) - Parameters:
sorted- whether the messages between 'StartOfPush' control messages and 'EndOfPush' control message in current topic partition is lexicographically sorted by key byteschunked- whether the push has chunking support enabled, in which case all keys are to be appended withChunkedKeySuffixand thePutmay contain either chunks orChunkedValueManifestrecords, in addition to regular (small) values.compressionStrategy- the store-version'sCompressionStrategyoptionalCompressionDictionary- The raw bytes of dictionary used to compress/decompress records.debugInfo- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastEndOfPush
This function might need synchronized locking. It might be possible that segments are ended while another thread is broadcasting EOP. However, we could not trigger this scenario using tests. This issue could surface when we support fully auto stream reprocessing in future. Users would call broadcastEndOfPush inside their Samza processors which can have multiple threads. Here is an example without synchronization: - Thread A: broadcastControlMessage execution completes. - Thread B: is in the middle of executing broadcastControlMessage - Thread A: begins endAllSegments - Segments that Thread B was writing to have now been ended. Thus causing the consumer to see messages not inside segments.- Parameters:
debugInfo- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
broadcastTopicSwitch
-
broadcastVersionSwap
public void broadcastVersionSwap(@Nonnull String oldServingVersionTopic, @Nonnull String newServingVersionTopic, Map<String, String> debugInfo) Broadcast control message to the real-time or version topic partition. If it's broadcasted to the RT, the leader will consume the Version Swap message and produce it to the VT to be consumed by the followers. Partition high watermarks are left to local venice leader to prepare and then been produced to version topic partition.- Parameters:
oldServingVersionTopic- the version topic change capture consumer should switch from.newServingVersionTopic- the version topic change capture consumer should switch to.debugInfo- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
nonBlockingBroadcastVersionSwapWithRegionInfo
public List<CompletableFuture<PubSubProduceResult>> nonBlockingBroadcastVersionSwapWithRegionInfo(@Nonnull String oldServingVersionTopic, @Nonnull String newServingVersionTopic, @Nonnull String sourceRegion, @Nonnull String destinationRegion, long generationId, Map<String, String> debugInfo) Similar tobroadcastVersionSwap(String, String, Map)but with region info intended to guide Venice change capture consumer to perform version swap correctly in a true A/A setup (with unique writers in each region). The broadcast is also non-blocking and returns a list of future correspond to the control message write for each partition. The caller is responsible for waiting on the futures to ensure the control message is written to all partitions.- Parameters:
oldServingVersionTopic- the version topic change capture consumer should switch from.newServingVersionTopic- the version topic change capture consumer should switch to.sourceRegion- where the version swap event occurred.destinationRegion- of the RT topic where the original version swap message is being sent to.generationId- to identify this version switch event when there are multiple version switch events.debugInfo- arbitrary key/value pairs of information that will be propagated alongside the control message.- Returns:
- List of futures for each partition version swap message that was sent to.
-
broadcastStartOfIncrementalPush
-
broadcastEndOfIncrementalPush
-
closePartition
public void closePartition(int partition) Close a single partition from this writer. It will send a final EOS to the partition and remove it from segment map.- Parameters:
partition- The partition to be closed.
-
sendMessage
protected CompletableFuture<PubSubProduceResult> sendMessage(VeniceWriter.KeyProvider keyProvider, MessageType messageType, Object payload, int partition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) Data message like PUT and DELETE should call this API to enable DIV check. -
putLargeValue
protected CompletableFuture<PubSubProduceResult> putLargeValue(byte[] serializedKey, byte[] serializedValue, int valueSchemaId, PubSubProducerCallback callback, int partition, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, boolean isGlobalRtDiv) This function implements chunking of a large value into many small values. -
sendStartOfSegment
Send aControlMessageType.START_OF_SEGMENTcontrol message into the designated partition.- Parameters:
partition- the Kafka partition to write to.debugInfo- arbitrary key/value pairs of information that will be propagated alongside the control message.
-
sendControlMessageWithRetriesForNonExistentTopic
public void sendControlMessageWithRetriesForNonExistentTopic(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) This function sends a control message into the prescribed partition. If the Kafka topic does not exist, this function will try again for a total ofmaxAttemptsWhenTopicMissingattempts. Note that this retry behavior does not happen insendMessage(KeyProvider, MessageType, Object, int, PubSubProducerCallback, LeaderMetadataWrapper, long)because that function returns aFuture, and it isFuture.get()which throws the relevant exception. In any case, the topic should be seeded with aControlMessageType.START_OF_SEGMENTat first, and therefore, there should be no cases where a topic has not been created yet, and we attempt to write a data message first, prior to a control message. If a topic did disappear later on in theVeniceWriter's lifecycle, then it would be appropriate to let thatFuturefail. This function has a synchronized block because if the retries need to be exercised, then it would cause a DIV failure if another message slipped in after the first attempt and before the eventually successful attempt.- Parameters:
controlMessage- aControlMessageinstance to persist into Kafka.partition- the Kafka partition to write to.debugInfo- arbitrary key/value pairs of information that will be propagated alongside the control message.callback- callback to execute when the record has been acknowledged by the Kafka server (null means no callback)
-
sendControlMessage
public CompletableFuture<PubSubProduceResult> sendControlMessage(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) Main function for sending control messages. Synchronization is most minimal and there is no error checking. See also:sendControlMessageWithRetriesForNonExistentTopic(ControlMessage, int, Map, PubSubProducerCallback, LeaderMetadataWrapper) -
asyncSendControlMessage
public Future<PubSubProduceResult> asyncSendControlMessage(ControlMessage controlMessage, int partition, Map<String, String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper) This API should be only used in Leader/Standby model for store ingestion. Producer DIV will be recalculated (not DIV pass-through mode); checksum for the input partition in this producer will also be updated. -
getHeartbeatKME
public static KafkaMessageEnvelope getHeartbeatKME(long originTimeStampMs, LeaderMetadataWrapper leaderMetadataWrapper, ControlMessage heartBeatMessage, String writerId) -
sendHeartbeat
public CompletableFuture<PubSubProduceResult> sendHeartbeat(PubSubTopicPartition topicPartition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs) -
sendHeartbeat
public CompletableFuture<PubSubProduceResult> sendHeartbeat(String topicName, int partitionNumber, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs) -
getKafkaMessageEnvelope
public KafkaMessageEnvelope getKafkaMessageEnvelope(MessageType messageType, boolean isEndOfSegment, int partition, boolean incrementSequenceNumber, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs) A utility function to centralize some boiler plate code for the instantiation ofSpecificRecordclasses holding the content of our Kafka values. Note: the payloadUnion must be set on the return object before producing into Kafka.- Parameters:
messageType- an instance of theMessageTypeenum.- Returns:
- A
KafkaMessageEnvelopefor producing into Kafka
-
getPartition
protected int getPartition(byte[] key) - Parameters:
key- theKafkaKeyfor which we want to get the partition.- Returns:
- the partition number that the provided key belongs to.
-
getTime
-
getMaxRecordSizeBytes
public int getMaxRecordSizeBytes() -
isRecordTooLarge
public boolean isRecordTooLarge(int recordSize) -
isChunkingNeededForRecord
public boolean isChunkingNeededForRecord(int recordSize) -
getMaxSizeForUserPayloadPerMessageInBytes
public int getMaxSizeForUserPayloadPerMessageInBytes() -
getDestination
- Returns:
- Returns a string of format: topicName@brokerAddress
-
getProducerAdapter
-