Class ApacheKafkaUtils
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final org.apache.kafka.common.header.internals.RecordHeadersMandatory Kafka SSL configs when SSL is enabled. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic org.apache.kafka.common.header.internals.RecordHeadersstatic PropertiesgetValidKafkaClientProperties(VeniceProperties veniceProperties, PubSubSecurityProtocol securityProtocol, Set<String> validKafkaClientSpecificConfigKeys, Set<String> kafkaConfigPrefixes) Extracts and returns only the valid Kafka client configuration properties from the providedVeniceProperties.static booleanisKafkaProtocolValid(String kafkaProtocol) static booleanvalidateAndCopyKafkaSSLConfig(PubSubSecurityProtocol securityProtocol, VeniceProperties veniceProperties, Properties properties) This function will extract SSL related config if Kafka SSL is enabled.
-
Field Details
-
EMPTY_RECORD_HEADERS
public static final org.apache.kafka.common.header.internals.RecordHeaders EMPTY_RECORD_HEADERS -
KAFKA_SSL_MANDATORY_CONFIGS
Mandatory Kafka SSL configs when SSL is enabled.
-
-
Constructor Details
-
ApacheKafkaUtils
public ApacheKafkaUtils()
-
-
Method Details
-
convertToKafkaSpecificHeaders
public static org.apache.kafka.common.header.internals.RecordHeaders convertToKafkaSpecificHeaders(PubSubMessageHeaders headers) -
getValidKafkaClientProperties
public static Properties getValidKafkaClientProperties(VeniceProperties veniceProperties, PubSubSecurityProtocol securityProtocol, Set<String> validKafkaClientSpecificConfigKeys, Set<String> kafkaConfigPrefixes) Extracts and returns only the valid Kafka client configuration properties from the providedVeniceProperties.This method filters the provided properties against a supplied set of valid configuration keys specific to a Kafka client type (e.g.,
ProducerConfig.configNames(),ConsumerConfig.configNames(), orAdminClientConfig.configNames()).In addition to the client-specific configuration keys, this method always retains common SASL-related properties defined in
KAFKA_SASL_CONFIGS. If the extracted configuration specifies a Kafka security protocol that implies SSL (e.g.,SSLorSASL_SSL), it also validates that all required SSL configurations are present. These required keys are defined inKAFKA_SSL_MANDATORY_CONFIGS. If any mandatory SSL property is missing or an invalid security protocol is specified, aVeniceExceptionis thrown.This utility is intended for safely extracting Kafka configuration subsets suitable for initializing Kafka
Producer,Consumer, orAdminClientinstances.- Parameters:
veniceProperties- The sourceVenicePropertiescontaining client configuration.securityProtocol- The Kafka security protocol to be used (e.g.,PLAINTEXTvalidKafkaClientSpecificConfigKeys- The set of config keys valid for the specific Kafka client type.- Returns:
- A
Propertiesobject containing only valid and required Kafka client configurations. - Throws:
VeniceException- if required SSL configs are missing or an invalid protocol is specified.
-
validateAndCopyKafkaSSLConfig
public static boolean validateAndCopyKafkaSSLConfig(PubSubSecurityProtocol securityProtocol, VeniceProperties veniceProperties, Properties properties) This function will extract SSL related config if Kafka SSL is enabled.- Parameters:
veniceProperties-properties-- Returns:
- whether Kafka SSL is enabled or not0
-
isKafkaProtocolValid
-