Class VeniceWriter<K,V,U>

java.lang.Object
com.linkedin.venice.writer.AbstractVeniceWriter<K,V,U>
com.linkedin.venice.writer.VeniceWriter<K,V,U>
All Implemented Interfaces:
Closeable, AutoCloseable

public class VeniceWriter<K,V,U> extends AbstractVeniceWriter<K,V,U>
Class which acts as the primary writer API.
  • Field Details

    • VENICE_WRITER_CONFIG_PREFIX

      public static final String VENICE_WRITER_CONFIG_PREFIX
      See Also:
    • CLOSE_TIMEOUT_MS

      public static final String CLOSE_TIMEOUT_MS
      See Also:
    • CHECK_SUM_TYPE

      public static final String CHECK_SUM_TYPE
      See Also:
    • ENABLE_CHUNKING

      public static final String ENABLE_CHUNKING
      See Also:
    • ENABLE_RMD_CHUNKING

      public static final String ENABLE_RMD_CHUNKING
      See Also:
    • MAX_ATTEMPTS_WHEN_TOPIC_MISSING

      public static final String MAX_ATTEMPTS_WHEN_TOPIC_MISSING
      See Also:
    • MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS

      public static final String MAX_ELAPSED_TIME_FOR_SEGMENT_IN_MS
      A negative value or 0 will disable the feature.
      See Also:
    • MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES

      public static final String MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
      Chunk size. Default: 972800 N.B.: This must be configured in relation to the following configs: 1. Kafka broker's global setting: `message.max.bytes` (default is 1000012, or ~976 KB) 2. Kafka broker's topic setting: `max.message.bytes` (default is 1000012, or ~976 KB) 3. Kafka producer's setting: `max.request.size` (default is 1048576)
      See Also:
    • MAX_RECORD_SIZE_BYTES

      public static final String MAX_RECORD_SIZE_BYTES
      Maximum Venice record size. Default: -1 Large records can cause performance issues, so this setting is used to detect and prevent them. Not to be confused with Kafka record size (which is the ~1MB limit MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES is designed to comply with). Venice records refer to a Venice key-value pair, which can be spread across 1+ Kafka records / events. Basically: Chunking Not Needed < ~1MB < Chunking Needed < Max Record Size 1. If a batch push data contains records larger than this setting, the push job will fail. 2. If a partial update creates records larger than this setting, consumption will be paused and manual intervention will be necessary.
      See Also:
    • PRODUCER_THREAD_COUNT

      public static final String PRODUCER_THREAD_COUNT
      See Also:
    • PRODUCER_QUEUE_SIZE

      public static final String PRODUCER_QUEUE_SIZE
      See Also:
    • DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES

      public static final int DEFAULT_MAX_SIZE_FOR_USER_PAYLOAD_PER_MESSAGE_IN_BYTES
      950 KB for user payload should be conservative enough.
      See Also:
    • UNLIMITED_MAX_RECORD_SIZE

      public static final int UNLIMITED_MAX_RECORD_SIZE
      The default for maxRecordSizeBytes is unlimited / unset (-1) just to be safe. A more specific default value should be set using com.linkedin.venice.ConfigKeys#CONTROLLER_DEFAULT_MAX_RECORD_SIZE_BYTES the controller config on the cluster level.
      See Also:
    • DEFAULT_CLOSE_TIMEOUT_MS

      public static final int DEFAULT_CLOSE_TIMEOUT_MS
      This controls the Kafka producer's close timeout.
      See Also:
    • DEFAULT_CHECK_SUM_TYPE

      public static final String DEFAULT_CHECK_SUM_TYPE
      Default checksum type. N.B.: Only MD5 (and having no checksums) supports checkpointing mid-checksum.
    • DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSING

      public static final int DEFAULT_MAX_ATTEMPTS_WHEN_TOPIC_MISSING
      Default number of attempts when trying to produce to a Kafka topic and an exception is caught saying the topic is missing.
      See Also:
    • DEFAULT_UPSTREAM_OFFSET

      public static final long DEFAULT_UPSTREAM_OFFSET
      The default value of the "upstreamOffset" field in avro record LeaderMetadata. Even though we have set the default value for "upstreamOffset" field as -1, the initial value for the long field "upstreamOffset" is still 0 when we construct a LeaderMetadata record. Default field values are primarily used when reading records that don't have those fields, typically when we deserialize a record from older version to newer version.
    • DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID

      public static final int DEFAULT_UPSTREAM_KAFKA_CLUSTER_ID
    • OPEN_VENICE_WRITER_COUNT

      public static final AtomicLong OPEN_VENICE_WRITER_COUNT
      A static counter shared by all VeniceWriter instances to track the number of active VeniceWriter
    • VENICE_WRITER_CLOSE_FAILED_COUNT

      public static final AtomicLong VENICE_WRITER_CLOSE_FAILED_COUNT
      A static counter shared by all VeniceWriter instances to track the number of VeniceWriter that fails to close
    • VENICE_DEFAULT_LOGICAL_TS

      public static final long VENICE_DEFAULT_LOGICAL_TS
      This sentinel value indicates that the venice samza apps do not support the logical timestamp.
      See Also:
    • APP_DEFAULT_LOGICAL_TS

      public static final long APP_DEFAULT_LOGICAL_TS
      This sentinel value indicates that the venice samza apps have not provided the logical timestamp.
      See Also:
    • VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID

      public static final int VENICE_DEFAULT_TIMESTAMP_METADATA_VERSION_ID
      This sentinel value indicates that metadata version id is not present.
      See Also:
    • VENICE_DEFAULT_VALUE_SCHEMA_ID

      public static final int VENICE_DEFAULT_VALUE_SCHEMA_ID
      This sentinel value indicates that value schema id is not present.
      See Also:
    • EMPTY_BYTE_ARRAY

      public static final byte[] EMPTY_BYTE_ARRAY
    • EMPTY_BYTE_BUFFER

      public static final ByteBuffer EMPTY_BYTE_BUFFER
    • DEFAULT_LEADER_METADATA_WRAPPER

      public static final LeaderMetadataWrapper DEFAULT_LEADER_METADATA_WRAPPER
  • Constructor Details

    • VeniceWriter

      public VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter)
    • VeniceWriter

      public VeniceWriter(VeniceWriterOptions params, VeniceProperties props, PubSubProducerAdapter producerAdapter, org.apache.avro.Schema overrideProtocolSchema)
      This constructor is currently used only in tests, in order to override the behavior of passing the protocol schema into the header of control messages.
      Parameters:
      overrideProtocolSchema - The schema to pass in CM headers, or null to omit that header entirely
  • Method Details

    • generateHeartbeatMessage

      public static ControlMessage generateHeartbeatMessage(CheckSumType checkSumType)
    • close

      public void close(boolean gracefulClose)
      Close the VeniceWriter
      Specified by:
      close in class AbstractVeniceWriter<K,V,U>
      Parameters:
      gracefulClose - whether to end the segments and send END_OF_SEGMENT control message.
    • closeAsync

      public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulClose)
    • closeAsync

      public CompletableFuture<VeniceResourceCloseResult> closeAsync(boolean gracefulClose, boolean retryOnGracefulCloseFailure)
    • close

      public void close()
    • flush

      public void flush()
      Call flush on the internal PubSubProducerAdapter.
      Specified by:
      flush in class AbstractVeniceWriter<K,V,U>
    • toString

      public String toString()
      Overrides:
      toString in class Object
    • getProducerGUID

      public GUID getProducerGUID()
    • getTopicName

      public String getTopicName()
      Overrides:
      getTopicName in class AbstractVeniceWriter<K,V,U>
      Returns:
      the Kafka topic name that this VeniceWriter instance writes into.
    • delete

      Execute a standard "delete" on the key.
      Parameters:
      key - - The key to delete in storage.
      callback - - callback will be executed after Kafka producer completes on sending the message.
      Returns:
      a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
    • delete

      public CompletableFuture<PubSubProduceResult> delete(K key, long logicalTs, PubSubProducerCallback callback)
      Execute a standard "delete" on the key.
      Parameters:
      key - - The key to delete in storage.
      logicalTs - - An timestamp field to indicate when this record was produced from apps point of view.
      callback - - callback will be executed after Kafka producer completes on sending the message.
      Returns:
      a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
    • delete

      public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
      Execute a standard "delete" on the key.
      Parameters:
      key - - The key to delete in storage.
      callback - - Callback function invoked by Kafka producer after sending the message.
      leaderMetadataWrapper - - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.
      Returns:
      a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
    • delete

      public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs)
      Execute a standard "delete" on the key.
      Parameters:
      key - - The key to delete in storage.
      callback - - callback will be executed after Kafka producer completes on sending the message.
      leaderMetadataWrapper - - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.
      logicalTs - - An timestamp field to indicate when this record was produced from apps point of view.
      Returns:
      a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
    • delete

      public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata)
      Execute a standard "delete" on the key.
      Parameters:
      key - - The key to delete in storage.
      callback - - callback will be executed after Kafka producer completes on sending the message.
      leaderMetadataWrapper - - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a delete message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.
      deleteMetadata - - a DeleteMetadata containing replication metadata related fields.
      Returns:
      a java.util.concurrent.CompletableFuture Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
    • delete

      public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, DeleteMetadata deleteMetadata)
      Specified by:
      delete in class AbstractVeniceWriter<K,V,U>
    • deleteDeprecatedChunk

      public void deleteDeprecatedChunk(byte[] serializedKey, int partition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, DeleteMetadata deleteMetadata)
      This method produces a DELETE request to a deprecated chunk key.
    • delete

      public CompletableFuture<PubSubProduceResult> delete(K key, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, DeleteMetadata deleteMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest)
      Execute a standard "delete" on the key.
      Parameters:
      key - - The key to delete in storage.
      callback - - Callback function invoked by Kafka producer after sending the message.
      leaderMetadataWrapper - - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a DELETE message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.
      logicalTs - - An timestamp field to indicate when this record was produced from apps point of view.
      deleteMetadata - - a DeleteMetadata containing replication metadata related fields (can be null).
      Returns:
      a java.util.concurrent.CompletableFuture. Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.CompletableFuture's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
    • put

      public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback)
      Execute a standard "put" on the key.
      Specified by:
      put in class AbstractVeniceWriter<K,V,U>
      Parameters:
      key - - The key to put in storage.
      value - - The value to be associated with the given key
      valueSchemaId - - value schema id for the given value
      callback - - Callback function invoked by Kafka producer after sending the message
      Returns:
      a java.util.concurrent.Future. Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
    • put

      public Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, PutMetadata putMetadata)
      Specified by:
      put in class AbstractVeniceWriter<K,V,U>
    • put

      public Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, long logicalTs, PubSubProducerCallback callback)
      Execute a standard "put" on the key.
      Parameters:
      key - - The key to put in storage.
      value - - The value to be associated with the given key
      valueSchemaId - - value schema id for the given value
      logicalTs - - A timestamp field to indicate when this record was produced from apps view.
      callback - - Callback function invoked by Kafka producer after sending the message
      Returns:
      a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
    • put

      public Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
      VeniceWriter in the leader replica should call this API to fulfill extra metadata information --- upstreamOffset. UpstreamOffset is the offset of PUT message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a put message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.
    • put

      public Future<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest)
    • put

      public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata)
    • put

      public CompletableFuture<PubSubProduceResult> put(K key, V value, int valueSchemaId, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs, PutMetadata putMetadata, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest)
      Execute a standard "put" on the key. VeniceReducer and VeniceSystemProducer should call this API.
      Parameters:
      key - - The key to put in storage.
      value - - The value to be associated with the given key
      valueSchemaId - - value schema id for the given value
      callback - - Callback function invoked by Kafka producer after sending the message
      leaderMetadataWrapper - - The leader Metadata of this message in the source topic: -1: VeniceWriter is sending this message in a Samza app to the real-time topic; or it's sending the message in VPJ plugin to the version topic; >=0: Leader replica consumes a put message from real-time topic, VeniceWriter in leader is sending this message to version topic with extra info: offset in the real-time topic.
      logicalTs - - An timestamp field to indicate when this record was produced from apps view.
      putMetadata - - a PutMetadata containing replication metadata related fields (can be null).
      Returns:
      a java.util.concurrent.Future Future for the RecordMetadata that will be assigned to this record. Invoking java.util.concurrent.Future's get() on this future will block until the associated request completes and then return the metadata for the record or throw any exception that occurred while sending the record.
    • put

      @Deprecated public Future<PubSubProduceResult> put(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper)
      Deprecated.
      Write a message with the kafka message envelope (KME) passed in. This allows users re-using existing KME to speed up the performance. If this is called, VeniceWriter will also reuse the existing DIV data (producer metadata). It's the "pass-through" mode. TODO: move pass-through supports into a server-specific extension of VeniceWriter
    • delete

      public Future<PubSubProduceResult> delete(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, PubSubProducerCallback callback, int upstreamPartition, LeaderMetadataWrapper leaderMetadataWrapper)
      DIV pass-through mode for delete
    • update

      public Future<PubSubProduceResult> update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback)
      Specified by:
      update in class AbstractVeniceWriter<K,V,U>
    • update

      public Future<PubSubProduceResult> update(K key, U update, int valueSchemaId, int derivedSchemaId, PubSubProducerCallback callback, long logicalTs)
    • broadcastStartOfPush

      public void broadcastStartOfPush(Map<String,String> debugInfo)
      Parameters:
      debugInfo - arbitrary key/value pairs of information that will be propagated alongside the control message.
    • broadcastStartOfPush

      public void broadcastStartOfPush(boolean sorted, Map<String,String> debugInfo)
    • broadcastStartOfPush

      public void broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Map<String,String> debugInfo)
      Parameters:
      sorted - whether the messages between 'StartOfPush' control messages and 'EndOfPush' control message in current topic partition is lexicographically sorted by key bytes
      chunked - whether the push has chunking support enabled, in which case all keys are to be appended with ChunkedKeySuffix and the Put may contain either chunks or ChunkedValueManifest records, in addition to regular (small) values.
      compressionStrategy - the store-version's CompressionStrategy
      debugInfo - arbitrary key/value pairs of information that will be propagated alongside the control message.
    • broadcastStartOfPush

      public void broadcastStartOfPush(boolean sorted, boolean chunked, CompressionStrategy compressionStrategy, Optional<ByteBuffer> optionalCompressionDictionary, Map<String,String> debugInfo)
      Parameters:
      sorted - whether the messages between 'StartOfPush' control messages and 'EndOfPush' control message in current topic partition is lexicographically sorted by key bytes
      chunked - whether the push has chunking support enabled, in which case all keys are to be appended with ChunkedKeySuffix and the Put may contain either chunks or ChunkedValueManifest records, in addition to regular (small) values.
      compressionStrategy - the store-version's CompressionStrategy
      optionalCompressionDictionary - The raw bytes of dictionary used to compress/decompress records.
      debugInfo - arbitrary key/value pairs of information that will be propagated alongside the control message.
    • broadcastEndOfPush

      public void broadcastEndOfPush(Map<String,String> debugInfo)
      This function might need synchronized locking. It might be possible that segments are ended while another thread is broadcasting EOP. However, we could not trigger this scenario using tests. This issue could surface when we support fully auto stream reprocessing in future. Users would call broadcastEndOfPush inside their Samza processors which can have multiple threads. Here is an example without synchronization: - Thread A: broadcastControlMessage execution completes. - Thread B: is in the middle of executing broadcastControlMessage - Thread A: begins endAllSegments - Segments that Thread B was writing to have now been ended. Thus causing the consumer to see messages not inside segments.
      Parameters:
      debugInfo - arbitrary key/value pairs of information that will be propagated alongside the control message.
    • broadcastTopicSwitch

      public void broadcastTopicSwitch(@Nonnull List<CharSequence> sourceKafkaCluster, @Nonnull String sourceTopicName, @Nonnull Long rewindStartTimestamp, Map<String,String> debugInfo)
    • broadcastVersionSwap

      public void broadcastVersionSwap(@Nonnull String oldServingVersionTopic, @Nonnull String newServingVersionTopic, Map<String,String> debugInfo)
      Broadcast control message to real-time topic partition, to be consumed by venice leader. Partition high watermarks are left to local venice leader to prepare and then been produced to version topic partition.
      Parameters:
      oldServingVersionTopic - the version topic change capture consumer should switch from.
      newServingVersionTopic - the version topic change capture consumer should switch to.
      debugInfo - arbitrary key/value pairs of information that will be propagated alongside the control message.
    • broadcastStartOfIncrementalPush

      public void broadcastStartOfIncrementalPush(String version, Map<String,String> debugInfo)
    • broadcastEndOfIncrementalPush

      public void broadcastEndOfIncrementalPush(String version, Map<String,String> debugInfo)
    • closePartition

      public void closePartition(int partition)
      Close a single partition from this writer. It will send a final EOS to the partition and remove it from segment map.
      Parameters:
      partition - The partition to be closed.
    • getLeaderCompleteStateHeader

      public static PubSubMessageHeader getLeaderCompleteStateHeader(LeaderCompleteState leaderCompleteState)
    • sendStartOfSegment

      public void sendStartOfSegment(int partition, Map<String,String> debugInfo)
      Send a ControlMessageType.START_OF_SEGMENT control message into the designated partition.
      Parameters:
      partition - the Kafka partition to write to.
      debugInfo - arbitrary key/value pairs of information that will be propagated alongside the control message.
    • sendControlMessageWithRetriesForNonExistentTopic

      public void sendControlMessageWithRetriesForNonExistentTopic(ControlMessage controlMessage, int partition, Map<String,String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
      This function sends a control message into the prescribed partition. If the Kafka topic does not exist, this function will try again for a total of maxAttemptsWhenTopicMissing attempts. Note that this retry behavior does not happen in sendMessage(KeyProvider, MessageType, Object, int, PubSubProducerCallback, LeaderMetadataWrapper, long) because that function returns a Future, and it is Future.get() which throws the relevant exception. In any case, the topic should be seeded with a ControlMessageType.START_OF_SEGMENT at first, and therefore, there should be no cases where a topic has not been created yet, and we attempt to write a data message first, prior to a control message. If a topic did disappear later on in the VeniceWriter's lifecycle, then it would be appropriate to let that Future fail. This function has a synchronized block because if the retries need to be exercised, then it would cause a DIV failure if another message slipped in after the first attempt and before the eventually successful attempt.
      Parameters:
      controlMessage - a ControlMessage instance to persist into Kafka.
      partition - the Kafka partition to write to.
      debugInfo - arbitrary key/value pairs of information that will be propagated alongside the control message.
      callback - callback to execute when the record has been acknowledged by the Kafka server (null means no callback)
    • sendControlMessage

      public CompletableFuture<PubSubProduceResult> sendControlMessage(ControlMessage controlMessage, int partition, Map<String,String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
      Main function for sending control messages. Synchronization is most minimal and there is no error checking. See also: sendControlMessageWithRetriesForNonExistentTopic(ControlMessage, int, Map, PubSubProducerCallback, LeaderMetadataWrapper)
    • asyncSendControlMessage

      public Future<PubSubProduceResult> asyncSendControlMessage(ControlMessage controlMessage, int partition, Map<String,String> debugInfo, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper)
      This API should be only used in Leader/Standby model for store ingestion. Producer DIV will be recalculated (not DIV pass-through mode); checksum for the input partition in this producer will also be updated.
    • getHeartbeatKME

      public static KafkaMessageEnvelope getHeartbeatKME(long originTimeStampMs, LeaderMetadataWrapper leaderMetadataWrapper, ControlMessage heartBeatMessage, String writerId)
    • sendHeartbeat

      public CompletableFuture<PubSubProduceResult> sendHeartbeat(PubSubTopicPartition topicPartition, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs)
    • sendHeartbeat

      public CompletableFuture<PubSubProduceResult> sendHeartbeat(String topicName, int partitionNumber, PubSubProducerCallback callback, LeaderMetadataWrapper leaderMetadataWrapper, boolean addLeaderCompleteState, LeaderCompleteState leaderCompleteState, long originTimeStampMs)
    • getKafkaMessageEnvelope

      protected KafkaMessageEnvelope getKafkaMessageEnvelope(MessageType messageType, boolean isEndOfSegment, int partition, boolean incrementSequenceNumber, LeaderMetadataWrapper leaderMetadataWrapper, long logicalTs)
      A utility function to centralize some boiler plate code for the instantiation of SpecificRecord classes holding the content of our Kafka values. Note: the payloadUnion must be set on the return object before producing into Kafka.
      Parameters:
      messageType - an instance of the MessageType enum.
      Returns:
      A KafkaMessageEnvelope for producing into Kafka
    • getTime

      public Time getTime()
    • getMaxRecordSizeBytes

      public int getMaxRecordSizeBytes()
    • isRecordTooLarge

      public boolean isRecordTooLarge(int recordSize)
    • isChunkingNeededForRecord

      public boolean isChunkingNeededForRecord(int recordSize)
    • getMaxSizeForUserPayloadPerMessageInBytes

      public int getMaxSizeForUserPayloadPerMessageInBytes()
    • getDestination

      public String getDestination()
      Returns:
      Returns a string of format: topicName@brokerAddress
    • getProducerAdapter

      public PubSubProducerAdapter getProducerAdapter()