Class PubSubProducerAdapterConcurrentDelegator

  • All Implemented Interfaces:
    PubSubProducerAdapter

    public class PubSubProducerAdapterConcurrentDelegator
    extends java.lang.Object
    implements PubSubProducerAdapter
    This class will spin up multiple producer instances to speed up the producing. 1. Maintain a buffer queue per producer. 2. The writes to the same partition will be routed to the same queue all the time to guarantee ordering. 3. PubSubProducer in each thread will constantly be polling the corresponding queue and write to broker. Here are the main reasons we would like to leverage this new strategy in streaming processing job: 1. In the batch push job, each reducer only produces to one single Venice partition, so the multiple-thread-multiple-producer wouldn't help much. 2. In Venice-Server leader replica, it is natural to have multiple consumers (threads) producing to different Venice partitions as it is using partition-wise shared consumer assignment strategy, and multiple producers can help, and it will be covered by PubSubProducerAdapterDelegator. 3. Online writer is sharing a similar pattern as #2, but so far, we haven't heard any complains regarding online writer performance, so we will leave it out of scope for now. 4. Streaming job is typically running inside container and single-threaded to guarantee ordering, but the writes to Venice store can belong to several Venice partitions. We observed that KafkaProducer compression would introduce a lot of overheads for large records, which will slow down Producer#send and if Producer#send is slow, the streaming job throughput will be heavily affected. This strategy considers the following aspects: a. We still want to keep compression on producer end as the streaming app has enough cpu resources. b. We would like to utilize more cpu resources to improve the producing throughput. c. We would like to get round of the contention issue (KafkaProducer has several synchronized sections for sending messages to broker) when running in a multiple-threaded env. Here is how this strategy would fulfill #4. 1. Execute the Producer#send logic in a multi-thread env to better utilize cpu resources. 2. Each thread will have its own PubSubProducerAdapter to avoid contention. 3. The mapping between Venice partition and buffered producer is sticky, so the ordering is still guaranteed per Venice partition. The above analysis is correct in the high-level, but there are still some nuances, where this new strategy can perform better than the default one in VPJ reducer and Venice Server leader replicas: 1. If the record processing time is comparable with the producer#send latency, the new strategy will delegate the producing logic to a separate thread, which can potentially improve the overall throughput if there are enough cpu resources. 2. If the record processing time is minimal comparing with the producer#send latency, this new strategy won't help much. We will explore ^ after validating this new strategy in stream processing app.
    • Constructor Detail

      • PubSubProducerAdapterConcurrentDelegator

        public PubSubProducerAdapterConcurrentDelegator​(java.lang.String producingTopic,
                                                        int producerThreadCount,
                                                        int producerQueueSize,
                                                        java.util.function.Supplier<PubSubProducerAdapter> producerAdapterSupplier)
    • Method Detail

      • hasAnyProducerStopped

        public boolean hasAnyProducerStopped()
      • sendMessage

        public java.util.concurrent.CompletableFuture<PubSubProduceResult> sendMessage​(java.lang.String topic,
                                                                                       java.lang.Integer partition,
                                                                                       KafkaKey key,
                                                                                       KafkaMessageEnvelope value,
                                                                                       PubSubMessageHeaders pubSubMessageHeaders,
                                                                                       PubSubProducerCallback pubSubProducerCallback)
        Description copied from interface: PubSubProducerAdapter
        Sends a message to a PubSub topic asynchronously and returns a Future representing the result of the produce operation.
        Specified by:
        sendMessage in interface PubSubProducerAdapter
        Parameters:
        topic - The name of the Kafka topic to which the message will be sent.
        partition - The partition to which the message should be sent.
        key - The key associated with the message, used for partitioning and message retrieval.
        value - The message payload to be sent to the PubSubTopic topic.
        pubSubMessageHeaders - Additional headers to be included with the message.
        pubSubProducerCallback - An optional callback to handle the result of the produce operation.
        Returns:
        A Future representing the result of the produce operation.