Class ApacheKafkaUtils
-
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final org.apache.kafka.common.header.internals.RecordHeaders
Mandatory Kafka SSL configs when SSL is enabled. -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionstatic org.apache.kafka.common.header.internals.RecordHeaders
static Properties
getValidKafkaClientProperties
(VeniceProperties veniceProperties, PubSubSecurityProtocol securityProtocol, Set<String> validKafkaClientSpecificConfigKeys, Set<String> kafkaConfigPrefixes) Extracts and returns only the valid Kafka client configuration properties from the providedVeniceProperties
.static boolean
isKafkaProtocolValid
(String kafkaProtocol) static boolean
validateAndCopyKafkaSSLConfig
(PubSubSecurityProtocol securityProtocol, VeniceProperties veniceProperties, Properties properties) This function will extract SSL related config if Kafka SSL is enabled.
-
Field Details
-
EMPTY_RECORD_HEADERS
public static final org.apache.kafka.common.header.internals.RecordHeaders EMPTY_RECORD_HEADERS -
KAFKA_SSL_MANDATORY_CONFIGS
Mandatory Kafka SSL configs when SSL is enabled.
-
-
Constructor Details
-
ApacheKafkaUtils
public ApacheKafkaUtils()
-
-
Method Details
-
convertToKafkaSpecificHeaders
public static org.apache.kafka.common.header.internals.RecordHeaders convertToKafkaSpecificHeaders(PubSubMessageHeaders headers) -
getValidKafkaClientProperties
public static Properties getValidKafkaClientProperties(VeniceProperties veniceProperties, PubSubSecurityProtocol securityProtocol, Set<String> validKafkaClientSpecificConfigKeys, Set<String> kafkaConfigPrefixes) Extracts and returns only the valid Kafka client configuration properties from the providedVeniceProperties
.This method filters the provided properties against a supplied set of valid configuration keys specific to a Kafka client type (e.g.,
ProducerConfig.configNames()
,ConsumerConfig.configNames()
, orAdminClientConfig.configNames()
).In addition to the client-specific configuration keys, this method always retains common SASL-related properties defined in
KAFKA_SASL_CONFIGS
. If the extracted configuration specifies a Kafka security protocol that implies SSL (e.g.,SSL
orSASL_SSL
), it also validates that all required SSL configurations are present. These required keys are defined inKAFKA_SSL_MANDATORY_CONFIGS
. If any mandatory SSL property is missing or an invalid security protocol is specified, aVeniceException
is thrown.This utility is intended for safely extracting Kafka configuration subsets suitable for initializing Kafka
Producer
,Consumer
, orAdminClient
instances.- Parameters:
veniceProperties
- The sourceVeniceProperties
containing client configuration.securityProtocol
- The Kafka security protocol to be used (e.g.,PLAINTEXT
validKafkaClientSpecificConfigKeys
- The set of config keys valid for the specific Kafka client type.- Returns:
- A
Properties
object containing only valid and required Kafka client configurations. - Throws:
VeniceException
- if required SSL configs are missing or an invalid protocol is specified.
-
validateAndCopyKafkaSSLConfig
public static boolean validateAndCopyKafkaSSLConfig(PubSubSecurityProtocol securityProtocol, VeniceProperties veniceProperties, Properties properties) This function will extract SSL related config if Kafka SSL is enabled.- Parameters:
veniceProperties
-properties
-- Returns:
- whether Kafka SSL is enabled or not0
-
isKafkaProtocolValid
-