Package com.linkedin.venice.pubsub.api
Class PubSubProducerAdapterConcurrentDelegator
- java.lang.Object
-
- com.linkedin.venice.pubsub.api.PubSubProducerAdapterConcurrentDelegator
-
- All Implemented Interfaces:
PubSubProducerAdapter
public class PubSubProducerAdapterConcurrentDelegator extends java.lang.Object implements PubSubProducerAdapter
This class will spin up multiple producer instances to speed up the producing. 1. Maintain a buffer queue per producer. 2. The writes to the same partition will be routed to the same queue all the time to guarantee ordering. 3. PubSubProducer in each thread will constantly be polling the corresponding queue and write to broker. Here are the main reasons we would like to leverage this new strategy in streaming processing job: 1. In the batch push job, each reducer only produces to one single Venice partition, so the multiple-thread-multiple-producer wouldn't help much. 2. In Venice-Server leader replica, it is natural to have multiple consumers (threads) producing to different Venice partitions as it is using partition-wise shared consumer assignment strategy, and multiple producers can help, and it will be covered byPubSubProducerAdapterDelegator
. 3. Online writer is sharing a similar pattern as #2, but so far, we haven't heard any complains regarding online writer performance, so we will leave it out of scope for now. 4. Streaming job is typically running inside container and single-threaded to guarantee ordering, but the writes to Venice store can belong to several Venice partitions. We observed that KafkaProducer compression would introduce a lot of overheads for large records, which will slow down Producer#send and if Producer#send is slow, the streaming job throughput will be heavily affected. This strategy considers the following aspects: a. We still want to keep compression on producer end as the streaming app has enough cpu resources. b. We would like to utilize more cpu resources to improve the producing throughput. c. We would like to get round of the contention issue (KafkaProducer has several synchronized sections for sending messages to broker) when running in a multiple-threaded env. Here is how this strategy would fulfill #4. 1. Execute the Producer#send logic in a multi-thread env to better utilize cpu resources. 2. Each thread will have its ownPubSubProducerAdapter
to avoid contention. 3. The mapping between Venice partition and buffered producer is sticky, so the ordering is still guaranteed per Venice partition. The above analysis is correct in the high-level, but there are still some nuances, where this new strategy can perform better than the default one in VPJ reducer and Venice Server leader replicas: 1. If the record processing time is comparable with the producer#send latency, the new strategy will delegate the producing logic to a separate thread, which can potentially improve the overall throughput if there are enough cpu resources. 2. If the record processing time is minimal comparing with the producer#send latency, this new strategy won't help much. We will explore ^ after validating this new strategy in stream processing app.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description static class
PubSubProducerAdapterConcurrentDelegator.ProducerQueueDrainer
static class
PubSubProducerAdapterConcurrentDelegator.ProducerQueueNode
-
Constructor Summary
Constructors Constructor Description PubSubProducerAdapterConcurrentDelegator(java.lang.String producingTopic, int producerThreadCount, int producerQueueSize, java.util.function.Supplier<PubSubProducerAdapter> producerAdapterSupplier)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
close(long closeTimeOutMs)
void
flush()
java.lang.String
getBrokerAddress()
it.unimi.dsi.fastutil.objects.Object2DoubleMap<java.lang.String>
getMeasurableProducerMetrics()
int
getNumberOfPartitions(java.lang.String topic)
The support for the following two getNumberOfPartitions APIs will be removed.boolean
hasAnyProducerStopped()
java.util.concurrent.CompletableFuture<PubSubProduceResult>
sendMessage(java.lang.String topic, java.lang.Integer partition, KafkaKey key, KafkaMessageEnvelope value, PubSubMessageHeaders pubSubMessageHeaders, PubSubProducerCallback pubSubProducerCallback)
Sends a message to a PubSub topic asynchronously and returns aFuture
representing the result of the produce operation.
-
-
-
Constructor Detail
-
PubSubProducerAdapterConcurrentDelegator
public PubSubProducerAdapterConcurrentDelegator(java.lang.String producingTopic, int producerThreadCount, int producerQueueSize, java.util.function.Supplier<PubSubProducerAdapter> producerAdapterSupplier)
-
-
Method Detail
-
getNumberOfPartitions
public int getNumberOfPartitions(java.lang.String topic)
Description copied from interface:PubSubProducerAdapter
The support for the following two getNumberOfPartitions APIs will be removed.- Specified by:
getNumberOfPartitions
in interfacePubSubProducerAdapter
-
hasAnyProducerStopped
public boolean hasAnyProducerStopped()
-
sendMessage
public java.util.concurrent.CompletableFuture<PubSubProduceResult> sendMessage(java.lang.String topic, java.lang.Integer partition, KafkaKey key, KafkaMessageEnvelope value, PubSubMessageHeaders pubSubMessageHeaders, PubSubProducerCallback pubSubProducerCallback)
Description copied from interface:PubSubProducerAdapter
Sends a message to a PubSub topic asynchronously and returns aFuture
representing the result of the produce operation.- Specified by:
sendMessage
in interfacePubSubProducerAdapter
- Parameters:
topic
- The name of the Kafka topic to which the message will be sent.partition
- The partition to which the message should be sent.key
- The key associated with the message, used for partitioning and message retrieval.value
- The message payload to be sent to the PubSubTopic topic.pubSubMessageHeaders
- Additional headers to be included with the message.pubSubProducerCallback
- An optional callback to handle the result of the produce operation.- Returns:
- A
Future
representing the result of the produce operation.
-
flush
public void flush()
- Specified by:
flush
in interfacePubSubProducerAdapter
-
close
public void close(long closeTimeOutMs)
- Specified by:
close
in interfacePubSubProducerAdapter
-
getMeasurableProducerMetrics
public it.unimi.dsi.fastutil.objects.Object2DoubleMap<java.lang.String> getMeasurableProducerMetrics()
- Specified by:
getMeasurableProducerMetrics
in interfacePubSubProducerAdapter
-
getBrokerAddress
public java.lang.String getBrokerAddress()
- Specified by:
getBrokerAddress
in interfacePubSubProducerAdapter
-
-