Class ActiveActiveStoreIngestionTask
java.lang.Object
com.linkedin.davinci.kafka.consumer.StoreIngestionTask
com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
com.linkedin.davinci.kafka.consumer.ActiveActiveStoreIngestionTask
- All Implemented Interfaces:
Closeable
,AutoCloseable
,Runnable
This class contains logic that SNs must perform if a store-version is running in Active/Active mode.
-
Nested Class Summary
Nested classes/interfaces inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
StoreIngestionTask.DelegateConsumerRecordResult
-
Field Summary
Fields inherited from class com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
hasChangeCaptureView, kafkaClusterIdToUrlMap, LEADER_OFFSET_LAG_FILTER, storeDeserializerCache, VALID_LAG, veniceWriter, veniceWriterForRealTime, viewWriters
Fields inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
aggKafkaConsumerService, availableSchemaIds, batchReportIncPushStatusEnabled, bootstrapTimeoutInMs, chunkAssembler, compressionStrategy, compressor, compressorFactory, consumerActionSequenceNumber, consumerActionsQueue, databaseSyncBytesIntervalForDeferredWriteMode, databaseSyncBytesIntervalForTransactionalMode, dataRecoverySourceVersionNumber, defaultReadyToServeChecker, deserializedSchemaIds, diskUsage, divErrorMetricCallback, emitMetrics, emptyPollSleepMs, errorPartitionId, gracefulShutdownLatch, hostLevelIngestionStats, hostName, hybridStoreConfig, idleCounter, ingestionNotificationDispatcher, ingestionTaskName, isChunked, isCurrentVersion, isDataRecovery, isDaVinciClient, isGlobalRtDivEnabled, isIsolatedIngestion, isRmdChunked, isRunning, isSeparatedRealtimeTopicEnabled, isWriteComputationEnabled, kafkaClusterUrlResolver, kafkaProps, kafkaVersionTopic, KILL_WAIT_TIME_MS, localKafkaClusterId, localKafkaServer, localKafkaServerSingletonSet, manifestSerializer, metaStoreWriter, parallelProcessingThreadPool, partitionConsumptionStateMap, partitionCount, partitionToPendingConsumerActionCountMap, pubSubTopicRepository, readCycleDelayMs, readOnlyForBatchOnlyStoreEnabled, realTimeTopic, recordLevelMetricEnabled, REDUNDANT_LOGGING_FILTER, resetErrorReplicaEnabled, SCHEMA_POLLING_DELAY_MS, schemaRepository, serverConfig, storageEngine, storageEngineRepository, storageMetadataService, storageService, STORE_VERSION_POLLING_DELAY_MS, storeBufferService, storeName, storeRepository, storeVersionPartitionCount, topicManagerRepository, versionedDIVStats, versionedIngestionStats, versionNumber, versionRole, versionTopic, WAITING_TIME_FOR_LAST_RECORD_TO_BE_PROCESSED, workloadType, writeComputeFailureCode, zkHelixAdmin
-
Constructor Summary
ConstructorDescriptionActiveActiveStoreIngestionTask
(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerConfig recordTransformerConfig, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin) -
Method Summary
Modifier and TypeMethodDescriptioncalculateLeaderUpstreamOffsetWithTopicSwitch
(PartitionConsumptionState partitionConsumptionState, PubSubTopic newSourceTopic, List<CharSequence> unreachableBrokerList) void
consumerSubscribe
(PubSubTopicPartition pubSubTopicPartition, long startOffset, String pubSubAddress) Ensures the PubSub URL is present in the PubSub cluster URL-to-ID map before subscribing to a topic.protected LeaderProducerCallback
createProducerCallback
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) delegateConsumerRecord
(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed.protected IngestionBatchProcessor
static int
getKeyLevelLockMaxPoolSizeBasedOnServerConfig
(VeniceServerConfig serverConfig, int partitionCount) protected long
getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement
(PartitionConsumptionState pcs, String upstreamKafkaUrl) Different from the persisted upstream offset map in OffsetRecord, latest consumed upstream offset map is maintained for each individual Kafka url.protected long
getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement
(PartitionConsumptionState pcs, String upstreamKafkaUrl) For A/A, there are multiple entries in upstreamOffsetMap during RT ingestion.protected BiConsumer<ChunkAwareCallback,
LeaderMetadataWrapper> getProduceToTopicFunction
(PartitionConsumptionState partitionConsumptionState, byte[] key, ByteBuffer updatedValueBytes, ByteBuffer updatedRmdBytes, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, int valueSchemaId, boolean resultReuseInput) long
getRegionHybridOffsetLag
(int regionId) boolean
used for metric purposesprotected boolean
isRealTimeBufferReplayStarted
(PartitionConsumptionState partitionConsumptionState) boolean
For Active-Active this buffer is always used.protected void
leaderExecuteTopicSwitch
(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic) protected long
measureRTOffsetLagForMultiRegions
(Set<String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) For stores in aggregate mode this is optimistic and returns the minimum lag of all fabric.protected void
processMessageAndMaybeProduceToKafka
(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) protected boolean
processTopicSwitch
(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState) ProcessTopicSwitch
control message at given partition offset for a specificPartitionConsumptionState
.protected void
produceToLocalKafka
(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceFunction, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs) protected void
putInStorageEngine
(int partition, byte[] keyBytes, Put put) Persist Put record to storage engine.protected void
removeFromStorageEngine
(int partition, byte[] keyBytes, Delete delete) protected boolean
For non AA hybrid stores with AGGREGATE DRP, SIT reads from parent RT while the HB is written to the child RTs.protected void
startConsumingAsLeader
(PartitionConsumptionState partitionConsumptionState) protected void
updateLatestInMemoryLeaderConsumedRTOffset
(PartitionConsumptionState pcs, String kafkaUrl, long offset) protected void
updateLatestInMemoryProcessedOffset
(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
.Methods inherited from class com.linkedin.davinci.kafka.consumer.LeaderFollowerStoreIngestionTask
addPartitionConsumptionState, calculateAssembledRecordSizeRatio, checkAndHandleUpstreamOffsetRewind, checkAndLogIfLagIsAcceptableForHybridStore, checkLongRunningTaskState, checkWhetherToCloseUnusedVeniceWriter, closeVeniceViewWriters, closeVeniceWriters, constructVeniceWriter, consumerUnSubscribeAllTopics, demoteToStandby, getAndUpdateLeaderCompletedState, getBatchFollowerOffsetLag, getBatchLeaderOffsetLag, getBatchReplicationLag, getConsumptionSourceKafkaAddress, getFollowerOffsetLag, getHybridFollowerOffsetLag, getHybridLeaderOffsetLag, getLeaderOffsetLag, getMaxNearlineRecordSizeBytes, getMaxRecordSizeBytes, getRealTimeDataSourceKafkaAddress, getTopicPartitionOffsetByKafkaURL, getVeniceWriter, getWriteComputeErrorCode, hasViewWriters, isHybridFollower, isLeader, maybeCompressData, maybeSendIngestionHeartbeat, measureHybridOffsetLag, measureRTOffsetLagForSingleRegion, processConsumerAction, processControlMessageForViews, promoteToLeader, queueUpVersionTopicWritesWithViewWriters, recordAssembledRecordSizeRatio, recordHeartbeatReceived, recordProcessedRecordStats, recordWriterStats, reportIfCatchUpVersionTopicOffset, resubscribe, resubscribeAsFollower, resubscribeAsLeader, shouldCompressData, shouldNewLeaderSwitchToRemoteConsumption, shouldPersistRecord, shouldProcessRecord, shouldProduceToVersionTopic, startConsumingAsLeaderInTransitionFromStandby, syncConsumedUpstreamRTOffsetMapIfNeeded, syncTopicSwitchToIngestionMetadataService, updateLeaderTopicOnFollower, updateOffsetMetadataInOffsetRecord, updateOffsetsFromConsumerRecord, validateAndFilterOutDuplicateMessagesFromLeaderTopic, waitForAllMessageToBeProcessedFromTopicPartition, waitForLastLeaderPersistFuture
Methods inherited from class com.linkedin.davinci.kafka.consumer.StoreIngestionTask
checkIngestionProgress, cloneProducerStates, close, consumerBatchUnsubscribe, consumerHasAnySubscription, consumerHasSubscription, consumerResetOffset, consumerSubscribe, createKafkaConsumerProperties, disableMetricsEmission, dropStoragePartitionGracefully, dumpPartitionConsumptionStates, dumpStoreVersionState, enableMetricsEmission, getCompressionStrategy, getCompressor, getFailedIngestionPartitionCount, getHostLevelIngestionStats, getIngestionTaskName, getKafkaVersionTopic, getOffsetToOnlineLagThresholdPerPartition, getPartitionConsumptionState, getPartitionOffsetLagBasedOnMetrics, getSchemaRepo, getServerConfig, getStorageEngine, getStoragePartitionConfig, getStoragePartitionConfig, getStoreName, getTopicManager, getTopicPartitionEndOffSet, getVersionedDIVStats, getVersionIngestionStats, getVersionNumber, getVersionTopic, hasAllPartitionReportedCompleted, hasAnyPartitionConsumptionState, hasAnySubscription, hasPendingPartitionIngestionAction, isActiveActiveReplicationEnabled, isChunked, isCurrentVersion, isFutureVersion, isHybridMode, isIngestionTaskActive, isMetricsEmissionEnabled, isPartitionConsumingOrHasPendingIngestionAction, isProducingVersionTopicHealthy, isReadyToServe, isRunning, isSegmentControlMsg, isSeparatedRealtimeTopicEnabled, isStuckByMemoryConstraint, isUserSystemStore, kill, logStorageOperationWhileUnsubscribed, measureLagWithCallToPubSub, measureLagWithCallToPubSub, minZeroLag, nextSeqNum, processCommonConsumerAction, processConsumerRecord, processEndOfIncrementalPush, processEndOfPush, processStartOfIncrementalPush, produceToStoreBufferService, produceToStoreBufferServiceOrKafka, produceToStoreBufferServiceOrKafkaInBatch, recordAssembledRecordSize, recordChecksumVerificationFailure, reportError, resetPartitionConsumptionOffset, resolveSourceKafkaServersWithinTopicSwitch, run, setLastConsumerException, setLastStoreIngestionException, setPartitionConsumptionState, shouldUpdateUpstreamOffset, shutdownAndWait, subscribePartition, subscribePartition, throwIfNotRunning, throwOrLogStorageFailureDependingIfStillSubscribed, unsubscribeFromTopic, unSubscribePartition, unSubscribePartition, updateIngestionRoleIfStoreChanged, updateOffsetMetadataAndSync, updateOffsetMetadataAndSyncOffset, validateMessage, waitVersionStateAvailable
-
Constructor Details
-
ActiveActiveStoreIngestionTask
public ActiveActiveStoreIngestionTask(StorageService storageService, StoreIngestionTaskFactory.Builder builder, Store store, Version version, Properties kafkaConsumerProperties, BooleanSupplier isCurrentVersion, VeniceStoreVersionConfig storeConfig, int errorPartitionId, boolean isIsolatedIngestion, Optional<ObjectCacheBackend> cacheBackend, DaVinciRecordTransformerConfig recordTransformerConfig, Lazy<org.apache.helix.manager.zk.ZKHelixAdmin> zkHelixAdmin)
-
-
Method Details
-
getKeyLevelLockMaxPoolSizeBasedOnServerConfig
public static int getKeyLevelLockMaxPoolSizeBasedOnServerConfig(VeniceServerConfig serverConfig, int partitionCount) -
delegateConsumerRecord
protected StoreIngestionTask.DelegateConsumerRecordResult delegateConsumerRecord(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingPerRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) Description copied from class:LeaderFollowerStoreIngestionTask
The goal of this function is to possibly produce the incoming kafka message consumed from local VT, remote VT, RT or SR topic to local VT if needed. It's decided based on the function output ofLeaderFollowerStoreIngestionTask.shouldProduceToVersionTopic(com.linkedin.davinci.kafka.consumer.PartitionConsumptionState)
and message type. It also perform any necessary additional computation operation such as for write-compute/update message. It returns a boolean indicating if it was produced to kafka or not. This function should be called as one of the first steps in processing pipeline for all messages consumed from any kafka topic. The caller of this function should only process this further if the return isStoreIngestionTask.DelegateConsumerRecordResult.QUEUED_TO_DRAINER
. This function assumesLeaderFollowerStoreIngestionTask.shouldProcessRecord(PubSubMessage)
has been called which happens inStoreIngestionTask.produceToStoreBufferServiceOrKafka(Iterable, PubSubTopicPartition, String, int)
before calling this and the it was decided that this record needs to be processed. It does not perform any validation check on the PartitionConsumptionState object to keep the goal of the function simple and not overload. Also DIV validation is done here if the message is received from RT topic. For more info please see please see StoreIngestionTask#internalProcessConsumerRecord(PubSubMessage, PartitionConsumptionState, LeaderProducedRecordContext, int, String, long) This function may modify the original record in KME and it is unsafe to use the payload from KME directly after this function.- Overrides:
delegateConsumerRecord
in classLeaderFollowerStoreIngestionTask
- Returns:
- a
StoreIngestionTask.DelegateConsumerRecordResult
indicating what to do with the record
-
putInStorageEngine
Description copied from class:StoreIngestionTask
Persist Put record to storage engine.- Overrides:
putInStorageEngine
in classStoreIngestionTask
-
removeFromStorageEngine
- Overrides:
removeFromStorageEngine
in classStoreIngestionTask
-
getRmdSerDe
-
getIngestionBatchProcessor
- Overrides:
getIngestionBatchProcessor
in classLeaderFollowerStoreIngestionTask
-
processMessageAndMaybeProduceToKafka
protected void processMessageAndMaybeProduceToKafka(PubSubMessageProcessedResultWrapper<KafkaKey, KafkaMessageEnvelope, Long> consumerRecordWrapper, PartitionConsumptionState partitionConsumptionState, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs, long beforeProcessingBatchRecordsTimestampMs) - Overrides:
processMessageAndMaybeProduceToKafka
in classLeaderFollowerStoreIngestionTask
-
produceToLocalKafka
protected void produceToLocalKafka(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, BiConsumer<ChunkAwareCallback, LeaderMetadataWrapper> produceFunction, int partition, String kafkaUrl, int kafkaClusterId, long beforeProcessingRecordTimestampNs) - Overrides:
produceToLocalKafka
in classLeaderFollowerStoreIngestionTask
-
calculateLeaderUpstreamOffsetWithTopicSwitch
protected Map<String,Long> calculateLeaderUpstreamOffsetWithTopicSwitch(PartitionConsumptionState partitionConsumptionState, PubSubTopic newSourceTopic, List<CharSequence> unreachableBrokerList) - Overrides:
calculateLeaderUpstreamOffsetWithTopicSwitch
in classLeaderFollowerStoreIngestionTask
-
startConsumingAsLeader
- Overrides:
startConsumingAsLeader
in classLeaderFollowerStoreIngestionTask
-
consumerSubscribe
public void consumerSubscribe(PubSubTopicPartition pubSubTopicPartition, long startOffset, String pubSubAddress) Ensures the PubSub URL is present in the PubSub cluster URL-to-ID map before subscribing to a topic. Prevents subscription to unknown PubSub URLs, which can cause issues during message consumption. -
leaderExecuteTopicSwitch
protected void leaderExecuteTopicSwitch(PartitionConsumptionState partitionConsumptionState, TopicSwitch topicSwitch, PubSubTopic newSourceTopic) - Overrides:
leaderExecuteTopicSwitch
in classLeaderFollowerStoreIngestionTask
-
processTopicSwitch
protected boolean processTopicSwitch(ControlMessage controlMessage, int partition, long offset, PartitionConsumptionState partitionConsumptionState) ProcessTopicSwitch
control message at given partition offset for a specificPartitionConsumptionState
. Return whether we need to execute additional ready-to-serve check after this message is processed.- Overrides:
processTopicSwitch
in classLeaderFollowerStoreIngestionTask
-
updateLatestInMemoryProcessedOffset
protected void updateLatestInMemoryProcessedOffset(PartitionConsumptionState partitionConsumptionState, PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, LeaderProducedRecordContext leaderProducedRecordContext, String kafkaUrl, boolean dryRun) Description copied from class:StoreIngestionTask
Maintain the latest processed offsets by drainers in memory; in most of the time, these offsets are ahead of the checkpoint offsets insideOffsetRecord
. Prior to update the offset in memory, the underlying storage engine should have persisted the given record. Dry-run mode will only do offset rewind check and it won't update the processed offset.- Overrides:
updateLatestInMemoryProcessedOffset
in classLeaderFollowerStoreIngestionTask
-
isRealTimeBufferReplayStarted
protected boolean isRealTimeBufferReplayStarted(PartitionConsumptionState partitionConsumptionState) - Overrides:
isRealTimeBufferReplayStarted
in classLeaderFollowerStoreIngestionTask
-
getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement
protected long getLatestPersistedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, String upstreamKafkaUrl) For A/A, there are multiple entries in upstreamOffsetMap during RT ingestion. If the current DataReplicationPolicy is on Aggregate mode, A/A will check the upstream offset lags from all regions; otherwise, only check the upstream offset lag from the local region. -
getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement
protected long getLatestConsumedUpstreamOffsetForHybridOffsetLagMeasurement(PartitionConsumptionState pcs, String upstreamKafkaUrl) Different from the persisted upstream offset map in OffsetRecord, latest consumed upstream offset map is maintained for each individual Kafka url. -
updateLatestInMemoryLeaderConsumedRTOffset
protected void updateLatestInMemoryLeaderConsumedRTOffset(PartitionConsumptionState pcs, String kafkaUrl, long offset) - Overrides:
updateLatestInMemoryLeaderConsumedRTOffset
in classLeaderFollowerStoreIngestionTask
-
isTransientRecordBufferUsed
public boolean isTransientRecordBufferUsed()For Active-Active this buffer is always used.- Overrides:
isTransientRecordBufferUsed
in classStoreIngestionTask
- Returns:
-
shouldCheckLeaderCompleteStateInFollower
protected boolean shouldCheckLeaderCompleteStateInFollower()Description copied from class:LeaderFollowerStoreIngestionTask
For non AA hybrid stores with AGGREGATE DRP, SIT reads from parent RT while the HB is written to the child RTs. Once all hybrid stores are either AA for cross colo replication and non AA otherwise, DRP and this extra check can also be removed.- Overrides:
shouldCheckLeaderCompleteStateInFollower
in classLeaderFollowerStoreIngestionTask
-
getRegionHybridOffsetLag
public long getRegionHybridOffsetLag(int regionId) - Overrides:
getRegionHybridOffsetLag
in classLeaderFollowerStoreIngestionTask
-
measureRTOffsetLagForMultiRegions
protected long measureRTOffsetLagForMultiRegions(Set<String> sourceRealTimeTopicKafkaURLs, PartitionConsumptionState partitionConsumptionState, boolean shouldLogLag) For stores in aggregate mode this is optimistic and returns the minimum lag of all fabric. This is because in aggregate mode duplicate msg consumption happen from all fabric. So it should be fine to consider the lowest lag. For stores in active/active mode, if no fabric is unreachable, return the maximum lag of all fabrics. If only one fabric is unreachable, return the maximum lag of other fabrics. If more than one fabrics are unreachable, return Long.MAX_VALUE, which means the partition is not ready-to-serve. TODO: For active/active incremental push stores or stores with only one samza job, we should consider the weight of unreachable fabric and make the decision. For example, we should not let partition ready-to-serve when the only source fabric is unreachable. In non-aggregate mode of consumption only return the local fabric lag- Overrides:
measureRTOffsetLagForMultiRegions
in classLeaderFollowerStoreIngestionTask
- Parameters:
sourceRealTimeTopicKafkaURLs
-partitionConsumptionState
-shouldLogLag
-- Returns:
-
isReadyToServeAnnouncedWithRTLag
public boolean isReadyToServeAnnouncedWithRTLag()used for metric purposes- Overrides:
isReadyToServeAnnouncedWithRTLag
in classLeaderFollowerStoreIngestionTask
-
getProduceToTopicFunction
protected BiConsumer<ChunkAwareCallback,LeaderMetadataWrapper> getProduceToTopicFunction(PartitionConsumptionState partitionConsumptionState, byte[] key, ByteBuffer updatedValueBytes, ByteBuffer updatedRmdBytes, ChunkedValueManifest oldValueManifest, ChunkedValueManifest oldRmdManifest, int valueSchemaId, boolean resultReuseInput) -
createProducerCallback
protected LeaderProducerCallback createProducerCallback(PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long> consumerRecord, PartitionConsumptionState partitionConsumptionState, LeaderProducedRecordContext leaderProducedRecordContext, int partition, String kafkaUrl, long beforeProcessingRecordTimestampNs) - Overrides:
createProducerCallback
in classLeaderFollowerStoreIngestionTask
-