Package com.linkedin.davinci.validation
Class DataIntegrityValidator
java.lang.Object
com.linkedin.davinci.validation.DataIntegrityValidator
Colloquially known as DIV (Data Integrity Validator).
 This class is a library that can validate the topic message during consumption, which can be used in
 Venice-Server/Da-Vinci and ETL. In high level, it keeps track of messages produced by different producers,
 and validates data integrity in 4 perspectives:
 1. Whether a segment starts from a non-zero sequence number (UNREGISTERED_PRODUCER);
 2. Whether there is a gap between segments or within a segment (MISSING);
 3. Whether data within a segment is corrupted (CORRUPT);
 4. Whether producers have produced duplicate messages, which is fine and expected due to producer retries (DUPLICATE).
- 
Field SummaryFieldsModifier and TypeFieldDescriptionstatic final longprotected final IntFunction<PartitionTracker>protected final SparseConcurrentList<PartitionTracker>Keeps track of every upstream producer this consumer task has seen so far for each partition.
- 
Constructor SummaryConstructorsConstructorDescriptionDataIntegrityValidator(String topicName, PubSubPositionDeserializer pubSubPositionDeserializer) DataIntegrityValidator(String topicName, PubSubPositionDeserializer pubSubPositionDeserializer, long logCompactionDelayInMs) This constructor is used by a proprietary ETL project.DataIntegrityValidator(String topicName, PubSubPositionDeserializer pubSubPositionDeserializer, long logCompactionDelayInMs, long maxAgeInMs) 
- 
Method SummaryModifier and TypeMethodDescriptionvoidcheckMissingMessage(DefaultPubSubMessage consumerRecord, Optional<PartitionTracker.DIVErrorMetricCallback> errorMetricCallback) Only check for missing sequence number; segment starting from a positive sequence number is acceptable considering real-time buffer replay would start in the middle of a segment; checksum is also ignored for the same reason.voidclearPartition(int partition) In some cases, such as when resetting offsets or unsubscribing from a partition, thePartitionTrackershould forget about the state that it accumulated for a given partition.voidclearRtSegments(int partition) cloneRtProducerStates(int partition, String brokerUrl) Returns the RT DIV state for a given partition and broker URL pair.cloneVtProducerStates(int partition) Returns the VT DIV state and latest consumed vt offset for a given partition.voidcloneVtProducerStates(int partition, DataIntegrityValidator newValidator) booleanhasGlobalRtDivState(int partition) booleanhasVtDivState(int partition) voidsetPartitionState(PartitionTracker.TopicType type, int partition, OffsetRecord offsetRecord) voidsetPartitionState(PartitionTracker.TopicType type, int partition, Map<CharSequence, ProducerPartitionState> producerPartitionStateMap) voidupdateLatestConsumedVtPosition(int partition, PubSubPosition vtPosition) voidupdateOffsetRecordForPartition(PartitionTracker.TopicType type, int partition, OffsetRecord offsetRecord) For a given partition, find all the producers that has written to this partition and update the offsetRecord using segment information.voidvalidateMessage(PartitionTracker.TopicType type, DefaultPubSubMessage consumerRecord, boolean endOfPushReceived, Lazy<Boolean> tolerateMissingMsgs) Run a thorough DIV check on the message, including UNREGISTERED_PRODUCER, MISSING, CORRUPT and DUPLICATE.
- 
Field Details- 
DISABLEDpublic static final long DISABLED- See Also:
 
- 
partitionTrackersKeeps track of every upstream producer this consumer task has seen so far for each partition.
- 
partitionTrackerCreator
 
- 
- 
Constructor Details- 
DataIntegrityValidatorpublic DataIntegrityValidator(String topicName, PubSubPositionDeserializer pubSubPositionDeserializer) 
- 
DataIntegrityValidatorpublic DataIntegrityValidator(String topicName, PubSubPositionDeserializer pubSubPositionDeserializer, long logCompactionDelayInMs) This constructor is used by a proprietary ETL project. Do not clean up (yet)! TODO: Open source the ETL or make it stop depending on an exotic open source API
- 
DataIntegrityValidatorpublic DataIntegrityValidator(String topicName, PubSubPositionDeserializer pubSubPositionDeserializer, long logCompactionDelayInMs, long maxAgeInMs) 
 
- 
- 
Method Details- 
clearPartitionpublic void clearPartition(int partition) In some cases, such as when resetting offsets or unsubscribing from a partition, thePartitionTrackershould forget about the state that it accumulated for a given partition.- Parameters:
- partition- to clear state for
 
- 
clearRtSegmentspublic void clearRtSegments(int partition) 
- 
setPartitionStatepublic void setPartitionState(PartitionTracker.TopicType type, int partition, OffsetRecord offsetRecord) 
- 
setPartitionStatepublic void setPartitionState(PartitionTracker.TopicType type, int partition, Map<CharSequence, ProducerPartitionState> producerPartitionStateMap) 
- 
validateMessagepublic void validateMessage(PartitionTracker.TopicType type, DefaultPubSubMessage consumerRecord, boolean endOfPushReceived, Lazy<Boolean> tolerateMissingMsgs) throws DataValidationException Run a thorough DIV check on the message, including UNREGISTERED_PRODUCER, MISSING, CORRUPT and DUPLICATE.- Throws:
- DataValidationException
 
- 
updateOffsetRecordForPartitionpublic void updateOffsetRecordForPartition(PartitionTracker.TopicType type, int partition, OffsetRecord offsetRecord) For a given partition, find all the producers that has written to this partition and update the offsetRecord using segment information. Prior to this, the state which is expired according tomaxAgeInMswill be cleared.- Parameters:
- partition- to extract info for
- offsetRecord- to modify
 
- 
cloneVtProducerStates
- 
cloneRtProducerStatesReturns the RT DIV state for a given partition and broker URL pair.
- 
cloneVtProducerStatesReturns the VT DIV state and latest consumed vt offset for a given partition.
- 
checkMissingMessagepublic void checkMissingMessage(DefaultPubSubMessage consumerRecord, Optional<PartitionTracker.DIVErrorMetricCallback> errorMetricCallback) throws DataValidationException Only check for missing sequence number; segment starting from a positive sequence number is acceptable considering real-time buffer replay would start in the middle of a segment; checksum is also ignored for the same reason. If missing message happens for an old message that is older than the log compaction lag threshold, MISSING exception will not be thrown because it's expected that log compaction would compact old messages. However, if data are fresh and missing message is detected, MISSING exception will be thrown. This API is used by a proprietary ETL project. Do not clean up (yet)! TODO: Open source the ETL or make it stop depending on an exotic open source API- Throws:
- DataValidationException
 
- 
updateLatestConsumedVtPosition
- 
hasGlobalRtDivStatepublic boolean hasGlobalRtDivState(int partition) 
- 
hasVtDivStatepublic boolean hasVtDivState(int partition) 
 
-