Class LeaderFollowerStoreIngestionTask
- java.lang.Object
-
- com.linkedin.davinci.kafka.consumer.StoreIngestionTask
-
- com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
-
- All Implemented Interfaces:
java.io.Closeable
,java.lang.AutoCloseable
,java.lang.Runnable
- Direct Known Subclasses:
ActiveActiveStoreIngestionTask
public class LeaderFollowerStoreIngestionTask extends StoreIngestionTask
This class contains the state transition work between leader and follower; both leader and follower will keep track of information like which topic leader is consuming from and the corresponding offset as well as the latest successfully consumed or produced offset in the version topic (VT). State Transition: 1. OFFLINE -> STANDBY: Generate a SUBSCRIBE message in the consumer action queue; the logic here is the same as Online/Offline model; all it needs to do is to restore the checkpointed state from OffsetRecord; 2. STANDBY -> LEADER: The partition will be marked as in the transition progress from STANDBY to LEADER and completes the action immediately; after processing the rest of the consumer actions in the queue, check whether there is any partition is in the transition progress, if so: (i) consume the latest messages from version topic; (ii) drain all the messages in drainer queue in order to update the latest consumed message replication metadata; (iii) check whether there has been at least 5 minutes (configurable) of inactivity for this partition (meaning no new messages); if so, turn on the LEADER flag for this partition. 3. LEADER -> STANDBY: a. if the leader is consuming from VT, just set "isLeader" field to false and resume consumption; b. if the leader is consuming from anything other than VT, it needs to unsubscribe from the leader topic for this partition first, drain all the messages in the drainer queue for this leader topic/partition so that it can get the last producer callback for the last message it produces to VT; block on getting the result from the callback to update the corresponding offset in version topic, so that the new follower can subscribe back to VT using the recently updated VT offset.
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
StoreIngestionTask.DelegateConsumerRecordResult
-
-
Field Summary
Fields Modifier and Type Field Description protected boolean
hasChangeCaptureView
protected it.unimi.dsi.fastutil.ints.Int2ObjectMap<java.lang.String>
kafkaClusterIdToUrlMap
static java.util.function.Predicate<? super PartitionConsumptionState>
LEADER_OFFSET_LAG_FILTER
protected AvroStoreDeserializerCache
storeDeserializerCache
protected static java.util.function.LongPredicate
VALID_LAG
protected Lazy<VeniceWriter<byte[],byte[],byte[]>>
veniceWriter
N.B.: With L/F+native replication and many Leader partitions getting assigned to a single SN thisVeniceWriter
may be called from multiple thread simultaneously, during start of batch push.protected Lazy<VeniceWriter<byte[],byte[],byte[]>>
veniceWriterForRealTime
protected java.util.Map<java.lang.String,VeniceViewWriter>
viewWriters
-
Fields inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
aggKafkaConsumerService, availableSchemaIds, batchReportIncPushStatusEnabled, bootstrapTimeoutInMs, chunkAssembler, compressionStrategy, compressor, compressorFactory, consumerActionSequenceNumber, consumerActionsQueue, databaseSyncBytesIntervalForDeferredWriteMode, databaseSyncBytesIntervalForTransactionalMode, dataRecoverySourceVersionNumber, defaultReadyToServeChecker, deserializedSchemaIds, diskUsage, divErrorMetricCallback, emitMetrics, emptyPollSleepMs, errorPartitionId, gracefulShutdownLatch, hostLevelIngestionStats, hostName, hybridStoreConfig, idleCounter, ingestionNotificationDispatcher, ingestionTaskName, isChunked, isCurrentVersion, isDataRecovery, isDaVinciClient, isGlobalRtDivEnabled, isIsolatedIngestion, isRmdChunked, isRunning, isSeparatedRealtimeTopicEnabled, isWriteComputationEnabled, kafkaClusterUrlResolver, kafkaProps, kafkaVersionTopic, KILL_WAIT_TIME_MS, localKafkaClusterId, localKafkaServer, localKafkaServerSingletonSet, manifestSerializer, metaStoreWriter, parallelProcessingThreadPool, partitionConsumptionStateMap, partitionCount, partitionToPendingConsumerActionCountMap, pubSubTopicRepository, readCycleDelayMs, readOnlyForBatchOnlyStoreEnabled, realTimeTopic, recordLevelMetricEnabled, REDUNDANT_LOGGING_FILTER, resetErrorReplicaEnabled, SCHEMA_POLLING_DELAY_MS, schemaRepository, serverConfig, storageEngine, storageEngineRepository, storageMetadataService, storageService, STORE_VERSION_POLLING_DELAY_MS, storeBufferService, storeName, storeRepository, storeVersionPartitionCount, topicManagerRepository, versionedDIVStats, versionedIngestionStats, versionNumber, versionRole, versionTopic, WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED, workloadType, writeComputeFailureCode, zkHelixAdmin
-
-
Constructor Summary
Constructors Constructor Description LeaderFollowerStoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, java.util.Properties kafkaConsumerProperties, java.util.function.BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description protected void
addPartitionConsumptionState(java.lang.Integer partition, PartitionConsumptionState pcs)
protected double
calculateAssembledRecordSizeRatio(long recordSize)
protected java.util.Map<java.lang.String,java.lang.Long>
calculateLeaderUpstreamOffsetWithTopicSwitch(PartitionConsumptionState partitionConsumptionState, PubSubTopic newSourceTopic, java.util.List<java.lang.CharSequence> unreachableBrokerList)
protected static void
checkAndHandleUpstreamOffsetRewind(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, long newUpstreamOffset, long previousUpstreamOffset, LeaderFollowerStoreIngestionTask ingestionTask)
protected boolean
checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState pcs, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp)
Checks whether the lag is acceptable for hybrid storesprotected void
checkLongRunningTaskState()
The following function will be executed after processing all the quick actions in the consumer action queues, so that the long running actions doesn't block other partition's consumer actions.protected static boolean
checkWhetherToCloseUnusedVeniceWriter(Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterLazy, Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterForRealTimeLazy, java.util.Map<java.lang.Integer,PartitionConsumptionState> partitionConsumptionStateMap, java.lang.Runnable reInitializeVeniceWriterLazyRunnable, java.lang.String versionTopicName)
protected void
closeVeniceViewWriters()
void
closeVeniceWriters(boolean doFlush)
static VeniceWriter<byte[],byte[],byte[]>
constructVeniceWriter(VeniceWriterFactory veniceWriterFactory, java.lang.String topic, Version version, boolean producerCompressionEnabled, int producerCnt)
void
consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState)
Unsubscribe from all the topics being consumed for the partition in partitionConsumptionStateprotected LeaderProducerCallback
createProducerCallback(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs)
protected StoreIngestionTask.DelegateConsumerRecordResult
delegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)
The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed.void
demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
protected void
getAndUpdateLeaderCompletedState(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaValue, ControlMessage controlMessage, PubSubMessageHeaders pubSubMessageHeaders, PartitionConsumptionState partitionConsumptionState)
HeartBeat SOS messages carry the leader completion state in the header.long
getBatchFollowerOffsetLag()
long
getBatchLeaderOffsetLag()
long
getBatchReplicationLag()
protected java.util.Set<java.lang.String>
getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
long
getFollowerOffsetLag()
Measure the offset lag between follower and leaderlong
getHybridFollowerOffsetLag()
long
getHybridLeaderOffsetLag()
protected IngestionBatchProcessor
getIngestionBatchProcessor()
protected long
getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, java.lang.String ignoredKafkaUrl)
For regular L/F stores without A/A enabled, there is always only one real-time source.protected long
getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, java.lang.String ignoredUpstreamKafkaUrl)
For L/F or NR, there is only one entry in upstreamOffsetMap whose key is NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY.long
getLeaderOffsetLag()
protected int
getMaxNearlineRecordSizeBytes()
protected int
getMaxRecordSizeBytes()
protected java.util.Set<java.lang.String>
getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
long
getRegionHybridOffsetLag(int regionId)
protected long
getTopicPartitionOffsetByKafkaURL(java.lang.CharSequence kafkaURL, PubSubTopicPartition pubSubTopicPartition, long rewindStartTimestamp)
protected Lazy<VeniceWriter<byte[],byte[],byte[]>>
getVeniceWriter(PartitionConsumptionState partitionConsumptionState)
int
getWriteComputeErrorCode()
protected boolean
isHybridFollower(PartitionConsumptionState partitionConsumptionState)
protected static boolean
isLeader(PartitionConsumptionState partitionConsumptionState)
boolean
isReadyToServeAnnouncedWithRTLag()
protected boolean
isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState)
protected void
leaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic)
protected java.nio.ByteBuffer
maybeCompressData(int partition, java.nio.ByteBuffer data, PartitionConsumptionState partitionConsumptionState)
Compresses data in a bytebuffer when consuming from rt as a leader node and compression is enabled for the store version for which we're consuming data.protected java.util.Set<java.lang.String>
maybeSendIngestionHeartbeat()
For hybrid stores only, the leader periodically writes a special SOS message to the RT topic with the following properties:
1.protected long
measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)
For Leader/Follower state model, we already keep track of the consumption progress in leader, so directly calculate the lag with the real-time topic and the leader consumption offset.protected long
measureRTOffsetLagForMultiRegions(java.util.Set<java.lang.String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)
protected long
measureRTOffsetLagForSingleRegion(java.lang.String sourceRealTimeTopicKafkaURL, PartitionConsumptionState pcs, boolean shouldLog)
This method fetches/calculates latest leader persisted offset and last offset in RT topic.protected void
processConsumerAction(ConsumerAction message, Store store)
protected void
processMessageAndMaybeProduceToKafka(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)
protected boolean
processTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState)
ProcessTopicSwitch
control message at given partition offset for a specificPartitionConsumptionState
.protected void
processVersionSwapMessage(ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState)
This isn't really used for ingestion outside of A/A, so we NoOp here and rely on the actual implementation inActiveActiveStoreIngestionTask
protected void
produceToLocalKafka(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, java.util.function.BiConsumer<ChunkAwareCallback,LeaderMetadataWrapper> produceFunction, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs)
void
promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
protected void
recordAssembledRecordSizeRatio(double ratio, long currentTimeMs)
protected void
recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, java.lang.String kafkaUrl)
protected void
recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize)
protected void
recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState)
protected void
reportIfCatchUpVersionTopicOffset(PartitionConsumptionState pcs)
Check if the ingestion progress has reached to the end of the version topic.protected void
resubscribe(PartitionConsumptionState partitionConsumptionState)
Resubscribe operation by passing new version role and partition role toAggKafkaConsumerService
.protected void
resubscribeAsFollower(PartitionConsumptionState partitionConsumptionState)
protected void
resubscribeAsLeader(PartitionConsumptionState partitionConsumptionState)
protected boolean
shouldCheckLeaderCompleteStateInFollower()
For non AA hybrid stores with AGGREGATE DRP, SIT reads from parent RT while the HB is written to the child RTs.protected boolean
shouldCompressData(PartitionConsumptionState partitionConsumptionState)
protected boolean
shouldNewLeaderSwitchToRemoteConsumption(PartitionConsumptionState partitionConsumptionState)
protected boolean
shouldPersistRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record, PartitionConsumptionState partitionConsumptionState)
Additional safeguards in Leader/Follower ingestion: 1.protected boolean
shouldProcessRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record)
For Leader/Follower model, the follower should have the same kind of check as the Online/Offline model; for leader, it's possible that it consumers from real-time topic or GF topic.protected boolean
shouldProduceToVersionTopic(PartitionConsumptionState partitionConsumptionState)
For the corresponding partition being tracked in `partitionConsumptionState`, if it's in LEADER state and it's not consuming from version topic, it should produce the new message to version topic; besides, if LEADER is consuming remotely, it should also produce to local fabric.protected void
startConsumingAsLeader(PartitionConsumptionState partitionConsumptionState)
protected void
startConsumingAsLeaderInTransitionFromStandby(PartitionConsumptionState partitionConsumptionState)
protected void
syncConsumedUpstreamRTOffsetMapIfNeeded(PartitionConsumptionState pcs, java.util.Map<java.lang.String,java.lang.Long> upstreamStartOffsetByKafkaURL)
protected void
syncTopicSwitchToIngestionMetadataService(TopicSwitch topicSwitch, PartitionConsumptionState partitionConsumptionState)
protected void
updateLatestInMemoryLeaderConsumedRTOffset(PartitionConsumptionState pcs, java.lang.String ignoredKafkaUrl, long offset)
protected void
updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, java.lang.String kafkaUrl, boolean dryRun)
Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
.void
updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState)
protected void
updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState)
Sync the metadata about offset inOffsetRecord
.protected void
updateOffsetsFromConsumerRecord(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateVersionTopicOffset updateVersionTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateUpstreamTopicOffset updateUpstreamTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.GetLastKnownUpstreamTopicOffset lastKnownUpstreamTopicOffsetSupplier, java.util.function.Supplier<java.lang.String> sourceKafkaUrlSupplier, boolean dryRun)
A helper function to the latest in-memory offsets processed by drainers inPartitionConsumptionState
, after processing the givenPubSubMessage
.protected java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>>
validateAndFilterOutDuplicateMessagesFromLeaderTopic(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, java.lang.String kafkaUrl, PubSubTopicPartition topicPartition)
protected void
waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState)
Besides draining messages in the drainer queue, wait for the last producer future.protected void
waitForLastLeaderPersistFuture(PartitionConsumptionState partitionConsumptionState, java.lang.String errorMsg)
-
Methods inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
checkIngestionProgress, cloneProducerStates, close, consumerBatchUnsubscribe, consumerHasAnySubscription, consumerHasSubscription, consumerResetOffset, consumerSubscribe, createKafkaConsumerProperties, disableMetricsEmission, dropStoragePartitionGracefully, dumpPartitionConsumptionStates, dumpStoreVersionState, enableMetricsEmission, getCompressionStrategy, getCompressor, getFailedIngestionPartitionCount, getHostLevelIngestionStats, getIngestionTaskName, getKafkaVersionTopic, getOffsetToOnlineLagThresholdPerPartition, getPartitionConsumptionState, getPartitionOffsetLagBasedOnMetrics, getSchemaRepo, getServerConfig, getStorageEngine, getStoragePartitionConfig, getStoragePartitionConfig, getStoreName, getTopicManager, getTopicPartitionEndOffSet, getVersionedDIVStats, getVersionIngestionStats, getVersionNumber, getVersionTopic, hasAllPartitionReportedCompleted, hasAnyPartitionConsumptionState, hasAnySubscription, hasPendingPartitionIngestionAction, isActiveActiveReplicationEnabled, isChunked, isCurrentVersion, isDaVinciClient, isFutureVersion, isHybridMode, isIngestionTaskActive, isMetricsEmissionEnabled, isPartitionConsumingOrHasPendingIngestionAction, isProducingVersionTopicHealthy, isReadyToServe, isRunning, isSegmentControlMsg, isSeparatedRealtimeTopicEnabled, isStuckByMemoryConstraint, isTransientRecordBufferUsed, isUserSystemStore, kill, logStorageOperationWhileUnsubscribed, measureLagWithCallToPubSub, measureLagWithCallToPubSub, minZeroLag, nextSeqNum, processCommonConsumerAction, processConsumerRecord, processEndOfIncrementalPush, processEndOfPush, processStartOfIncrementalPush, produceToStoreBufferService, produceToStoreBufferServiceOrKafka, produceToStoreBufferServiceOrKafkaInBatch, putInStorageEngine, recordAssembledRecordSize, recordChecksumVerificationFailure, removeFromStorageEngine, reportError, resetPartitionConsumptionOffset, resolveSourceKafkaServersWithinTopicSwitch, run, setLastConsumerException, setLastStoreIngestionException, setPartitionConsumptionState, shouldUpdateUpstreamOffset, shutdownAndWait, subscribePartition, subscribePartition, throwIfNotRunning, throwOrLogStorageFailureDependingIfStillSubscribed, unsubscribeFromTopic, unSubscribePartition, unSubscribePartition, updateIngestionRoleIfStoreChanged, updateOffsetMetadataAndSync, updateOffsetMetadataAndSyncOffset, validateMessage, waitVersionStateAvailable
-
-
-
-
Field Detail
-
veniceWriter
protected Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriter
N.B.: With L/F+native replication and many Leader partitions getting assigned to a single SN thisVeniceWriter
may be called from multiple thread simultaneously, during start of batch push. Therefore, we wrap it inLazy
to initialize it in a thread safe way and to ensure that only one instance is created for the entire ingestion task. Important: Please don't use these writers directly, and you should retrieve the writer fromPartitionConsumptionState.getVeniceWriterLazyRef()
when producing to the local topic.
-
veniceWriterForRealTime
protected final Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterForRealTime
-
kafkaClusterIdToUrlMap
protected final it.unimi.dsi.fastutil.ints.Int2ObjectMap<java.lang.String> kafkaClusterIdToUrlMap
-
viewWriters
protected final java.util.Map<java.lang.String,VeniceViewWriter> viewWriters
-
hasChangeCaptureView
protected final boolean hasChangeCaptureView
-
storeDeserializerCache
protected final AvroStoreDeserializerCache storeDeserializerCache
-
VALID_LAG
protected static final java.util.function.LongPredicate VALID_LAG
-
LEADER_OFFSET_LAG_FILTER
public static final java.util.function.Predicate<? super PartitionConsumptionState> LEADER_OFFSET_LAG_FILTER
-
-
Constructor Detail
-
LeaderFollowerStoreIngestionTask
public LeaderFollowerStoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, java.util.Properties kafkaConsumerProperties, java.util.function.BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, java.util.Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerFunctionalInterface recordTransformerFunction, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
-
Method Detail
-
constructVeniceWriter
public static VeniceWriter<byte[],byte[],byte[]> constructVeniceWriter(VeniceWriterFactory veniceWriterFactory, java.lang.String topic, Version version, boolean producerCompressionEnabled, int producerCnt)
-
closeVeniceWriters
public void closeVeniceWriters(boolean doFlush)
- Overrides:
closeVeniceWriters
in classStoreIngestionTask
-
closeVeniceViewWriters
protected void closeVeniceViewWriters()
- Overrides:
closeVeniceViewWriters
in classStoreIngestionTask
-
getIngestionBatchProcessor
protected IngestionBatchProcessor getIngestionBatchProcessor()
- Specified by:
getIngestionBatchProcessor
in classStoreIngestionTask
-
promoteToLeader
public void promoteToLeader(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
- Specified by:
promoteToLeader
in classStoreIngestionTask
-
demoteToStandby
public void demoteToStandby(PubSubTopicPartition topicPartition, LeaderFollowerPartitionStateModel.LeaderSessionIdChecker checker)
- Specified by:
demoteToStandby
in classStoreIngestionTask
-
processConsumerAction
protected void processConsumerAction(ConsumerAction message, Store store) throws java.lang.InterruptedException
- Specified by:
processConsumerAction
in classStoreIngestionTask
- Throws:
java.lang.InterruptedException
-
checkLongRunningTaskState
protected void checkLongRunningTaskState() throws java.lang.InterruptedException
The following function will be executed after processing all the quick actions in the consumer action queues, so that the long running actions doesn't block other partition's consumer actions. Besides, there is no thread sleeping operations in this function in order to be efficient, but this function will be invoked again and again in the main loop of the StoreIngestionTask to check whether some long-running actions can finish now. The only drawback is that for regular batch push, leader flag is never on at least a few minutes after the leader consumes the last message (END_OF_PUSH), which is an acceptable trade-off for us in order to share and test the same code path between regular push job, hybrid store and reprocessing job.- Specified by:
checkLongRunningTaskState
in classStoreIngestionTask
- Throws:
java.lang.InterruptedException
-
checkWhetherToCloseUnusedVeniceWriter
protected static boolean checkWhetherToCloseUnusedVeniceWriter(Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterLazy, Lazy<VeniceWriter<byte[],byte[],byte[]>> veniceWriterForRealTimeLazy, java.util.Map<java.lang.Integer,PartitionConsumptionState> partitionConsumptionStateMap, java.lang.Runnable reInitializeVeniceWriterLazyRunnable, java.lang.String versionTopicName)
-
startConsumingAsLeaderInTransitionFromStandby
protected void startConsumingAsLeaderInTransitionFromStandby(PartitionConsumptionState partitionConsumptionState)
-
calculateLeaderUpstreamOffsetWithTopicSwitch
protected java.util.Map<java.lang.String,java.lang.Long> calculateLeaderUpstreamOffsetWithTopicSwitch(PartitionConsumptionState partitionConsumptionState, PubSubTopic newSourceTopic, java.util.List<java.lang.CharSequence> unreachableBrokerList)
-
startConsumingAsLeader
protected void startConsumingAsLeader(PartitionConsumptionState partitionConsumptionState)
- Overrides:
startConsumingAsLeader
in classStoreIngestionTask
-
leaderExecuteTopicSwitch
protected void leaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic)
-
syncConsumedUpstreamRTOffsetMapIfNeeded
protected void syncConsumedUpstreamRTOffsetMapIfNeeded(PartitionConsumptionState pcs, java.util.Map<java.lang.String,java.lang.Long> upstreamStartOffsetByKafkaURL)
-
waitForLastLeaderPersistFuture
protected void waitForLastLeaderPersistFuture(PartitionConsumptionState partitionConsumptionState, java.lang.String errorMsg)
-
getTopicPartitionOffsetByKafkaURL
protected long getTopicPartitionOffsetByKafkaURL(java.lang.CharSequence kafkaURL, PubSubTopicPartition pubSubTopicPartition, long rewindStartTimestamp)
-
getConsumptionSourceKafkaAddress
protected java.util.Set<java.lang.String> getConsumptionSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
- Specified by:
getConsumptionSourceKafkaAddress
in classStoreIngestionTask
-
getRealTimeDataSourceKafkaAddress
protected java.util.Set<java.lang.String> getRealTimeDataSourceKafkaAddress(PartitionConsumptionState partitionConsumptionState)
- Overrides:
getRealTimeDataSourceKafkaAddress
in classStoreIngestionTask
-
shouldNewLeaderSwitchToRemoteConsumption
protected boolean shouldNewLeaderSwitchToRemoteConsumption(PartitionConsumptionState partitionConsumptionState)
-
shouldProduceToVersionTopic
protected boolean shouldProduceToVersionTopic(PartitionConsumptionState partitionConsumptionState)
For the corresponding partition being tracked in `partitionConsumptionState`, if it's in LEADER state and it's not consuming from version topic, it should produce the new message to version topic; besides, if LEADER is consuming remotely, it should also produce to local fabric. If buffer replay is disable, all replicas will stick to version topic, no one is going to produce any message.
-
isLeader
protected static boolean isLeader(PartitionConsumptionState partitionConsumptionState)
-
processTopicSwitch
protected boolean processTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState)
ProcessTopicSwitch
control message at given partition offset for a specificPartitionConsumptionState
. Return whether we need to execute additional ready-to-serve check after this message is processed.- Overrides:
processTopicSwitch
in classStoreIngestionTask
-
syncTopicSwitchToIngestionMetadataService
protected void syncTopicSwitchToIngestionMetadataService(TopicSwitch topicSwitch, PartitionConsumptionState partitionConsumptionState)
-
updateOffsetsFromConsumerRecord
protected void updateOffsetsFromConsumerRecord(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateVersionTopicOffset updateVersionTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.UpdateUpstreamTopicOffset updateUpstreamTopicOffsetFunction, com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask.GetLastKnownUpstreamTopicOffset lastKnownUpstreamTopicOffsetSupplier, java.util.function.Supplier<java.lang.String> sourceKafkaUrlSupplier, boolean dryRun)
A helper function to the latest in-memory offsets processed by drainers inPartitionConsumptionState
, after processing the givenPubSubMessage
. When using this helper function to update the latest in-memory offsets processed by drainers inPartitionConsumptionState
: "updateVersionTopicOffsetFunction" should try to update the VT offset inPartitionConsumptionState
"updateRealtimeTopicOffsetFunction" should try to update the latest processed upstream offset map inPartitionConsumptionState
In LeaderFollowerStoreIngestionTask, "sourceKafkaUrlSupplier" should always returnOffsetRecord.NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY
; in ActiveActiveStoreIngestionTask, "sourceKafkaUrlSupplier" should return the actual source Kafka url of the "consumerRecordWrapper" Dry-run mode would only check whether the offset rewind is benign or not instead of persisting the processed offset.
-
updateOffsetMetadataInOffsetRecord
protected void updateOffsetMetadataInOffsetRecord(PartitionConsumptionState partitionConsumptionState)
Description copied from class:StoreIngestionTask
Sync the metadata about offset inOffsetRecord
.PartitionConsumptionState
will pass through some information toOffsetRecord
for persistence and Offset rewind/split brain has been guarded inStoreIngestionTask.updateLatestInMemoryProcessedOffset(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState, com.linkedin.venice.pubsub.api.PubSubMessage<com.linkedin.venice.message.KafkaKey, com.linkedin.venice.kafka.protocol.KafkaMessageEnvelope, java.lang.Long>, com.linkedin.davinci.kafka.consumer.LeaderProducedRecordContext, java.lang.String, boolean)
.- Specified by:
updateOffsetMetadataInOffsetRecord
in classStoreIngestionTask
-
updateLatestInMemoryProcessedOffset
protected void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, LeaderProducedRecordContext leaderProducedRecordContext, java.lang.String kafkaUrl, boolean dryRun)
Description copied from class:StoreIngestionTask
Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset.- Specified by:
updateLatestInMemoryProcessedOffset
in classStoreIngestionTask
-
checkAndHandleUpstreamOffsetRewind
protected static void checkAndHandleUpstreamOffsetRewind(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, long newUpstreamOffset, long previousUpstreamOffset, LeaderFollowerStoreIngestionTask ingestionTask)
-
produceToLocalKafka
protected void produceToLocalKafka(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, java.util.function.BiConsumer<ChunkAwareCallback,LeaderMetadataWrapper> produceFunction, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs)
-
isRealTimeBufferReplayStarted
protected boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState)
- Specified by:
isRealTimeBufferReplayStarted
in classStoreIngestionTask
-
measureHybridOffsetLag
protected long measureHybridOffsetLag(PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)
For Leader/Follower state model, we already keep track of the consumption progress in leader, so directly calculate the lag with the real-time topic and the leader consumption offset.- Specified by:
measureHybridOffsetLag
in classStoreIngestionTask
-
measureRTOffsetLagForMultiRegions
protected long measureRTOffsetLagForMultiRegions(java.util.Set<java.lang.String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag)
-
isReadyToServeAnnouncedWithRTLag
public boolean isReadyToServeAnnouncedWithRTLag()
- Overrides:
isReadyToServeAnnouncedWithRTLag
in classStoreIngestionTask
-
reportIfCatchUpVersionTopicOffset
protected void reportIfCatchUpVersionTopicOffset(PartitionConsumptionState pcs)
Description copied from class:StoreIngestionTask
Check if the ingestion progress has reached to the end of the version topic. This is currently only usedLeaderFollowerStoreIngestionTask
.- Specified by:
reportIfCatchUpVersionTopicOffset
in classStoreIngestionTask
-
shouldProcessRecord
protected boolean shouldProcessRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record)
For Leader/Follower model, the follower should have the same kind of check as the Online/Offline model; for leader, it's possible that it consumers from real-time topic or GF topic.- Overrides:
shouldProcessRecord
in classStoreIngestionTask
-
shouldPersistRecord
protected boolean shouldPersistRecord(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> record, PartitionConsumptionState partitionConsumptionState)
Additional safeguards in Leader/Follower ingestion: 1. Check whether the incoming messages are from the expected source topics- Overrides:
shouldPersistRecord
in classStoreIngestionTask
-
recordWriterStats
protected void recordWriterStats(long consumerTimestampMs, long producerBrokerLatencyMs, long brokerConsumerLatencyMs, PartitionConsumptionState partitionConsumptionState)
- Specified by:
recordWriterStats
in classStoreIngestionTask
-
recordProcessedRecordStats
protected void recordProcessedRecordStats(PartitionConsumptionState partitionConsumptionState, int processedRecordSize)
- Overrides:
recordProcessedRecordStats
in classStoreIngestionTask
-
getMaxRecordSizeBytes
protected int getMaxRecordSizeBytes()
-
getMaxNearlineRecordSizeBytes
protected int getMaxNearlineRecordSizeBytes()
-
calculateAssembledRecordSizeRatio
protected final double calculateAssembledRecordSizeRatio(long recordSize)
- Specified by:
calculateAssembledRecordSizeRatio
in classStoreIngestionTask
-
recordAssembledRecordSizeRatio
protected final void recordAssembledRecordSizeRatio(double ratio, long currentTimeMs)
- Specified by:
recordAssembledRecordSizeRatio
in classStoreIngestionTask
-
isHybridFollower
protected boolean isHybridFollower(PartitionConsumptionState partitionConsumptionState)
- Specified by:
isHybridFollower
in classStoreIngestionTask
-
shouldCheckLeaderCompleteStateInFollower
protected boolean shouldCheckLeaderCompleteStateInFollower()
For non AA hybrid stores with AGGREGATE DRP, SIT reads from parent RT while the HB is written to the child RTs. Once all hybrid stores are either AA for cross colo replication and non AA otherwise, DRP and this extra check can also be removed.- Specified by:
shouldCheckLeaderCompleteStateInFollower
in classStoreIngestionTask
-
checkAndLogIfLagIsAcceptableForHybridStore
protected boolean checkAndLogIfLagIsAcceptableForHybridStore(PartitionConsumptionState pcs, long lag, long threshold, boolean shouldLogLag, LagType lagType, long latestConsumedProducerTimestamp)
Checks whether the lag is acceptable for hybrid storesIf the instance is a hybrid standby or DaVinciClient: Also check if
1. checkLeaderCompleteStateInFollower feature is enabled based on configs
2. leaderCompleteStatus has the leader state=completed and
3. the last update time was within the configured time interval to not use the stale leader state: checkConfigKeys.SERVER_LEADER_COMPLETE_STATE_CHECK_IN_FOLLOWER_VALID_INTERVAL_MS
- Specified by:
checkAndLogIfLagIsAcceptableForHybridStore
in classStoreIngestionTask
-
getAndUpdateLeaderCompletedState
protected void getAndUpdateLeaderCompletedState(KafkaKey kafkaKey, KafkaMessageEnvelope kafkaValue, ControlMessage controlMessage, PubSubMessageHeaders pubSubMessageHeaders, PartitionConsumptionState partitionConsumptionState)
HeartBeat SOS messages carry the leader completion state in the header. This function extracts the leader completion state from that header and updates the {@param partitionConsumptionState} accordingly.
-
recordHeartbeatReceived
protected void recordHeartbeatReceived(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, java.lang.String kafkaUrl)
- Overrides:
recordHeartbeatReceived
in classStoreIngestionTask
-
validateAndFilterOutDuplicateMessagesFromLeaderTopic
protected java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> validateAndFilterOutDuplicateMessagesFromLeaderTopic(java.lang.Iterable<PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long>> records, java.lang.String kafkaUrl, PubSubTopicPartition topicPartition)
- Specified by:
validateAndFilterOutDuplicateMessagesFromLeaderTopic
in classStoreIngestionTask
-
delegateConsumerRecord
protected StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)
The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed. It's decided based on the function output ofshouldProduceToVersionTopic(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState)
and message type. It also perform any necessary additional computation operation such as for write-compute/update message. It returns a boolean indicating if it was produced to kafka or not. This function should be called as one of the first steps in processing pipeline for all messages consumed from any kafka topic. The caller of this function should only process this {@param consumerRecord} further if the return isStoreIngestionTask.DelegateConsumerRecordResult.QUEUED_TO_DRAINER
. This function assumesshouldProcessRecord(PubSubMessage)
has been called which happens inStoreIngestionTask.produceToStoreBufferServiceOrKafka(Iterable, PubSubTopicPartition, String, int)
before calling this and the it was decided that this record needs to be processed. It does not perform any validation check on the PartitionConsumptionState object to keep the goal of the function simple and not overload. Also DIV validation is done here if the message is received from RT topic. For more info please see please see StoreIngestionTask#internalProcessConsumerRecord(PubSubMessage, PartitionConsumptionState, LeaderProducedRecordContext, int, String, long) This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this function.- Specified by:
delegateConsumerRecord
in classStoreIngestionTask
- Returns:
- a
StoreIngestionTask.DelegateConsumerRecordResult
indicating what to do with the record
-
waitForAllMessageToBeProcessedFromTopicPartition
protected void waitForAllMessageToBeProcessedFromTopicPartition(PubSubTopicPartition topicPartition, PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
Besides draining messages in the drainer queue, wait for the last producer future.- Overrides:
waitForAllMessageToBeProcessedFromTopicPartition
in classStoreIngestionTask
- Parameters:
topicPartition
- for which to wait- Throws:
java.lang.InterruptedException
-
getBatchReplicationLag
public long getBatchReplicationLag()
- Specified by:
getBatchReplicationLag
in classStoreIngestionTask
-
getLeaderOffsetLag
public long getLeaderOffsetLag()
- Specified by:
getLeaderOffsetLag
in classStoreIngestionTask
-
getBatchLeaderOffsetLag
public long getBatchLeaderOffsetLag()
- Specified by:
getBatchLeaderOffsetLag
in classStoreIngestionTask
-
getHybridLeaderOffsetLag
public long getHybridLeaderOffsetLag()
- Specified by:
getHybridLeaderOffsetLag
in classStoreIngestionTask
-
getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement
protected long getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, java.lang.String ignoredUpstreamKafkaUrl)
For L/F or NR, there is only one entry in upstreamOffsetMap whose key is NON_AA_REPLICATION_UPSTREAM_OFFSET_MAP_KEY. Return the value of the entry.
-
getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement
protected long getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, java.lang.String ignoredKafkaUrl)
For regular L/F stores without A/A enabled, there is always only one real-time source.
-
updateLatestInMemoryLeaderConsumedRTOffset
protected void updateLatestInMemoryLeaderConsumedRTOffset(PartitionConsumptionState pcs, java.lang.String ignoredKafkaUrl, long offset)
-
getFollowerOffsetLag
public long getFollowerOffsetLag()
Description copied from class:StoreIngestionTask
Measure the offset lag between follower and leader- Specified by:
getFollowerOffsetLag
in classStoreIngestionTask
-
getBatchFollowerOffsetLag
public long getBatchFollowerOffsetLag()
- Specified by:
getBatchFollowerOffsetLag
in classStoreIngestionTask
-
getHybridFollowerOffsetLag
public long getHybridFollowerOffsetLag()
- Specified by:
getHybridFollowerOffsetLag
in classStoreIngestionTask
-
getRegionHybridOffsetLag
public long getRegionHybridOffsetLag(int regionId)
- Specified by:
getRegionHybridOffsetLag
in classStoreIngestionTask
-
consumerUnSubscribeAllTopics
public void consumerUnSubscribeAllTopics(PartitionConsumptionState partitionConsumptionState)
Unsubscribe from all the topics being consumed for the partition in partitionConsumptionState- Specified by:
consumerUnSubscribeAllTopics
in classStoreIngestionTask
-
getWriteComputeErrorCode
public int getWriteComputeErrorCode()
- Specified by:
getWriteComputeErrorCode
in classStoreIngestionTask
-
updateLeaderTopicOnFollower
public void updateLeaderTopicOnFollower(PartitionConsumptionState partitionConsumptionState)
- Specified by:
updateLeaderTopicOnFollower
in classStoreIngestionTask
-
maybeCompressData
protected java.nio.ByteBuffer maybeCompressData(int partition, java.nio.ByteBuffer data, PartitionConsumptionState partitionConsumptionState)
Compresses data in a bytebuffer when consuming from rt as a leader node and compression is enabled for the store version for which we're consuming data.- Parameters:
partition
- which partition we're acting on so as to determine the PartitionConsumptionStatedata
- the data that we might compress- Returns:
- a bytebuffer that's either the original bytebuffer or a new one depending on if we compressed it.
-
shouldCompressData
protected boolean shouldCompressData(PartitionConsumptionState partitionConsumptionState)
-
processMessageAndMaybeProduceToKafka
protected void processMessageAndMaybeProduceToKafka(PubSubMessageProcessedResultWrapper<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, java.lang.String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs)
-
measureRTOffsetLagForSingleRegion
protected long measureRTOffsetLagForSingleRegion(java.lang.String sourceRealTimeTopicKafkaURL, PartitionConsumptionState pcs, boolean shouldLog)
This method fetches/calculates latest leader persisted offset and last offset in RT topic. The method relies ongetLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState, String)
to fetch latest leader persisted offset for different data replication policy.- Returns:
- the lag (lastOffsetInRealTimeTopic - latestPersistedLeaderOffset)
-
processVersionSwapMessage
protected void processVersionSwapMessage(ControlMessage controlMessage, int partition, PartitionConsumptionState partitionConsumptionState)
Description copied from class:StoreIngestionTask
This isn't really used for ingestion outside of A/A, so we NoOp here and rely on the actual implementation inActiveActiveStoreIngestionTask
- Overrides:
processVersionSwapMessage
in classStoreIngestionTask
-
createProducerCallback
protected LeaderProducerCallback createProducerCallback(PubSubMessage<KafkaKey,KafkaMessageEnvelope,java.lang.Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, java.lang.String kafkaUrl, long beforeProcessingRecordTimestampNs)
-
getVeniceWriter
protected Lazy<VeniceWriter<byte[],byte[],byte[]>> getVeniceWriter(PartitionConsumptionState partitionConsumptionState)
-
addPartitionConsumptionState
protected void addPartitionConsumptionState(java.lang.Integer partition, PartitionConsumptionState pcs)
-
maybeSendIngestionHeartbeat
protected java.util.Set<java.lang.String> maybeSendIngestionHeartbeat()
For hybrid stores only, the leader periodically writes a special SOS message to the RT topic with the following properties:
1. Special key: This key contains constant bytes, allowing for compaction.
2. Fixed/known producer GUID: This GUID is dedicated to heartbeats and prevents DIV from breaking.
3. Special segment: This segment never contains data, eliminating the need for an EOS message.
Upon consuming the SOS message, the leader writes it to its local VT. Once the drainer processes the record, the leader updates its latest processed upstream RT topic offset. At this point, the offset reflects the correct position, regardless of trailing CMs or skippable data records due to DCR.
This heartbeat message does not send a leader completion header. This results in having the leader completion states only in VTs and not in the RT, avoiding the need to differentiate between heartbeats from leaders of different versions (backup/current/future) and colos.
- Specified by:
maybeSendIngestionHeartbeat
in classStoreIngestionTask
- Returns:
- the set of partitions that failed to send heartbeat (used for tests)
-
resubscribe
protected void resubscribe(PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
Resubscribe operation by passing new version role and partition role toAggKafkaConsumerService
. The action for leader and follower replica will be handled differently.- Specified by:
resubscribe
in classStoreIngestionTask
- Throws:
java.lang.InterruptedException
-
resubscribeAsFollower
protected void resubscribeAsFollower(PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
resubscribeAsLeader
protected void resubscribeAsLeader(PartitionConsumptionState partitionConsumptionState) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
-