Class ApacheKafkaUtils

java.lang.Object
com.linkedin.venice.pubsub.adapter.kafka.ApacheKafkaUtils

public class ApacheKafkaUtils extends Object
  • Field Details

    • EMPTY_RECORD_HEADERS

      public static final org.apache.kafka.common.header.internals.RecordHeaders EMPTY_RECORD_HEADERS
    • KAFKA_SSL_MANDATORY_CONFIGS

      protected static final Set<String> KAFKA_SSL_MANDATORY_CONFIGS
      Mandatory Kafka SSL configs when SSL is enabled.
  • Constructor Details

    • ApacheKafkaUtils

      public ApacheKafkaUtils()
  • Method Details

    • convertToKafkaSpecificHeaders

      public static org.apache.kafka.common.header.internals.RecordHeaders convertToKafkaSpecificHeaders(PubSubMessageHeaders headers)
    • getValidKafkaClientProperties

      public static Properties getValidKafkaClientProperties(VeniceProperties veniceProperties, PubSubSecurityProtocol securityProtocol, Set<String> validKafkaClientSpecificConfigKeys, Set<String> kafkaConfigPrefixes)
      Extracts and returns only the valid Kafka client configuration properties from the provided VeniceProperties.

      This method filters the provided properties against a supplied set of valid configuration keys specific to a Kafka client type (e.g., ProducerConfig.configNames(), ConsumerConfig.configNames(), or AdminClientConfig.configNames()).

      In addition to the client-specific configuration keys, this method always retains common SASL-related properties defined in KAFKA_SASL_CONFIGS. If the extracted configuration specifies a Kafka security protocol that implies SSL (e.g., SSL or SASL_SSL), it also validates that all required SSL configurations are present. These required keys are defined in KAFKA_SSL_MANDATORY_CONFIGS. If any mandatory SSL property is missing or an invalid security protocol is specified, a VeniceException is thrown.

      This utility is intended for safely extracting Kafka configuration subsets suitable for initializing Kafka Producer, Consumer, or AdminClient instances.

      Parameters:
      veniceProperties - The source VeniceProperties containing client configuration.
      securityProtocol - The Kafka security protocol to be used (e.g., PLAINTEXT
      validKafkaClientSpecificConfigKeys - The set of config keys valid for the specific Kafka client type.
      Returns:
      A Properties object containing only valid and required Kafka client configurations.
      Throws:
      VeniceException - if required SSL configs are missing or an invalid protocol is specified.
    • validateAndCopyKafkaSSLConfig

      public static boolean validateAndCopyKafkaSSLConfig(PubSubSecurityProtocol securityProtocol, VeniceProperties veniceProperties, Properties properties)
      This function will extract SSL related config if Kafka SSL is enabled.
      Parameters:
      veniceProperties -
      properties -
      Returns:
      whether Kafka SSL is enabled or not0
    • isKafkaProtocolValid

      public static boolean isKafkaProtocolValid(String kafkaProtocol)