Class VeniceSystemProducer

java.lang.Object
com.linkedin.venice.samza.VeniceSystemProducer
All Implemented Interfaces:
Closeable, AutoCloseable, org.apache.samza.system.SystemProducer

public class VeniceSystemProducer extends Object implements org.apache.samza.system.SystemProducer, Closeable
VeniceSystemProducer defines the interfaces for Samza jobs to send data to Venice stores. Samza jobs talk to either parent or child controller depending on the aggregate mode config. The decision of which controller should be used is made in VeniceSystemFactory. The "Primary Controller" term is used to refer to whichever controller the Samza job should talk to. The primary controller should be: 1. The parent controller when the Venice system is deployed in a multi-colo mode and either: a. Version.PushType is Version.PushType.BATCH or Version.PushType.STREAM_REPROCESSING; 2. The child controller when either: a. The Venice system is deployed in a single-colo mode; or b. The Version.PushType is Version.PushType.STREAM and the job is configured to write data in NON_AGGREGATE mode
  • Constructor Details

    • VeniceSystemProducer

      @Deprecated public VeniceSystemProducer(String primaryControllerColoD2ZKHost, String primaryControllerD2ServiceName, String storeName, Version.PushType pushType, String samzaJobId, String runningFabric, boolean verifyLatestProtocolPresent, VeniceSystemFactory factory, Optional<SSLFactory> sslFactory, Optional<String> partitioners)
      Deprecated.
    • VeniceSystemProducer

      public VeniceSystemProducer(String veniceChildD2ZkHost, String primaryControllerColoD2ZKHost, String primaryControllerD2ServiceName, String storeName, Version.PushType pushType, String samzaJobId, String runningFabric, boolean verifyLatestProtocolPresent, VeniceSystemFactory factory, Optional<SSLFactory> sslFactory, Optional<String> partitioners)
      Constructs a new instance of VeniceSystemProducer.

      This constructor is equivalent to calling the full constructor with SystemTime.INSTANCE as the time provider.

      Parameters:
      veniceChildD2ZkHost - ZK host for the Venice child fabric.
      primaryControllerColoD2ZKHost - ZK host for the primary controller colo.
      primaryControllerD2ServiceName - D2 service name of the primary controller.
      storeName - Name of the Venice store.
      pushType - The push type (e.g., batch, stream).
      samzaJobId - The Samza job ID.
      runningFabric - The name of the current fabric.
      verifyLatestProtocolPresent - Whether to verify that the latest protocol is present.
      factory - The system factory used to create producer components.
      sslFactory - Optional SSL factory for secure communication.
      partitioners - Optional partitioner class name(s) for custom partitioning.
    • VeniceSystemProducer

      @Deprecated public VeniceSystemProducer(String primaryControllerColoD2ZKHost, String primaryControllerD2ServiceName, String storeName, Version.PushType pushType, String samzaJobId, String runningFabric, boolean verifyLatestProtocolPresent, VeniceSystemFactory factory, Optional<SSLFactory> sslFactory, Optional<String> partitioners, Time time)
      Deprecated.
    • VeniceSystemProducer

      public VeniceSystemProducer(String veniceChildD2ZkHost, String primaryControllerColoD2ZKHost, String primaryControllerD2ServiceName, String storeName, Version.PushType pushType, String samzaJobId, String runningFabric, boolean verifyLatestProtocolPresent, VeniceSystemFactory factory, Optional<SSLFactory> sslFactory, Optional<String> partitioners, Time time)
      Construct a new instance of VeniceSystemProducer
      Parameters:
      veniceChildD2ZkHost - D2 Zk Address where the components in the child colo are announcing themselves
      primaryControllerColoD2ZKHost - D2 Zk Address of the colo where the primary controller resides
      primaryControllerD2ServiceName - The service name that the primary controller uses to announce itself to D2
      storeName - The store to write to
      pushType - The Version.PushType to use to write to the store
      samzaJobId - A unique id used to identify jobs that can concurrently write to the same store
      runningFabric - The colo where the job is running. It is used to find the best destination for the data to be written to
      verifyLatestProtocolPresent - Config to check whether the protocol versions used at runtime are valid in Venice backend
      factory - The VeniceSystemFactory object that was used to create this object
      sslFactory - An optional SSLFactory that is used to communicate with other components using SSL
      partitioners - A list of comma-separated partitioners class names that are supported.
      time - An object of type Time. It is helpful to be configurable for testing.
    • VeniceSystemProducer

      public VeniceSystemProducer(String discoveryUrl, String storeName, Version.PushType pushType, String samzaJobId, String runningFabric, boolean verifyLatestProtocolPresent, VeniceSystemFactory factory, Optional<SSLFactory> sslFactory, Optional<String> partitioners, Time time)
  • Method Details

    • applyAdditionalConfigs

      public void applyAdditionalConfigs(Map<String,String> additionalConfigs)
    • setRouterUrl

      public void setRouterUrl(String routerUrl)
    • getRunningFabric

      public String getRunningFabric()
    • controllerRequestWithRetry

      protected ControllerResponse controllerRequestWithRetry(Supplier<ControllerResponse> supplier, int retryLimit)
    • getTopicName

      public String getTopicName()
    • getVeniceWriter

      protected AbstractVeniceWriter<byte[],byte[],byte[]> getVeniceWriter(VersionCreationResponse store)
      This method is overridden and not used by LinkedIn internally. Please update the overridden method accordingly after modifying this method.
    • extractConcurrentProducerConfig

      protected static void extractConcurrentProducerConfig(Properties veniceWriterProperties, VeniceWriterOptions.Builder builder)
    • getVeniceWriter

      protected AbstractVeniceWriter<byte[],byte[],byte[]> getVeniceWriter(VersionCreationResponse store, Properties veniceWriterProperties)
      Please don't remove this method since it is still being used by LinkedIn internally.
    • setupClientsAndReInitProvider

      protected void setupClientsAndReInitProvider()
    • start

      public void start()
      Specified by:
      start in interface org.apache.samza.system.SystemProducer
    • close

      public void close()
      Specified by:
      close in interface AutoCloseable
      Specified by:
      close in interface Closeable
    • stop

      public void stop()
      Specified by:
      stop in interface org.apache.samza.system.SystemProducer
    • register

      public void register(String source)
      Specified by:
      register in interface org.apache.samza.system.SystemProducer
    • send

      public void send(String source, org.apache.samza.system.OutgoingMessageEnvelope outgoingMessageEnvelope)
      Specified by:
      send in interface org.apache.samza.system.SystemProducer
    • send

      protected CompletableFuture<Void> send(Object keyObject, Object valueObject)
    • put

      public CompletableFuture<Void> put(Object keyObject, Object valueObject)
    • delete

      public CompletableFuture<Void> delete(Object keyObject)
    • flush

      public void flush(String s)
      Flushing the data to Venice store in case VeniceSystemProducer buffers message.
      Specified by:
      flush in interface org.apache.samza.system.SystemProducer
      Parameters:
      s - String representing the source of the message. Currently, VeniceSystemProducer is not using this param.
    • convertPartialUpdateToFullPut

      protected Object convertPartialUpdateToFullPut(Pair<Integer,Integer> schemaIds, Object incomingWriteValueObject)
    • setExitMode

      public void setExitMode(SamzaExitMode exitMode)
    • getKafkaBootstrapServers

      public String getKafkaBootstrapServers()
      Test methods
    • getInternalWriter

      public AbstractVeniceWriter<byte[],byte[],byte[]> getInternalWriter()
    • setControllerClient

      protected void setControllerClient(D2ControllerClient controllerClient)