Class LeaderFollowerStoreIngestionTask
- All Implemented Interfaces:
Closeable
,AutoCloseable
,Runnable
- Direct Known Subclasses:
ActiveActiveStoreIngestionTask
-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
StoreIngestionTask.DelegateConsumerRecordResult
-
Field Summary
Modifier and TypeFieldDescriptionprotected final boolean
protected final it.unimi.dsi.fastutil.ints.Int2ObjectMap<String>
static final Predicate<? super PartitionConsumptionState>
protected final AvroStoreDeserializerCache
protected static final LongPredicate
protected Lazy<VeniceWriter<byte[],
byte[], byte[]>> N.B.: With L/F+native replication and many Leader partitions getting assigned to a single SN thisVeniceWriter
may be called from multiple thread simultaneously, during start of batch push.protected final Lazy<VeniceWriter<byte[],
byte[], byte[]>> protected final Map<String,
VeniceViewWriter> Fields inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
aggKafkaConsumerService, availableSchemaIds, batchReportIncPushStatusEnabled, bootstrapTimeoutInMs, chunkAssembler, compressionStrategy, compressor, compressorFactory, consumerActionSequenceNumber, consumerActionsQueue, databaseSyncBytesIntervalForDeferredWriteMode, databaseSyncBytesIntervalForTransactionalMode, dataRecoverySourceVersionNumber, defaultReadyToServeChecker, deserializedSchemaIds, diskUsage, divErrorMetricCallback, emitMetrics, emptyPollSleepMs, errorPartitionId, gracefulShutdownLatch, hostLevelIngestionStats, hostName, hybridStoreConfig, idleCounter, ingestionNotificationDispatcher, ingestionTaskName, isChunked, isCurrentVersion, isDataRecovery, isDaVinciClient, isGlobalRtDivEnabled, isIsolatedIngestion, isRmdChunked, isRunning, isSeparatedRealtimeTopicEnabled, isWriteComputationEnabled, kafkaClusterUrlResolver, kafkaProps, kafkaVersionTopic, KILL_WAIT_TIME_MS, localKafkaClusterId, localKafkaServer, localKafkaServerSingletonSet, manifestSerializer, metaStoreWriter, parallelProcessingThreadPool, partitionConsumptionStateMap, partitionCount, partitionToPendingConsumerActionCountMap, pubSubTopicRepository, readCycleDelayMs, readOnlyForBatchOnlyStoreEnabled, realTimeTopic, recordLevelMetricEnabled, REDUNDANT_LOGGING_FILTER, resetErrorReplicaEnabled, SCHEMA_POLLING_DELAY_MS, schemaRepository, serverConfig, storageEngine, storageEngineRepository, storageMetadataService, storageService, STORE_VERSION_POLLING_DELAY_MS, storeBufferService, storeName, storeRepository, storeVersionPartitionCount, topicManagerRepository, versionedDIVStats, versionedIngestionStats, versionNumber, versionRole, versionTopic, WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED, workloadType, writeComputeFailureCode, zkHelixAdmin
-
Constructor Summary
ConstructorDescriptionLeaderFollowerStoreIngestionTask
(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerConfig recordTransformerConfig, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin) -
Method Summary
Modifier and TypeMethodDescriptionprotected void
addPartitionConsumptionState
(Integer partition, PartitionConsumptionState pcs) protected final double
calculateAssembledRecordSizeRatio
(long recordSize) calculateLeaderUpstreamOffsetWithTopicSwitch
(PartitionConsumptionState partitionConsumptionState, PubSubTopic newSourceTopic, List<CharSequence> unreachableBrokerList) protected static void
checkAndHandleUpstreamOffsetRewind
(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, long newUpstreamOffset, long previousUpstreamOffset, LeaderFollowerStoreIngestionTask ingestionTask) protected boolean
checkAndLogIfLagIsAcceptableForHybridStore
(PartitionConsumptionState pcs, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp) Checks whether the lag is acceptable for hybrid storesprotected void
The following function will be executed after processing all the quick actions in the consumer action queues, so that the long running actions doesn't block other partition's consumer actions.protected static boolean
checkWhetherToCloseUnusedVeniceWriter
(Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazy, Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterForRealTimeLazy, Map<Integer, PartitionConsumptionState> partitionConsumptionStateMap, Runnable reInitializeVeniceWriterLazyRunnable, String versionTopicName) protected void
void
closeVeniceWriters
(boolean doFlush) static VeniceWriter<byte[],
byte[], byte[]> constructVeniceWriter
(VeniceWriterFactory veniceWriterFactory, String topic, Version version, boolean producerCompressionEnabled, int producerCnt) void
consumerUnSubscribeAllTopics
(PartitionConsumptionState partitionConsumptionState) Unsubscribe from all the topics being consumed for the partition in partitionConsumptionStateprotected LeaderProducerCallback
createProducerCallback
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) delegateConsumerRecord
(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed.void
demoteToStandby
(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) protected void
getAndUpdateLeaderCompletedState
(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaValue, ControlMessage controlMessage, PubSubMessageHeaders pubSubMessageHeaders, PartitionConsumptionState partitionConsumptionState) HeartBeat SOS messages carry the leader completion state in the header.long
long
long
getConsumptionSourceKafkaAddress
(PartitionConsumptionState partitionConsumptionState) long
Measure the offset lag between follower and leaderlong
long
protected IngestionBatchProcessor
protected long
getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement
(PartitionConsumptionState pcs, String ignoredKafkaUrl) For regular L/F stores without A/A enabled, there is always only one real-time source.protected long
getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement
(PartitionConsumptionState pcs, String ignoredUpstreamKafkaUrl) For L/F or NR, there is only one entry in upstreamOffsetMap whose key is NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY.long
protected int
protected int
getRealTimeDataSourceKafkaAddress
(PartitionConsumptionState partitionConsumptionState) long
getRegionHybridOffsetLag
(int regionId) protected long
getTopicPartitionOffsetByKafkaURL
(CharSequence kafkaURL, PubSubTopicPartition pubSubTopicPartition, long rewindStartTimestamp) protected Lazy<VeniceWriter<byte[],
byte[], byte[]>> getVeniceWriter
(PartitionConsumptionState partitionConsumptionState) int
protected boolean
protected boolean
isHybridFollower
(PartitionConsumptionState partitionConsumptionState) protected static boolean
isLeader
(PartitionConsumptionState partitionConsumptionState) boolean
protected boolean
isRealTimeBufferReplayStarted
(PartitionConsumptionState partitionConsumptionState) protected void
leaderExecuteTopicSwitch
(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic) protected ByteBuffer
maybeCompressData
(int partition, ByteBuffer data, PartitionConsumptionState partitionConsumptionState) Compresses data in a bytebuffer when consuming from rt as a leader node and compression is enabled for the store version for which we're consuming data.For hybrid stores only, the leader periodically writes a special SOS message to the RT topic with the following properties:
1.protected long
measureHybridOffsetLag
(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) For Leader/Follower state model, we already keep track of the consumption progress in leader, so directly calculate the lag with the real-time topic and the leader consumption offset.protected long
measureRTOffsetLagForMultiRegions
(Set<String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) protected long
measureRTOffsetLagForSingleRegion
(String sourceRealTimeTopicKafkaURL, PartitionConsumptionState pcs, boolean shouldLog) This method fetches/calculates latest leader persisted offset and last offset in RT topic.protected void
processConsumerAction
(ConsumerAction message, Store store) protected void
processControlMessageForViews
(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation inLeaderFollowerStoreIngestionTask
protected void
processMessageAndMaybeProduceToKafka
(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) protected boolean
processTopicSwitch
(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState) ProcessTopicSwitch
control message at given partition offset for a specificPartitionConsumptionState
.protected void
produceToLocalKafka
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceFunction, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs) void
promoteToLeader
(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) protected void
queueUpVersionTopicWritesWithViewWriters
(PartitionConsumptionState partitionConsumptionState, Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor, Runnable versionTopicWrite) protected final void
recordAssembledRecordSizeRatio
(double ratio, long currentTimeMs) protected void
recordHeartbeatReceived
(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, String kafkaUrl) protected void
recordProcessedRecordStats
(PartitionConsumptionState partitionConsumptionState, int processedRecordSize) protected void
recordWriterStats
(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState) protected void
Check if the ingestion progress has reached to the end of the version topic.protected void
resubscribe
(PartitionConsumptionState partitionConsumptionState) Resubscribe operation by passing new version role and partition role toAggKafkaConsumerService
.protected void
resubscribeAsFollower
(PartitionConsumptionState partitionConsumptionState) protected void
resubscribeAsLeader
(PartitionConsumptionState partitionConsumptionState) protected boolean
For non AA hybrid stores with AGGREGATE DRP, SIT reads from parent RT while the HB is written to the child RTs.protected boolean
shouldCompressData
(PartitionConsumptionState partitionConsumptionState) protected boolean
shouldNewLeaderSwitchToRemoteConsumption
(PartitionConsumptionState partitionConsumptionState) protected boolean
shouldPersistRecord
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record, PartitionConsumptionState partitionConsumptionState) Additional safeguards in Leader/Follower ingestion: 1.protected boolean
For Leader/Follower model, the follower should have the same kind of check as the Online/Offline model; for leader, it's possible that it consumers from real-time topic or GF topic.protected boolean
shouldProduceToVersionTopic
(PartitionConsumptionState partitionConsumptionState) For the corresponding partition being tracked in `partitionConsumptionState`, if it's in LEADER state and it's not consuming from version topic, it should produce the new message to version topic; besides, if LEADER is consuming remotely, it should also produce to local fabric.protected void
startConsumingAsLeader
(PartitionConsumptionState partitionConsumptionState) protected void
startConsumingAsLeaderInTransitionFromStandby
(PartitionConsumptionState partitionConsumptionState) protected void
syncConsumedUpstreamRTOffsetMapIfNeeded
(PartitionConsumptionState pcs, Map<String, Long> upstreamStartOffsetByKafkaURL) protected void
syncTopicSwitchToIngestionMetadataService
(TopicSwitch topicSwitch, PartitionConsumptionState partitionConsumptionState) protected void
updateLatestInMemoryLeaderConsumedRTOffset
(PartitionConsumptionState pcs, String ignoredKafkaUrl, long offset) protected void
updateLatestInMemoryProcessedOffset
(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
.void
updateLeaderTopicOnFollower
(PartitionConsumptionState partitionConsumptionState) protected void
updateOffsetMetadataInOffsetRecord
(PartitionConsumptionState partitionConsumptionState) Sync the metadata about offset inOffsetRecord
.protected void
updateOffsetsFromConsumerRecord
(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateVersionTopicOffset updateVersionTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateUpstreamTopicOffset updateUpstreamTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.GetLastKnownUpstreamTopicOffset lastKnownUpstreamTopicOffsetSupplier, Supplier<String> sourceKafkaUrlSupplier, boolean dryRun) A helper function to the latest in-memory offsets processed by drainers inPartitionConsumptionState
, after processing the givenPubSubMessage
.protected Iterable<PubSubMessage<KafkaKey,
KafkaMessageEnvelope, Long>> validateAndFilterOutDuplicateMessagesFromLeaderTopic
(Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records, String kafkaUrl, PubSubTopicPartition topicPartition) protected void
waitForAllMessageToBeProcessedFromTopicPartition
(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) Besides draining messages in the drainer queue, wait for the last producer future.protected void
waitForLastLeaderPersistFuture
(PartitionConsumptionState partitionConsumptionState, String errorMsg) Methods inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
checkIngestionProgress, cloneProducerStates, close, consumerBatchUnsubscribe, consumerHasAnySubscription, consumerHasSubscription, consumerResetOffset, consumerSubscribe, createKafkaConsumerProperties, disableMetricsEmission, dropStoragePartitionGracefully, dumpPartitionConsumptionStates, dumpStoreVersionState, enableMetricsEmission, getCompressionStrategy, getCompressor, getFailedIngestionPartitionCount, getHostLevelIngestionStats, getIngestionTaskName, getKafkaVersionTopic, getOffsetToOnlineLagThresholdPerPartition, getPartitionConsumptionState, getPartitionOffsetLagBasedOnMetrics, getSchemaRepo, getServerConfig, getStorageEngine, getStoragePartitionConfig, getStoragePartitionConfig, getStoreName, getTopicManager, getTopicPartitionEndOffSet, getVersionedDIVStats, getVersionIngestionStats, getVersionNumber, getVersionTopic, hasAllPartitionReportedCompleted, hasAnyPartitionConsumptionState, hasAnySubscription, hasPendingPartitionIngestionAction, isActiveActiveReplicationEnabled, isChunked, isCurrentVersion, isFutureVersion, isHybridMode, isIngestionTaskActive, isMetricsEmissionEnabled, isPartitionConsumingOrHasPendingIngestionAction, isProducingVersionTopicHealthy, isReadyToServe, isRunning, isSegmentControlMsg, isSeparatedRealtimeTopicEnabled, isStuckByMemoryConstraint, isTransientRecordBufferUsed, isUserSystemStore, kill, logStorageOperationWhileUnsubscribed, measureLagWithCallToPubSub, measureLagWithCallToPubSub, minZeroLag, nextSeqNum, processCommonConsumerAction, processConsumerRecord, processEndOfIncrementalPush, processEndOfPush, processStartOfIncrementalPush, produceToStoreBufferService, produceToStoreBufferServiceOrKafka, produceToStoreBufferServiceOrKafkaInBatch, putInStorageEngine, recordAssembledRecordSize, recordChecksumVerificationFailure, removeFromStorageEngine, reportError, resetPartitionConsumptionOffset, resolveSourceKafkaServersWithinTopicSwitch, run, setLastConsumerException, setLastStoreIngestionException, setPartitionConsumptionState, shouldUpdateUpstreamOffset, shutdownAndWait, subscribePartition, subscribePartition, throwIfNotRunning, throwOrLogStorageFailureDependingIfStillSubscribed, unsubscribeFromTopic, unSubscribePartition, unSubscribePartition, updateIngestionRoleIfStoreChanged, updateOffsetMetadataAndSync, updateOffsetMetadataAndSyncOffset, validateMessage, waitVersionStateAvailable
-
Field Details
-
veniceWriter
N.B.: With L/F+native replication and many Leader partitions getting assigned to a single SN thisVeniceWriter
may be called from multiple thread simultaneously, during start of batch push. Therefore, we wrap it inLazy
to initialize it in a thread safe way and to ensure that only one instance is created for the entire ingestion task. Important: Please don't use these writers directly, and you should retrieve the writer fromPartitionConsumptionState.getVeniceWriterLazyRef()
when producing to the local topic. -
veniceWriterForRealTime
-
kafkaClusterIdToUrlMap
-
viewWriters
-
hasChangeCaptureView
protected final boolean hasChangeCaptureView -
storeDeserializerCache
-
VALID_LAG
-
LEADER_OFFSET_LAG_FILTER
-
-
Constructor Details
-
LeaderFollowerStoreIngestionTask
public LeaderFollowerStoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerConfig recordTransformerConfig, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
-
Method Details
-
constructVeniceWriter
public static VeniceWriter<byte[],byte[], constructVeniceWriterbyte[]> (VeniceWriterFactory veniceWriterFactory, String topic, Version version, boolean producerCompressionEnabled, int producerCnt) -
closeVeniceWriters
public void closeVeniceWriters(boolean doFlush) - Overrides:
closeVeniceWriters
in classStoreIngestionTask
-
closeVeniceViewWriters
protected void closeVeniceViewWriters()- Overrides:
closeVeniceViewWriters
in classStoreIngestionTask
-
getIngestionBatchProcessor
- Specified by:
getIngestionBatchProcessor
in classStoreIngestionTask
-
promoteToLeader
public void promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) - Specified by:
promoteToLeader
in classStoreIngestionTask
-
demoteToStandby
public void demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker) - Specified by:
demoteToStandby
in classStoreIngestionTask
-
processConsumerAction
protected void processConsumerAction(ConsumerAction message, Store store) throws InterruptedException - Specified by:
processConsumerAction
in classStoreIngestionTask
- Throws:
InterruptedException
-
checkLongRunningTaskState
The following function will be executed after processing all the quick actions in the consumer action queues, so that the long running actions doesn't block other partition's consumer actions. Besides, there is no thread sleeping operations in this function in order to be efficient, but this function will be invoked again and again in the main loop of the StoreIngestionTask to check whether some long-running actions can finish now. The only drawback is that for regular batch push, leader flag is never on at least a few minutes after the leader consumes the last message (END_OF_PUSH), which is an acceptable trade-off for us in order to share and test the same code path between regular push job, hybrid store and reprocessing job.- Specified by:
checkLongRunningTaskState
in classStoreIngestionTask
- Throws:
InterruptedException
-
checkWhetherToCloseUnusedVeniceWriter
protected static boolean checkWhetherToCloseUnusedVeniceWriter(Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazy, Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterForRealTimeLazy, Map<Integer, PartitionConsumptionState> partitionConsumptionStateMap, Runnable reInitializeVeniceWriterLazyRunnable, String versionTopicName) -
startConsumingAsLeaderInTransitionFromStandby
protected void startConsumingAsLeaderInTransitionFromStandby(PartitionConsumptionState partitionConsumptionState) -
calculateLeaderUpstreamOffsetWithTopicSwitch
protected Map<String,Long> calculateLeaderUpstreamOffsetWithTopicSwitch(PartitionConsumptionState partitionConsumptionState, PubSubTopic newSourceTopic, List<CharSequence> unreachableBrokerList) -
startConsumingAsLeader
- Overrides:
startConsumingAsLeader
in classStoreIngestionTask
-
leaderExecuteTopicSwitch
protected void leaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic) -
syncConsumedUpstreamRTOffsetMapIfNeeded
protected void syncConsumedUpstreamRTOffsetMapIfNeeded(PartitionConsumptionState pcs, Map<String, Long> upstreamStartOffsetByKafkaURL) -
waitForLastLeaderPersistFuture
protected void waitForLastLeaderPersistFuture(PartitionConsumptionState partitionConsumptionState, String errorMsg) -
getTopicPartitionOffsetByKafkaURL
protected long getTopicPartitionOffsetByKafkaURL(CharSequence kafkaURL, PubSubTopicPartition pubSubTopicPartition, long rewindStartTimestamp) -
getConsumptionSourceKafkaAddress
protected Set<String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) - Specified by:
getConsumptionSourceKafkaAddress
in classStoreIngestionTask
-
getRealTimeDataSourceKafkaAddress
protected Set<String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState) - Overrides:
getRealTimeDataSourceKafkaAddress
in classStoreIngestionTask
-
shouldNewLeaderSwitchToRemoteConsumption
protected boolean shouldNewLeaderSwitchToRemoteConsumption(PartitionConsumptionState partitionConsumptionState) -
shouldProduceToVersionTopic
For the corresponding partition being tracked in `partitionConsumptionState`, if it's in LEADER state and it's not consuming from version topic, it should produce the new message to version topic; besides, if LEADER is consuming remotely, it should also produce to local fabric. If buffer replay is disable, all replicas will stick to version topic, no one is going to produce any message. -
isLeader
-
processTopicSwitch
protected boolean processTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState) ProcessTopicSwitch
control message at given partition offset for a specificPartitionConsumptionState
. Return whether we need to execute additional ready-to-serve check after this message is processed.- Overrides:
processTopicSwitch
in classStoreIngestionTask
-
syncTopicSwitchToIngestionMetadataService
protected void syncTopicSwitchToIngestionMetadataService(TopicSwitch topicSwitch, PartitionConsumptionState partitionConsumptionState) -
updateOffsetsFromConsumerRecord
protected void updateOffsetsFromConsumerRecord(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateVersionTopicOffset updateVersionTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateUpstreamTopicOffset updateUpstreamTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.GetLastKnownUpstreamTopicOffset lastKnownUpstreamTopicOffsetSupplier, Supplier<String> sourceKafkaUrlSupplier, boolean dryRun) A helper function to the latest in-memory offsets processed by drainers inPartitionConsumptionState
, after processing the givenPubSubMessage
. When using this helper function to update the latest in-memory offsets processed by drainers inPartitionConsumptionState
: "updateVersionTopicOffsetFunction" should try to update the VT offset inPartitionConsumptionState
"updateRealtimeTopicOffsetFunction" should try to update the latest processed upstream offset map inPartitionConsumptionState
In LeaderFollowerStoreIngestionTask, "sourceKafkaUrlSupplier" should always returnOffsetRecord.NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY
; in ActiveActiveStoreIngestionTask, "sourceKafkaUrlSupplier" should return the actual source Kafka url of the "consumerRecordWrapper" Dry-run mode would only check whether the offset rewind is benign or not instead of persisting the processed offset. -
updateOffsetMetadataInOffsetRecord
protected void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState) Description copied from class:StoreIngestionTask
Sync the metadata about offset inOffsetRecord
.PartitionConsumptionState
will pass through some information toOffsetRecord
for persistence and Offset rewind/split brain has been guarded inStoreIngestionTask.updateLatestInMemoryProcessedOffset(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState, com.linkedin.venice.pubsub.api.PubSubMessage<com.linkedin.venice.message.KafkaKey, com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope, java.lang.Long>, com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext, java.lang.String, boolean)
.- Specified by:
updateOffsetMetadataInOffsetRecord
in classStoreIngestionTask
-
updateLatestInMemoryProcessedOffset
protected void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Description copied from class:StoreIngestionTask
Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset.- Specified by:
updateLatestInMemoryProcessedOffset
in classStoreIngestionTask
-
checkAndHandleUpstreamOffsetRewind
protected static void checkAndHandleUpstreamOffsetRewind(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, long newUpstreamOffset, long previousUpstreamOffset, LeaderFollowerStoreIngestionTask ingestionTask) -
produceToLocalKafka
protected void produceToLocalKafka(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceFunction, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs) -
isRealTimeBufferReplayStarted
protected boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) - Specified by:
isRealTimeBufferReplayStarted
in classStoreIngestionTask
-
measureHybridOffsetLag
protected long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) For Leader/Follower state model, we already keep track of the consumption progress in leader, so directly calculate the lag with the real-time topic and the leader consumption offset.- Specified by:
measureHybridOffsetLag
in classStoreIngestionTask
-
measureRTOffsetLagForMultiRegions
protected long measureRTOffsetLagForMultiRegions(Set<String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) -
isReadyToServeAnnouncedWithRTLag
public boolean isReadyToServeAnnouncedWithRTLag()- Overrides:
isReadyToServeAnnouncedWithRTLag
in classStoreIngestionTask
-
reportIfCatchUpVersionTopicOffset
Description copied from class:StoreIngestionTask
Check if the ingestion progress has reached to the end of the version topic. This is currently only usedLeaderFollowerStoreIngestionTask
.- Specified by:
reportIfCatchUpVersionTopicOffset
in classStoreIngestionTask
-
shouldProcessRecord
For Leader/Follower model, the follower should have the same kind of check as the Online/Offline model; for leader, it's possible that it consumers from real-time topic or GF topic.- Overrides:
shouldProcessRecord
in classStoreIngestionTask
-
shouldPersistRecord
protected boolean shouldPersistRecord(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> record, PartitionConsumptionState partitionConsumptionState) Additional safeguards in Leader/Follower ingestion: 1. Check whether the incoming messages are from the expected source topics- Overrides:
shouldPersistRecord
in classStoreIngestionTask
-
recordWriterStats
protected void recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState) - Specified by:
recordWriterStats
in classStoreIngestionTask
-
recordProcessedRecordStats
protected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize) - Overrides:
recordProcessedRecordStats
in classStoreIngestionTask
-
getMaxRecordSizeBytes
protected int getMaxRecordSizeBytes() -
getMaxNearlineRecordSizeBytes
protected int getMaxNearlineRecordSizeBytes() -
calculateAssembledRecordSizeRatio
protected final double calculateAssembledRecordSizeRatio(long recordSize) - Specified by:
calculateAssembledRecordSizeRatio
in classStoreIngestionTask
-
recordAssembledRecordSizeRatio
protected final void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs) - Specified by:
recordAssembledRecordSizeRatio
in classStoreIngestionTask
-
isHybridFollower
- Specified by:
isHybridFollower
in classStoreIngestionTask
-
shouldCheckLeaderCompleteStateInFollower
protected boolean shouldCheckLeaderCompleteStateInFollower()For non AA hybrid stores with AGGREGATE DRP, SIT reads from parent RT while the HB is written to the child RTs. Once all hybrid stores are either AA for cross colo replication and non AA otherwise, DRP and this extra check can also be removed.- Specified by:
shouldCheckLeaderCompleteStateInFollower
in classStoreIngestionTask
-
checkAndLogIfLagIsAcceptableForHybridStore
protected boolean checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState pcs, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp) Checks whether the lag is acceptable for hybrid storesIf the instance is a hybrid standby or DaVinciClient: Also check if
1. checkLeaderCompleteStateInFollower feature is enabled based on configs
2. leaderCompleteStatus has the leader state=completed and
3. the last update time was within the configured time interval to not use the stale leader state: checkConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS
- Specified by:
checkAndLogIfLagIsAcceptableForHybridStore
in classStoreIngestionTask
-
getAndUpdateLeaderCompletedState
protected void getAndUpdateLeaderCompletedState(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaValue, ControlMessage controlMessage, PubSubMessageHeaders pubSubMessageHeaders, PartitionConsumptionState partitionConsumptionState) HeartBeat SOS messages carry the leader completion state in the header. This function extracts the leader completion state from that header and updates the accordingly. -
recordHeartbeatReceived
protected void recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, String kafkaUrl) - Overrides:
recordHeartbeatReceived
in classStoreIngestionTask
-
validateAndFilterOutDuplicateMessagesFromLeaderTopic
protected Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope, validateAndFilterOutDuplicateMessagesFromLeaderTopicLong>> (Iterable<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> records, String kafkaUrl, PubSubTopicPartition topicPartition) - Specified by:
validateAndFilterOutDuplicateMessagesFromLeaderTopic
in classStoreIngestionTask
-
delegateConsumerRecord
protected StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed. It's decided based on the function output ofshouldProduceToVersionTopic(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState)
and message type. It also perform any necessary additional computation operation such as for write-compute/update message. It returns a boolean indicating if it was produced to kafka or not. This function should be called as one of the first steps in processing pipeline for all messages consumed from any kafka topic. The caller of this function should only process this further if the return isStoreIngestionTask.DelegateConsumerRecordResult.QUEUED_TO_DRAINER
. This function assumesshouldProcessRecord(PubSubMessage)
has been called which happens inStoreIngestionTask.produceToStoreBufferServiceOrKafka(Iterable, PubSubTopicPartition, String, int)
before calling this and the it was decided that this record needs to be processed. It does not perform any validation check on the PartitionConsumptionState object to keep the goal of the function simple and not overload. Also DIV validation is done here if the message is received from RT topic. For more info please see please see StoreIngestionTask#internalProcessConsumerRecord(PubSubMessage, PartitionConsumptionState, LeaderProducedRecordContext, int, String, long) This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this function.- Specified by:
delegateConsumerRecord
in classStoreIngestionTask
- Returns:
- a
StoreIngestionTask.DelegateConsumerRecordResult
indicating what to do with the record
-
waitForAllMessageToBeProcessedFromTopicPartition
protected void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) throws InterruptedException Besides draining messages in the drainer queue, wait for the last producer future.- Overrides:
waitForAllMessageToBeProcessedFromTopicPartition
in classStoreIngestionTask
- Parameters:
topicPartition
- for which to wait- Throws:
InterruptedException
-
getBatchReplicationLag
public long getBatchReplicationLag()- Specified by:
getBatchReplicationLag
in classStoreIngestionTask
-
getLeaderOffsetLag
public long getLeaderOffsetLag()- Specified by:
getLeaderOffsetLag
in classStoreIngestionTask
-
getBatchLeaderOffsetLag
public long getBatchLeaderOffsetLag()- Specified by:
getBatchLeaderOffsetLag
in classStoreIngestionTask
-
getHybridLeaderOffsetLag
public long getHybridLeaderOffsetLag()- Specified by:
getHybridLeaderOffsetLag
in classStoreIngestionTask
-
getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement
protected long getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, String ignoredUpstreamKafkaUrl) For L/F or NR, there is only one entry in upstreamOffsetMap whose key is NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY. Return the value of the entry. -
getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement
protected long getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, String ignoredKafkaUrl) For regular L/F stores without A/A enabled, there is always only one real-time source. -
updateLatestInMemoryLeaderConsumedRTOffset
protected void updateLatestInMemoryLeaderConsumedRTOffset(PartitionConsumptionState pcs, String ignoredKafkaUrl, long offset) -
getFollowerOffsetLag
public long getFollowerOffsetLag()Description copied from class:StoreIngestionTask
Measure the offset lag between follower and leader- Specified by:
getFollowerOffsetLag
in classStoreIngestionTask
-
getBatchFollowerOffsetLag
public long getBatchFollowerOffsetLag()- Specified by:
getBatchFollowerOffsetLag
in classStoreIngestionTask
-
getHybridFollowerOffsetLag
public long getHybridFollowerOffsetLag()- Specified by:
getHybridFollowerOffsetLag
in classStoreIngestionTask
-
getRegionHybridOffsetLag
public long getRegionHybridOffsetLag(int regionId) - Specified by:
getRegionHybridOffsetLag
in classStoreIngestionTask
-
consumerUnSubscribeAllTopics
Unsubscribe from all the topics being consumed for the partition in partitionConsumptionState- Specified by:
consumerUnSubscribeAllTopics
in classStoreIngestionTask
-
getWriteComputeErrorCode
public int getWriteComputeErrorCode()- Specified by:
getWriteComputeErrorCode
in classStoreIngestionTask
-
updateLeaderTopicOnFollower
- Specified by:
updateLeaderTopicOnFollower
in classStoreIngestionTask
-
maybeCompressData
protected ByteBuffer maybeCompressData(int partition, ByteBuffer data, PartitionConsumptionState partitionConsumptionState) Compresses data in a bytebuffer when consuming from rt as a leader node and compression is enabled for the store version for which we're consuming data.- Parameters:
partition
- which partition we're acting on so as to determine the PartitionConsumptionStatedata
- the data that we might compress- Returns:
- a bytebuffer that's either the original bytebuffer or a new one depending on if we compressed it.
-
shouldCompressData
-
processMessageAndMaybeProduceToKafka
protected void processMessageAndMaybeProduceToKafka(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) -
measureRTOffsetLagForSingleRegion
protected long measureRTOffsetLagForSingleRegion(String sourceRealTimeTopicKafkaURL, PartitionConsumptionState pcs, boolean shouldLog) This method fetches/calculates latest leader persisted offset and last offset in RT topic. The method relies ongetLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState, String)
to fetch latest leader persisted offset for different data replication policy.- Returns:
- the lag (lastOffsetInRealTimeTopic - latestPersistedLeaderOffset)
-
processControlMessageForViews
protected void processControlMessageForViews(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaMessageEnvelope, ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState) Description copied from class:StoreIngestionTask
This isn't used for ingestion outside L/F, so we NoOp here and rely on the actual implementation inLeaderFollowerStoreIngestionTask
- Overrides:
processControlMessageForViews
in classStoreIngestionTask
-
createProducerCallback
protected LeaderProducerCallback createProducerCallback(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) -
getVeniceWriter
protected Lazy<VeniceWriter<byte[],byte[], getVeniceWriterbyte[]>> (PartitionConsumptionState partitionConsumptionState) -
addPartitionConsumptionState
-
maybeSendIngestionHeartbeat
For hybrid stores only, the leader periodically writes a special SOS message to the RT topic with the following properties:
1. Special key: This key contains constant bytes, allowing for compaction.
2. Fixed/known producer GUID: This GUID is dedicated to heartbeats and prevents DIV from breaking.
3. Special segment: This segment never contains data, eliminating the need for an EOS message.
Upon consuming the SOS message, the leader writes it to its local VT. Once the drainer processes the record, the leader updates its latest processed upstream RT topic offset. At this point, the offset reflects the correct position, regardless of trailing CMs or skippable data records due to DCR.
This heartbeat message does not send a leader completion header. This results in having the leader completion states only in VTs and not in the RT, avoiding the need to differentiate between heartbeats from leaders of different versions (backup/current/future) and colos.
- Specified by:
maybeSendIngestionHeartbeat
in classStoreIngestionTask
- Returns:
- the set of partitions that failed to send heartbeat (used for tests)
-
resubscribe
protected void resubscribe(PartitionConsumptionState partitionConsumptionState) throws InterruptedException Resubscribe operation by passing new version role and partition role toAggKafkaConsumerService
. The action for leader and follower replica will be handled differently.- Specified by:
resubscribe
in classStoreIngestionTask
- Throws:
InterruptedException
-
resubscribeAsFollower
protected void resubscribeAsFollower(PartitionConsumptionState partitionConsumptionState) throws InterruptedException - Throws:
InterruptedException
-
resubscribeAsLeader
protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptionState) throws InterruptedException - Throws:
InterruptedException
-
queueUpVersionTopicWritesWithViewWriters
protected void queueUpVersionTopicWritesWithViewWriters(PartitionConsumptionState partitionConsumptionState, Function<VeniceViewWriter, CompletableFuture<PubSubProduceResult>> viewWriterRecordProcessor, Runnable versionTopicWrite) -
hasViewWriters
protected boolean hasViewWriters()
-