Package com.linkedin.venice.pubsub.api
Class PubSubProducerAdapterConcurrentDelegator
java.lang.Object
com.linkedin.venice.pubsub.api.PubSubProducerAdapterConcurrentDelegator
- All Implemented Interfaces:
PubSubProducerAdapter
public class PubSubProducerAdapterConcurrentDelegator
extends Object
implements PubSubProducerAdapter
This class will spin up multiple producer instances to speed up the producing.
1. Maintain a buffer queue per producer.
2. The writes to the same partition will be routed to the same queue all the time to guarantee ordering.
3. PubSubProducer in each thread will constantly be polling the corresponding queue and write to broker.
Here are the main reasons we would like to leverage this new strategy in streaming processing job:
1. In the batch push job, each reducer only produces to one single Venice partition, so the multiple-thread-multiple-producer
wouldn't help much.
2. In Venice-Server leader replica, it is natural to have multiple consumers (threads) producing to different
Venice partitions as it is using partition-wise shared consumer assignment strategy, and multiple producers
can help, and it will be covered by
PubSubProducerAdapterDelegator
.
3. Online writer is sharing a similar pattern as #2, but so far, we haven't heard any complains regarding online
writer performance, so we will leave it out of scope for now.
4. Streaming job is typically running inside container and single-threaded to guarantee ordering, but the writes
to Venice store can belong to several Venice partitions. We observed that KafkaProducer compression
would introduce a lot of overheads for large records, which will slow down Producer#send and if Producer#send is slow,
the streaming job throughput will be heavily affected.
This strategy considers the following aspects:
a. We still want to keep compression on producer end as the streaming app has enough cpu resources.
b. We would like to utilize more cpu resources to improve the producing throughput.
c. We would like to get round of the contention issue (KafkaProducer has several synchronized sections for sending
messages to broker) when running in a multiple-threaded env.
Here is how this strategy would fulfill #4.
1. Execute the Producer#send logic in a multi-thread env to better utilize cpu resources.
2. Each thread will have its own PubSubProducerAdapter
to avoid contention.
3. The mapping between Venice partition and buffered producer is sticky, so the ordering is still guaranteed per
Venice partition.
The above analysis is correct in the high-level, but there are still some nuances, where this new strategy can perform
better than the default one in VPJ reducer and Venice Server leader replicas:
1. If the record processing time is comparable with the producer#send latency, the new strategy will delegate the producing
logic to a separate thread, which can potentially improve the overall throughput if there are enough cpu resources.
2. If the record processing time is minimal comparing with the producer#send latency, this new strategy won't help much.
We will explore ^ after validating this new strategy in stream processing app.-
Nested Class Summary
Modifier and TypeClassDescriptionstatic class
static class
-
Constructor Summary
ConstructorDescriptionPubSubProducerAdapterConcurrentDelegator
(String producingTopic, int producerThreadCount, int producerQueueSize, Supplier<PubSubProducerAdapter> producerAdapterSupplier) -
Method Summary
Modifier and TypeMethodDescriptionvoid
close
(long closeTimeOutMs) void
flush()
it.unimi.dsi.fastutil.objects.Object2DoubleMap<String>
int
getNumberOfPartitions
(String topic) The support for the following two getNumberOfPartitions APIs will be removed.boolean
sendMessage
(String topic, Integer partition, KafkaKey key, KafkaMessageEnvelope value, PubSubMessageHeaders pubSubMessageHeaders, PubSubProducerCallback pubSubProducerCallback) Sends a message to a PubSub topic asynchronously and returns aFuture
representing the result of the produce operation.
-
Constructor Details
-
PubSubProducerAdapterConcurrentDelegator
public PubSubProducerAdapterConcurrentDelegator(String producingTopic, int producerThreadCount, int producerQueueSize, Supplier<PubSubProducerAdapter> producerAdapterSupplier)
-
-
Method Details
-
getNumberOfPartitions
Description copied from interface:PubSubProducerAdapter
The support for the following two getNumberOfPartitions APIs will be removed.- Specified by:
getNumberOfPartitions
in interfacePubSubProducerAdapter
-
hasAnyProducerStopped
public boolean hasAnyProducerStopped() -
sendMessage
public CompletableFuture<PubSubProduceResult> sendMessage(String topic, Integer partition, KafkaKey key, KafkaMessageEnvelope value, PubSubMessageHeaders pubSubMessageHeaders, PubSubProducerCallback pubSubProducerCallback) Description copied from interface:PubSubProducerAdapter
Sends a message to a PubSub topic asynchronously and returns aFuture
representing the result of the produce operation.- Specified by:
sendMessage
in interfacePubSubProducerAdapter
- Parameters:
topic
- The name of the Kafka topic to which the message will be sent.partition
- The partition to which the message should be sent.key
- The key associated with the message, used for partitioning and message retrieval.value
- The message payload to be sent to the PubSubTopic topic.pubSubMessageHeaders
- Additional headers to be included with the message.pubSubProducerCallback
- An optional callback to handle the result of the produce operation.- Returns:
- A
Future
representing the result of the produce operation.
-
flush
public void flush()- Specified by:
flush
in interfacePubSubProducerAdapter
-
close
public void close(long closeTimeOutMs) - Specified by:
close
in interfacePubSubProducerAdapter
-
getMeasurableProducerMetrics
- Specified by:
getMeasurableProducerMetrics
in interfacePubSubProducerAdapter
-
getBrokerAddress
- Specified by:
getBrokerAddress
in interfacePubSubProducerAdapter
-