Class VeniceSystemFactory

java.lang.Object
com.linkedin.venice.samza.VeniceSystemFactory
All Implemented Interfaces:
Serializable, org.apache.samza.system.SystemFactory

public class VeniceSystemFactory extends Object implements org.apache.samza.system.SystemFactory, Serializable
Samza jobs talk to either parent or child controller depending on the aggregate mode config. The decision of which controller should be used is made in VeniceSystemFactory. The "Primary Controller" term is used to refer to whichever controller the Samza job should talk to. The primary controller should be: 1. The parent controller when the Venice system is deployed in a multi-colo mode and either: a. Version.PushType is Version.PushType.BATCH or Version.PushType.STREAM_REPROCESSING; or b. @deprecated Version.PushType is Version.PushType.STREAM and the job is configured to write data in AGGREGATE mode 2. The child controller when either: a. The Venice system is deployed in a single-colo mode; or b. The Version.PushType is Version.PushType.STREAM and the job is configured to write data in NON_AGGREGATE mode
See Also:
  • Field Details

    • LEGACY_CHILD_D2_ZK_HOSTS_PROPERTY

      public static final String LEGACY_CHILD_D2_ZK_HOSTS_PROPERTY
      See Also:
    • SYSTEMS_PREFIX

      public static final String SYSTEMS_PREFIX
      See Also:
    • DOT

      public static final String DOT
      See Also:
    • DEPLOYMENT_ID

      public static final String DEPLOYMENT_ID
      See Also:
    • VENICE_PUSH_TYPE

      public static final String VENICE_PUSH_TYPE
      See Also:
    • VENICE_STORE

      public static final String VENICE_STORE
      Venice store name Samza application is going to produce to.
      See Also:
    • VENICE_AGGREGATE

      public static final String VENICE_AGGREGATE
      Whether to talk to parent region controller. By default, it is 'false'. This will should only be set to true when using non-STREAM push type in multi-region setup.
      See Also:
    • VENICE_CHILD_D2_ZK_HOSTS

      public static final String VENICE_CHILD_D2_ZK_HOSTS
      D2 ZK hosts for Venice Child Cluster.
      See Also:
    • VENICE_CONTROLLER_DISCOVERY_URL

      public static final String VENICE_CONTROLLER_DISCOVERY_URL
      See Also:
    • VENICE_ROUTER_URL

      public static final String VENICE_ROUTER_URL
      See Also:
    • VENICE_PARENT_D2_ZK_HOSTS

      public static final String VENICE_PARENT_D2_ZK_HOSTS
      D2 ZK hosts for Venice Parent Cluster.
      See Also:
    • VENICE_CHILD_CONTROLLER_D2_SERVICE

      public static final String VENICE_CHILD_CONTROLLER_D2_SERVICE
      See Also:
    • VENICE_PARENT_CONTROLLER_D2_SERVICE

      public static final String VENICE_PARENT_CONTROLLER_D2_SERVICE
      See Also:
    • LEGACY_VENICE_CHILD_CONTROLLER_D2_SERVICE

      public static final String LEGACY_VENICE_CHILD_CONTROLLER_D2_SERVICE
      See Also:
    • LEGACY_VENICE_PARENT_CONTROLLER_D2_SERVICE

      public static final String LEGACY_VENICE_PARENT_CONTROLLER_D2_SERVICE
      See Also:
    • providedPrimaryControllerColoD2Client

      protected Optional<com.linkedin.d2.balancer.D2Client> providedPrimaryControllerColoD2Client
    • providedChildColoD2Client

      protected Optional<com.linkedin.d2.balancer.D2Client> providedChildColoD2Client
    • SSL_MANDATORY_CONFIGS

      protected static final List<String> SSL_MANDATORY_CONFIGS
      All the required configs to build a SSL Factory
  • Constructor Details

    • VeniceSystemFactory

      public VeniceSystemFactory()
  • Method Details

    • setProvidedD2Clients

      public void setProvidedD2Clients(Optional<com.linkedin.d2.balancer.D2Client> providedChildColoD2Client, Optional<com.linkedin.d2.balancer.D2Client> providedPrimaryControllerColoD2Client)
    • getAdmin

      public org.apache.samza.system.SystemAdmin getAdmin(String systemName, org.apache.samza.config.Config config)
      Specified by:
      getAdmin in interface org.apache.samza.system.SystemFactory
    • getConsumer

      public org.apache.samza.system.SystemConsumer getConsumer(String systemName, org.apache.samza.config.Config config, org.apache.samza.metrics.MetricsRegistry registry)
      Specified by:
      getConsumer in interface org.apache.samza.system.SystemFactory
    • getControllerDiscoveryUrl

      public Optional<String> getControllerDiscoveryUrl(org.apache.samza.config.Config config)
    • createSystemProducer

      @Deprecated protected org.apache.samza.system.SystemProducer createSystemProducer(String primaryControllerColoD2ZKHost, String primaryControllerD2Service, String storeName, Version.PushType venicePushType, String samzaJobId, String runningFabric, org.apache.samza.config.Config config, Optional<SSLFactory> sslFactory, Optional<String> partitioners)
      Deprecated.
      Left in to maintain backward compatibility
    • createSystemProducer

      @Deprecated protected org.apache.samza.system.SystemProducer createSystemProducer(String primaryControllerColoD2ZKHost, String primaryControllerD2Service, String storeName, Version.PushType venicePushType, String samzaJobId, String runningFabric, boolean verifyLatestProtocolPresent, org.apache.samza.config.Config config, Optional<SSLFactory> sslFactory, Optional<String> partitioners)
      Deprecated.
      Left in to maintain backward compatibility
    • createSystemProducer

      protected VeniceSystemProducer createSystemProducer(String veniceChildD2ZkHost, String primaryControllerColoD2ZKHost, String primaryControllerD2Service, String storeName, Version.PushType venicePushType, String samzaJobId, String runningFabric, boolean verifyLatestProtocolPresent, org.apache.samza.config.Config config, Optional<SSLFactory> sslFactory, Optional<String> partitioners)
      Constructs a new instance of VeniceSystemProducer.
      Parameters:
      veniceChildD2ZkHost - D2 ZooKeeper address where components in the child colo announce themselves.
      primaryControllerColoD2ZKHost - D2 ZooKeeper address of the colo where the primary controller resides.
      primaryControllerD2Service - The D2 service name used by the primary controller to register itself.
      storeName - The name of the Venice store to write to.
      venicePushType - The Version.PushType to use for writing to the store.
      samzaJobId - A unique identifier for jobs that may concurrently write to the same store.
      runningFabric - The colo where the job is running; used to determine the optimal destination for data writes.
      verifyLatestProtocolPresent - Whether to verify that the protocol versions used at runtime are valid in the Venice backend.
      config - A Config object that may be used by the factory to override and create a customized VeniceSystemProducer.
      sslFactory - Optional SSLFactory for communicating with other components over SSL.
      partitioners - Optional comma-separated list of supported partitioner class names.
      Returns:
      A configured VeniceSystemProducer instance.
    • createSystemProducer

      protected VeniceSystemProducer createSystemProducer(com.linkedin.d2.balancer.D2Client providedChildColoD2Client, com.linkedin.d2.balancer.D2Client providedPrimaryControllerColoD2Client, String primaryControllerD2Service, String storeName, Version.PushType venicePushType, String samzaJobId, String runningFabric, boolean verifyLatestProtocolPresent, org.apache.samza.config.Config config, Optional<SSLFactory> sslFactory, Optional<String> partitioners)
    • createSystemProducer

      protected VeniceSystemProducer createSystemProducer(String veniceChildD2ZkHost, String primaryControllerColoD2ZKHost, String primaryControllerD2Service, String storeName, Version.PushType venicePushType, String samzaJobId, String runningFabric, boolean verifyLatestProtocolPresent, Optional<SSLFactory> sslFactory, Optional<String> partitioners)
      Constructs a new instance of VeniceSystemProducer.
      Parameters:
      veniceChildD2ZkHost - D2 ZooKeeper address where components in the child colo announce themselves.
      primaryControllerColoD2ZKHost - D2 ZooKeeper address of the colo where the primary controller resides.
      primaryControllerD2Service - The D2 service name used by the primary controller to register itself.
      storeName - The name of the Venice store to write to.
      venicePushType - The Version.PushType to use for writing to the store.
      samzaJobId - A unique identifier for jobs that may concurrently write to the same store.
      runningFabric - The colo where the job is running; used to determine the optimal destination for writes.
      verifyLatestProtocolPresent - Whether to verify that the protocol versions used at runtime are valid in the Venice backend.
      sslFactory - Optional SSLFactory used for secure communication with other components.
      partitioners - Optional comma-separated list of partitioner class names to use.
      Returns:
      A configured VeniceSystemProducer instance.
    • getProducer

      public org.apache.samza.system.SystemProducer getProducer(String systemName, String storeName, boolean veniceAggregate, String pushTypeString, org.apache.samza.config.Config config)
      Samza table writer would directly call this function to create venice system producer instead of calling the general getProducer(String, Config, MetricsRegistry) function.
    • getProducer

      public org.apache.samza.system.SystemProducer getProducer(String systemName, org.apache.samza.config.Config config, org.apache.samza.metrics.MetricsRegistry registry)
      The core function of a SystemFactory; most Samza users would specify VeniceSystemFactory in the job config and Samza would invoke SystemFactory.getProducer(String, Config, MetricsRegistry) to create producers.
      Specified by:
      getProducer in interface org.apache.samza.system.SystemFactory
    • getClosableProducer

      public VeniceSystemProducer getClosableProducer(String systemName, org.apache.samza.config.Config config, org.apache.samza.metrics.MetricsRegistry registry)
      Convenience method to hide the ugliness of casting in just one place. Ideally, we would change the return type of getProducer(String, Config, MetricsRegistry) to VeniceSystemProducer but since there are existing users of this API, we are being extra careful not to disturb it. TODO: clean this up when we have the bandwidth to coordinate the refactoring with the existing users.
    • getNumberOfActiveSystemProducers

      public int getNumberOfActiveSystemProducers()
      Get the total number of active SystemProducer. The SystemProducer for push type: STREAM and BATCH will always be at active state; so if there is any real-time SystemProducer in the Samza task, the task will not be stopped even though all the stream reprocessing jobs have completed. Besides, a Samza task can not have a mix of BATCH push type and STREAM_REPROCESSING push type; otherwise, the Samza task can not be automatically stopped.
    • getOverallExecutionStatus

      public boolean getOverallExecutionStatus()
      Check whether all the stream reprocessing jobs have succeeded; return false if any of them fail.
    • endStreamReprocessingSystemProducer

      public void endStreamReprocessingSystemProducer(org.apache.samza.system.SystemProducer systemProducer, boolean jobSucceed)
      RouterBasedPushMonitor will update the status of a SystemProducer with push type STREAM_REPROCESSING: END_OF_PUSH_RECEIVED: isActive -> false; isStreamReprocessingJobSucceeded -> true COMPLETED: isActive -> false; isStreamReprocessingJobSucceeded -> true ERROR: isActive -> false; isStreamReprocessingJobSucceeded -> false For all the other push job status, SystemProducer status will not be updated.
    • getSslProperties

      protected Properties getSslProperties(org.apache.samza.config.Config samzaConfig)
      Build SSL properties based on the Samza job config
    • isEmpty

      protected static boolean isEmpty(String input)