Package com.linkedin.davinci.validation
Class KafkaDataIntegrityValidator
- java.lang.Object
-
- com.linkedin.davinci.validation.KafkaDataIntegrityValidator
-
public class KafkaDataIntegrityValidator extends java.lang.Object
This class is a library that can validate the Kafka message during consumption, which can be used in Venice-Server/Da-Vinci and ETL. In high level, it keeps track of messages produced by different producers, and validates data integrity in 4 perspectives: 1. Whether a segment starts from a non-zero sequence number (UNREGISTERED_PRODUCER); 2. Whether there is a gap between segments or within a segment (MISSING); 3. Whether data within a segment is corrupted (CORRUPT); 4. Whether producers have produced duplicate messages, which is fine and expected due to producer retries (DUPLICATE).
-
-
Field Summary
Fields Modifier and Type Field Description static long
DISABLED
protected java.util.function.IntFunction<PartitionTracker>
partitionTrackerCreator
protected SparseConcurrentList<PartitionTracker>
partitionTrackers
Keeps track of every upstream producer this consumer task has seen so far for each partition.
-
Constructor Summary
Constructors Constructor Description KafkaDataIntegrityValidator(java.lang.String topicName)
KafkaDataIntegrityValidator(java.lang.String topicName, long kafkaLogCompactionDelayInMs)
This constructor is used by a proprietary ETL project.KafkaDataIntegrityValidator(java.lang.String topicName, long kafkaLogCompactionDelayInMs, long maxAgeInMs)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description void
checkMissingMessage(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, java.util.Optional<PartitionTracker.DIVErrorMetricCallback> errorMetricCallback)
Only check for missing sequence number; segment starting from a positive sequence number is acceptable considering real-time buffer replay would start in the middle of a segment; checksum is also ignored for the same reason.void
clearPartition(int partition)
In some cases, such as when resetting offsets or unsubscribing from a partition, thePartitionTracker
should forget about the state that it accumulated for a given partition.void
cloneProducerStates(int partition, KafkaDataIntegrityValidator newValidator)
void
setPartitionState(PartitionTracker.TopicType type, int partition, OffsetRecord offsetRecord)
void
updateOffsetRecordForPartition(PartitionTracker.TopicType type, int partition, OffsetRecord offsetRecord)
For a given partition, find all the producers that has written to this partition and update the offsetRecord using segment information.void
validateMessage(PartitionTracker.TopicType type, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, boolean endOfPushReceived, Lazy<java.lang.Boolean> tolerateMissingMsgs)
Run a thorough DIV check on the message, including UNREGISTERED_PRODUCER, MISSING, CORRUPT and DUPLICATE.
-
-
-
Field Detail
-
DISABLED
public static final long DISABLED
- See Also:
- Constant Field Values
-
partitionTrackers
protected final SparseConcurrentList<PartitionTracker> partitionTrackers
Keeps track of every upstream producer this consumer task has seen so far for each partition.
-
partitionTrackerCreator
protected final java.util.function.IntFunction<PartitionTracker> partitionTrackerCreator
-
-
Constructor Detail
-
KafkaDataIntegrityValidator
public KafkaDataIntegrityValidator(java.lang.String topicName)
-
KafkaDataIntegrityValidator
public KafkaDataIntegrityValidator(java.lang.String topicName, long kafkaLogCompactionDelayInMs)
This constructor is used by a proprietary ETL project. Do not clean up (yet)! TODO: Open source the ETL or make it stop depending on an exotic open source API
-
KafkaDataIntegrityValidator
public KafkaDataIntegrityValidator(java.lang.String topicName, long kafkaLogCompactionDelayInMs, long maxAgeInMs)
-
-
Method Detail
-
clearPartition
public void clearPartition(int partition)
In some cases, such as when resetting offsets or unsubscribing from a partition, thePartitionTracker
should forget about the state that it accumulated for a given partition.- Parameters:
partition
- to clear state for
-
setPartitionState
public void setPartitionState(PartitionTracker.TopicType type, int partition, OffsetRecord offsetRecord)
-
validateMessage
public void validateMessage(PartitionTracker.TopicType type, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, boolean endOfPushReceived, Lazy<java.lang.Boolean> tolerateMissingMsgs) throws DataValidationException
Run a thorough DIV check on the message, including UNREGISTERED_PRODUCER, MISSING, CORRUPT and DUPLICATE.- Throws:
DataValidationException
-
updateOffsetRecordForPartition
public void updateOffsetRecordForPartition(PartitionTracker.TopicType type, int partition, OffsetRecord offsetRecord)
For a given partition, find all the producers that has written to this partition and update the offsetRecord using segment information. Prior to this, the state which is expired according tomaxAgeInMs
will be cleared.- Parameters:
partition
- to extract info foroffsetRecord
- to modify
-
cloneProducerStates
public void cloneProducerStates(int partition, KafkaDataIntegrityValidator newValidator)
-
checkMissingMessage
public void checkMissingMessage(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, java.util.Optional<PartitionTracker.DIVErrorMetricCallback> errorMetricCallback) throws DataValidationException
Only check for missing sequence number; segment starting from a positive sequence number is acceptable considering real-time buffer replay would start in the middle of a segment; checksum is also ignored for the same reason. If missing message happens for an old message that is older than the Kafka log compaction lag threshold, MISSING exception will not be thrown because it's expected that log compaction would compact old messages. However, if data are fresh and missing message is detected, MISSING exception will be thrown. This API is used by a proprietary ETL project. Do not clean up (yet)! TODO: Open source the ETL or make it stop depending on an exotic open source API- Throws:
DataValidationException
-
-